Merge pull request #5479 from mempool/mononaut/rbf-tracking-fixes

RBF tracking fixes
This commit is contained in:
softsimon 2024-09-13 22:11:27 +08:00 committed by GitHub
commit c8719f1f1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 149 additions and 73 deletions

View File

@ -1,5 +1,5 @@
import { Common } from '../../api/common';
import { MempoolTransactionExtended } from '../../mempool.interfaces';
import { MempoolTransactionExtended, TransactionExtended } from '../../mempool.interfaces';
const randomTransactions = require('./test-data/transactions-random.json');
const replacedTransactions = require('./test-data/transactions-replaced.json');
@ -10,14 +10,14 @@ describe('Common', () => {
describe('RBF', () => {
const newTransactions = rbfTransactions.concat(randomTransactions);
test('should detect RBF transactions with fast method', () => {
const result: { [txid: string]: MempoolTransactionExtended[] } = Common.findRbfTransactions(newTransactions, replacedTransactions);
const result: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = Common.findRbfTransactions(newTransactions, replacedTransactions);
expect(Object.values(result).length).toEqual(2);
expect(result).toHaveProperty('7219d95161f3718335991ac6d967d24eedec370908c9879bb1e192e6d797d0a6');
expect(result).toHaveProperty('5387881d695d4564d397026dc5f740f816f8390b4b2c5ec8c20309122712a875');
});
test('should detect RBF transactions with scalable method', () => {
const result: { [txid: string]: MempoolTransactionExtended[] } = Common.findRbfTransactions(newTransactions, replacedTransactions, true);
const result: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = Common.findRbfTransactions(newTransactions, replacedTransactions, true);
expect(Object.values(result).length).toEqual(2);
expect(result).toHaveProperty('7219d95161f3718335991ac6d967d24eedec370908c9879bb1e192e6d797d0a6');
expect(result).toHaveProperty('5387881d695d4564d397026dc5f740f816f8390b4b2c5ec8c20309122712a875');

View File

@ -79,8 +79,8 @@ export class Common {
return arr;
}
static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: MempoolTransactionExtended[] } {
const matches: { [txid: string]: MempoolTransactionExtended[] } = {};
static findRbfTransactions(added: MempoolTransactionExtended[], deleted: MempoolTransactionExtended[], forceScalable = false): { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} {
const matches: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }} = {};
// For small N, a naive nested loop is extremely fast, but it doesn't scale
if (added.length < 1000 && deleted.length < 50 && !forceScalable) {
@ -95,7 +95,7 @@ export class Common {
addedTx.vin.some((vin) => vin.txid === deletedVin.txid && vin.vout === deletedVin.vout));
});
if (foundMatches?.length) {
matches[addedTx.txid] = [...new Set(foundMatches)];
matches[addedTx.txid] = { replaced: [...new Set(foundMatches)], replacedBy: addedTx };
}
});
} else {
@ -123,7 +123,7 @@ export class Common {
foundMatches.add(deletedTx);
}
if (foundMatches.size) {
matches[addedTx.txid] = [...foundMatches];
matches[addedTx.txid] = { replaced: [...foundMatches], replacedBy: addedTx };
}
}
}
@ -138,17 +138,17 @@ export class Common {
const replaced: Set<MempoolTransactionExtended> = new Set();
for (let i = 0; i < tx.vin.length; i++) {
const vin = tx.vin[i];
const match = spendMap.get(`${vin.txid}:${vin.vout}`);
const key = `${vin.txid}:${vin.vout}`;
const match = spendMap.get(key);
if (match && match.txid !== tx.txid) {
replaced.add(match);
// remove this tx from the spendMap
// prevents the same tx being replaced more than once
for (const replacedVin of match.vin) {
const key = `${replacedVin.txid}:${replacedVin.vout}`;
spendMap.delete(key);
const replacedKey = `${replacedVin.txid}:${replacedVin.vout}`;
spendMap.delete(replacedKey);
}
}
const key = `${vin.txid}:${vin.vout}`;
spendMap.delete(key);
}
if (replaced.size) {

View File

@ -257,6 +257,7 @@ class DiskCache {
trees: rbfData.rbf.trees,
expiring: rbfData.rbf.expiring.map(([txid, value]) => ({ key: txid, value })),
mempool: memPool.getMempool(),
spendMap: memPool.getSpendMap(),
});
}
} catch (e) {

View File

@ -19,12 +19,13 @@ class Mempool {
private mempoolCache: { [txId: string]: MempoolTransactionExtended } = {};
private mempoolCandidates: { [txid: string ]: boolean } = {};
private spendMap = new Map<string, MempoolTransactionExtended>();
private recentlyDeleted: MempoolTransactionExtended[][] = []; // buffer of transactions deleted in recent mempool updates
private mempoolInfo: IBitcoinApi.MempoolInfo = { loaded: false, size: 0, bytes: 0, usage: 0, total_fee: 0,
maxmempool: 300000000, mempoolminfee: Common.isLiquid() ? 0.00000100 : 0.00001000, minrelaytxfee: Common.isLiquid() ? 0.00000100 : 0.00001000 };
private mempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, newTransactions: MempoolTransactionExtended[],
deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void) | undefined;
deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void) | undefined;
private $asyncMempoolChangedCallback: ((newMempool: {[txId: string]: MempoolTransactionExtended; }, mempoolSize: number, newTransactions: MempoolTransactionExtended[],
deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], candidates?: GbtCandidates) => Promise<void>) | undefined;
deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[], candidates?: GbtCandidates) => Promise<void>) | undefined;
private accelerations: { [txId: string]: Acceleration } = {};
private accelerationPositions: { [txid: string]: { poolId: number, pool: string, block: number, vsize: number }[] } = {};
@ -74,12 +75,12 @@ class Mempool {
}
public setMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; },
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[]) => void): void {
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[]) => void): void {
this.mempoolChangedCallback = fn;
}
public setAsyncMempoolChangedCallback(fn: (newMempool: { [txId: string]: MempoolTransactionExtended; }, mempoolSize: number,
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[],
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[],
candidates?: GbtCandidates) => Promise<void>): void {
this.$asyncMempoolChangedCallback = fn;
}
@ -362,12 +363,15 @@ class Mempool {
const candidatesChanged = candidates?.added?.length || candidates?.removed?.length;
if (this.mempoolChangedCallback && (hasChange || deletedTransactions.length)) {
this.mempoolChangedCallback(this.mempoolCache, newTransactions, deletedTransactions, accelerationDelta);
this.recentlyDeleted.unshift(deletedTransactions);
this.recentlyDeleted.length = Math.min(this.recentlyDeleted.length, 10); // truncate to the last 10 mempool updates
if (this.mempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length)) {
this.mempoolChangedCallback(this.mempoolCache, newTransactions, this.recentlyDeleted, accelerationDelta);
}
if (this.$asyncMempoolChangedCallback && (hasChange || deletedTransactions.length || candidatesChanged)) {
if (this.$asyncMempoolChangedCallback && (hasChange || newTransactions.length || deletedTransactions.length || candidatesChanged)) {
this.updateTimerProgress(timer, 'running async mempool callback');
await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, deletedTransactions, accelerationDelta, candidates);
await this.$asyncMempoolChangedCallback(this.mempoolCache, newMempoolSize, newTransactions, this.recentlyDeleted, accelerationDelta, candidates);
this.updateTimerProgress(timer, 'completed async mempool callback');
}
@ -541,16 +545,7 @@ class Mempool {
}
}
public handleRbfTransactions(rbfTransactions: { [txid: string]: MempoolTransactionExtended[]; }): void {
for (const rbfTransaction in rbfTransactions) {
if (this.mempoolCache[rbfTransaction] && rbfTransactions[rbfTransaction]?.length) {
// Store replaced transactions
rbfCache.add(rbfTransactions[rbfTransaction], this.mempoolCache[rbfTransaction]);
}
}
}
public handleMinedRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void {
public handleRbfTransactions(rbfTransactions: { [txid: string]: { replaced: MempoolTransactionExtended[], replacedBy: TransactionExtended }}): void {
for (const rbfTransaction in rbfTransactions) {
if (rbfTransactions[rbfTransaction].replacedBy && rbfTransactions[rbfTransaction]?.replaced?.length) {
// Store replaced transactions

View File

@ -44,6 +44,22 @@ interface CacheEvent {
value?: any,
}
/**
* Singleton for tracking RBF trees
*
* Maintains a set of RBF trees, where each tree represents a sequence of
* consecutive RBF replacements.
*
* Trees are identified by the txid of the root transaction.
*
* To maintain consistency, the following invariants must be upheld:
* - Symmetry: replacedBy(A) = B <=> A in replaces(B)
* - Unique id: treeMap(treeMap(X)) = treeMap(X)
* - Unique tree: A in replaces(B) => treeMap(A) == treeMap(B)
* - Existence: X in treeMap => treeMap(X) in rbfTrees
* - Completeness: X in replacedBy => X in treeMap, Y in replaces => Y in treeMap
*/
class RbfCache {
private replacedBy: Map<string, string> = new Map();
private replaces: Map<string, string[]> = new Map();
@ -61,6 +77,10 @@ class RbfCache {
setInterval(this.cleanup.bind(this), 1000 * 60 * 10);
}
/**
* Low level cache operations
*/
private addTx(txid: string, tx: MempoolTransactionExtended): void {
this.txs.set(txid, tx);
this.cacheQueue.push({ op: CacheOp.Add, type: 'tx', txid });
@ -92,6 +112,12 @@ class RbfCache {
this.cacheQueue.push({ op: CacheOp.Remove, type: 'exp', txid });
}
/**
* Basic data structure operations
* must uphold tree invariants
*/
public add(replaced: MempoolTransactionExtended[], newTxExtended: MempoolTransactionExtended): void {
if (!newTxExtended || !replaced?.length || this.txs.has(newTxExtended.txid)) {
return;
@ -114,6 +140,10 @@ class RbfCache {
if (!replacedTx.rbf) {
txFullRbf = true;
}
if (this.replacedBy.has(replacedTx.txid)) {
// should never happen
continue;
}
this.replacedBy.set(replacedTx.txid, newTx.txid);
if (this.treeMap.has(replacedTx.txid)) {
const treeId = this.treeMap.get(replacedTx.txid);
@ -140,18 +170,47 @@ class RbfCache {
}
}
newTx.fullRbf = txFullRbf;
const treeId = replacedTrees[0].tx.txid;
const newTree = {
tx: newTx,
time: newTime,
fullRbf: treeFullRbf,
replaces: replacedTrees
};
this.addTree(treeId, newTree);
this.updateTreeMap(treeId, newTree);
this.addTree(newTree.tx.txid, newTree);
this.updateTreeMap(newTree.tx.txid, newTree);
this.replaces.set(newTx.txid, replacedTrees.map(tree => tree.tx.txid));
}
public mined(txid): void {
if (!this.txs.has(txid)) {
return;
}
const treeId = this.treeMap.get(txid);
if (treeId && this.rbfTrees.has(treeId)) {
const tree = this.rbfTrees.get(treeId);
if (tree) {
this.setTreeMined(tree, txid);
tree.mined = true;
this.dirtyTrees.add(treeId);
this.cacheQueue.push({ op: CacheOp.Change, type: 'tree', txid: treeId });
}
}
this.evict(txid);
}
// flag a transaction as removed from the mempool
public evict(txid: string, fast: boolean = false): void {
this.evictionCount++;
if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) {
const expiryTime = fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400); // 24 hours
this.addExpiration(txid, expiryTime);
}
}
/**
* Read-only public interface
*/
public has(txId: string): boolean {
return this.txs.has(txId);
}
@ -232,32 +291,6 @@ class RbfCache {
return changes;
}
public mined(txid): void {
if (!this.txs.has(txid)) {
return;
}
const treeId = this.treeMap.get(txid);
if (treeId && this.rbfTrees.has(treeId)) {
const tree = this.rbfTrees.get(treeId);
if (tree) {
this.setTreeMined(tree, txid);
tree.mined = true;
this.dirtyTrees.add(treeId);
this.cacheQueue.push({ op: CacheOp.Change, type: 'tree', txid: treeId });
}
}
this.evict(txid);
}
// flag a transaction as removed from the mempool
public evict(txid: string, fast: boolean = false): void {
this.evictionCount++;
if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) {
const expiryTime = fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400); // 24 hours
this.addExpiration(txid, expiryTime);
}
}
// is the transaction involved in a full rbf replacement?
public isFullRbf(txid: string): boolean {
const treeId = this.treeMap.get(txid);
@ -271,6 +304,10 @@ class RbfCache {
return tree?.fullRbf;
}
/**
* Cache maintenance & utility functions
*/
private cleanup(): void {
const now = Date.now();
for (const txid of this.expiring.keys()) {
@ -299,10 +336,6 @@ class RbfCache {
for (const tx of (replaces || [])) {
// recursively remove prior versions from the cache
this.replacedBy.delete(tx);
// if this is the id of a tree, remove that too
if (this.treeMap.get(tx) === tx) {
this.removeTree(tx);
}
this.remove(tx);
}
}
@ -370,14 +403,21 @@ class RbfCache {
};
}
public async load({ txs, trees, expiring, mempool }): Promise<void> {
public async load({ txs, trees, expiring, mempool, spendMap }): Promise<void> {
try {
txs.forEach(txEntry => {
this.txs.set(txEntry.value.txid, txEntry.value);
});
this.staleCount = 0;
for (const deflatedTree of trees) {
await this.importTree(mempool, deflatedTree.root, deflatedTree.root, deflatedTree, this.txs);
for (const deflatedTree of trees.sort((a, b) => Object.keys(b).length - Object.keys(a).length)) {
const tree = await this.importTree(mempool, deflatedTree.root, deflatedTree.root, deflatedTree, this.txs);
if (tree) {
this.addTree(tree.tx.txid, tree);
this.updateTreeMap(tree.tx.txid, tree);
if (tree.mined) {
this.evict(tree.tx.txid);
}
}
}
expiring.forEach(expiringEntry => {
if (this.txs.has(expiringEntry.key)) {
@ -385,6 +425,31 @@ class RbfCache {
}
});
this.staleCount = 0;
// connect cached trees to current mempool transactions
const conflicts: Record<string, { replacedBy: MempoolTransactionExtended, replaces: Set<MempoolTransactionExtended> }> = {};
for (const tree of this.rbfTrees.values()) {
const tx = this.getTx(tree.tx.txid);
if (!tx || tree.mined) {
continue;
}
for (const vin of tx.vin) {
const conflict = spendMap.get(`${vin.txid}:${vin.vout}`);
if (conflict && conflict.txid !== tx.txid) {
if (!conflicts[conflict.txid]) {
conflicts[conflict.txid] = {
replacedBy: conflict,
replaces: new Set(),
};
}
conflicts[conflict.txid].replaces.add(tx);
}
}
}
for (const { replacedBy, replaces } of Object.values(conflicts)) {
this.add([...replaces.values()], replacedBy);
}
await this.checkTrees();
logger.debug(`loaded ${txs.length} txs, ${trees.length} trees into rbf cache, ${expiring.length} due to expire, ${this.staleCount} were stale`);
this.cleanup();
@ -426,6 +491,12 @@ class RbfCache {
return;
}
// if this tx is already in the cache, return early
if (this.treeMap.has(txid)) {
this.removeTree(deflated.key);
return;
}
// recursively reconstruct child trees
for (const childId of treeInfo.replaces) {
const replaced = await this.importTree(mempool, root, childId, deflated, txs, mined);
@ -457,10 +528,6 @@ class RbfCache {
fullRbf: treeInfo.fullRbf,
replaces,
};
this.treeMap.set(txid, root);
if (root === txid) {
this.addTree(root, tree);
}
return tree;
}
@ -511,6 +578,7 @@ class RbfCache {
processTxs(txs);
}
// evict missing transactions
for (const txid of txids) {
if (!found[txid]) {
this.evict(txid, false);

View File

@ -365,6 +365,7 @@ class RedisCache {
trees: rbfTrees.map(loadedTree => { loadedTree.value.key = loadedTree.key; return loadedTree.value; }),
expiring: rbfExpirations,
mempool: memPool.getMempool(),
spendMap: memPool.getSpendMap(),
});
}

View File

@ -520,8 +520,17 @@ class WebsocketHandler {
}
}
/**
*
* @param newMempool
* @param mempoolSize
* @param newTransactions array of transactions added this mempool update.
* @param recentlyDeletedTransactions array of arrays of transactions removed in the last N mempool updates, most recent first.
* @param accelerationDelta
* @param candidates
*/
async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number,
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[],
newTransactions: MempoolTransactionExtended[], recentlyDeletedTransactions: MempoolTransactionExtended[][], accelerationDelta: string[],
candidates?: GbtCandidates): Promise<void> {
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
@ -529,6 +538,8 @@ class WebsocketHandler {
this.printLogs();
const deletedTransactions = recentlyDeletedTransactions.length ? recentlyDeletedTransactions[0] : [];
const transactionIds = (memPool.limitGBT && candidates) ? Object.keys(candidates?.txs || {}) : Object.keys(newMempool);
let added = newTransactions;
let removed = deletedTransactions;
@ -547,7 +558,7 @@ class WebsocketHandler {
const mBlockDeltas = mempoolBlocks.getMempoolBlockDeltas();
const mempoolInfo = memPool.getMempoolInfo();
const vBytesPerSecond = memPool.getVBytesPerSecond();
const rbfTransactions = Common.findRbfTransactions(newTransactions, deletedTransactions);
const rbfTransactions = Common.findRbfTransactions(newTransactions, recentlyDeletedTransactions.flat());
const da = difficultyAdjustment.getDifficultyAdjustment();
const accelerations = memPool.getAccelerations();
memPool.handleRbfTransactions(rbfTransactions);
@ -578,7 +589,7 @@ class WebsocketHandler {
const replacedTransactions: { replaced: string, by: TransactionExtended }[] = [];
for (const tx of newTransactions) {
if (rbfTransactions[tx.txid]) {
for (const replaced of rbfTransactions[tx.txid]) {
for (const replaced of rbfTransactions[tx.txid].replaced) {
replacedTransactions.push({ replaced: replaced.txid, by: tx });
}
}
@ -947,7 +958,7 @@ class WebsocketHandler {
await accelerationRepository.$indexAccelerationsForBlock(block, accelerations, structuredClone(transactions));
const rbfTransactions = Common.findMinedRbfTransactions(transactions, memPool.getSpendMap());
memPool.handleMinedRbfTransactions(rbfTransactions);
memPool.handleRbfTransactions(rbfTransactions);
memPool.removeFromSpendMap(transactions);
if (config.MEMPOOL.AUDIT && memPool.isInSync()) {