diff --git a/.sqlx/query-391efd59ae4d18800d3ed5e1850083a8b5aa335fbb27bd607e546e44e9544e92.json b/.sqlx/query-391efd59ae4d18800d3ed5e1850083a8b5aa335fbb27bd607e546e44e9544e92.json new file mode 100644 index 0000000..a884970 --- /dev/null +++ b/.sqlx/query-391efd59ae4d18800d3ed5e1850083a8b5aa335fbb27bd607e546e44e9544e92.json @@ -0,0 +1,213 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH\n user_command_info AS (\n SELECT DISTINCT\n ON (uca.block_id, uca.id, uca.sequence_no) uca.id,\n uca.command_type AS \"command_type: UserCommandType\",\n uca.fee_payer_id,\n uca.source_id,\n uca.receiver_id,\n uca.nonce,\n uca.amount,\n uca.fee,\n uca.valid_until,\n uca.memo,\n uca.hash,\n uca.block_id,\n uca.sequence_no,\n uca.status AS \"status: TransactionStatus\",\n uca.failure_reason,\n b.state_hash,\n b.chain_status AS \"chain_status: ChainStatus\",\n b.height\n FROM\n user_commands_aggregated AS uca\n INNER JOIN public_keys AS pk ON uca.fee_payer_id=pk.id\n OR (\n uca.status='applied'\n AND (\n uca.source_id=pk.id\n OR uca.receiver_id=pk.id\n )\n )\n INNER JOIN blocks AS b ON uca.block_id=b.id\n WHERE\n (\n b.chain_status='canonical'\n OR b.chain_status='pending'\n )\n AND (\n $1>=b.height\n OR $1 IS NULL\n )\n AND (\n $2=uca.hash\n OR $2 IS NULL\n )\n AND (\n $3=pk.value\n AND $4=''\n OR (\n $3 IS NULL\n AND $4 IS NULL\n )\n )\n AND (\n $5=uca.status\n OR $5 IS NULL\n )\n AND (\n $6=uca.status\n OR $6 IS NULL\n )\n AND (\n $7=pk.value\n OR $7 IS NULL\n )\n ),\n id_count AS (\n SELECT\n count(*) AS total_count\n FROM\n user_command_info\n )\nSELECT\n u.*,\n id_count.total_count,\n pk_payer.value AS fee_payer,\n pk_source.value AS source,\n pk_receiver.value AS receiver,\n ac.creation_fee AS \"creation_fee?\"\nFROM\n id_count,\n (\n SELECT\n *\n FROM\n user_command_info\n ORDER BY\n block_id,\n id,\n sequence_no\n LIMIT\n $8\n OFFSET\n $9\n ) AS u\n INNER JOIN public_keys AS pk_payer ON u.fee_payer_id=pk_payer.id\n INNER JOIN public_keys AS pk_source ON u.source_id=pk_source.id\n INNER JOIN public_keys AS pk_receiver ON u.receiver_id=pk_receiver.id\n /* Account creation fees are attributed to the first successful command in the\n block that mentions the account with the following LEFT JOINs */\n LEFT JOIN account_identifiers AS ai_receiver ON u.receiver_id=ai_receiver.public_key_id\n LEFT JOIN accounts_created AS ac ON u.block_id=ac.block_id\n AND ai_receiver.id=ac.account_identifier_id\n AND u.\"status: TransactionStatus\"='applied'\n AND u.sequence_no=(\n SELECT\n least(\n (\n SELECT\n min(bic2.sequence_no)\n FROM\n blocks_internal_commands AS bic2\n INNER JOIN internal_commands AS ic2 ON bic2.internal_command_id=ic2.id\n WHERE\n u.receiver_id=ic2.receiver_id\n AND bic2.block_id=u.block_id\n AND bic2.status='applied'\n ),\n (\n SELECT\n min(buc2.sequence_no)\n FROM\n blocks_user_commands AS buc2\n INNER JOIN user_commands AS uc2 ON buc2.user_command_id=uc2.id\n WHERE\n u.receiver_id=uc2.receiver_id\n AND buc2.block_id=u.block_id\n AND buc2.status='applied'\n )\n )\n )\nORDER BY\n u.block_id,\n u.id,\n u.sequence_no\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "command_type: UserCommandType", + "type_info": { + "Custom": { + "name": "user_command_type", + "kind": { + "Enum": [ + "payment", + "delegation" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "fee_payer_id", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "source_id", + "type_info": "Int4" + }, + { + "ordinal": 4, + "name": "receiver_id", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "nonce", + "type_info": "Int8" + }, + { + "ordinal": 6, + "name": "amount", + "type_info": "Text" + }, + { + "ordinal": 7, + "name": "fee", + "type_info": "Text" + }, + { + "ordinal": 8, + "name": "valid_until", + "type_info": "Int8" + }, + { + "ordinal": 9, + "name": "memo", + "type_info": "Text" + }, + { + "ordinal": 10, + "name": "hash", + "type_info": "Text" + }, + { + "ordinal": 11, + "name": "block_id", + "type_info": "Int4" + }, + { + "ordinal": 12, + "name": "sequence_no", + "type_info": "Int4" + }, + { + "ordinal": 13, + "name": "status: TransactionStatus", + "type_info": { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + } + }, + { + "ordinal": 14, + "name": "failure_reason", + "type_info": "Text" + }, + { + "ordinal": 15, + "name": "state_hash", + "type_info": "Text" + }, + { + "ordinal": 16, + "name": "chain_status: ChainStatus", + "type_info": { + "Custom": { + "name": "chain_status_type", + "kind": { + "Enum": [ + "canonical", + "orphaned", + "pending" + ] + } + } + } + }, + { + "ordinal": 17, + "name": "height", + "type_info": "Int8" + }, + { + "ordinal": 18, + "name": "total_count", + "type_info": "Int8" + }, + { + "ordinal": 19, + "name": "fee_payer", + "type_info": "Text" + }, + { + "ordinal": 20, + "name": "source", + "type_info": "Text" + }, + { + "ordinal": 21, + "name": "receiver", + "type_info": "Text" + }, + { + "ordinal": 22, + "name": "creation_fee?", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Text", + { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + }, + { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + }, + "Text", + "Int8", + "Int8" + ] + }, + "nullable": [ + true, + false, + false, + false, + false, + false, + true, + false, + true, + false, + false, + false, + false, + false, + true, + false, + false, + false, + null, + false, + false, + false, + false + ] + }, + "hash": "391efd59ae4d18800d3ed5e1850083a8b5aa335fbb27bd607e546e44e9544e92" +} diff --git a/.sqlx/query-42f6465f70b094c9cfe7c9756a326d140a9b58b42ddf9ef4783ca208119b7a09.json b/.sqlx/query-42f6465f70b094c9cfe7c9756a326d140a9b58b42ddf9ef4783ca208119b7a09.json new file mode 100644 index 0000000..ad5d482 --- /dev/null +++ b/.sqlx/query-42f6465f70b094c9cfe7c9756a326d140a9b58b42ddf9ef4783ca208119b7a09.json @@ -0,0 +1,156 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH\n zkapp_commands_info AS (\n SELECT\n zc.id,\n zc.memo,\n zc.hash,\n pk_fee_payer.value AS fee_payer,\n pk_update_body.value AS pk_update_body,\n zfpb.fee,\n zfpb.valid_until,\n zfpb.nonce,\n bzc.sequence_no,\n bzc.status AS \"status: TransactionStatus\",\n zaub.balance_change,\n bzc.block_id,\n b.state_hash,\n b.height,\n token_update_body.value AS token,\n ARRAY(\n SELECT\n unnest(zauf.failures)\n FROM\n zkapp_account_update_failures AS zauf\n WHERE\n zauf.id=ANY (bzc.failure_reasons_ids)\n ) AS failure_reasons\n FROM\n zkapp_commands AS zc\n INNER JOIN blocks_zkapp_commands AS bzc ON zc.id=bzc.zkapp_command_id\n INNER JOIN zkapp_fee_payer_body AS zfpb ON zc.zkapp_fee_payer_body_id=zfpb.id\n INNER JOIN public_keys AS pk_fee_payer ON zfpb.public_key_id=pk_fee_payer.id\n INNER JOIN blocks AS b ON bzc.block_id=b.id\n AND (\n b.chain_status='canonical'\n OR b.chain_status='pending'\n )\n LEFT JOIN zkapp_account_update AS zau ON zau.id=ANY (zc.zkapp_account_updates_ids)\n INNER JOIN zkapp_account_update_body AS zaub ON zau.body_id=zaub.id\n INNER JOIN account_identifiers AS ai_update_body ON zaub.account_identifier_id=ai_update_body.id\n INNER JOIN public_keys AS pk_update_body ON ai_update_body.public_key_id=pk_update_body.id\n INNER JOIN tokens AS token_update_body ON ai_update_body.token_id=token_update_body.id\n WHERE\n (\n $1>=b.height\n OR $1 IS NULL\n )\n AND (\n $2=zc.hash\n OR $2 IS NULL\n )\n AND (\n (\n (\n $3=pk_fee_payer.value\n AND $4=''\n )\n OR (\n $3=pk_update_body.value\n AND $4=token_update_body.value\n )\n )\n OR (\n $3 IS NULL\n AND $4 IS NULL\n )\n )\n AND (\n $5=bzc.status\n OR $5 IS NULL\n )\n AND (\n $6=bzc.status\n OR $6 IS NULL\n )\n AND (\n (\n $7=pk_fee_payer.value\n OR $7=pk_update_body.value\n )\n OR $7 IS NULL\n )\n ),\n zkapp_commands_ids AS (\n SELECT DISTINCT\n id,\n block_id,\n sequence_no\n FROM\n zkapp_commands_info\n ),\n id_count AS (\n SELECT\n count(*) AS total_count\n FROM\n zkapp_commands_ids\n )\nSELECT\n zc.*,\n id_count.total_count\nFROM\n id_count,\n (\n SELECT\n *\n FROM\n zkapp_commands_ids\n ORDER BY\n block_id,\n id,\n sequence_no\n LIMIT\n $8\n OFFSET\n $9\n ) AS ids\n INNER JOIN zkapp_commands_info AS zc ON ids.id=zc.id\n AND ids.block_id=zc.block_id\n AND ids.sequence_no=zc.sequence_no\nORDER BY\n ids.block_id,\n ids.id,\n ids.sequence_no\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "memo", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "hash", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "fee_payer", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "pk_update_body", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "fee", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "valid_until", + "type_info": "Int8" + }, + { + "ordinal": 7, + "name": "nonce", + "type_info": "Int8" + }, + { + "ordinal": 8, + "name": "sequence_no", + "type_info": "Int4" + }, + { + "ordinal": 9, + "name": "status: TransactionStatus", + "type_info": { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + } + }, + { + "ordinal": 10, + "name": "balance_change", + "type_info": "Text" + }, + { + "ordinal": 11, + "name": "block_id", + "type_info": "Int4" + }, + { + "ordinal": 12, + "name": "state_hash", + "type_info": "Text" + }, + { + "ordinal": 13, + "name": "height", + "type_info": "Int8" + }, + { + "ordinal": 14, + "name": "token", + "type_info": "Text" + }, + { + "ordinal": 15, + "name": "failure_reasons", + "type_info": "TextArray" + }, + { + "ordinal": 16, + "name": "total_count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Text", + { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + }, + { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + }, + "Text", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + true, + false, + false, + false, + false, + false, + false, + false, + false, + null, + null + ] + }, + "hash": "42f6465f70b094c9cfe7c9756a326d140a9b58b42ddf9ef4783ca208119b7a09" +} diff --git a/.sqlx/query-cb1027d095390b97b8853e7775222974441b581f1fb8a5af7c28451b604b738e.json b/.sqlx/query-cb1027d095390b97b8853e7775222974441b581f1fb8a5af7c28451b604b738e.json new file mode 100644 index 0000000..32b1f8d --- /dev/null +++ b/.sqlx/query-cb1027d095390b97b8853e7775222974441b581f1fb8a5af7c28451b604b738e.json @@ -0,0 +1,155 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH\n coinbase_receiver_info AS (\n SELECT\n bic.block_id,\n bic.internal_command_id,\n bic.sequence_no,\n bic.secondary_sequence_no,\n coinbase_receiver_pk.value AS coinbase_receiver\n FROM\n blocks_internal_commands AS bic\n INNER JOIN internal_commands AS ic ON bic.internal_command_id=ic.id\n INNER JOIN blocks_internal_commands AS bic_coinbase_receiver ON bic.block_id=bic_coinbase_receiver.block_id\n AND (\n bic.internal_command_id<>bic_coinbase_receiver.internal_command_id\n OR bic.sequence_no<>bic_coinbase_receiver.sequence_no\n OR bic.secondary_sequence_no<>bic_coinbase_receiver.secondary_sequence_no\n )\n INNER JOIN internal_commands AS ic_coinbase_receiver ON ic.command_type='fee_transfer_via_coinbase'\n AND ic_coinbase_receiver.command_type='coinbase'\n AND bic_coinbase_receiver.internal_command_id=ic_coinbase_receiver.id\n INNER JOIN public_keys AS coinbase_receiver_pk ON ic_coinbase_receiver.receiver_id=coinbase_receiver_pk.id\n ),\n internal_commands_info AS (\n SELECT DISTINCT\n ON (\n bic.block_id,\n bic.internal_command_id,\n bic.sequence_no,\n bic.secondary_sequence_no\n ) i.id,\n i.command_type AS \"command_type: InternalCommandType\",\n i.receiver_id,\n i.fee,\n i.hash,\n pk.value AS receiver,\n cri.coinbase_receiver AS \"coinbase_receiver?\",\n bic.sequence_no,\n bic.secondary_sequence_no,\n bic.block_id,\n bic.status AS \"status: TransactionStatus\",\n b.state_hash,\n b.height\n FROM\n internal_commands AS i\n INNER JOIN blocks_internal_commands AS bic ON i.id=bic.internal_command_id\n INNER JOIN public_keys AS pk ON i.receiver_id=pk.id\n INNER JOIN blocks AS b ON bic.block_id=b.id\n AND (\n b.chain_status='canonical'\n OR b.chain_status='pending'\n )\n LEFT JOIN coinbase_receiver_info AS cri ON bic.block_id=cri.block_id\n AND bic.internal_command_id=cri.internal_command_id\n AND bic.sequence_no=cri.sequence_no\n AND bic.secondary_sequence_no=cri.secondary_sequence_no\n WHERE\n (\n $1>=b.height\n OR $1 IS NULL\n )\n AND (\n $2=i.hash\n OR $2 IS NULL\n )\n AND (\n (\n $3=pk.value\n OR $3=cri.coinbase_receiver\n )\n AND $4=''\n OR (\n $3 IS NULL\n AND $4 IS NULL\n )\n )\n AND (\n $5=bic.status\n OR $5 IS NULL\n )\n AND (\n $6=bic.status\n OR $6 IS NULL\n )\n AND (\n (\n $7=pk.value\n OR $7=cri.coinbase_receiver\n )\n OR $7 IS NULL\n )\n ),\n id_count AS (\n SELECT\n count(*) AS total_count\n FROM\n internal_commands_info\n )\nSELECT\n i.*,\n id_count.total_count,\n ac.creation_fee AS \"creation_fee?\"\nFROM\n id_count,\n (\n SELECT\n *\n FROM\n internal_commands_info\n ORDER BY\n block_id,\n id,\n sequence_no,\n secondary_sequence_no\n LIMIT\n $8\n OFFSET\n $9\n ) AS i\n LEFT JOIN account_identifiers AS ai ON i.receiver_id=ai.public_key_id\n LEFT JOIN accounts_created AS ac ON ai.id=ac.account_identifier_id\n AND i.block_id=ac.block_id\n AND i.sequence_no=(\n SELECT\n least(\n (\n SELECT\n min(bic2.sequence_no)\n FROM\n blocks_internal_commands AS bic2\n INNER JOIN internal_commands AS ic2 ON bic2.internal_command_id=ic2.id\n WHERE\n i.receiver_id=ic2.receiver_id\n AND bic2.block_id=i.block_id\n AND bic2.status='applied'\n ),\n (\n SELECT\n min(buc2.sequence_no)\n FROM\n blocks_user_commands AS buc2\n INNER JOIN user_commands AS uc2 ON buc2.user_command_id=uc2.id\n WHERE\n i.receiver_id=uc2.receiver_id\n AND buc2.block_id=i.block_id\n AND buc2.status='applied'\n )\n )\n )\nORDER BY\n i.block_id,\n i.id,\n i.sequence_no,\n i.secondary_sequence_no\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "command_type: InternalCommandType", + "type_info": { + "Custom": { + "name": "internal_command_type", + "kind": { + "Enum": [ + "fee_transfer_via_coinbase", + "fee_transfer", + "coinbase" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "receiver_id", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "fee", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "hash", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "receiver", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "coinbase_receiver?", + "type_info": "Text" + }, + { + "ordinal": 7, + "name": "sequence_no", + "type_info": "Int4" + }, + { + "ordinal": 8, + "name": "secondary_sequence_no", + "type_info": "Int4" + }, + { + "ordinal": 9, + "name": "block_id", + "type_info": "Int4" + }, + { + "ordinal": 10, + "name": "status: TransactionStatus", + "type_info": { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + } + }, + { + "ordinal": 11, + "name": "state_hash", + "type_info": "Text" + }, + { + "ordinal": 12, + "name": "height", + "type_info": "Int8" + }, + { + "ordinal": 13, + "name": "total_count", + "type_info": "Int8" + }, + { + "ordinal": 14, + "name": "creation_fee?", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int8", + "Text", + "Text", + "Text", + { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + }, + { + "Custom": { + "name": "transaction_status", + "kind": { + "Enum": [ + "applied", + "failed" + ] + } + } + }, + "Text", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + null, + false + ] + }, + "hash": "cb1027d095390b97b8853e7775222974441b581f1fb8a5af7c28451b604b738e" +} diff --git a/deno.json b/deno.json index c94c259..0e9e69c 100644 --- a/deno.json +++ b/deno.json @@ -7,10 +7,12 @@ "pg:init": "docker run -d --name mina-archive-db -p 5432:5432 -v $(pwd)/sql_scripts:/docker-entrypoint-initdb.d -e POSTGRES_PASSWORD=whatever -e POSTGRES_USER=mina postgres", "pg:wait": "deno run -A ./tasks/pg_wait.ts", "pg:enable_logging": "deno run -A ./tasks/enable_logging.ts", + "pg:apply_optimizations": "deno run -A ./tasks/search_tx_optimizations.ts --apply", + "pg:drop_optimizations": "deno run -A ./tasks/search_tx_optimizations.ts --drop", "pg:up": "docker start mina-archive-db", "pg:down": "docker kill mina-archive-db", "pg:rm": "docker rm mina-archive-db", - "dev:init": "deno task dl:devnet && deno task pg:enable_logging && deno task pg:init && deno task pg:wait", + "dev:init": "deno task dl:devnet && deno task pg:enable_logging && deno task pg:init && deno task pg:wait && deno task pg:apply_optimizations", "dev": "cargo run serve --playground" }, "imports": { diff --git a/sql/migrations/apply_search_tx_optimizations.sql b/sql/migrations/apply_search_tx_optimizations.sql new file mode 100644 index 0000000..8156ddd --- /dev/null +++ b/sql/migrations/apply_search_tx_optimizations.sql @@ -0,0 +1,121 @@ +CREATE TABLE user_commands_aggregated ( + id INT, + command_type user_command_type NOT NULL, + fee_payer_id INT NOT NULL, + source_id INT NOT NULL, + receiver_id INT NOT NULL, + nonce BIGINT NOT NULL, + amount TEXT, + fee TEXT NOT NULL, + valid_until BIGINT, + memo TEXT NOT NULL, + hash TEXT NOT NULL, + block_id INT NOT NULL, + sequence_no INT NOT NULL, + status transaction_status NOT NULL, + failure_reason TEXT, + user_command_id INT NOT NULL, + CONSTRAINT user_commands_aggregated_unique UNIQUE (id, block_id, sequence_no) +); + +-- NEXT -- +CREATE INDEX idx_user_commands_aggregated_hash ON user_commands_aggregated (hash); + +-- NEXT -- +-- Populate the table with the existing data +INSERT INTO + user_commands_aggregated ( + id, + command_type, + fee_payer_id, + source_id, + receiver_id, + nonce, + amount, + fee, + valid_until, + memo, + hash, + block_id, + sequence_no, + status, + failure_reason, + user_command_id + ) +SELECT + u.id, + u.command_type, + u.fee_payer_id, + u.source_id, + u.receiver_id, + u.nonce, + u.amount, + u.fee, + u.valid_until, + u.memo, + u.hash, + buc.block_id, + buc.sequence_no, + buc.status, + buc.failure_reason, + buc.user_command_id +FROM + user_commands AS u + INNER JOIN blocks_user_commands AS buc ON u.id=buc.user_command_id; + +-- NEXT -- +-- Create the trigger function to insert a new row into user_commands_aggregated +CREATE +OR REPLACE function add_to_user_commands_aggregated () returns trigger AS $$ +BEGIN + -- Insert a new row into user_commands_aggregated only if the corresponding entry doesn't already exist + INSERT INTO user_commands_aggregated ( + id, + command_type, + fee_payer_id, + source_id, + receiver_id, + nonce, + amount, + fee, + valid_until, + memo, + hash, + block_id, + sequence_no, + status, + failure_reason, + user_command_id + ) + SELECT + u.id, + u.command_type, + u.fee_payer_id, + u.source_id, + u.receiver_id, + u.nonce, + u.amount, + u.fee, + u.valid_until, + u.memo, + u.hash, + NEW.block_id, + NEW.sequence_no, + NEW.status, + NEW.failure_reason, + NEW.user_command_id + FROM + user_commands AS u + WHERE u.id = NEW.user_command_id + ON CONFLICT (id, block_id, sequence_no) DO NOTHING; + + RETURN NEW; +END; +$$ language plpgsql; + +-- NEXT -- +-- Create the trigger that fires after each insert into blocks_user_commands +CREATE +OR REPLACE trigger trigger_add_to_user_commands_aggregated +AFTER insert ON blocks_user_commands FOR each ROW +EXECUTE function add_to_user_commands_aggregated (); diff --git a/sql/migrations/drop_search_tx_optimizations.sql b/sql/migrations/drop_search_tx_optimizations.sql new file mode 100644 index 0000000..46d6cd1 --- /dev/null +++ b/sql/migrations/drop_search_tx_optimizations.sql @@ -0,0 +1,11 @@ +-- Drop the triggers +DROP TRIGGER if EXISTS trigger_add_to_user_commands_aggregated ON blocks_user_commands; + +-- Drop the function +DROP FUNCTION if EXISTS update_user_commands_aggregated; + +-- Drop indexes +DROP INDEX if EXISTS idx_user_commands_aggregated_hash; + +-- Drop the table +DROP TABLE IF EXISTS user_commands_aggregated; diff --git a/sql/indexer_internal_commands.sql b/sql/queries/indexer_internal_commands.sql similarity index 100% rename from sql/indexer_internal_commands.sql rename to sql/queries/indexer_internal_commands.sql diff --git a/sql/queries/indexer_internal_commands_optimized.sql b/sql/queries/indexer_internal_commands_optimized.sql new file mode 100644 index 0000000..2fd13d5 --- /dev/null +++ b/sql/queries/indexer_internal_commands_optimized.sql @@ -0,0 +1,153 @@ +WITH + coinbase_receiver_info AS ( + SELECT + bic.block_id, + bic.internal_command_id, + bic.sequence_no, + bic.secondary_sequence_no, + coinbase_receiver_pk.value AS coinbase_receiver + FROM + blocks_internal_commands AS bic + INNER JOIN internal_commands AS ic ON bic.internal_command_id=ic.id + INNER JOIN blocks_internal_commands AS bic_coinbase_receiver ON bic.block_id=bic_coinbase_receiver.block_id + AND ( + bic.internal_command_id<>bic_coinbase_receiver.internal_command_id + OR bic.sequence_no<>bic_coinbase_receiver.sequence_no + OR bic.secondary_sequence_no<>bic_coinbase_receiver.secondary_sequence_no + ) + INNER JOIN internal_commands AS ic_coinbase_receiver ON ic.command_type='fee_transfer_via_coinbase' + AND ic_coinbase_receiver.command_type='coinbase' + AND bic_coinbase_receiver.internal_command_id=ic_coinbase_receiver.id + INNER JOIN public_keys AS coinbase_receiver_pk ON ic_coinbase_receiver.receiver_id=coinbase_receiver_pk.id + ), + internal_commands_info AS ( + SELECT DISTINCT + ON ( + bic.block_id, + bic.internal_command_id, + bic.sequence_no, + bic.secondary_sequence_no + ) i.id, + i.command_type AS "command_type: InternalCommandType", + i.receiver_id, + i.fee, + i.hash, + pk.value AS receiver, + cri.coinbase_receiver AS "coinbase_receiver?", + bic.sequence_no, + bic.secondary_sequence_no, + bic.block_id, + bic.status AS "status: TransactionStatus", + b.state_hash, + b.height + FROM + internal_commands AS i + INNER JOIN blocks_internal_commands AS bic ON i.id=bic.internal_command_id + INNER JOIN public_keys AS pk ON i.receiver_id=pk.id + INNER JOIN blocks AS b ON bic.block_id=b.id + AND ( + b.chain_status='canonical' + OR b.chain_status='pending' + ) + LEFT JOIN coinbase_receiver_info AS cri ON bic.block_id=cri.block_id + AND bic.internal_command_id=cri.internal_command_id + AND bic.sequence_no=cri.sequence_no + AND bic.secondary_sequence_no=cri.secondary_sequence_no + WHERE + ( + $1>=b.height + OR $1 IS NULL + ) + AND ( + $2=i.hash + OR $2 IS NULL + ) + AND ( + ( + $3=pk.value + OR $3=cri.coinbase_receiver + ) + AND $4='' + OR ( + $3 IS NULL + AND $4 IS NULL + ) + ) + AND ( + $5=bic.status + OR $5 IS NULL + ) + AND ( + $6=bic.status + OR $6 IS NULL + ) + AND ( + ( + $7=pk.value + OR $7=cri.coinbase_receiver + ) + OR $7 IS NULL + ) + ), + id_count AS ( + SELECT + count(*) AS total_count + FROM + internal_commands_info + ) +SELECT + i.*, + id_count.total_count, + ac.creation_fee AS "creation_fee?" +FROM + id_count, + ( + SELECT + * + FROM + internal_commands_info + ORDER BY + block_id, + id, + sequence_no, + secondary_sequence_no + LIMIT + $8 + OFFSET + $9 + ) AS i + LEFT JOIN account_identifiers AS ai ON i.receiver_id=ai.public_key_id + LEFT JOIN accounts_created AS ac ON ai.id=ac.account_identifier_id + AND i.block_id=ac.block_id + AND i.sequence_no=( + SELECT + least( + ( + SELECT + min(bic2.sequence_no) + FROM + blocks_internal_commands AS bic2 + INNER JOIN internal_commands AS ic2 ON bic2.internal_command_id=ic2.id + WHERE + i.receiver_id=ic2.receiver_id + AND bic2.block_id=i.block_id + AND bic2.status='applied' + ), + ( + SELECT + min(buc2.sequence_no) + FROM + blocks_user_commands AS buc2 + INNER JOIN user_commands AS uc2 ON buc2.user_command_id=uc2.id + WHERE + i.receiver_id=uc2.receiver_id + AND buc2.block_id=i.block_id + AND buc2.status='applied' + ) + ) + ) +ORDER BY + i.block_id, + i.id, + i.sequence_no, + i.secondary_sequence_no diff --git a/sql/indexer_user_commands.sql b/sql/queries/indexer_user_commands.sql similarity index 100% rename from sql/indexer_user_commands.sql rename to sql/queries/indexer_user_commands.sql diff --git a/sql/queries/indexer_user_commands_optimized.sql b/sql/queries/indexer_user_commands_optimized.sql new file mode 100644 index 0000000..e2b3764 --- /dev/null +++ b/sql/queries/indexer_user_commands_optimized.sql @@ -0,0 +1,135 @@ +WITH + user_command_info AS ( + SELECT DISTINCT + ON (uca.block_id, uca.id, uca.sequence_no) uca.id, + uca.command_type AS "command_type: UserCommandType", + uca.fee_payer_id, + uca.source_id, + uca.receiver_id, + uca.nonce, + uca.amount, + uca.fee, + uca.valid_until, + uca.memo, + uca.hash, + uca.block_id, + uca.sequence_no, + uca.status AS "status: TransactionStatus", + uca.failure_reason, + b.state_hash, + b.chain_status AS "chain_status: ChainStatus", + b.height + FROM + user_commands_aggregated AS uca + INNER JOIN public_keys AS pk ON uca.fee_payer_id=pk.id + OR ( + uca.status='applied' + AND ( + uca.source_id=pk.id + OR uca.receiver_id=pk.id + ) + ) + INNER JOIN blocks AS b ON uca.block_id=b.id + WHERE + ( + b.chain_status='canonical' + OR b.chain_status='pending' + ) + AND ( + $1>=b.height + OR $1 IS NULL + ) + AND ( + $2=uca.hash + OR $2 IS NULL + ) + AND ( + $3=pk.value + AND $4='' + OR ( + $3 IS NULL + AND $4 IS NULL + ) + ) + AND ( + $5=uca.status + OR $5 IS NULL + ) + AND ( + $6=uca.status + OR $6 IS NULL + ) + AND ( + $7=pk.value + OR $7 IS NULL + ) + ), + id_count AS ( + SELECT + count(*) AS total_count + FROM + user_command_info + ) +SELECT + u.*, + id_count.total_count, + pk_payer.value AS fee_payer, + pk_source.value AS source, + pk_receiver.value AS receiver, + ac.creation_fee AS "creation_fee?" +FROM + id_count, + ( + SELECT + * + FROM + user_command_info + ORDER BY + block_id, + id, + sequence_no + LIMIT + $8 + OFFSET + $9 + ) AS u + INNER JOIN public_keys AS pk_payer ON u.fee_payer_id=pk_payer.id + INNER JOIN public_keys AS pk_source ON u.source_id=pk_source.id + INNER JOIN public_keys AS pk_receiver ON u.receiver_id=pk_receiver.id + /* Account creation fees are attributed to the first successful command in the + block that mentions the account with the following LEFT JOINs */ + LEFT JOIN account_identifiers AS ai_receiver ON u.receiver_id=ai_receiver.public_key_id + LEFT JOIN accounts_created AS ac ON u.block_id=ac.block_id + AND ai_receiver.id=ac.account_identifier_id + AND u."status: TransactionStatus"='applied' + AND u.sequence_no=( + SELECT + least( + ( + SELECT + min(bic2.sequence_no) + FROM + blocks_internal_commands AS bic2 + INNER JOIN internal_commands AS ic2 ON bic2.internal_command_id=ic2.id + WHERE + u.receiver_id=ic2.receiver_id + AND bic2.block_id=u.block_id + AND bic2.status='applied' + ), + ( + SELECT + min(buc2.sequence_no) + FROM + blocks_user_commands AS buc2 + INNER JOIN user_commands AS uc2 ON buc2.user_command_id=uc2.id + WHERE + u.receiver_id=uc2.receiver_id + AND buc2.block_id=u.block_id + AND buc2.status='applied' + ) + ) + ) +ORDER BY + u.block_id, + u.id, + u.sequence_no diff --git a/sql/indexer_zkapp_commands.sql b/sql/queries/indexer_zkapp_commands.sql similarity index 100% rename from sql/indexer_zkapp_commands.sql rename to sql/queries/indexer_zkapp_commands.sql diff --git a/sql/queries/indexer_zkapp_commands_optimized.sql b/sql/queries/indexer_zkapp_commands_optimized.sql new file mode 100644 index 0000000..fa14967 --- /dev/null +++ b/sql/queries/indexer_zkapp_commands_optimized.sql @@ -0,0 +1,122 @@ +WITH + zkapp_commands_info AS ( + SELECT + zc.id, + zc.memo, + zc.hash, + pk_fee_payer.value AS fee_payer, + pk_update_body.value AS pk_update_body, + zfpb.fee, + zfpb.valid_until, + zfpb.nonce, + bzc.sequence_no, + bzc.status AS "status: TransactionStatus", + zaub.balance_change, + bzc.block_id, + b.state_hash, + b.height, + token_update_body.value AS token, + ARRAY( + SELECT + unnest(zauf.failures) + FROM + zkapp_account_update_failures AS zauf + WHERE + zauf.id=ANY (bzc.failure_reasons_ids) + ) AS failure_reasons + FROM + zkapp_commands AS zc + INNER JOIN blocks_zkapp_commands AS bzc ON zc.id=bzc.zkapp_command_id + INNER JOIN zkapp_fee_payer_body AS zfpb ON zc.zkapp_fee_payer_body_id=zfpb.id + INNER JOIN public_keys AS pk_fee_payer ON zfpb.public_key_id=pk_fee_payer.id + INNER JOIN blocks AS b ON bzc.block_id=b.id + AND ( + b.chain_status='canonical' + OR b.chain_status='pending' + ) + LEFT JOIN zkapp_account_update AS zau ON zau.id=ANY (zc.zkapp_account_updates_ids) + INNER JOIN zkapp_account_update_body AS zaub ON zau.body_id=zaub.id + INNER JOIN account_identifiers AS ai_update_body ON zaub.account_identifier_id=ai_update_body.id + INNER JOIN public_keys AS pk_update_body ON ai_update_body.public_key_id=pk_update_body.id + INNER JOIN tokens AS token_update_body ON ai_update_body.token_id=token_update_body.id + WHERE + ( + $1>=b.height + OR $1 IS NULL + ) + AND ( + $2=zc.hash + OR $2 IS NULL + ) + AND ( + ( + ( + $3=pk_fee_payer.value + AND $4='' + ) + OR ( + $3=pk_update_body.value + AND $4=token_update_body.value + ) + ) + OR ( + $3 IS NULL + AND $4 IS NULL + ) + ) + AND ( + $5=bzc.status + OR $5 IS NULL + ) + AND ( + $6=bzc.status + OR $6 IS NULL + ) + AND ( + ( + $7=pk_fee_payer.value + OR $7=pk_update_body.value + ) + OR $7 IS NULL + ) + ), + zkapp_commands_ids AS ( + SELECT DISTINCT + id, + block_id, + sequence_no + FROM + zkapp_commands_info + ), + id_count AS ( + SELECT + count(*) AS total_count + FROM + zkapp_commands_ids + ) +SELECT + zc.*, + id_count.total_count +FROM + id_count, + ( + SELECT + * + FROM + zkapp_commands_ids + ORDER BY + block_id, + id, + sequence_no + LIMIT + $8 + OFFSET + $9 + ) AS ids + INNER JOIN zkapp_commands_info AS zc ON ids.id=zc.id + AND ids.block_id=zc.block_id + AND ids.sequence_no=zc.sequence_no +ORDER BY + ids.block_id, + ids.id, + ids.sequence_no diff --git a/sql/internal_commands.sql b/sql/queries/internal_commands.sql similarity index 100% rename from sql/internal_commands.sql rename to sql/queries/internal_commands.sql diff --git a/sql/max_canonical_height.sql b/sql/queries/max_canonical_height.sql similarity index 100% rename from sql/max_canonical_height.sql rename to sql/queries/max_canonical_height.sql diff --git a/sql/maybe_account_balance_info.sql b/sql/queries/maybe_account_balance_info.sql similarity index 100% rename from sql/maybe_account_balance_info.sql rename to sql/queries/maybe_account_balance_info.sql diff --git a/sql/maybe_block.sql b/sql/queries/maybe_block.sql similarity index 100% rename from sql/maybe_block.sql rename to sql/queries/maybe_block.sql diff --git a/sql/oldest_block.sql b/sql/queries/oldest_block.sql similarity index 100% rename from sql/oldest_block.sql rename to sql/queries/oldest_block.sql diff --git a/sql/query_best.sql b/sql/queries/query_best.sql similarity index 100% rename from sql/query_best.sql rename to sql/queries/query_best.sql diff --git a/sql/query_both.sql b/sql/queries/query_both.sql similarity index 100% rename from sql/query_both.sql rename to sql/queries/query_both.sql diff --git a/sql/query_canonical.sql b/sql/queries/query_canonical.sql similarity index 100% rename from sql/query_canonical.sql rename to sql/queries/query_canonical.sql diff --git a/sql/query_hash.sql b/sql/queries/query_hash.sql similarity index 100% rename from sql/query_hash.sql rename to sql/queries/query_hash.sql diff --git a/sql/query_id.sql b/sql/queries/query_id.sql similarity index 100% rename from sql/query_id.sql rename to sql/queries/query_id.sql diff --git a/sql/query_pending.sql b/sql/queries/query_pending.sql similarity index 100% rename from sql/query_pending.sql rename to sql/queries/query_pending.sql diff --git a/sql/timing_info.sql b/sql/queries/timing_info.sql similarity index 100% rename from sql/timing_info.sql rename to sql/queries/timing_info.sql diff --git a/sql/user_commands.sql b/sql/queries/user_commands.sql similarity index 100% rename from sql/user_commands.sql rename to sql/queries/user_commands.sql diff --git a/sql/zkapp_commands.sql b/sql/queries/zkapp_commands.sql similarity index 100% rename from sql/zkapp_commands.sql rename to sql/queries/zkapp_commands.sql diff --git a/src/api/account_balance.rs b/src/api/account_balance.rs index 9179d32..c8b885e 100644 --- a/src/api/account_balance.rs +++ b/src/api/account_balance.rs @@ -30,15 +30,19 @@ impl MinaMesh { metadata: Option, PartialBlockIdentifier { index, .. }: PartialBlockIdentifier, ) -> Result { - let block = sqlx::query_file!("sql/maybe_block.sql", index) + let block = sqlx::query_file!("sql/queries/maybe_block.sql", index) .fetch_optional(&self.pg_pool) .await? .ok_or(MinaMeshError::BlockMissing(index.unwrap().to_string()))?; // has canonical height / do we really need to do a different query? - let maybe_account_balance_info = - sqlx::query_file!("sql/maybe_account_balance_info.sql", public_key, index, Wrapper(metadata).to_token_id()?) - .fetch_optional(&self.pg_pool) - .await?; + let maybe_account_balance_info = sqlx::query_file!( + "sql/queries/maybe_account_balance_info.sql", + public_key, + index, + Wrapper(metadata).to_token_id()? + ) + .fetch_optional(&self.pg_pool) + .await?; match maybe_account_balance_info { None => { Ok(AccountBalanceResponse::new(BlockIdentifier { hash: block.state_hash, index: block.height }, vec![Amount { @@ -58,7 +62,7 @@ impl MinaMesh { Some(account_balance_info) => { println!("B"); let last_relevant_command_balance = account_balance_info.balance.parse::()?; - let timing_info = sqlx::query_file!("sql/timing_info.sql", account_balance_info.timing_id) + let timing_info = sqlx::query_file!("sql/queries/timing_info.sql", account_balance_info.timing_id) .fetch_optional(&self.pg_pool) .await?; let liquid_balance = match timing_info { diff --git a/src/api/block.rs b/src/api/block.rs index fa0cba1..e0e7047 100644 --- a/src/api/block.rs +++ b/src/api/block.rs @@ -21,7 +21,7 @@ impl MinaMesh { }; let parent_block_metadata = match &metadata.parent_id { Some(parent_id) => { - sqlx::query_file_as!(BlockMetadata, "sql/query_id.sql", parent_id).fetch_optional(&self.pg_pool).await? + sqlx::query_file_as!(BlockMetadata, "sql/queries/query_id.sql", parent_id).fetch_optional(&self.pg_pool).await? } None => None, }; @@ -50,9 +50,10 @@ impl MinaMesh { // TODO: use default token value, check how to best handle this pub async fn user_commands(&self, metadata: &BlockMetadata) -> Result, MinaMeshError> { - let metadata = sqlx::query_file_as!(UserCommandMetadata, "sql/user_commands.sql", metadata.id, DEFAULT_TOKEN_ID) - .fetch_all(&self.pg_pool) - .await?; + let metadata = + sqlx::query_file_as!(UserCommandMetadata, "sql/queries/user_commands.sql", metadata.id, DEFAULT_TOKEN_ID) + .fetch_all(&self.pg_pool) + .await?; let transactions = metadata .into_iter() .map(|item| { @@ -64,7 +65,7 @@ impl MinaMesh { pub async fn internal_commands(&self, metadata: &BlockMetadata) -> Result, MinaMeshError> { let metadata = - sqlx::query_file_as!(InternalCommandMetadata, "sql/internal_commands.sql", metadata.id, DEFAULT_TOKEN_ID) + sqlx::query_file_as!(InternalCommandMetadata, "sql/queries/internal_commands.sql", metadata.id, DEFAULT_TOKEN_ID) .fetch_all(&self.pg_pool) .await?; let transactions = metadata @@ -78,7 +79,7 @@ impl MinaMesh { } pub async fn zkapp_commands(&self, metadata: &BlockMetadata) -> Result, MinaMeshError> { - let metadata = sqlx::query_file_as!(ZkAppCommand, "sql/zkapp_commands.sql", metadata.id, DEFAULT_TOKEN_ID) + let metadata = sqlx::query_file_as!(ZkAppCommand, "sql/queries/zkapp_commands.sql", metadata.id, DEFAULT_TOKEN_ID) .fetch_all(&self.pg_pool) .await?; let transactions = zkapp_commands_to_transactions(metadata); @@ -90,20 +91,22 @@ impl MinaMesh { PartialBlockIdentifier { index, hash }: &PartialBlockIdentifier, ) -> Result, sqlx::Error> { if let (Some(index), Some(hash)) = (&index, &hash) { - sqlx::query_file_as!(BlockMetadata, "sql/query_both.sql", hash.to_string(), index) + sqlx::query_file_as!(BlockMetadata, "sql/queries/query_both.sql", hash.to_string(), index) .fetch_optional(&self.pg_pool) .await } else if let Some(index) = index { - let record = sqlx::query_file!("sql/max_canonical_height.sql").fetch_one(&self.pg_pool).await?; + let record = sqlx::query_file!("sql/queries/max_canonical_height.sql").fetch_one(&self.pg_pool).await?; if index <= &record.max_canonical_height.unwrap() { - sqlx::query_file_as!(BlockMetadata, "sql/query_canonical.sql", index).fetch_optional(&self.pg_pool).await + sqlx::query_file_as!(BlockMetadata, "sql/queries/query_canonical.sql", index) + .fetch_optional(&self.pg_pool) + .await } else { - sqlx::query_file_as!(BlockMetadata, "sql/query_pending.sql", index).fetch_optional(&self.pg_pool).await + sqlx::query_file_as!(BlockMetadata, "sql/queries/query_pending.sql", index).fetch_optional(&self.pg_pool).await } } else if let Some(hash) = &hash { - sqlx::query_file_as!(BlockMetadata, "sql/query_hash.sql", hash).fetch_optional(&self.pg_pool).await + sqlx::query_file_as!(BlockMetadata, "sql/queries/query_hash.sql", hash).fetch_optional(&self.pg_pool).await } else { - sqlx::query_file_as!(BlockMetadata, "sql/query_best.sql").fetch_optional(&self.pg_pool).await + sqlx::query_file_as!(BlockMetadata, "sql/queries/query_best.sql").fetch_optional(&self.pg_pool).await } } } diff --git a/src/api/network_status.rs b/src/api/network_status.rs index 54e77c9..f9f9d2d 100644 --- a/src/api/network_status.rs +++ b/src/api/network_status.rs @@ -16,7 +16,7 @@ impl MinaMesh { let blocks = best_chain.ok_or(MinaMeshError::ChainInfoMissing)?; let first_block = blocks.first().ok_or(MinaMeshError::ChainInfoMissing)?; let Block3 { protocol_state, state_hash } = first_block; - let oldest_block = sqlx::query_file!("sql/oldest_block.sql").fetch_one(&self.pg_pool).await?; + let oldest_block = sqlx::query_file!("sql/queries/oldest_block.sql").fetch_one(&self.pg_pool).await?; Ok(NetworkStatusResponse { peers: Some(peers.into_iter().map(|peer| Peer::new(peer.peer_id)).collect()), current_block_identifier: Box::new(BlockIdentifier::new( diff --git a/src/api/search_transactions.rs b/src/api/search_transactions.rs index 71d5d40..e8b5eec 100644 --- a/src/api/search_transactions.rs +++ b/src/api/search_transactions.rs @@ -85,23 +85,41 @@ impl MinaMesh { ) -> Result, MinaMeshError> { let query_params = SearchTransactionsQueryParams::try_from(req.clone())?; - let user_commands = sqlx::query_file_as!( - UserCommand, - "sql/indexer_user_commands.sql", - query_params.max_block, - query_params.transaction_hash, - query_params.account_identifier, - query_params.token_id, - query_params.status as Option, - query_params.success_status as Option, - query_params.address, - limit, - offset, - ) - .fetch_all(&self.pg_pool) - .await?; - - Ok(user_commands) + if !self.search_tx_optimized { + let user_commands = sqlx::query_file_as!( + UserCommand, + "sql/queries/indexer_user_commands.sql", + query_params.max_block, + query_params.transaction_hash, + query_params.account_identifier, + query_params.token_id, + query_params.status as Option, + query_params.success_status as Option, + query_params.address, + limit, + offset, + ) + .fetch_all(&self.pg_pool) + .await?; + Ok(user_commands) + } else { + let user_commands = sqlx::query_file_as!( + UserCommand, + "sql/queries/indexer_user_commands_optimized.sql", + query_params.max_block, + query_params.transaction_hash, + query_params.account_identifier, + query_params.token_id, + query_params.status as Option, + query_params.success_status as Option, + query_params.address, + limit, + offset, + ) + .fetch_all(&self.pg_pool) + .await?; + Ok(user_commands) + } } pub async fn fetch_internal_commands( @@ -112,23 +130,43 @@ impl MinaMesh { ) -> Result, MinaMeshError> { let query_params = SearchTransactionsQueryParams::try_from(req.clone())?; - let internal_commands = sqlx::query_file_as!( - InternalCommand, - "sql/indexer_internal_commands.sql", - query_params.max_block, - query_params.transaction_hash, - query_params.account_identifier, - query_params.token_id, - query_params.status as Option, - query_params.success_status as Option, - query_params.address, - limit, - offset - ) - .fetch_all(&self.pg_pool) - .await?; - - Ok(internal_commands) + if !self.search_tx_optimized { + let internal_commands = sqlx::query_file_as!( + InternalCommand, + "sql/queries/indexer_internal_commands.sql", + query_params.max_block, + query_params.transaction_hash, + query_params.account_identifier, + query_params.token_id, + query_params.status as Option, + query_params.success_status as Option, + query_params.address, + limit, + offset + ) + .fetch_all(&self.pg_pool) + .await?; + + Ok(internal_commands) + } else { + let internal_commands = sqlx::query_file_as!( + InternalCommand, + "sql/queries/indexer_internal_commands_optimized.sql", + query_params.max_block, + query_params.transaction_hash, + query_params.account_identifier, + query_params.token_id, + query_params.status as Option, + query_params.success_status as Option, + query_params.address, + limit, + offset + ) + .fetch_all(&self.pg_pool) + .await?; + + Ok(internal_commands) + } } async fn fetch_zkapp_commands( @@ -139,23 +177,43 @@ impl MinaMesh { ) -> Result, MinaMeshError> { let query_params = SearchTransactionsQueryParams::try_from(req.clone())?; - let zkapp_commands = sqlx::query_file_as!( - ZkAppCommand, - "sql/indexer_zkapp_commands.sql", - query_params.max_block, - query_params.transaction_hash, - query_params.account_identifier, - query_params.token_id, - query_params.status as Option, - query_params.success_status as Option, - query_params.address, - limit, - offset - ) - .fetch_all(&self.pg_pool) - .await?; - - Ok(zkapp_commands) + if !self.search_tx_optimized { + let zkapp_commands = sqlx::query_file_as!( + ZkAppCommand, + "sql/queries/indexer_zkapp_commands.sql", + query_params.max_block, + query_params.transaction_hash, + query_params.account_identifier, + query_params.token_id, + query_params.status as Option, + query_params.success_status as Option, + query_params.address, + limit, + offset + ) + .fetch_all(&self.pg_pool) + .await?; + + Ok(zkapp_commands) + } else { + let zkapp_commands = sqlx::query_file_as!( + ZkAppCommand, + "sql/queries/indexer_zkapp_commands_optimized.sql", + query_params.max_block, + query_params.transaction_hash, + query_params.account_identifier, + query_params.token_id, + query_params.status as Option, + query_params.success_status as Option, + query_params.address, + limit, + offset + ) + .fetch_all(&self.pg_pool) + .await?; + + Ok(zkapp_commands) + } } } @@ -165,7 +223,7 @@ pub fn zkapp_commands_to_block_transactions(commands: Vec) -> Vec< for command in commands { // Group by block identifier (block index and block hash) let block_key = (command.height.unwrap_or(0), command.state_hash.clone().unwrap_or_default()); - let tx_hash = command.hash.clone(); + let tx_hash = command.hash; // Initialize or update the operation list for this transaction let operations = block_map.entry(block_key).or_default().entry(tx_hash.clone()).or_default(); diff --git a/src/bin/mina-mesh.rs b/src/bin/mina-mesh.rs index b43b0bb..fec0112 100644 --- a/src/bin/mina-mesh.rs +++ b/src/bin/mina-mesh.rs @@ -3,13 +3,14 @@ use anyhow::Result; use clap::Parser; -use mina_mesh::{FetchGenesisBlockIdentifierCommand, ServeCommand}; +use mina_mesh::{FetchGenesisBlockIdentifierCommand, SearchTxOptimizationsCommand, ServeCommand}; #[derive(Debug, Parser)] #[command(name = "mina-mesh", version, about = "A Mesh-compliant Server for Mina", propagate_version = true, author)] enum Command { Serve(ServeCommand), FetchGenesisBlockIdentifier(FetchGenesisBlockIdentifierCommand), + SearchTxOptimizations(SearchTxOptimizationsCommand), } #[tokio::main] @@ -18,5 +19,6 @@ async fn main() -> Result<()> { match Command::parse() { Command::Serve(cmd) => cmd.run().await, Command::FetchGenesisBlockIdentifier(cmd) => cmd.run().await, + Command::SearchTxOptimizations(cmd) => cmd.run().await, } } diff --git a/src/commands.rs b/src/commands.rs index 2e97c9d..890b5fa 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,5 +1,7 @@ mod fetch_genesis_block_identifier; +mod search_tx_optimizations; mod serve; pub use fetch_genesis_block_identifier::*; +pub use search_tx_optimizations::*; pub use serve::*; diff --git a/src/commands/search_tx_optimizations.rs b/src/commands/search_tx_optimizations.rs new file mode 100644 index 0000000..b788fda --- /dev/null +++ b/src/commands/search_tx_optimizations.rs @@ -0,0 +1,93 @@ +use anyhow::{bail, Result}; +use clap::Args; +use sqlx::{PgPool, Row}; + +#[derive(Debug, Args)] +#[command(about = "Command to apply or drop search transaction optimizations in the archive database.")] +pub struct SearchTxOptimizationsCommand { + /// URL to the archive database + #[arg(long, env = "MINAMESH_ARCHIVE_DATABASE_URL")] + archive_database_url: String, + + /// Apply optimizations + #[arg(long, conflicts_with = "drop")] + apply: bool, + + /// Drop optimizations + #[arg(long, conflicts_with = "apply")] + drop: bool, + + /// Check if optimizations are applied + #[arg(long)] + check: bool, +} + +impl SearchTxOptimizationsCommand { + pub async fn run(&self) -> Result<()> { + // Connect to the database + let pool = PgPool::connect(&self.archive_database_url).await?; + + // Check if optimizations are already applied + if self.apply && self.check_if_optimizations_applied(&pool).await? { + bail!("Search transaction optimizations are already applied."); + } else if self.drop && !self.check_if_optimizations_applied(&pool).await? { + bail!("Search transaction optimizations are not applied."); + } + + if self.apply { + self.apply_optimizations(&pool).await?; + } else if self.drop { + self.drop_optimizations(&pool).await?; + } else if self.check { + let applied = self.check_if_optimizations_applied(&pool).await?; + if applied { + println!("Search transaction optimizations are already applied."); + } else { + println!("Search transaction optimizations are not applied."); + } + } else { + bail!("You must specify either --apply or --drop or --check."); + } + Ok(()) + } + + async fn apply_optimizations(&self, pool: &PgPool) -> Result<()> { + println!("Applying search transaction optimizations on Archive Database (this may take few minutes)..."); + + // Load and execute the SQL from the file + let sql = include_str!("../../sql/migrations/apply_search_tx_optimizations.sql"); + self.execute_sql_file(pool, sql, "-- NEXT --").await?; + + println!("Optimizations applied successfully."); + Ok(()) + } + + async fn drop_optimizations(&self, pool: &PgPool) -> Result<()> { + println!("Dropping search transaction optimizations from Archive Database..."); + + // Load and execute the SQL from the file + let sql = include_str!("../../sql/migrations/drop_search_tx_optimizations.sql"); + self.execute_sql_file(pool, sql, ";").await?; + + println!("Optimizations dropped successfully."); + Ok(()) + } + + async fn check_if_optimizations_applied(&self, pool: &PgPool) -> Result { + // select if table exists + let result = sqlx::query( + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'user_commands_aggregated')", + ) + .fetch_one(pool) + .await?; + Ok(result.get(0)) + } + + async fn execute_sql_file(&self, pool: &PgPool, file_content: &str, split_by: &str) -> Result<()> { + let statements: Vec<&str> = file_content.split(split_by).filter(|stmt| !stmt.trim().is_empty()).collect(); + for stmt in statements { + sqlx::query(stmt).execute(pool).await?; + } + Ok(()) + } +} diff --git a/src/config.rs b/src/config.rs index fc54e08..a3d2cbb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -15,6 +15,8 @@ pub struct MinaMeshConfig { pub genesis_block_identifier_height: i64, #[arg(long, env = "MINAMESH_GENESIS_BLOCK_IDENTIFIER_STATE_HASH")] pub genesis_block_identifier_state_hash: String, + #[arg(long, env = "USE_SEARCH_TX_OPTIMIZATIONS", default_value = "false")] + pub use_search_tx_optimizations: bool, } impl MinaMeshConfig { @@ -37,6 +39,7 @@ impl MinaMeshConfig { self.genesis_block_identifier_height, self.genesis_block_identifier_state_hash.to_owned(), ), + search_tx_optimized: self.use_search_tx_optimizations, }) } } diff --git a/src/error.rs b/src/error.rs index cb4886a..fc9588a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -52,7 +52,7 @@ pub enum MinaMeshError { #[error("No options provided")] NoOptionsProvided, - #[error("Exception")] + #[error("Exception {0}")] Exception(String), #[error("Invalid signature")] diff --git a/src/lib.rs b/src/lib.rs index dee15bb..4eaf07a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,4 +24,5 @@ pub struct MinaMesh { pub graphql_client: GraphQLClient, pub pg_pool: PgPool, pub genesis_block_identifier: BlockIdentifier, + pub search_tx_optimized: bool, } diff --git a/tasks/search_tx_optimizations.ts b/tasks/search_tx_optimizations.ts new file mode 100644 index 0000000..14d9f93 --- /dev/null +++ b/tasks/search_tx_optimizations.ts @@ -0,0 +1,38 @@ +import pg from "pg"; +import "@std/dotenv/load"; +import { assertExists } from "@std/assert"; +import { parseArgs } from "@std/cli"; +import * as path from "@std/path"; + +const connectionString = Deno.env.get("DATABASE_URL"); +assertExists(connectionString); + +const args = parseArgs(Deno.args, { + flags: { + apply: { type: "boolean" }, + drop: { type: "boolean" }, + }, +}); + +if (!args.apply && !args.drop) { + console.error("Allowed parameters: --apply | --drop"); + Deno.exit(1); +} + +const scriptPath = args.apply + ? path.join(path.dirname(path.fromFileUrl(import.meta.url)), "../sql/migrations/apply_search_tx_optimizations.sql") + : path.join(path.dirname(path.fromFileUrl(import.meta.url)), "../sql/migrations/drop_search_tx_optimizations.sql"); + +const client = new pg.Client({ connectionString }); +await client.connect(); + +try { + const sqlScript = await Deno.readTextFile(scriptPath); + console.log(`Executing ${args.apply ? "apply" : "drop"} script...`); + await client.query(sqlScript); + console.log(`Successfully executed ${args.apply ? "apply" : "drop"} script.`); +} catch (error) { + console.error("Error executing SQL script:", error); +} finally { + await client.end(); +}