Source code for erc7730.convert.convert_erc7730_input_to_resolved

import os
from typing import final, override

import requests
from pydantic import AnyUrl, RootModel

from erc7730.common.output import OutputAdder
from erc7730.convert import ERC7730Converter
from erc7730.model.abi import ABI
from erc7730.model.context import EIP712JsonSchema
from erc7730.model.display import (
    AddressNameParameters,
    CallDataParameters,
    DateParameters,
    FieldFormat,
    NftNameParameters,
    TokenAmountParameters,
    UnitParameters,
)
from erc7730.model.input.context import InputContract, InputContractContext, InputEIP712, InputEIP712Context
from erc7730.model.input.descriptor import InputERC7730Descriptor
from erc7730.model.input.display import (
    InputDisplay,
    InputEnumParameters,
    InputField,
    InputFieldDefinition,
    InputFieldDescription,
    InputFieldParameters,
    InputFormat,
    InputNestedFields,
    InputReference,
)
from erc7730.model.resolved.context import (
    ResolvedContract,
    ResolvedContractContext,
    ResolvedEIP712,
    ResolvedEIP712Context,
)
from erc7730.model.resolved.descriptor import ResolvedERC7730Descriptor
from erc7730.model.resolved.display import (
    ResolvedDisplay,
    ResolvedEnumParameters,
    ResolvedField,
    ResolvedFieldDefinition,
    ResolvedFieldDescription,
    ResolvedFieldParameters,
    ResolvedFormat,
    ResolvedNestedFields,
)


[docs] @final class ERC7730InputToResolved(ERC7730Converter[InputERC7730Descriptor, ResolvedERC7730Descriptor]): """ Converts ERC-7730 descriptor input to resolved form. After conversion, the descriptor is in resolved form: - URLs have been fetched - Contract addresses have been normalized to lowercase (TODO not implemented) - References have been inlined (TODO not implemented) - Constants have been inlined (TODO not implemented) - Field definitions have been inlined (TODO not implemented) - Selectors have been converted to 4 bytes form (TODO not implemented) """
[docs] @override def convert(self, descriptor: InputERC7730Descriptor, out: OutputAdder) -> ResolvedERC7730Descriptor | None: context = self._convert_context(descriptor.context, out) display = self._convert_display(descriptor.display, out) if context is None or display is None: return None return ResolvedERC7730Descriptor.model_validate( {"$schema": descriptor.schema_, "context": context, "metadata": descriptor.metadata, "display": display} )
@classmethod def _convert_context( cls, context: InputContractContext | InputEIP712Context, out: OutputAdder ) -> ResolvedContractContext | ResolvedEIP712Context | None: if isinstance(context, InputContractContext): return cls._convert_context_contract(context, out) if isinstance(context, InputEIP712Context): return cls._convert_context_eip712(context, out) return out.error( title="Invalid context type", message=f"Descriptor has an invalid context type: {type(context)}. Context type should be either contract" f"or eip712.", ) @classmethod def _convert_context_contract( cls, context: InputContractContext, out: OutputAdder ) -> ResolvedContractContext | None: contract = cls._convert_contract(context.contract, out) if contract is None: return None return ResolvedContractContext(contract=contract) @classmethod def _convert_contract(cls, contract: InputContract, out: OutputAdder) -> ResolvedContract | None: abi = cls._convert_abis(contract.abi, out) if abi is None: return None return ResolvedContract( abi=abi, deployments=contract.deployments, addressMatcher=contract.addressMatcher, factory=contract.factory ) @classmethod def _convert_abis(cls, abis: list[ABI] | AnyUrl, out: OutputAdder) -> list[ABI] | None: if isinstance(abis, AnyUrl): return cls._get_abi_from_url(abis) if isinstance(abis, list): return abis return out.error( title="Invalid ABIs type", message=f"Descriptor contains invalid value for ABIs: {type(abis)}, it should either be an URL or a JSON" f"representation of the ABIs.", ) @classmethod def _convert_context_eip712(cls, context: InputEIP712Context, out: OutputAdder) -> ResolvedEIP712Context | None: eip712 = cls._convert_eip712(context.eip712, out) if eip712 is None: return None return ResolvedEIP712Context(eip712=eip712) @classmethod def _convert_eip712(cls, eip712: InputEIP712, out: OutputAdder) -> ResolvedEIP712 | None: schemas = cls._convert_schemas(eip712.schemas, out) if schemas is None: return None return ResolvedEIP712( domain=eip712.domain, schemas=schemas, domainSeparator=eip712.domainSeparator, deployments=eip712.deployments, ) @classmethod def _convert_schemas( cls, schemas: list[EIP712JsonSchema | AnyUrl], out: OutputAdder ) -> list[EIP712JsonSchema] | None: resolved_schemas = [] for schema in schemas: if (resolved_schema := cls._convert_schema(schema, out)) is not None: resolved_schemas.append(resolved_schema) return resolved_schemas @classmethod def _convert_schema(cls, schema: EIP712JsonSchema | AnyUrl, out: OutputAdder) -> EIP712JsonSchema | None: if isinstance(schema, AnyUrl): return cls._get_schema_from_url(schema) if isinstance(schema, EIP712JsonSchema): return schema return out.error( title="Invalid EIP-712 schema type", message=f"Descriptor contains invalid value for EIP-712 schema: {type(schema)}, it should either be an URL" f"or a JSON representation of the schema.", ) @classmethod def _convert_display(cls, display: InputDisplay, out: OutputAdder) -> ResolvedDisplay | None: if display.definitions is None: definitions = None else: definitions = {} for definition_key, definition in display.definitions.items(): if (resolved_definition := cls._convert_field_definition(definition, out)) is not None: definitions[definition_key] = resolved_definition formats = {} for format_key, format in display.formats.items(): if (resolved_format := cls._convert_format(format, out)) is not None: formats[format_key] = resolved_format return ResolvedDisplay(definitions=definitions, formats=formats) @classmethod def _convert_field_definition( cls, definition: InputFieldDefinition, out: OutputAdder ) -> ResolvedFieldDefinition | None: params = cls._convert_field_parameters(definition.params, out) if definition.params is not None else None return ResolvedFieldDefinition.model_validate( { "$id": definition.id, "label": definition.label, "format": FieldFormat(definition.format) if definition.format is not None else None, "params": params, } ) @classmethod def _convert_field_description( cls, definition: InputFieldDescription, out: OutputAdder ) -> ResolvedFieldDescription | None: params = cls._convert_field_parameters(definition.params, out) if definition.params is not None else None return ResolvedFieldDescription.model_validate( { "$id": definition.id, "path": definition.path, "label": definition.label, "format": FieldFormat(definition.format) if definition.format is not None else None, "params": params, } ) @classmethod def _convert_field_parameters( cls, params: InputFieldParameters, out: OutputAdder ) -> ResolvedFieldParameters | None: if isinstance(params, AddressNameParameters): return params if isinstance(params, CallDataParameters): return params if isinstance(params, TokenAmountParameters): return params if isinstance(params, NftNameParameters): return params if isinstance(params, DateParameters): return params if isinstance(params, UnitParameters): return params if isinstance(params, InputEnumParameters): return cls._convert_enum_parameters(params, out) return out.error(title="Invalid field parameters", message=f"Invalid field parameters type: {type(params)}") @classmethod def _convert_enum_parameters(cls, params: InputEnumParameters, out: OutputAdder) -> ResolvedEnumParameters | None: return ResolvedEnumParameters.model_validate({"$ref": params.ref}) # TODO must inline here @classmethod def _convert_format(cls, format: InputFormat, error: OutputAdder) -> ResolvedFormat | None: fields = cls._convert_fields(format.fields, error) if fields is None: return None return ResolvedFormat.model_validate( { "$id": format.id, "intent": format.intent, "fields": fields, "required": format.required, "excluded": format.excluded, "screens": format.screens, } ) @classmethod def _convert_fields(cls, fields: list[InputField], out: OutputAdder) -> list[ResolvedField] | None: resolved_fields = [] for input_format in fields: if (resolved_field := cls._convert_field(input_format, out)) is not None: resolved_fields.append(resolved_field) return resolved_fields @classmethod def _convert_field(cls, field: InputField, out: OutputAdder) -> ResolvedField | None: if isinstance(field, InputReference): return cls._convert_reference(field, out) if isinstance(field, InputFieldDescription): return cls._convert_field_description(field, out) if isinstance(field, InputNestedFields): return cls._convert_nested_fields(field, out) return out.error(title="Invalid field type", message=f"Invalid field type: {type(field)}") @classmethod def _convert_nested_fields(cls, fields: InputNestedFields, out: OutputAdder) -> ResolvedNestedFields | None: resolved_fields = cls._convert_fields(fields.fields, out) if resolved_fields is None: return None return ResolvedNestedFields(path=fields.path, fields=resolved_fields) @classmethod def _convert_reference(cls, reference: InputReference, out: OutputAdder) -> ResolvedField | None: raise NotImplementedError("_convert_reference is not implemented") # TODO @classmethod def _adapt_github_uri(cls, url: AnyUrl) -> AnyUrl: if url.host == "github.com": return AnyUrl( str(url).replace("https://github.com/", "https://raw.githubusercontent.com/").replace("/blob/", "/") ) else: return url @classmethod def _adapt_etherscan_uri(cls, url: AnyUrl) -> AnyUrl: if (api_key := os.environ.get("ETHERSCAN_API_KEY")) is not None: return AnyUrl(f"{url}&apikey={api_key}") else: return url @classmethod def _get_schema_from_url(cls, url: AnyUrl) -> EIP712JsonSchema: resp = requests.get(cls._adapt_github_uri(url), timeout=10) # type:ignore resp.raise_for_status() return EIP712JsonSchema.model_validate(resp.json()) @classmethod def _get_abi_from_url(cls, url: AnyUrl) -> list[ABI]: match url.host: case "api.etherscan.io": # TODO use client? Do we want to parse URL? Or use deployments? resp = requests.get(cls._adapt_etherscan_uri(url), timeout=10) # type:ignore resp.raise_for_status() return RootModel[list[ABI]].model_validate_json(resp.json()["result"]).root case _: resp = requests.get(cls._adapt_github_uri(url), timeout=10) # type:ignore resp.raise_for_status() return RootModel[list[ABI]].model_validate(resp.json()).root