diff --git a/backend/src/__tests__/api/common.ts b/backend/src/__tests__/api/common.ts index 74a7db88f..14ae3c78b 100644 --- a/backend/src/__tests__/api/common.ts +++ b/backend/src/__tests__/api/common.ts @@ -1,5 +1,5 @@ import { Common } from '../../api/common'; -import { MempoolTransactionExtended } from '../../mempool.interfaces'; +import { MempoolTransactionExtended, TransactionExtended } from '../../mempool.interfaces'; const randomTransactions = require('./test-data/transactions-random.json'); const replacedTransactions = require('./test-data/transactions-replaced.json'); @@ -10,14 +10,14 @@ describe('Common', () => { describe('RBF', () => { const newTransactions = rbfTransactions.concat(randomTransactions); test('should detect RBF transactions with fast method', () => { - const result: { [txid: string]: MempoolTransactionExtended[] } = Common.findRbfTransactions(newTransactions, replacedTransactions); + const result: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = Common.findRbfTransactions(newTransactions, replacedTransactions); expect(Object.values(result).length).toEqual(2); expect(result).toHaveProperty('7219d95161f3718335991ac6d967d24eedec370908c9879bb1e192e6d797d0a6'); expect(result).toHaveProperty('5387881d695d4564d397026dc5f740f816f8390b4b2c5ec8c20309122712a875'); }); test('should detect RBF transactions with scalable method', () => { - const result: { [txid: string]: MempoolTransactionExtended[] } = Common.findRbfTransactions(newTransactions, replacedTransactions, true); + const result: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = Common.findRbfTransactions(newTransactions, replacedTransactions, true); expect(Object.values(result).length).toEqual(2); expect(result).toHaveProperty('7219d95161f3718335991ac6d967d24eedec370908c9879bb1e192e6d797d0a6'); expect(result).toHaveProperty('5387881d695d4564d397026dc5f740f816f8390b4b2c5ec8c20309122712a875'); diff --git a/backend/src/api/common.ts b/backend/src/api/common.ts index d17068a09..50de63afc 100644 --- a/backend/src/api/common.ts +++ b/backend/src/api/common.ts @@ -79,8 +79,8 @@ export class Common { return arr; } - static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: MempoolTransactionExtended[] } { - const matches: { [txid: string]: MempoolTransactionExtended[] } = {}; + static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} { + const matches: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = {}; // For small N, a naive nested loop is extremely fast, but it doesn't scale if (added.length < 1000 && deleted.length < 50 && !forceScalable) { @@ -95,7 +95,7 @@ export class Common { addedTx.vin.some((vin) => vin.txid === deletedVin.txid && vin.vout === deletedVin.vout)); }); if (foundMatches?.length) { - matches[addedTx.txid] = [...new Set(foundMatches)]; + matches[addedTx.txid] = { replaced: [...new Set(foundMatches)], replacedBy: addedTx }; } }); } else { @@ -123,7 +123,7 @@ export class Common { foundMatches.add(deletedTx); } if (foundMatches.size) { - matches[addedTx.txid] = [...foundMatches]; + matches[addedTx.txid] = { replaced: [...foundMatches], replacedBy: addedTx }; } } } @@ -138,17 +138,17 @@ export class Common { const replaced: Set = new Set(); for (let i = 0; i < tx.vin.length; i++) { const vin = tx.vin[i]; - const match = spendMap.get(`${vin.txid}:${vin.vout}`); + const key = `${vin.txid}:${vin.vout}`; + const match = spendMap.get(key); if (match && match.txid !== tx.txid) { replaced.add(match); // remove this tx from the spendMap // prevents the same tx being replaced more than once for (const replacedVin of match.vin) { - const key = `${replacedVin.txid}:${replacedVin.vout}`; - spendMap.delete(key); + const replacedKey = `${replacedVin.txid}:${replacedVin.vout}`; + spendMap.delete(replacedKey); } } - const key = `${vin.txid}:${vin.vout}`; spendMap.delete(key); } if (replaced.size) { diff --git a/backend/src/api/disk-cache.ts b/backend/src/api/disk-cache.ts index 202f8f4cb..f2a1f2390 100644 --- a/backend/src/api/disk-cache.ts +++ b/backend/src/api/disk-cache.ts @@ -257,6 +257,7 @@ class DiskCache { trees: rbfData.rbf.trees, expiring: rbfData.rbf.expiring.map(([txid, value]) => ({ key: txid, value })), mempool: memPool.getMempool(), + spendMap: memPool.getSpendMap(), }); } } catch (e) { diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 1f55179fb..1442b05fa 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -19,12 +19,13 @@ class Mempool { private mempoolCache: { [txId: string]: MempoolTransactionExtended } = {}; private mempoolCandidates: { [txid: string ]: boolean } = {}; private spendMap = new Map(); + private recentlyDeleted: MempoolTransactionExtended[][] = []; // buffer of transactions deleted in recent mempool updates private mempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0, maxmempool: 300000000, mempoolminfee: Common.isLiquid() ? 0.00000100 : 0.00001000, minrelaytxfee: Common.isLiquid() ? 0.00000100 : 0.00001000 }; private mempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, newTransactions: MempoolTransactionExtended[], - deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void) | undefined; + deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void) | undefined; private $asyncMempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, mempoolSize: number, newTransactions: MempoolTransactionExtended[], - deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], candidates?: GbtCandidates) => Promise) | undefined; + deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates) => Promise) | undefined; private accelerations: { [txId: string]: Acceleration } = {}; private accelerationPositions: { [txid: string]: { poolId: number, pool: string, block: number, vsize: number }[] } = {}; @@ -74,12 +75,12 @@ class Mempool { } public setMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; }, - newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void): void { + newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void): void { this.mempoolChangedCallback = fn; } public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; }, mempoolSize: number, - newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], + newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates) => Promise): void { this.$asyncMempoolChangedCallback = fn; } @@ -362,12 +363,15 @@ class Mempool { const candidatesChanged = candidates?.added?.length || candidates?.removed?.length; - if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) { - this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions, accelerationDelta); + this.recentlyDeleted.unshift(deletedTransactions); + this.recentlyDeleted.length = Math.min(this.recentlyDeleted.length, 10); // truncate to the last 10 mempool updates + + if (this.mempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length)) { + this.mempoolChangedCallback(this.mempoolCache, newTransactions, this.recentlyDeleted, accelerationDelta); } - if (this.$asyncMempoolChangedCallback && (hasChange || deletedTransactions.length || candidatesChanged)) { + if (this.$asyncMempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length || candidatesChanged)) { this.updateTimerProgress(timer, 'running async mempool callback'); - await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, deletedTransactions, accelerationDelta, candidates); + await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, this.recentlyDeleted, accelerationDelta, candidates); this.updateTimerProgress(timer, 'completed async mempool callback'); } @@ -541,16 +545,7 @@ class Mempool { } } - public handleRbfTransactions(rbfTransactions: { [txid: string]: MempoolTransactionExtended[]; }): void { - for (const rbfTransaction in rbfTransactions) { - if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) { - // Store replaced transactions - rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]); - } - } - } - - public handleMinedRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void { + public handleRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void { for (const rbfTransaction in rbfTransactions) { if (rbfTransactions[rbfTransaction].replacedBy && rbfTransactions[rbfTransaction]?.replaced?.length) { // Store replaced transactions diff --git a/backend/src/api/rbf-cache.ts b/backend/src/api/rbf-cache.ts index a087abbe0..944ad790e 100644 --- a/backend/src/api/rbf-cache.ts +++ b/backend/src/api/rbf-cache.ts @@ -44,6 +44,22 @@ interface CacheEvent { value?: any, } +/** + * Singleton for tracking RBF trees + * + * Maintains a set of RBF trees, where each tree represents a sequence of + * consecutive RBF replacements. + * + * Trees are identified by the txid of the root transaction. + * + * To maintain consistency, the following invariants must be upheld: + * - Symmetry: replacedBy(A) = B <=> A in replaces(B) + * - Unique id: treeMap(treeMap(X)) = treeMap(X) + * - Unique tree: A in replaces(B) => treeMap(A) == treeMap(B) + * - Existence: X in treeMap => treeMap(X) in rbfTrees + * - Completeness: X in replacedBy => X in treeMap, Y in replaces => Y in treeMap + */ + class RbfCache { private replacedBy: Map = new Map(); private replaces: Map = new Map(); @@ -61,6 +77,10 @@ class RbfCache { setInterval(this.cleanup.bind(this), 1000 * 60 * 10); } + /** + * Low level cache operations + */ + private addTx(txid: string, tx: MempoolTransactionExtended): void { this.txs.set(txid, tx); this.cacheQueue.push({ op: CacheOp.Add, type: 'tx', txid }); @@ -92,6 +112,12 @@ class RbfCache { this.cacheQueue.push({ op: CacheOp.Remove, type: 'exp', txid }); } + /** + * Basic data structure operations + * must uphold tree invariants + */ + + public add(replaced: MempoolTransactionExtended[], newTxExtended: MempoolTransactionExtended): void { if (!newTxExtended || !replaced?.length || this.txs.has(newTxExtended.txid)) { return; @@ -114,6 +140,10 @@ class RbfCache { if (!replacedTx.rbf) { txFullRbf = true; } + if (this.replacedBy.has(replacedTx.txid)) { + // should never happen + continue; + } this.replacedBy.set(replacedTx.txid, newTx.txid); if (this.treeMap.has(replacedTx.txid)) { const treeId = this.treeMap.get(replacedTx.txid); @@ -140,18 +170,47 @@ class RbfCache { } } newTx.fullRbf = txFullRbf; - const treeId = replacedTrees[0].tx.txid; const newTree = { tx: newTx, time: newTime, fullRbf: treeFullRbf, replaces: replacedTrees }; - this.addTree(treeId, newTree); - this.updateTreeMap(treeId, newTree); + this.addTree(newTree.tx.txid, newTree); + this.updateTreeMap(newTree.tx.txid, newTree); this.replaces.set(newTx.txid, replacedTrees.map(tree => tree.tx.txid)); } + public mined(txid): void { + if (!this.txs.has(txid)) { + return; + } + const treeId = this.treeMap.get(txid); + if (treeId && this.rbfTrees.has(treeId)) { + const tree = this.rbfTrees.get(treeId); + if (tree) { + this.setTreeMined(tree, txid); + tree.mined = true; + this.dirtyTrees.add(treeId); + this.cacheQueue.push({ op: CacheOp.Change, type: 'tree', txid: treeId }); + } + } + this.evict(txid); + } + + // flag a transaction as removed from the mempool + public evict(txid: string, fast: boolean = false): void { + this.evictionCount++; + if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) { + const expiryTime = fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400); // 24 hours + this.addExpiration(txid, expiryTime); + } + } + + /** + * Read-only public interface + */ + public has(txId: string): boolean { return this.txs.has(txId); } @@ -232,32 +291,6 @@ class RbfCache { return changes; } - public mined(txid): void { - if (!this.txs.has(txid)) { - return; - } - const treeId = this.treeMap.get(txid); - if (treeId && this.rbfTrees.has(treeId)) { - const tree = this.rbfTrees.get(treeId); - if (tree) { - this.setTreeMined(tree, txid); - tree.mined = true; - this.dirtyTrees.add(treeId); - this.cacheQueue.push({ op: CacheOp.Change, type: 'tree', txid: treeId }); - } - } - this.evict(txid); - } - - // flag a transaction as removed from the mempool - public evict(txid: string, fast: boolean = false): void { - this.evictionCount++; - if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) { - const expiryTime = fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400); // 24 hours - this.addExpiration(txid, expiryTime); - } - } - // is the transaction involved in a full rbf replacement? public isFullRbf(txid: string): boolean { const treeId = this.treeMap.get(txid); @@ -271,6 +304,10 @@ class RbfCache { return tree?.fullRbf; } + /** + * Cache maintenance & utility functions + */ + private cleanup(): void { const now = Date.now(); for (const txid of this.expiring.keys()) { @@ -299,10 +336,6 @@ class RbfCache { for (const tx of (replaces || [])) { // recursively remove prior versions from the cache this.replacedBy.delete(tx); - // if this is the id of a tree, remove that too - if (this.treeMap.get(tx) === tx) { - this.removeTree(tx); - } this.remove(tx); } } @@ -370,14 +403,21 @@ class RbfCache { }; } - public async load({ txs, trees, expiring, mempool }): Promise { + public async load({ txs, trees, expiring, mempool, spendMap }): Promise { try { txs.forEach(txEntry => { this.txs.set(txEntry.value.txid, txEntry.value); }); this.staleCount = 0; - for (const deflatedTree of trees) { - await this.importTree(mempool, deflatedTree.root, deflatedTree.root, deflatedTree, this.txs); + for (const deflatedTree of trees.sort((a, b) => Object.keys(b).length - Object.keys(a).length)) { + const tree = await this.importTree(mempool, deflatedTree.root, deflatedTree.root, deflatedTree, this.txs); + if (tree) { + this.addTree(tree.tx.txid, tree); + this.updateTreeMap(tree.tx.txid, tree); + if (tree.mined) { + this.evict(tree.tx.txid); + } + } } expiring.forEach(expiringEntry => { if (this.txs.has(expiringEntry.key)) { @@ -385,6 +425,31 @@ class RbfCache { } }); this.staleCount = 0; + + // connect cached trees to current mempool transactions + const conflicts: Record }> = {}; + for (const tree of this.rbfTrees.values()) { + const tx = this.getTx(tree.tx.txid); + if (!tx || tree.mined) { + continue; + } + for (const vin of tx.vin) { + const conflict = spendMap.get(`${vin.txid}:${vin.vout}`); + if (conflict && conflict.txid !== tx.txid) { + if (!conflicts[conflict.txid]) { + conflicts[conflict.txid] = { + replacedBy: conflict, + replaces: new Set(), + }; + } + conflicts[conflict.txid].replaces.add(tx); + } + } + } + for (const { replacedBy, replaces } of Object.values(conflicts)) { + this.add([...replaces.values()], replacedBy); + } + await this.checkTrees(); logger.debug(`loaded ${txs.length} txs, ${trees.length} trees into rbf cache, ${expiring.length} due to expire, ${this.staleCount} were stale`); this.cleanup(); @@ -426,6 +491,12 @@ class RbfCache { return; } + // if this tx is already in the cache, return early + if (this.treeMap.has(txid)) { + this.removeTree(deflated.key); + return; + } + // recursively reconstruct child trees for (const childId of treeInfo.replaces) { const replaced = await this.importTree(mempool, root, childId, deflated, txs, mined); @@ -457,10 +528,6 @@ class RbfCache { fullRbf: treeInfo.fullRbf, replaces, }; - this.treeMap.set(txid, root); - if (root === txid) { - this.addTree(root, tree); - } return tree; } @@ -511,6 +578,7 @@ class RbfCache { processTxs(txs); } + // evict missing transactions for (const txid of txids) { if (!found[txid]) { this.evict(txid, false); diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index cbfa2f18b..1caade15b 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -365,6 +365,7 @@ class RedisCache { trees: rbfTrees.map(loadedTree => { loadedTree.value.key = loadedTree.key; return loadedTree.value; }), expiring: rbfExpirations, mempool: memPool.getMempool(), + spendMap: memPool.getSpendMap(), }); } diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 79a783f88..2a047472e 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -520,8 +520,17 @@ class WebsocketHandler { } } + /** + * + * @param newMempool + * @param mempoolSize + * @param newTransactions array of transactions added this mempool update. + * @param recentlyDeletedTransactions array of arrays of transactions removed in the last N mempool updates, most recent first. + * @param accelerationDelta + * @param candidates + */ async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number, - newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], + newTransactions: MempoolTransactionExtended[], recentlyDeletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates): Promise { if (!this.webSocketServers.length) { throw new Error('No WebSocket.Server have been set'); @@ -529,6 +538,8 @@ class WebsocketHandler { this.printLogs(); + const deletedTransactions = recentlyDeletedTransactions.length ? recentlyDeletedTransactions[0] : []; + const transactionIds = (memPool.limitGBT && candidates) ? Object.keys(candidates?.txs || {}) : Object.keys(newMempool); let added = newTransactions; let removed = deletedTransactions; @@ -547,7 +558,7 @@ class WebsocketHandler { const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas(); const mempoolInfo = memPool.getMempoolInfo(); const vBytesPerSecond = memPool.getVBytesPerSecond(); - const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions); + const rbfTransactions = Common.findRbfTransactions(newTransactions, recentlyDeletedTransactions.flat()); const da = difficultyAdjustment.getDifficultyAdjustment(); const accelerations = memPool.getAccelerations(); memPool.handleRbfTransactions(rbfTransactions); @@ -578,7 +589,7 @@ class WebsocketHandler { const replacedTransactions: { replaced: string, by: TransactionExtended }[] = []; for (const tx of newTransactions) { if (rbfTransactions[tx.txid]) { - for (const replaced of rbfTransactions[tx.txid]) { + for (const replaced of rbfTransactions[tx.txid].replaced) { replacedTransactions.push({ replaced: replaced.txid, by: tx }); } } @@ -947,7 +958,7 @@ class WebsocketHandler { await accelerationRepository.$indexAccelerationsForBlock(block, accelerations, structuredClone(transactions)); const rbfTransactions = Common.findMinedRbfTransactions(transactions, memPool.getSpendMap()); - memPool.handleMinedRbfTransactions(rbfTransactions); + memPool.handleRbfTransactions(rbfTransactions); memPool.removeFromSpendMap(transactions); if (config.MEMPOOL.AUDIT && memPool.isInSync()) {