Merge pull request #5120 from mempool/mononaut/multi-pool-eta

Multi-pool ETA
This commit is contained in:
wiz 2024-06-05 14:21:34 +09:00 committed by GitHub
commit 17132ff047
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 179 additions and 16 deletions

View File

@ -1,12 +1,13 @@
import { GbtGenerator, GbtResult, ThreadTransaction as RustThreadTransaction, ThreadAcceleration as RustThreadAcceleration } from 'rust-gbt';
import logger from '../logger';
import { MempoolBlock, MempoolTransactionExtended, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction, EffectiveFeeStats, TransactionClassified, TransactionCompressed, MempoolDeltaChange, GbtCandidates } from '../mempool.interfaces';
import { MempoolBlock, MempoolTransactionExtended, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction, EffectiveFeeStats, TransactionClassified, TransactionCompressed, MempoolDeltaChange, GbtCandidates, PoolTag } from '../mempool.interfaces';
import { Common, OnlineFeeStatsCalculator } from './common';
import config from '../config';
import { Worker } from 'worker_threads';
import path from 'path';
import mempool from './mempool';
import { Acceleration } from './services/acceleration';
import PoolsRepository from '../repositories/PoolsRepository';
const MAX_UINT32 = Math.pow(2, 32) - 1;
@ -21,6 +22,8 @@ class MempoolBlocks {
private uidMap: Map<number, string> = new Map(); // map short numerical uids to full txids
private txidMap: Map<string, number> = new Map(); // map full txids back to short numerical uids
private pools: { [id: number]: PoolTag } = {};
public getMempoolBlocks(): MempoolBlock[] {
return this.mempoolBlocks.map((block) => {
return {
@ -42,6 +45,14 @@ class MempoolBlocks {
return this.mempoolBlockDeltas;
}
public async updatePools$(): Promise<void> {
const allPools = await PoolsRepository.$getPools();
this.pools = {};
for (const pool of allPools) {
this.pools[pool.uniqueId] = pool;
}
}
private calculateMempoolDeltas(prevBlocks: MempoolBlockWithTransactions[], mempoolBlocks: MempoolBlockWithTransactions[]): MempoolBlockDelta[] {
const mempoolBlockDeltas: MempoolBlockDelta[] = [];
for (let i = 0; i < Math.max(mempoolBlocks.length, prevBlocks.length); i++) {
@ -478,7 +489,7 @@ class MempoolBlocks {
const deltas = this.calculateMempoolDeltas(this.mempoolBlocks, mempoolBlocks);
this.mempoolBlocks = mempoolBlocks;
this.mempoolBlockDeltas = deltas;
this.updateAccelerationPositions(mempool, accelerations, mempoolBlocks);
}
return mempoolBlocks;
@ -625,6 +636,124 @@ class MempoolBlocks {
tx.acc ? 1 : 0,
];
}
// estimates and saves positions of accelerations in mining partner mempools
private updateAccelerationPositions(mempoolCache: { [txid: string]: MempoolTransactionExtended }, accelerations: { [txid: string]: Acceleration }, mempoolBlocks: MempoolBlockWithTransactions[]): void {
const accelerationPositions: { [txid: string]: { poolId: number, pool: string, block: number, vsize: number }[] } = {};
// keep track of simulated mempool blocks for each active pool
const pools: {
[pool: string]: { name: string, block: number, vsize: number, accelerations: string[], complete: boolean };
} = {};
// prepare a list of accelerations in ascending order (we'll pop items off the end of the list)
const accQueue: { acceleration: Acceleration, rate: number, vsize: number }[] = Object.values(accelerations).map(acc => {
let vsize = mempoolCache[acc.txid].vsize;
for (const ancestor of mempoolCache[acc.txid].ancestors || []) {
vsize += (ancestor.weight / 4);
}
return {
acceleration: acc,
rate: mempoolCache[acc.txid].effectiveFeePerVsize,
vsize
};
}).sort((a, b) => a.rate - b.rate);
// initialize the pool tracker
for (const { acceleration } of accQueue) {
accelerationPositions[acceleration.txid] = [];
for (const pool of acceleration.pools) {
if (!pools[pool]) {
pools[pool] = {
name: this.pools[pool]?.name || 'unknown',
block: 0,
vsize: 0,
accelerations: [],
complete: false,
};
}
pools[pool].accelerations.push(acceleration.txid);
}
for (const ancestor of mempoolCache[acceleration.txid].ancestors || []) {
accelerationPositions[ancestor.txid] = [];
}
}
for (const pool of Object.keys(pools)) {
// if any pools accepted *every* acceleration, we can just use the GBT result positions directly
if (pools[pool].accelerations.length === Object.keys(accelerations).length) {
pools[pool].complete = true;
}
}
let block = 0;
let index = 0;
let next = accQueue.pop();
// build simulated blocks for each pool by taking the best option from
// either the mempool or the list of accelerations.
while (next && block < mempoolBlocks.length) {
while (next && index < mempoolBlocks[block].transactions.length) {
const nextTx = mempoolBlocks[block].transactions[index];
if (next.rate >= (nextTx.rate || (nextTx.fee / nextTx.vsize))) {
for (const pool of next.acceleration.pools) {
if (pools[pool].vsize + next.vsize <= 999_000) {
pools[pool].vsize += next.vsize;
} else {
pools[pool].block++;
pools[pool].vsize = next.vsize;
}
// insert the acceleration into matching pool's blocks
if (pools[pool].complete && mempoolCache[next.acceleration.txid]?.position !== undefined) {
accelerationPositions[next.acceleration.txid].push({
...mempoolCache[next.acceleration.txid].position as { block: number, vsize: number },
poolId: pool,
pool: pools[pool].name
});
} else {
accelerationPositions[next.acceleration.txid].push({
poolId: pool,
pool: pools[pool].name,
block: pools[pool].block,
vsize: pools[pool].vsize - (next.vsize / 2),
});
}
// and any accelerated ancestors
for (const ancestor of mempoolCache[next.acceleration.txid].ancestors || []) {
if (pools[pool].complete && mempoolCache[ancestor.txid]?.position !== undefined) {
accelerationPositions[ancestor.txid].push({
...mempoolCache[ancestor.txid].position as { block: number, vsize: number },
poolId: pool,
pool: pools[pool].name,
});
} else {
accelerationPositions[ancestor.txid].push({
poolId: pool,
pool: pools[pool].name,
block: pools[pool].block,
vsize: pools[pool].vsize - (next.vsize / 2),
});
}
}
}
next = accQueue.pop();
} else {
// skip accelerated transactions and their CPFP ancestors
if (accelerationPositions[nextTx.txid] == null) {
// insert into all pools' blocks
for (const pool of Object.keys(pools)) {
if (pools[pool].vsize + nextTx.vsize <= 999_000) {
pools[pool].vsize += nextTx.vsize;
} else {
pools[pool].block++;
pools[pool].vsize = nextTx.vsize;
}
}
}
index++;
}
}
block++;
index = 0;
}
mempool.setAccelerationPositions(accelerationPositions);
}
}
export default new MempoolBlocks();

View File

@ -27,6 +27,7 @@ class Mempool {
deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[], candidates?: GbtCandidates) => Promise<void>) | undefined;
private accelerations: { [txId: string]: Acceleration } = {};
private accelerationPositions: { [txid: string]: { poolId: number, pool: string, block: number, vsize: number }[] } = {};
private txPerSecondArray: number[] = [];
private txPerSecond: number = 0;
@ -514,6 +515,14 @@ class Mempool {
}
}
setAccelerationPositions(positions: { [txid: string]: { poolId: number, pool: string, block: number, vsize: number }[] }): void {
this.accelerationPositions = positions;
}
getAccelerationPositions(txid: string): { [pool: number]: { poolId: number, pool: string, block: number, vsize: number } } | undefined {
return this.accelerationPositions[txid];
}
private startTimer() {
const state: any = {
start: Date.now(),

View File

@ -10,6 +10,12 @@ export interface Acceleration {
effectiveFee: number,
feeDelta: number,
pools: number[],
positions?: {
[pool: number]: {
block: number,
vbytes: number,
},
},
};
export interface AccelerationHistory {

View File

@ -206,7 +206,8 @@ class WebsocketHandler {
}
response['txPosition'] = JSON.stringify({
txid: trackTxid,
position
position,
accelerationPositions: memPool.getAccelerationPositions(tx.txid),
});
}
} else {
@ -821,7 +822,8 @@ class WebsocketHandler {
...mempoolTx.position,
accelerated: mempoolTx.acceleration || undefined,
acceleratedBy: mempoolTx.acceleratedBy || undefined,
}
},
accelerationPositions: memPool.getAccelerationPositions(mempoolTx.txid),
};
if (!mempoolTx.cpfpChecked && !mempoolTx.acceleration) {
calculateCpfp(mempoolTx, newMempool);
@ -834,7 +836,7 @@ class WebsocketHandler {
effectiveFeePerVsize: mempoolTx.effectiveFeePerVsize || null,
sigops: mempoolTx.sigops,
adjustedVsize: mempoolTx.adjustedVsize,
acceleration: mempoolTx.acceleration
acceleration: mempoolTx.acceleration,
};
}
response['txPosition'] = JSON.stringify(positionData);
@ -1137,7 +1139,8 @@ class WebsocketHandler {
...mempoolTx.position,
accelerated: mempoolTx.acceleration || undefined,
acceleratedBy: mempoolTx.acceleratedBy || undefined,
}
},
accelerationPositions: memPool.getAccelerationPositions(mempoolTx.txid),
});
}
}

View File

@ -2,8 +2,7 @@ import * as fs from 'fs';
import path from 'path';
import config from './config';
import { createPool, Pool, PoolConnection } from 'mysql2/promise';
import { LogLevel } from './logger';
import logger from './logger';
import logger, { LogLevel } from './logger';
import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } from 'mysql2/typings/mysql';
import { execSync } from 'child_process';

View File

@ -45,6 +45,7 @@ import bitcoinCoreRoutes from './api/bitcoin/bitcoin-core.routes';
import bitcoinSecondClient from './api/bitcoin/bitcoin-second-client';
import accelerationRoutes from './api/acceleration/acceleration.routes';
import aboutRoutes from './api/about.routes';
import mempoolBlocks from './api/mempool-blocks';
class Server {
private wss: WebSocket.Server | undefined;
@ -149,6 +150,7 @@ class Server {
await poolsUpdater.updatePoolsJson(); // Needs to be done before loading the disk cache because we sometimes wipe it
await syncAssets.syncAssets$();
await mempoolBlocks.updatePools$();
if (config.MEMPOOL.ENABLED) {
if (config.MEMPOOL.CACHE_ENABLED) {
await diskCache.$loadMempoolCache();

View File

@ -13,7 +13,7 @@ import {
retry
} from 'rxjs/operators';
import { Transaction } from '../../interfaces/electrs.interface';
import { of, merge, Subscription, Observable, Subject, from, throwError, combineLatest } from 'rxjs';
import { of, merge, Subscription, Observable, Subject, from, throwError } from 'rxjs';
import { StateService } from '../../services/state.service';
import { CacheService } from '../../services/cache.service';
import { WebsocketService } from '../../services/websocket.service';
@ -23,11 +23,11 @@ import { SeoService } from '../../services/seo.service';
import { StorageService } from '../../services/storage.service';
import { seoDescriptionNetwork } from '../../shared/common.utils';
import { getTransactionFlags } from '../../shared/transaction.utils';
import { Filter, toFilters, TransactionFlags } from '../../shared/filters.utils';
import { BlockExtended, CpfpInfo, RbfTree, MempoolPosition, DifficultyAdjustment, Acceleration } from '../../interfaces/node-api.interface';
import { Filter, toFilters } from '../../shared/filters.utils';
import { BlockExtended, CpfpInfo, RbfTree, MempoolPosition, DifficultyAdjustment, Acceleration, AccelerationPosition } from '../../interfaces/node-api.interface';
import { LiquidUnblinding } from './liquid-ublinding';
import { RelativeUrlPipe } from '../../shared/pipes/relative-url/relative-url.pipe';
import { Price, PriceService } from '../../services/price.service';
import { PriceService } from '../../services/price.service';
import { isFeatureActive } from '../../bitcoin.utils';
import { ServicesApiServices } from '../../services/services-api.service';
import { EnterpriseService } from '../../services/enterprise.service';
@ -62,6 +62,7 @@ export class TransactionComponent implements OnInit, AfterViewInit, OnDestroy {
txId: string;
txInBlockIndex: number;
mempoolPosition: MempoolPosition;
accelerationPositions: AccelerationPosition[];
isLoadingTx = true;
error: any = undefined;
errorUnblinded: any = undefined;
@ -372,10 +373,14 @@ export class TransactionComponent implements OnInit, AfterViewInit, OnDestroy {
this.now = Date.now();
if (txPosition && txPosition.txid === this.txId && txPosition.position) {
this.mempoolPosition = txPosition.position;
this.accelerationPositions = txPosition.accelerationPositions;
if (this.tx && !this.tx.status.confirmed) {
const txFeePerVSize = this.getUnacceleratedFeeRate(this.tx, this.tx.acceleration || this.mempoolPosition?.accelerated);
this.stateService.markBlock$.next({
txid: txPosition.txid,
mempoolPosition: this.mempoolPosition
txFeePerVSize,
mempoolPosition: this.mempoolPosition,
accelerationPositions: this.accelerationPositions,
});
this.txInBlockIndex = this.mempoolPosition.block;
@ -390,6 +395,7 @@ export class TransactionComponent implements OnInit, AfterViewInit, OnDestroy {
}
} else {
this.mempoolPosition = null;
this.accelerationPositions = null;
}
});
@ -513,11 +519,13 @@ export class TransactionComponent implements OnInit, AfterViewInit, OnDestroy {
});
this.fetchCpfp$.next(this.tx.txid);
} else {
const txFeePerVSize = this.getUnacceleratedFeeRate(this.tx, this.tx.acceleration || this.mempoolPosition?.accelerated);
if (tx.cpfpChecked) {
this.stateService.markBlock$.next({
txid: tx.txid,
txFeePerVSize: tx.effectiveFeePerVsize,
txFeePerVSize,
mempoolPosition: this.mempoolPosition,
accelerationPositions: this.accelerationPositions,
});
this.setCpfpInfo({
ancestors: tx.ancestors,
@ -791,6 +799,7 @@ export class TransactionComponent implements OnInit, AfterViewInit, OnDestroy {
this.mempoolPosition = null;
this.pool = null;
this.auditStatus = null;
this.accelerationPositions = null;
document.body.scrollTo(0, 0);
this.isAcceleration = false;
this.leaveTransaction();

View File

@ -251,6 +251,11 @@ export interface MempoolPosition {
acceleratedBy?: number[],
}
export interface AccelerationPosition extends MempoolPosition {
pool: string;
offset?: number;
}
export interface RewardStats {
startBlock: number;
endBlock: number;

View File

@ -2,7 +2,7 @@ import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core';
import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable } from 'rxjs';
import { Transaction } from '../interfaces/electrs.interface';
import { AccelerationDelta, HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '../interfaces/websocket.interface';
import { Acceleration, BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface';
import { Acceleration, AccelerationPosition, BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '../interfaces/node-api.interface';
import { Router, NavigationStart } from '@angular/router';
import { isPlatformBrowser } from '@angular/common';
import { filter, map, scan, shareReplay } from 'rxjs/operators';
@ -16,6 +16,7 @@ export interface MarkBlockState {
mempoolBlockIndex?: number;
txFeePerVSize?: number;
mempoolPosition?: MempoolPosition;
accelerationPositions?: AccelerationPosition[];
}
export interface ILoadingIndicators { [name: string]: number; }
@ -145,7 +146,7 @@ export class StateService {
utxoSpent$ = new Subject<object>();
difficultyAdjustment$ = new ReplaySubject<DifficultyAdjustment>(1);
mempoolTransactions$ = new Subject<Transaction>();
mempoolTxPosition$ = new Subject<{ txid: string, position: MempoolPosition, cpfp: CpfpInfo | null}>();
mempoolTxPosition$ = new Subject<{ txid: string, position: MempoolPosition, cpfp: CpfpInfo | null, accelerationPositions?: AccelerationPosition[] }>();
mempoolRemovedTransactions$ = new Subject<Transaction>();
multiAddressTransactions$ = new Subject<{ [address: string]: { mempool: Transaction[], confirmed: Transaction[], removed: Transaction[] }}>();
blockTransactions$ = new Subject<Transaction>();