Source code for erc7730.convert.resolved.convert_erc7730_input_to_resolved
from typing import assert_never, final, override
from eip712.model.schema import EIP712Type
from pydantic_string_url import HttpUrl
from erc7730.common import client
from erc7730.common.abi import reduce_signature, signature_to_selector
from erc7730.common.output import ExceptionsToOutput, OutputAdder
from erc7730.convert import ERC7730Converter
from erc7730.convert.resolved.constants import ConstantProvider, DefaultConstantProvider
from erc7730.convert.resolved.parameters import resolve_field_parameters
from erc7730.convert.resolved.references import resolve_reference
from erc7730.model.abi import ABI
from erc7730.model.context import EIP712Schema
from erc7730.model.display import (
FieldFormat,
)
from erc7730.model.input.context import (
InputContract,
InputContractContext,
InputDeployment,
InputDomain,
InputEIP712,
InputEIP712Context,
InputFactory,
)
from erc7730.model.input.descriptor import InputERC7730Descriptor
from erc7730.model.input.display import (
InputDisplay,
InputField,
InputFieldDefinition,
InputFieldDescription,
InputFormat,
InputNestedFields,
InputReference,
)
from erc7730.model.input.metadata import InputMetadata
from erc7730.model.metadata import EnumDefinition
from erc7730.model.paths import ROOT_DATA_PATH, Array, ArrayElement, ArraySlice, ContainerPath, DataPath, Field
from erc7730.model.paths.path_ops import data_or_container_path_concat, data_path_concat
from erc7730.model.resolved.context import (
ResolvedContract,
ResolvedContractContext,
ResolvedDeployment,
ResolvedDomain,
ResolvedEIP712,
ResolvedEIP712Context,
ResolvedFactory,
)
from erc7730.model.resolved.descriptor import ResolvedERC7730Descriptor
from erc7730.model.resolved.display import (
ResolvedDisplay,
ResolvedField,
ResolvedFieldDescription,
ResolvedFormat,
ResolvedNestedFields,
)
from erc7730.model.resolved.metadata import ResolvedMetadata
from erc7730.model.types import Address, Id, Selector
[docs]
@final
class ERC7730InputToResolved(ERC7730Converter[InputERC7730Descriptor, ResolvedERC7730Descriptor]):
"""
Converts ERC-7730 descriptor input to resolved form.
After conversion, the descriptor is in resolved form:
- URLs have been fetched
- Contract addresses have been normalized to lowercase
- References have been inlined
- Constants have been inlined
- Field definitions have been inlined
- Nested fields have been flattened where possible
- Selectors have been converted to 4 bytes form
"""
[docs]
@override
def convert(self, descriptor: InputERC7730Descriptor, out: OutputAdder) -> ResolvedERC7730Descriptor | None:
with ExceptionsToOutput(out):
constants = DefaultConstantProvider(descriptor)
if (context := self._resolve_context(descriptor.context, out)) is None:
return None
if (metadata := self._resolve_metadata(descriptor.metadata, out)) is None:
return None
if (display := self._resolve_display(descriptor.display, context, metadata.enums, constants, out)) is None:
return None
return ResolvedERC7730Descriptor(context=context, metadata=metadata, display=display)
# noinspection PyUnreachableCode
return None
@classmethod
def _resolve_context(
cls, context: InputContractContext | InputEIP712Context, out: OutputAdder
) -> ResolvedContractContext | ResolvedEIP712Context | None:
match context:
case InputContractContext():
return cls._resolve_context_contract(context, out)
case InputEIP712Context():
return cls._resolve_context_eip712(context, out)
case _:
assert_never(context)
@classmethod
def _resolve_metadata(cls, metadata: InputMetadata, out: OutputAdder) -> ResolvedMetadata | None:
resolved_enums = {}
if metadata.enums is not None:
for enum_id, enum in metadata.enums.items():
if (resolved_enum := cls._resolve_enum(enum, out)) is not None:
resolved_enums[enum_id] = resolved_enum
return ResolvedMetadata(
owner=metadata.owner,
info=metadata.info,
token=metadata.token,
enums=resolved_enums,
)
@classmethod
def _resolve_enum(cls, enum: HttpUrl | EnumDefinition, out: OutputAdder) -> dict[str, str] | None:
match enum:
case HttpUrl() as url:
try:
return client.get(url=url, model=EnumDefinition)
except Exception as e:
return out.error(
title="Failed to fetch enum definition from URL",
message=f'Failed to fetch enum definition from URL "{url}": {e}',
)
case dict():
return enum
case _:
assert_never(enum)
@classmethod
def _resolve_context_contract(
cls, context: InputContractContext, out: OutputAdder
) -> ResolvedContractContext | None:
if (contract := cls._resolve_contract(context.contract, out)) is None:
return None
return ResolvedContractContext(contract=contract)
@classmethod
def _resolve_contract(cls, contract: InputContract, out: OutputAdder) -> ResolvedContract | None:
if (abi := cls._resolve_abis(contract.abi, out)) is None:
return None
if (deployments := cls._resolve_deployments(contract.deployments, out)) is None:
return None
if contract.factory is None:
factory = None
elif (factory := cls._resolve_factory(contract.factory, out)) is None:
return None
return ResolvedContract(
abi=abi, deployments=deployments, addressMatcher=contract.addressMatcher, factory=factory
)
@classmethod
def _resolve_deployments(
cls, deployments: list[InputDeployment], out: OutputAdder
) -> list[ResolvedDeployment] | None:
resolved_deployments = []
for deployment in deployments:
if (resolved_deployment := cls._resolve_deployment(deployment, out)) is not None:
resolved_deployments.append(resolved_deployment)
return resolved_deployments
@classmethod
def _resolve_deployment(cls, deployment: InputDeployment, out: OutputAdder) -> ResolvedDeployment | None:
return ResolvedDeployment(chainId=deployment.chainId, address=Address(deployment.address))
@classmethod
def _resolve_factory(cls, factory: InputFactory, out: OutputAdder) -> ResolvedFactory | None:
if (deployments := cls._resolve_deployments(factory.deployments, out)) is None:
return None
return ResolvedFactory(deployments=deployments, deployEvent=factory.deployEvent)
@classmethod
def _resolve_abis(cls, abis: list[ABI] | HttpUrl, out: OutputAdder) -> list[ABI] | None:
match abis:
case HttpUrl() as url:
try:
return client.get(url=url, model=list[ABI])
except Exception as e:
return out.error(
title="Failed to fetch ABI from URL",
message=f'Failed to fetch ABI from URL "{url}": {e}',
)
case list():
return abis
case _:
assert_never(abis)
@classmethod
def _resolve_context_eip712(cls, context: InputEIP712Context, out: OutputAdder) -> ResolvedEIP712Context | None:
if (eip712 := cls._resolve_eip712(context.eip712, out)) is None:
return None
return ResolvedEIP712Context(eip712=eip712)
@classmethod
def _resolve_eip712(cls, eip712: InputEIP712, out: OutputAdder) -> ResolvedEIP712 | None:
if eip712.domain is None:
domain = None
elif (domain := cls._resolve_domain(eip712.domain, out)) is None:
return None
if (schemas := cls._resolve_schemas(eip712.schemas, out)) is None:
return None
if (deployments := cls._resolve_deployments(eip712.deployments, out)) is None:
return None
return ResolvedEIP712(
domain=domain,
schemas=schemas,
domainSeparator=eip712.domainSeparator,
deployments=deployments,
)
@classmethod
def _resolve_domain(cls, domain: InputDomain, out: OutputAdder) -> ResolvedDomain | None:
return ResolvedDomain(
name=domain.name,
version=domain.version,
chainId=domain.chainId,
verifyingContract=None if domain.verifyingContract is None else Address(domain.verifyingContract),
)
@classmethod
def _resolve_schemas(cls, schemas: list[EIP712Schema | HttpUrl], out: OutputAdder) -> list[EIP712Schema] | None:
resolved_schemas = []
for schema in schemas:
if (resolved_schema := cls._resolve_schema(schema, out)) is not None:
resolved_schemas.append(resolved_schema)
return resolved_schemas
@classmethod
def _resolve_schema(cls, schema: EIP712Schema | HttpUrl, out: OutputAdder) -> EIP712Schema | None:
match schema:
case HttpUrl() as url:
try:
return client.get(url=url, model=EIP712Schema)
except Exception as e:
return out.error(
title="Failed to fetch EIP-712 schema from URL",
message=f'Failed to fetch EIP-712 schema from URL "{url}": {e}',
)
case EIP712Schema():
return schema
case _:
assert_never(schema)
@classmethod
def _resolve_display(
cls,
display: InputDisplay,
context: ResolvedContractContext | ResolvedEIP712Context,
enums: dict[Id, EnumDefinition] | None,
constants: ConstantProvider,
out: OutputAdder,
) -> ResolvedDisplay | None:
definitions = display.definitions or {}
enums = enums or {}
formats = {}
for format_id, format in display.formats.items():
if (resolved_format_id := cls._resolve_format_id(format_id, context, out)) is None:
return None
if (resolved_format := cls._resolve_format(format, definitions, enums, constants, out)) is None:
return None
if resolved_format_id in formats:
return out.error(
title="Duplicate format",
message=f"Descriptor contains 2 formats sections for {resolved_format_id}",
)
formats[resolved_format_id] = resolved_format
return ResolvedDisplay(formats=formats)
@classmethod
def _resolve_field_description(
cls,
prefix: DataPath,
definition: InputFieldDescription,
enums: dict[Id, EnumDefinition],
constants: ConstantProvider,
out: OutputAdder,
) -> ResolvedFieldDescription | None:
match definition.format:
case None | FieldFormat.RAW | FieldFormat.AMOUNT | FieldFormat.TOKEN_AMOUNT | FieldFormat.DURATION:
pass
case (
FieldFormat.ADDRESS_NAME
| FieldFormat.CALL_DATA
| FieldFormat.NFT_NAME
| FieldFormat.DATE
| FieldFormat.UNIT
| FieldFormat.ENUM
):
if definition.params is None:
return out.error(
title="Missing parameters",
message=f"""Field format "{definition.format.value}" requires parameters to be defined, """
f"""they are missing for field "{definition.path}".""",
)
case _:
assert_never(definition.format)
params = resolve_field_parameters(prefix, definition.params, enums, constants, out)
if (path := constants.resolve_path(definition.path, out)) is None:
return None
return ResolvedFieldDescription.model_validate(
{
"$id": definition.id,
"path": data_or_container_path_concat(prefix, path),
"label": constants.resolve(definition.label, out),
"format": FieldFormat(definition.format) if definition.format is not None else None,
"params": params,
}
)
@classmethod
def _resolve_format_id(
cls,
format_id: str,
context: ResolvedContractContext | ResolvedEIP712Context,
out: OutputAdder,
) -> EIP712Type | Selector | None:
match context:
case ResolvedContractContext():
if format_id.startswith("0x"):
return Selector(format_id)
if (reduced_signature := reduce_signature(format_id)) is not None:
return Selector(signature_to_selector(reduced_signature))
return out.error(
title="Invalid selector",
message=f""""{format_id}" is not a valid function signature or selector.""",
)
case ResolvedEIP712Context():
return format_id
case _:
assert_never(context)
@classmethod
def _resolve_format(
cls,
format: InputFormat,
definitions: dict[Id, InputFieldDefinition],
enums: dict[Id, EnumDefinition],
constants: ConstantProvider,
out: OutputAdder,
) -> ResolvedFormat | None:
if (fields := cls._resolve_fields(ROOT_DATA_PATH, format.fields, definitions, enums, constants, out)) is None:
return None
return ResolvedFormat.model_validate(
{
"$id": format.id,
"intent": format.intent,
"fields": fields,
"required": format.required,
"excluded": format.excluded,
"screens": format.screens,
}
)
@classmethod
def _resolve_fields(
cls,
prefix: DataPath,
fields: list[InputField],
definitions: dict[Id, InputFieldDefinition],
enums: dict[Id, EnumDefinition],
constants: ConstantProvider,
out: OutputAdder,
) -> list[ResolvedField] | None:
resolved_fields = []
for input_format in fields:
if (resolved_field := cls._resolve_field(prefix, input_format, definitions, enums, constants, out)) is None:
return None
resolved_fields.extend(resolved_field)
return resolved_fields
@classmethod
def _resolve_field(
cls,
prefix: DataPath,
field: InputField,
definitions: dict[Id, InputFieldDefinition],
enums: dict[Id, EnumDefinition],
constants: ConstantProvider,
out: OutputAdder,
) -> list[ResolvedField] | None:
resolved_fields: list[ResolvedField] = []
match field:
case InputReference():
if (resolved_field := resolve_reference(prefix, field, definitions, enums, constants, out)) is None:
return None
resolved_fields.append(resolved_field)
case InputFieldDescription():
if (resolved_field := cls._resolve_field_description(prefix, field, enums, constants, out)) is None:
return None
resolved_fields.append(resolved_field)
case InputNestedFields():
if (
resolved_nested_fields := cls._resolve_nested_fields(
prefix, field, definitions, enums, constants, out
)
) is None:
return None
resolved_fields.extend(resolved_nested_fields)
case _:
assert_never(field)
return resolved_fields
@classmethod
def _resolve_nested_fields(
cls,
prefix: DataPath,
fields: InputNestedFields,
definitions: dict[Id, InputFieldDefinition],
enums: dict[Id, EnumDefinition],
constants: ConstantProvider,
out: OutputAdder,
) -> list[ResolvedNestedFields | ResolvedFieldDescription] | None:
path: DataPath
match constants.resolve_path(fields.path, out):
case None:
return None
case DataPath() as data_path:
path = data_path_concat(prefix, data_path)
case ContainerPath() as container_path:
return out.error(
title="Invalid path type",
message=f"Container path {container_path} cannot be used with nested fields.",
)
case _:
assert_never(fields.path)
if (
resolved_fields := cls._resolve_fields(
prefix=path, fields=fields.fields, definitions=definitions, enums=enums, constants=constants, out=out
)
) is None:
return None
match path.elements[-1]:
case Field() | ArrayElement():
return resolved_fields
case ArraySlice():
return out.error(
title="Invalid nested fields",
message="Using nested fields on an array slice is not allowed.",
)
case Array():
return [ResolvedNestedFields(path=path, fields=resolved_fields)]
case _:
assert_never(path.elements[-1])