Source code for anchorregistry.client

# SPDX-License-Identifier: BUSL-1.1
"""Public query functions for anchorregistry.

All functions compose on ``rpc._get_logs`` + ``decoder._decode_event``.
"""

from __future__ import annotations

import hashlib
from typing import Any

from eth_hash.auto import keccak

from eth_abi import decode as abi_decode

from anchorregistry.constants import KNOWN_DEPLOYMENTS, DEPLOYMENT_NETWORKS
from anchorregistry.decoder import _decode_data_fields, _decode_event
from anchorregistry.enums import ArtifactType
from anchorregistry.exceptions import AnchorNotFoundError
from anchorregistry.rpc import (
    _connect,
    _connect_all,
    _fetch_anchor_data,
    _fetch_anchor_data_batch,
    _fetch_transactions_batch,
    _get_logs,
)
from anchorregistry.utils import _address_topic, _build_topic, is_user_initiated

# Artifact types registered via registerTargeted (have a targetArId param).
_TARGETED_TYPES = {
    ArtifactType.RETRACTION,
    ArtifactType.REVIEW,
    ArtifactType.VOID,
    ArtifactType.AFFIRMED,
}

# ABI types for decoding registerTargeted calldata (after 4-byte selector).
_TARGETED_INPUT_TYPES = [
    "string",                                                    # arId
    "(uint8,string,string,string,string,string,string)",         # AnchorBase tuple
    "string",                                                    # targetArId
    "bytes",                                                     # extra
]

SOFTWARE_TYPES = {"CODE"}


# ── internal helpers ──────────────────────────────────────────────────

def _decode_target_ar_id(tx: dict) -> str:
    """Decode ``targetArId`` from a ``registerTargeted`` transaction object."""
    calldata = bytes(tx["input"])[4:]  # strip 4-byte function selector
    _, _, target_ar_id, _ = abi_decode(_TARGETED_INPUT_TYPES, calldata)
    return target_ar_id


def _fetch_target_ar_id(w3: Any, tx_hash: bytes) -> str:
    """Decode ``targetArId`` from a ``registerTargeted`` transaction.

    Fetches the transaction by hash and ABI-decodes the calldata to
    extract the ``targetArId`` parameter.
    """
    tx = w3.eth.get_transaction(tx_hash)
    return _decode_target_ar_id(tx)


def _build_record(w3: Any, contract: Any, raw_log: dict) -> dict[str, Any]:
    """Build a complete two-level record from a raw Anchored event log."""
    record = _decode_event(raw_log)
    ar_id = record["ar_id"]
    type_idx = record["artifact_type_index"]

    # Fetch and decode type-specific data from on-chain storage.
    extra = _fetch_anchor_data(contract, ar_id)
    data = _decode_data_fields(type_idx, extra)

    # For targeted types, recover targetArId from transaction calldata.
    if type_idx in _TARGETED_TYPES:
        target_ar_id = _fetch_target_ar_id(w3, raw_log["transactionHash"])
        data["target_ar_id"] = target_ar_id

    record["data"] = data
    return record


def _build_records(w3: Any, contract: Any, logs: list[dict]) -> list[dict[str, Any]]:
    """Build complete two-level records from multiple logs using batch RPC.

    Sends all ``getAnchorData`` calls in a single batch request, and
    all ``eth_getTransaction`` calls for targeted types in a second batch.
    Reduces N+1 RPC calls to 1 ``eth_getLogs`` + 1 batch ``getAnchorData``
    + 1 batch ``eth_getTransaction``.
    """
    if not logs:
        return []

    # Phase 1: decode all events (no RPC needed).
    records = [_decode_event(log) for log in logs]
    ar_ids = [r["ar_id"] for r in records]

    # Phase 2: batch-fetch all type-specific data.
    extras = _fetch_anchor_data_batch(w3, contract, ar_ids)

    # Phase 3: batch-fetch transactions for targeted types.
    targeted_indices = [
        i for i, r in enumerate(records) if r["artifact_type_index"] in _TARGETED_TYPES
    ]
    targeted_tx_hashes = [logs[i]["transactionHash"] for i in targeted_indices]
    targeted_txs = _fetch_transactions_batch(w3, targeted_tx_hashes)

    # Phase 4: assemble records.
    targeted_tx_map = dict(zip(targeted_indices, targeted_txs))

    for i, record in enumerate(records):
        type_idx = record["artifact_type_index"]
        data = _decode_data_fields(type_idx, extras[i])

        if i in targeted_tx_map:
            data["target_ar_id"] = _decode_target_ar_id(targeted_tx_map[i])

        record["data"] = data

    return records


# ── public API ────────────────────────────────────────────────────────

[docs] def get_by_arid(ar_id: str, rpc_url: str | None = None) -> dict[str, Any]: """Fetch a single anchor record by AR-ID. Uses the indexed ``arId`` topic for a targeted single-event query. Parameters ---------- ar_id: The AR-ID to look up (e.g. ``"AR-2026-Pvdp0W5"``). rpc_url: Optional RPC URL override. Returns ------- dict[str, Any] Two-level anchor record. Raises ------ AnchorNotFoundError If the AR-ID does not exist on-chain. """ # Iterate every deployment on the active network (newest first). importAnchor # was removed in V1.1-final, so an AR-ID's event lives only on the contract # where it was originally registered — this loop finds it there. topic = _build_topic(ar_id) for w3, contract, deploy_block, addr in _connect_all(rpc_url): logs = _get_logs( w3, contract.address, deploy_block or 0, "latest", topic_1=topic, early_exit_on_match=True, ) if logs: record = _build_record(w3, contract, logs[0]) record["contract_address"] = addr return record raise AnchorNotFoundError(f"AR-ID not found on any deployment: {ar_id}")
[docs] def get_by_registrant( wallet_address: str, rpc_url: str | None = None ) -> list[dict[str, Any]]: """Fetch all anchors registered by a specific wallet address. Uses the indexed ``registrant`` topic. Parameters ---------- wallet_address: Ethereum wallet address (checksummed or lowercase). rpc_url: Optional RPC URL override. Returns ------- list[dict[str, Any]] List of two-level anchor records. """ topic = _address_topic(wallet_address) merged: dict[str, dict[str, Any]] = {} for w3, contract, deploy_block, addr in _connect_all(rpc_url): logs = _get_logs( w3, contract.address, deploy_block or 0, "latest", topic_2=topic ) for record in _build_records(w3, contract, logs): record.setdefault("contract_address", addr) # newest-first iteration → first write wins on duplicate AR-IDs merged.setdefault(record["ar_id"], record) return list(merged.values())
[docs] def get_by_tree( tree_id_plain: str, rpc_url: str | None = None ) -> list[dict[str, Any]]: """Fetch all anchors belonging to a specific tree. Uses the indexed ``treeId`` topic. Parameters ---------- tree_id_plain: Human-readable treeId string (e.g. ``"ar-operator-v1"``). rpc_url: Optional RPC URL override. Returns ------- list[dict[str, Any]] List of two-level anchor records. """ topic = _build_topic(tree_id_plain) merged: dict[str, dict[str, Any]] = {} for w3, contract, deploy_block, addr in _connect_all(rpc_url): logs = _get_logs( w3, contract.address, deploy_block or 0, "latest", topic_3=topic ) for record in _build_records(w3, contract, logs): record.setdefault("contract_address", addr) merged.setdefault(record["ar_id"], record) return list(merged.values())
[docs] def get_by_type( artifact_type: ArtifactType | int, rpc_url: str | None = None ) -> list[dict[str, Any]]: """Fetch all anchors of a specific artifact type. Post-filter on decoded ``artifactType`` from event data. Parameters ---------- artifact_type: An ``ArtifactType`` enum member or its integer index. rpc_url: Optional RPC URL override. Returns ------- list[dict[str, Any]] List of two-level anchor records matching the type. """ type_index = int(artifact_type) records = get_all(rpc_url=rpc_url) return [r for r in records if r["artifact_type_index"] == type_index]
[docs] def get_all( from_block: int | None = None, to_block: int | None = None, rpc_url: str | None = None, ) -> list[dict[str, Any]]: """Fetch all anchors from the registry. Defaults to scanning from ``DEPLOY_BLOCK`` to latest. Parameters ---------- from_block: Starting block number. Defaults to ``DEPLOY_BLOCK``. to_block: Ending block number. Defaults to ``"latest"``. rpc_url: Optional RPC URL override. Returns ------- list[dict[str, Any]] List of all two-level anchor records. """ merged: dict[str, dict[str, Any]] = {} for w3, contract, deploy_block, addr in _connect_all(rpc_url): start = from_block if from_block is not None else (deploy_block or 0) end = to_block if to_block is not None else "latest" logs = _get_logs(w3, contract.address, start, end) for record in _build_records(w3, contract, logs): record.setdefault("contract_address", addr) merged.setdefault(record["ar_id"], record) return list(merged.values())
def which_contract( ar_id: str, network: str | None = None, rpc_url: str | None = None, ) -> str | None: """Find which known AnchorRegistry deployment holds an AR-ID. Probes every entry in KNOWN_DEPLOYMENTS that belongs to *network* (newest first by dict insertion order — V1B before V1A, etc.) and returns the contract address that matches. Each probe is a single early-exit eth_getLogs, so the worst case is one RPC call per known deployment on the network. Side effect: on a hit, ``configure()`` is called with the matching contract so subsequent operations in the session naturally target it. On a miss, configuration is left at the last probed contract — caller should re-configure() if that matters. Parameters ---------- ar_id: The AR-ID to locate (e.g. ``"AR-2026-Pvdp0W5"``). network: Network preset name to filter candidates by. Defaults to the currently-active network. rpc_url: Optional RPC URL override. Falls back to the network preset. Returns ------- str | None Lower-case contract address holding the anchor, or ``None`` if the AR-ID isn't on any known deployment for the network. """ # Local imports to avoid an import cycle at module load (config imports # from client via the public API surface in __init__). from anchorregistry.config import _active_network, configure target_net = network or _active_network candidates = [ addr for addr in KNOWN_DEPLOYMENTS if DEPLOYMENT_NETWORKS.get(addr) == target_net ] for addr in candidates: configure(network=target_net, contract_address=addr, rpc_url=rpc_url) try: get_by_arid(ar_id) return addr except AnchorNotFoundError: continue return None
[docs] def verify( ar_id: str, file_path: str | None = None, rpc_url: str | None = None, ) -> dict[str, Any]: """Fetch anchor record and optionally verify file integrity. If *file_path* is provided, computes SHA256 of the file and compares against ``manifest_hash`` on-chain. Parameters ---------- ar_id: The AR-ID to verify. file_path: Optional local file path for SHA256 integrity check. rpc_url: Optional RPC URL override. Returns ------- dict[str, Any] Record dict plus ``verified`` (bool) and ``hash_match`` (bool) keys. Raises ------ AnchorNotFoundError If the AR-ID does not exist on-chain. """ record = get_by_arid(ar_id, rpc_url=rpc_url) result = {**record, "verified": True, "hash_match": None} if file_path is not None: sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha256.update(chunk) file_hash = sha256.hexdigest() result["hash_match"] = file_hash == record["manifest_hash"] return result
[docs] def watermark( ar_id: str, artifact_type: str | None = None, rpc_url: str | None = None, ) -> str: """Generate the correct SPDX-Anchor or DAPX-Anchor embedded tag. - ``artifact_type == "CODE"`` → ``SPDX-Anchor: anchorregistry.ai/{ar_id}`` - All other types → ``DAPX-Anchor: anchorregistry.ai/{ar_id}`` - If *artifact_type* is ``None``, resolves via ``get_by_arid()`` automatically. Parameters ---------- ar_id: The AR-ID to watermark. artifact_type: Optional type name string (e.g. ``"CODE"``). If ``None``, looked up on-chain. rpc_url: Optional RPC URL override. Returns ------- str The watermark line, e.g. ``"SPDX-Anchor: anchorregistry.ai/AR-2026-Pvdp0W5"``. """ if artifact_type is None: record = get_by_arid(ar_id, rpc_url=rpc_url) artifact_type = record["artifact_type_name"] prefix = "SPDX-Anchor" if artifact_type in SOFTWARE_TYPES else "DAPX-Anchor" return f"{prefix}: anchorregistry.ai/{ar_id}"
[docs] def authenticate_anchor( ownership_token: str, ar_id: str, rpc_url: str | None = None, ) -> dict[str, Any]: """Authenticate a single anchor by verifying its tokenCommitment on-chain. Computes ``keccak256(K || arId)`` (paper spec Section 4.2) and compares the result against the ``token_commitment`` stored in the on-chain ``Anchored`` event. Governance anchors (VOID, REVIEW, AFFIRMED, RETRACTION) carry ``bytes32(0)`` and are returned as ``authenticated: False`` immediately. Parameters ---------- ownership_token: Ownership token K = keccak256(salt), salt = 32 uniform random bytes. Stored as 0x-prefixed 64-char hex string (bytes32). Never transmitted to AnchorRegistry — known only to the token holder. ar_id: The AR-ID to authenticate (e.g. ``"AR-2026-Pvdp0W5"``). rpc_url: Optional RPC URL override. Returns ------- dict[str, Any] Result dict with keys: - ``authenticated`` — bool: True if keccak256 proof matches on-chain commitment. - ``ar_id`` — str: the AR-ID that was checked. - ``token_commitment`` — str: on-chain commitment (``0x``-prefixed bytes32 hex). - ``is_user_initiated`` — bool: False for governance anchors (bytes32(0)). - ``verified`` — bool: same as ``authenticated``. Raises ------ AnchorNotFoundError If the AR-ID does not exist on-chain. Examples -------- >>> from anchorregistry import configure, authenticate_anchor >>> configure(network="sepolia") >>> result = authenticate_anchor("0x1a2b3c4d...", "AR-2026-Pvdp0W5") >>> result["authenticated"] True """ record = get_by_arid(ar_id, rpc_url=rpc_url) user_initiated = is_user_initiated(record) if not user_initiated: return { "authenticated": False, "ar_id": ar_id, "token_commitment": record["token_commitment"], "is_user_initiated": False, "verified": False, } # Paper spec Section 4.2: Φi = H(K || Ci), H = keccak256 # K: bytes32 from 0x-prefixed hex string; Ci: AR-ID as UTF-8 bytes K_bytes = bytes.fromhex(ownership_token[2:]) Ci_bytes = ar_id.encode("utf-8") computed = "0x" + keccak(K_bytes + Ci_bytes).hex() on_chain = record["token_commitment"] verified = computed == on_chain return { "authenticated": verified, "ar_id": ar_id, "token_commitment": on_chain, "is_user_initiated": True, "verified": verified, }
[docs] def is_sealed( root_ar_id: str, rpc_url: str | None = None, ) -> dict[str, Any]: """Check if a tree root has been sealed on-chain. Parameters ---------- root_ar_id: AR-ID of the tree root anchor. rpc_url: Optional RPC URL override. Returns ------- dict[str, Any] Result dict with keys: - ``sealed`` — bool: True if the tree root is sealed. - ``continuation`` — str: new tree root if set, empty string otherwise. """ # Sealed state can live on any deployment in the network — check all, # newest first, return on first hit. for w3, contract, _deploy_block, _addr in _connect_all(rpc_url): if contract.functions.isSealed(root_ar_id).call(): try: continuation = contract.functions.sealContinuation(root_ar_id).call() except Exception: continuation = "" return {"sealed": True, "continuation": continuation} return {"sealed": False, "continuation": ""}
[docs] def authenticate_tree( ownership_token: str, root_ar_id: str, rpc_url: str | None = None, ) -> dict[str, Any]: """Authenticate a full tree by verifying ownership and all anchor commitments. If the tree is sealed, returns immediately with sealed status — the tree is authentic and complete, no further verification needed. Two-layer verification: **Layer 1 — Tree ownership:** Computes ``keccak256(K || rootArId)`` (paper spec Section 4.2) and compares against ``record["tree_id"]``. Returns ``authenticated: False`` immediately on failure. **Layer 2 — Per-anchor initiation:** Calls ``get_by_tree()`` and runs ``authenticate_anchor()`` for each user-initiated anchor. Governance anchors (bytes32(0) commitment) are counted separately and skipped from verification. Parameters ---------- ownership_token: Ownership token K = keccak256(salt), salt = 32 uniform random bytes. Stored as 0x-prefixed 64-char hex string (bytes32). root_ar_id: AR-ID of the tree root anchor. rpc_url: Optional RPC URL override. Returns ------- dict[str, Any] Result dict with keys: - ``authenticated`` — bool: True if Layer 1 passes and ``anchors_failed == 0``. - ``sealed`` — bool: True if the tree is sealed. - ``continuation`` — str: new tree root (if sealed with continuation). - ``tree_id`` — str: human-readable treeId from the root anchor. - ``root_ar_id`` — str: the root AR-ID that was checked. - ``anchor_count`` — int: total anchors in the tree. - ``anchors_verified`` — int: user-initiated anchors whose commitment matched. - ``anchors_failed`` — int: user-initiated anchors whose commitment did not match. - ``governance_count`` — int: governance anchors skipped (bytes32(0)). Raises ------ AnchorNotFoundError If ``root_ar_id`` does not exist on-chain. Examples -------- >>> from anchorregistry import configure, authenticate_tree >>> configure(network="sepolia") >>> result = authenticate_tree("0x1a2b3c4d...", "AR-2026-Pvdp0W5") >>> result["authenticated"] True """ # Check sealed status first — sealed trees are authentic and complete. seal_status = is_sealed(root_ar_id, rpc_url=rpc_url) if seal_status["sealed"]: root_record = get_by_arid(root_ar_id, rpc_url=rpc_url) return { "authenticated": False, "sealed": True, "continuation": seal_status["continuation"], "tree_id": root_record["tree_id"], "root_ar_id": root_ar_id, "anchor_count": 0, "anchors_verified": 0, "anchors_failed": 0, "governance_count": 0, "message": ( "Tree sealed — record authentic and complete. " f"Continued at {seal_status['continuation']}" if seal_status["continuation"] else "Tree sealed — record authentic and complete." ), } root_record = get_by_arid(root_ar_id, rpc_url=rpc_url) tree_id = root_record["tree_id"] # Layer 1: verify tree ownership — keccak256(K || R) == treeId # Paper spec Section 4.2: T = H(K || R), H = keccak256 K_bytes = bytes.fromhex(ownership_token[2:]) R_bytes = root_ar_id.encode("utf-8") computed_tree_id = "0x" + keccak(K_bytes + R_bytes).hex() layer1_pass = computed_tree_id == tree_id if not layer1_pass: return { "authenticated": False, "sealed": False, "continuation": "", "tree_id": tree_id, "root_ar_id": root_ar_id, "anchor_count": 0, "anchors_verified": 0, "anchors_failed": 0, "governance_count": 0, } # Layer 2: verify every user-initiated anchor in the tree. tree_records = get_by_tree(tree_id, rpc_url=rpc_url) anchors_verified = 0 anchors_failed = 0 governance_count = 0 for anchor in tree_records: if not is_user_initiated(anchor): governance_count += 1 continue result = authenticate_anchor(ownership_token, anchor["ar_id"], rpc_url=rpc_url) if result["verified"]: anchors_verified += 1 else: anchors_failed += 1 return { "authenticated": anchors_failed == 0, "sealed": False, "continuation": "", "tree_id": tree_id, "root_ar_id": root_ar_id, "anchor_count": len(tree_records), "anchors_verified": anchors_verified, "anchors_failed": anchors_failed, "governance_count": governance_count, }