diff --git a/backend/src/api/audit.ts b/backend/src/api/audit.ts index 4d05870e8..705579b14 100644 --- a/backend/src/api/audit.ts +++ b/backend/src/api/audit.ts @@ -6,15 +6,16 @@ import rbfCache from './rbf-cache'; const PROPAGATION_MARGIN = 180; // in seconds, time since a transaction is first seen after which it is assumed to have propagated to all miners class Audit { - auditBlock(transactions: MempoolTransactionExtended[], projectedBlocks: MempoolBlockWithTransactions[], mempool: { [txId: string]: MempoolTransactionExtended }, useAccelerations: boolean = false) - : { censored: string[], added: string[], prioritized: string[], fresh: string[], sigop: string[], fullrbf: string[], accelerated: string[], score: number, similarity: number } { + auditBlock(height: number, transactions: MempoolTransactionExtended[], projectedBlocks: MempoolBlockWithTransactions[], mempool: { [txId: string]: MempoolTransactionExtended }) + : { unseen: string[], censored: string[], added: string[], prioritized: string[], fresh: string[], sigop: string[], fullrbf: string[], accelerated: string[], score: number, similarity: number } { if (!projectedBlocks?.[0]?.transactionIds || !mempool) { - return { censored: [], added: [], prioritized: [], fresh: [], sigop: [], fullrbf: [], accelerated: [], score: 1, similarity: 1 }; + return { unseen: [], censored: [], added: [], prioritized: [], fresh: [], sigop: [], fullrbf: [], accelerated: [], score: 1, similarity: 1 }; } const matches: string[] = []; // present in both mined block and template const added: string[] = []; // present in mined block, not in template - const prioritized: string[] = [] // present in the mined block, not in the template, but further down in the mempool + const unseen: string[] = []; // present in the mined block, not in our mempool + const prioritized: string[] = []; // higher in the block than would be expected by in-band feerate alone const fresh: string[] = []; // missing, but firstSeen or lastBoosted within PROPAGATION_MARGIN const rbf: string[] = []; // either missing or present, and either part of a full-rbf replacement, or a conflict with the mined block const accelerated: string[] = []; // prioritized by the mempool accelerator @@ -113,11 +114,16 @@ class Audit { } else { if (rbfCache.has(tx.txid)) { rbf.push(tx.txid); - } else if (!isDisplaced[tx.txid]) { + if (!mempool[tx.txid] && !rbfCache.getReplacedBy(tx.txid)) { + unseen.push(tx.txid); + } + } else { if (mempool[tx.txid]) { - prioritized.push(tx.txid); + if (isDisplaced[tx.txid]) { + added.push(tx.txid); + } } else { - added.push(tx.txid); + unseen.push(tx.txid); } } overflowWeight += tx.weight; @@ -125,6 +131,23 @@ class Audit { totalWeight += tx.weight; } + + // identify "prioritized" transactions + let lastEffectiveRate = 0; + // Iterate over the mined template from bottom to top (excluding the coinbase) + // Transactions should appear in ascending order of mining priority. + for (let i = transactions.length - 1; i > 0; i--) { + const blockTx = transactions[i]; + // If a tx has a lower in-band effective fee rate than the previous tx, + // it must have been prioritized out-of-band (in order to have a higher mining priority) + // so exclude from the analysis. + if ((blockTx.effectiveFeePerVsize || 0) < lastEffectiveRate) { + prioritized.push(blockTx.txid); + } else { + lastEffectiveRate = blockTx.effectiveFeePerVsize || 0; + } + } + // transactions missing from near the end of our template are probably not being censored let overflowWeightRemaining = overflowWeight - (config.MEMPOOL.BLOCK_WEIGHT_UNITS - totalWeight); let maxOverflowRate = 0; @@ -165,6 +188,7 @@ class Audit { const similarity = projectedWeight ? matchedWeight / projectedWeight : 1; return { + unseen, censored: Object.keys(isCensored), added, prioritized, diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index 762c81ff7..e203bbd3c 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -439,7 +439,7 @@ class Blocks { if (config.MEMPOOL.BACKEND === 'esplora') { - const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendTransaction(tx)); + const txs = (await bitcoinApi.$getTxsForBlock(block.hash)).map(tx => transactionUtils.extendMempoolTransaction(tx)); const cpfpSummary = await this.$indexCPFP(block.hash, block.height, txs); if (cpfpSummary) { await this.$getStrippedBlockTransactions(block.hash, true, true, cpfpSummary, block.height); // This will index the block summary @@ -927,12 +927,12 @@ class Blocks { const newBlock = await this.$indexBlock(lastBlock.height - i); this.blocks.push(newBlock); this.updateTimerProgress(timer, `reindexed block`); - let cpfpSummary; + let newCpfpSummary; if (config.MEMPOOL.CPFP_INDEXING) { - cpfpSummary = await this.$indexCPFP(newBlock.id, lastBlock.height - i); + newCpfpSummary = await this.$indexCPFP(newBlock.id, lastBlock.height - i); this.updateTimerProgress(timer, `reindexed block cpfp`); } - await this.$getStrippedBlockTransactions(newBlock.id, true, true, cpfpSummary, newBlock.height); + await this.$getStrippedBlockTransactions(newBlock.id, true, true, newCpfpSummary, newBlock.height); this.updateTimerProgress(timer, `reindexed block summary`); } await mining.$indexDifficultyAdjustments(); @@ -981,7 +981,7 @@ class Blocks { // start async callbacks this.updateTimerProgress(timer, `starting async callbacks for ${this.currentBlockHeight}`); - const callbackPromises = this.newAsyncBlockCallbacks.map((cb) => cb(blockExtended, txIds, transactions)); + const callbackPromises = this.newAsyncBlockCallbacks.map((cb) => cb(blockExtended, txIds, cpfpSummary.transactions)); if (block.height % 2016 === 0) { if (Common.indexingEnabled()) { @@ -1178,7 +1178,7 @@ class Blocks { }; }), }; - summaryVersion = 1; + summaryVersion = cpfpSummary.version; } else { if (config.MEMPOOL.BACKEND === 'esplora') { const txs = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx)); @@ -1397,11 +1397,11 @@ class Blocks { return this.currentBlockHeight; } - public async $indexCPFP(hash: string, height: number, txs?: TransactionExtended[]): Promise { + public async $indexCPFP(hash: string, height: number, txs?: MempoolTransactionExtended[]): Promise { let transactions = txs; if (!transactions) { if (config.MEMPOOL.BACKEND === 'esplora') { - transactions = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendTransaction(tx)); + transactions = (await bitcoinApi.$getTxsForBlock(hash)).map(tx => transactionUtils.extendMempoolTransaction(tx)); } if (!transactions) { const block = await bitcoinClient.getBlock(hash, 2); @@ -1413,7 +1413,7 @@ class Blocks { } if (transactions?.length != null) { - const summary = calculateFastBlockCpfp(height, transactions as TransactionExtended[]); + const summary = calculateFastBlockCpfp(height, transactions); await this.$saveCpfp(hash, height, summary); diff --git a/backend/src/api/common.ts b/backend/src/api/common.ts index cba39a511..13fc86147 100644 --- a/backend/src/api/common.ts +++ b/backend/src/api/common.ts @@ -1,6 +1,6 @@ import * as bitcoinjs from 'bitcoinjs-lib'; import { Request } from 'express'; -import { CpfpInfo, CpfpSummary, CpfpCluster, EffectiveFeeStats, MempoolBlockWithTransactions, TransactionExtended, MempoolTransactionExtended, TransactionStripped, WorkingEffectiveFeeStats, TransactionClassified, TransactionFlags } from '../mempool.interfaces'; +import { EffectiveFeeStats, MempoolBlockWithTransactions, TransactionExtended, MempoolTransactionExtended, TransactionStripped, WorkingEffectiveFeeStats, TransactionClassified, TransactionFlags } from '../mempool.interfaces'; import config from '../config'; import { NodeSocket } from '../repositories/NodesSocketsRepository'; import { isIP } from 'net'; diff --git a/backend/src/api/cpfp.ts b/backend/src/api/cpfp.ts index 5818eb1ea..9da11328b 100644 --- a/backend/src/api/cpfp.ts +++ b/backend/src/api/cpfp.ts @@ -6,7 +6,7 @@ import { Acceleration } from './acceleration/acceleration'; const CPFP_UPDATE_INTERVAL = 60_000; // update CPFP info at most once per 60s per transaction const MAX_CLUSTER_ITERATIONS = 100; -export function calculateFastBlockCpfp(height: number, transactions: TransactionExtended[], saveRelatives: boolean = false): CpfpSummary { +export function calculateFastBlockCpfp(height: number, transactions: MempoolTransactionExtended[], saveRelatives: boolean = false): CpfpSummary { const clusters: CpfpCluster[] = []; // list of all cpfp clusters in this block const clusterMap: { [txid: string]: CpfpCluster } = {}; // map transactions to their cpfp cluster let clusterTxs: TransactionExtended[] = []; // working list of elements of the current cluster @@ -93,6 +93,7 @@ export function calculateFastBlockCpfp(height: number, transactions: Transaction return { transactions, clusters, + version: 1, }; } @@ -159,6 +160,7 @@ export function calculateGoodBlockCpfp(height: number, transactions: MempoolTran return { transactions: transactions.map(tx => txMap[tx.txid]), clusters: clusterArray, + version: 2, }; } diff --git a/backend/src/api/mini-miner.ts b/backend/src/api/mini-miner.ts index 4a4ef5daa..0bef1a819 100644 --- a/backend/src/api/mini-miner.ts +++ b/backend/src/api/mini-miner.ts @@ -337,7 +337,7 @@ export function makeBlockTemplate(candidates: MempoolTransactionExtended[], acce let failures = 0; while (mempoolArray.length || modified.length) { // skip invalid transactions - while (mempoolArray[0].used || mempoolArray[0].modified) { + while (mempoolArray[0]?.used || mempoolArray[0]?.modified) { mempoolArray.shift(); } diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index e57b8221b..79a783f88 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -3,7 +3,7 @@ import * as WebSocket from 'ws'; import { BlockExtended, TransactionExtended, MempoolTransactionExtended, WebsocketResponse, OptimizedStatistic, ILoadingIndicators, GbtCandidates, TxTrackingInfo, - MempoolBlockDelta, MempoolDelta, MempoolDeltaTxids + MempoolDelta, MempoolDeltaTxids } from '../mempool.interfaces'; import blocks from './blocks'; import memPool from './mempool'; @@ -933,6 +933,8 @@ class WebsocketHandler { throw new Error('No WebSocket.Server have been set'); } + const blockTransactions = structuredClone(transactions); + this.printLogs(); await statistics.runStatistics(); @@ -942,7 +944,7 @@ class WebsocketHandler { let transactionIds: string[] = (memPool.limitGBT) ? Object.keys(candidates?.txs || {}) : Object.keys(_memPool); const accelerations = Object.values(mempool.getAccelerations()); - await accelerationRepository.$indexAccelerationsForBlock(block, accelerations, transactions); + await accelerationRepository.$indexAccelerationsForBlock(block, accelerations, structuredClone(transactions)); const rbfTransactions = Common.findMinedRbfTransactions(transactions, memPool.getSpendMap()); memPool.handleMinedRbfTransactions(rbfTransactions); @@ -962,7 +964,7 @@ class WebsocketHandler { } if (Common.indexingEnabled()) { - const { censored, added, prioritized, fresh, sigop, fullrbf, accelerated, score, similarity } = Audit.auditBlock(transactions, projectedBlocks, auditMempool); + const { unseen, censored, added, prioritized, fresh, sigop, fullrbf, accelerated, score, similarity } = Audit.auditBlock(block.height, blockTransactions, projectedBlocks, auditMempool); const matchRate = Math.round(score * 100 * 100) / 100; const stripped = projectedBlocks[0]?.transactions ? projectedBlocks[0].transactions : []; @@ -984,9 +986,11 @@ class WebsocketHandler { }); BlocksAuditsRepository.$saveAudit({ + version: 1, time: block.timestamp, height: block.height, hash: block.id, + unseenTxs: unseen, addedTxs: added, prioritizedTxs: prioritized, missingTxs: censored, diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index ed1b3b445..ccbc94bfa 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -385,8 +385,9 @@ export interface CpfpCluster { } export interface CpfpSummary { - transactions: TransactionExtended[]; + transactions: MempoolTransactionExtended[]; clusters: CpfpCluster[]; + version: number; } export interface Statistic { diff --git a/backend/src/replication/AuditReplication.ts b/backend/src/replication/AuditReplication.ts index 4ea629839..6f616dbbe 100644 --- a/backend/src/replication/AuditReplication.ts +++ b/backend/src/replication/AuditReplication.ts @@ -31,11 +31,11 @@ class AuditReplication { const missingAudits = await this.$getMissingAuditBlocks(); logger.debug(`Fetching missing audit data for ${missingAudits.length} blocks from trusted servers`, 'Replication'); - + let totalSynced = 0; let totalMissed = 0; let loggerTimer = Date.now(); - // process missing audits in batches of + // process missing audits in batches of BATCH_SIZE for (let i = 0; i < missingAudits.length; i += BATCH_SIZE) { const slice = missingAudits.slice(i, i + BATCH_SIZE); const results = await Promise.all(slice.map(hash => this.$syncAudit(hash))); @@ -109,9 +109,11 @@ class AuditReplication { version: 1, }); await blocksAuditsRepository.$saveAudit({ + version: auditSummary.version || 0, hash: blockHash, height: auditSummary.height, time: auditSummary.timestamp || auditSummary.time, + unseenTxs: auditSummary.unseenTxs || [], missingTxs: auditSummary.missingTxs || [], addedTxs: auditSummary.addedTxs || [], prioritizedTxs: auditSummary.prioritizedTxs || [], diff --git a/backend/src/repositories/AccelerationRepository.ts b/backend/src/repositories/AccelerationRepository.ts index 70fa78dc6..4c9896296 100644 --- a/backend/src/repositories/AccelerationRepository.ts +++ b/backend/src/repositories/AccelerationRepository.ts @@ -192,6 +192,7 @@ class AccelerationRepository { } } + // modifies block transactions public async $indexAccelerationsForBlock(block: BlockExtended, accelerations: Acceleration[], transactions: MempoolTransactionExtended[]): Promise { const blockTxs: { [txid: string]: MempoolTransactionExtended } = {}; for (const tx of transactions) { diff --git a/backend/src/repositories/BlocksAuditsRepository.ts b/backend/src/repositories/BlocksAuditsRepository.ts index cddd535de..abf26aa29 100644 --- a/backend/src/repositories/BlocksAuditsRepository.ts +++ b/backend/src/repositories/BlocksAuditsRepository.ts @@ -17,8 +17,8 @@ interface MigrationAudit { class BlocksAuditRepositories { public async $saveAudit(audit: BlockAudit): Promise { try { - await DB.query(`INSERT INTO blocks_audits(version, time, height, hash, seen_txs, missing_txs, added_txs, prioritized_txs, fresh_txs, sigop_txs, fullrbf_txs, accelerated_txs, match_rate, expected_fees, expected_weight) - VALUE (?, FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [audit.version, audit.time, audit.height, audit.hash, JSON.stringify(audit.missingTxs), + await DB.query(`INSERT INTO blocks_audits(version, time, height, hash, unseen_txs, missing_txs, added_txs, prioritized_txs, fresh_txs, sigop_txs, fullrbf_txs, accelerated_txs, match_rate, expected_fees, expected_weight) + VALUE (?, FROM_UNIXTIME(?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [audit.version, audit.time, audit.height, audit.hash, JSON.stringify(audit.unseenTxs), JSON.stringify(audit.missingTxs), JSON.stringify(audit.addedTxs), JSON.stringify(audit.prioritizedTxs), JSON.stringify(audit.freshTxs), JSON.stringify(audit.sigopTxs), JSON.stringify(audit.fullrbfTxs), JSON.stringify(audit.acceleratedTxs), audit.matchRate, audit.expectedFees, audit.expectedWeight]); } catch (e: any) { if (e.errno === 1062) { // ER_DUP_ENTRY - This scenario is possible upon node backend restart @@ -209,69 +209,86 @@ class BlocksAuditRepositories { */ public async $migrateAuditsV0toV1(): Promise { try { - const [toMigrate]: MigrationAudit[][] = await DB.query( - `SELECT - blocks_audits.height as height, - blocks_audits.hash as id, - UNIX_TIMESTAMP(blocks_audits.time) as timestamp, - blocks_summaries.transactions as transactions, - blocks_templates.template as template, - blocks_audits.prioritized_txs as prioritizedTxs, - blocks_audits.accelerated_txs as acceleratedTxs - FROM blocks_audits - JOIN blocks_summaries ON blocks_summaries.id = blocks_audits.hash - JOIN blocks_templates ON blocks_templates.id = blocks_audits.hash - WHERE blocks_audits.version = 0 - AND blocks_summaries.version = 2 - ORDER BY blocks_audits.height DESC - `) as any[]; + let done = false; + let processed = 0; + let lastHeight; + while (!done) { + const [toMigrate]: MigrationAudit[][] = await DB.query( + `SELECT + blocks_audits.height as height, + blocks_audits.hash as id, + UNIX_TIMESTAMP(blocks_audits.time) as timestamp, + blocks_summaries.transactions as transactions, + blocks_templates.template as template, + blocks_audits.prioritized_txs as prioritizedTxs, + blocks_audits.accelerated_txs as acceleratedTxs + FROM blocks_audits + JOIN blocks_summaries ON blocks_summaries.id = blocks_audits.hash + JOIN blocks_templates ON blocks_templates.id = blocks_audits.hash + WHERE blocks_audits.version = 0 + AND blocks_summaries.version = 2 + ORDER BY blocks_audits.height DESC + LIMIT 100 + `) as any[]; - logger.info(`migrating ${toMigrate.length} audits to version 1`); - - for (const audit of toMigrate) { - // unpack JSON-serialized transaction lists - audit.transactions = JSON.parse((audit.transactions as any as string) || '[]'); - audit.template = JSON.parse((audit.transactions as any as string) || '[]'); - - // we know transactions in the template, or marked "prioritized" or "accelerated" - // were seen in our mempool before the block was mined. - const isSeen = new Set(); - for (const tx of audit.template) { - isSeen.add(tx.txid); + if (toMigrate.length <= 0 || lastHeight === toMigrate[0].height) { + done = true; + break; } - for (const txid of audit.prioritizedTxs) { - isSeen.add(txid); - } - for (const txid of audit.acceleratedTxs) { - isSeen.add(txid); - } - const unseenTxs = audit.transactions.slice(0).map(tx => tx.txid).filter(txid => !isSeen.has(txid)); + lastHeight = toMigrate[0].height; - // identify "prioritized" transactions - const prioritizedTxs: string[] = []; - let lastEffectiveRate = 0; - // Iterate over the mined template from bottom to top (excluding the coinbase) - // Transactions should appear in ascending order of mining priority. - for (let i = audit.transactions.length - 1; i > 0; i--) { - const blockTx = audit.transactions[i]; - // If a tx has a lower in-band effective fee rate than the previous tx, - // it must have been prioritized out-of-band (in order to have a higher mining priority) - // so exclude from the analysis. - if ((blockTx.rate || 0) < lastEffectiveRate) { - prioritizedTxs.push(blockTx.txid); - } else { - lastEffectiveRate = blockTx.rate || 0; + logger.info(`migrating ${toMigrate.length} audits to version 1`); + + for (const audit of toMigrate) { + // unpack JSON-serialized transaction lists + audit.transactions = JSON.parse((audit.transactions as any as string) || '[]'); + audit.template = JSON.parse((audit.template as any as string) || '[]'); + + // we know transactions in the template, or marked "prioritized" or "accelerated" + // were seen in our mempool before the block was mined. + const isSeen = new Set(); + for (const tx of audit.template) { + isSeen.add(tx.txid); } + for (const txid of audit.prioritizedTxs) { + isSeen.add(txid); + } + for (const txid of audit.acceleratedTxs) { + isSeen.add(txid); + } + const unseenTxs = audit.transactions.slice(0).map(tx => tx.txid).filter(txid => !isSeen.has(txid)); + + // identify "prioritized" transactions + const prioritizedTxs: string[] = []; + let lastEffectiveRate = 0; + // Iterate over the mined template from bottom to top (excluding the coinbase) + // Transactions should appear in ascending order of mining priority. + for (let i = audit.transactions.length - 1; i > 0; i--) { + const blockTx = audit.transactions[i]; + // If a tx has a lower in-band effective fee rate than the previous tx, + // it must have been prioritized out-of-band (in order to have a higher mining priority) + // so exclude from the analysis. + if ((blockTx.rate || 0) < lastEffectiveRate) { + prioritizedTxs.push(blockTx.txid); + } else { + lastEffectiveRate = blockTx.rate || 0; + } + } + + // Update audit in the database + await DB.query(` + UPDATE blocks_audits SET + version = ?, + unseen_txs = ?, + prioritized_txs = ? + WHERE hash = ? + `, [1, JSON.stringify(unseenTxs), JSON.stringify(prioritizedTxs), audit.id]); } - // Update audit in the database - await DB.query(` - UPDATE blocks_audits SET - unseen_txs = ?, - prioritized_txs = ? - WHERE hash = ? - `, [JSON.stringify(unseenTxs), JSON.stringify(prioritizedTxs), audit.id]); + processed += toMigrate.length; } + + logger.info(`migrated ${processed} audits to version 1`); } catch (e: any) { logger.err(`Error while migrating audits from v0 to v1. Will try again later. Reason: ` + (e instanceof Error ? e.message : e)); }