diff --git a/backend/mempool-config.sample.json b/backend/mempool-config.sample.json index b29e8cb7a..5a91ee182 100644 --- a/backend/mempool-config.sample.json +++ b/backend/mempool-config.sample.json @@ -139,6 +139,8 @@ "ENABLED": false, "AUDIT": false, "AUDIT_START_HEIGHT": 774000, + "STATISTICS": false, + "STATISTICS_START_TIME": 1481932800, "SERVERS": [ "list", "of", diff --git a/backend/src/__fixtures__/mempool-config.template.json b/backend/src/__fixtures__/mempool-config.template.json index f81e889ca..86fb69fda 100644 --- a/backend/src/__fixtures__/mempool-config.template.json +++ b/backend/src/__fixtures__/mempool-config.template.json @@ -131,6 +131,8 @@ "ENABLED": false, "AUDIT": false, "AUDIT_START_HEIGHT": 774000, + "STATISTICS": false, + "STATISTICS_START_TIME": 1481932800, "SERVERS": [] }, "MEMPOOL_SERVICES": { diff --git a/backend/src/__tests__/config.test.ts b/backend/src/__tests__/config.test.ts index 18f72f88c..256d8f931 100644 --- a/backend/src/__tests__/config.test.ts +++ b/backend/src/__tests__/config.test.ts @@ -135,6 +135,8 @@ describe('Mempool Backend Config', () => { ENABLED: false, AUDIT: false, AUDIT_START_HEIGHT: 774000, + STATISTICS: false, + STATISTICS_START_TIME: 1481932800, SERVERS: [] }); diff --git a/backend/src/api/statistics/statistics-api.ts b/backend/src/api/statistics/statistics-api.ts index c7c3f37b0..2d66d69d9 100644 --- a/backend/src/api/statistics/statistics-api.ts +++ b/backend/src/api/statistics/statistics-api.ts @@ -64,7 +64,7 @@ class StatisticsApi { } } - public async $create(statistics: Statistic): Promise { + public async $create(statistics: Statistic, convertToDatetime = false): Promise { try { const query = `INSERT INTO statistics( added, @@ -114,7 +114,7 @@ class StatisticsApi { vsize_1800, vsize_2000 ) - VALUES (${statistics.added}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + VALUES (${convertToDatetime ? `FROM_UNIXTIME(${statistics.added})` : statistics.added}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`; const params: (string | number)[] = [ @@ -456,6 +456,59 @@ class StatisticsApi { }; }); } + + public mapOptimizedStatisticToStatistic(statistic: OptimizedStatistic[]): Statistic[] { + return statistic.map((s) => { + return { + added: s.added, + unconfirmed_transactions: s.count, + tx_per_second: 0, + vbytes_per_second: s.vbytes_per_second, + mempool_byte_weight: s.mempool_byte_weight || 0, + total_fee: s.total_fee || 0, + min_fee: s.min_fee, + fee_data: '', + vsize_1: s.vsizes[0], + vsize_2: s.vsizes[1], + vsize_3: s.vsizes[2], + vsize_4: s.vsizes[3], + vsize_5: s.vsizes[4], + vsize_6: s.vsizes[5], + vsize_8: s.vsizes[6], + vsize_10: s.vsizes[7], + vsize_12: s.vsizes[8], + vsize_15: s.vsizes[9], + vsize_20: s.vsizes[10], + vsize_30: s.vsizes[11], + vsize_40: s.vsizes[12], + vsize_50: s.vsizes[13], + vsize_60: s.vsizes[14], + vsize_70: s.vsizes[15], + vsize_80: s.vsizes[16], + vsize_90: s.vsizes[17], + vsize_100: s.vsizes[18], + vsize_125: s.vsizes[19], + vsize_150: s.vsizes[20], + vsize_175: s.vsizes[21], + vsize_200: s.vsizes[22], + vsize_250: s.vsizes[23], + vsize_300: s.vsizes[24], + vsize_350: s.vsizes[25], + vsize_400: s.vsizes[26], + vsize_500: s.vsizes[27], + vsize_600: s.vsizes[28], + vsize_700: s.vsizes[29], + vsize_800: s.vsizes[30], + vsize_900: s.vsizes[31], + vsize_1000: s.vsizes[32], + vsize_1200: s.vsizes[33], + vsize_1400: s.vsizes[34], + vsize_1600: s.vsizes[35], + vsize_1800: s.vsizes[36], + vsize_2000: s.vsizes[37], + } + }); + } } export default new StatisticsApi(); diff --git a/backend/src/config.ts b/backend/src/config.ts index 2c479c098..20352a499 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -141,6 +141,8 @@ interface IConfig { ENABLED: boolean; AUDIT: boolean; AUDIT_START_HEIGHT: number; + STATISTICS: boolean; + STATISTICS_START_TIME: number | string; SERVERS: string[]; }, MEMPOOL_SERVICES: { @@ -298,6 +300,8 @@ const defaults: IConfig = { 'ENABLED': false, 'AUDIT': false, 'AUDIT_START_HEIGHT': 774000, + 'STATISTICS': false, + 'STATISTICS_START_TIME': 1481932800, 'SERVERS': [], }, 'MEMPOOL_SERVICES': { diff --git a/backend/src/indexer.ts b/backend/src/indexer.ts index bc169630f..ab2e0678d 100644 --- a/backend/src/indexer.ts +++ b/backend/src/indexer.ts @@ -8,6 +8,7 @@ import priceUpdater from './tasks/price-updater'; import PricesRepository from './repositories/PricesRepository'; import config from './config'; import auditReplicator from './replication/AuditReplication'; +import statisticsReplicator from './replication/StatisticsReplication'; import AccelerationRepository from './repositories/AccelerationRepository'; export interface CoreIndex { @@ -188,6 +189,7 @@ class Indexer { await blocks.$generateCPFPDatabase(); await blocks.$generateAuditStats(); await auditReplicator.$sync(); + await statisticsReplicator.$sync(); await AccelerationRepository.$indexPastAccelerations(); // do not wait for classify blocks to finish blocks.$classifyBlocks(); diff --git a/backend/src/mempool.interfaces.ts b/backend/src/mempool.interfaces.ts index 0fcddc45a..884ae5c1b 100644 --- a/backend/src/mempool.interfaces.ts +++ b/backend/src/mempool.interfaces.ts @@ -422,6 +422,7 @@ export interface Statistic { export interface OptimizedStatistic { added: string; + count: number; vbytes_per_second: number; total_fee: number; mempool_byte_weight: number; diff --git a/backend/src/replication/StatisticsReplication.ts b/backend/src/replication/StatisticsReplication.ts new file mode 100644 index 000000000..f3ebb5a97 --- /dev/null +++ b/backend/src/replication/StatisticsReplication.ts @@ -0,0 +1,228 @@ +import DB from '../database'; +import logger from '../logger'; +import { $sync } from './replicator'; +import config from '../config'; +import { Common } from '../api/common'; +import statistics from '../api/statistics/statistics-api'; + +interface MissingStatistics { + '24h': Set; + '1w': Set; + '1m': Set; + '3m': Set; + '6m': Set; + '2y': Set; + 'all': Set; +} + +const steps = { + '24h': 60, + '1w': 300, + '1m': 1800, + '3m': 7200, + '6m': 10800, + '2y': 28800, + 'all': 43200, +}; + +/** + * Syncs missing statistics data from trusted servers + */ +class StatisticsReplication { + inProgress: boolean = false; + + public async $sync(): Promise { + if (!config.REPLICATION.ENABLED || !config.REPLICATION.STATISTICS || !config.STATISTICS.ENABLED) { + // replication not enabled, or statistics not enabled + return; + } + if (this.inProgress) { + logger.info(`StatisticsReplication sync already in progress`, 'Replication'); + return; + } + this.inProgress = true; + + const missingStatistics = await this.$getMissingStatistics(); + const missingIntervals = Object.keys(missingStatistics).filter(key => missingStatistics[key].size > 0); + const totalMissing = missingIntervals.reduce((total, key) => total + missingStatistics[key].size, 0); + + if (totalMissing === 0) { + this.inProgress = false; + logger.info(`Statistics table is complete, no replication needed`, 'Replication'); + return; + } + + for (const interval of missingIntervals) { + logger.debug(`Missing ${missingStatistics[interval].size} statistics rows in '${interval}' timespan`, 'Replication'); + } + logger.debug(`Fetching ${missingIntervals.join(', ')} statistics endpoints from trusted servers to fill ${totalMissing} rows missing in statistics`, 'Replication'); + + let totalSynced = 0; + let totalMissed = 0; + + for (const interval of missingIntervals) { + const results = await this.$syncStatistics(interval, missingStatistics[interval]); + totalSynced += results.synced; + totalMissed += results.missed; + + logger.info(`Found ${totalSynced} / ${totalSynced + totalMissed} of ${totalMissing} missing statistics rows`, 'Replication'); + await Common.sleep$(3000); + } + + logger.debug(`Synced ${totalSynced} statistics rows, ${totalMissed} still missing`, 'Replication'); + + this.inProgress = false; + } + + private async $syncStatistics(interval: string, missingTimes: Set): Promise { + + let success = false; + let synced = 0; + let missed = new Set(missingTimes); + const syncResult = await $sync(`/api/v1/statistics/${interval}`); + if (syncResult && syncResult.data?.length) { + success = true; + logger.info(`Fetched /api/v1/statistics/${interval} from ${syncResult.server}`); + + for (const stat of syncResult.data) { + const time = this.roundToNearestStep(stat.added, steps[interval]); + if (missingTimes.has(time)) { + try { + await statistics.$create(statistics.mapOptimizedStatisticToStatistic([stat])[0], true); + if (missed.delete(time)) { + synced++; + } + } catch (e: any) { + logger.err(`Failed to insert statistics row at ${stat.added} (${interval}) from ${syncResult.server}. Reason: ` + (e instanceof Error ? e.message : e)); + } + } + } + + } else { + logger.warn(`An error occured when trying to fetch /api/v1/statistics/${interval}`); + } + + return { success, synced, missed: missed.size }; + } + + private async $getMissingStatistics(): Promise { + try { + const now = Math.floor(Date.now() / 1000); + const day = 60 * 60 * 24; + + const startTime = this.getStartTimeFromConfig(); + + const missingStatistics: MissingStatistics = { + '24h': new Set(), + '1w': new Set(), + '1m': new Set(), + '3m': new Set(), + '6m': new Set(), + '2y': new Set(), + 'all': new Set() + }; + + const intervals = [ // [start, end, label ] + [now - day, now - 60, '24h'] , // from 24 hours ago to now = 1 minute granularity + startTime < now - day ? [now - day * 7, now - day, '1w' ] : null, // from 1 week ago to 24 hours ago = 5 minutes granularity + startTime < now - day * 7 ? [now - day * 30, now - day * 7, '1m' ] : null, // from 1 month ago to 1 week ago = 30 minutes granularity + startTime < now - day * 30 ? [now - day * 90, now - day * 30, '3m' ] : null, // from 3 months ago to 1 month ago = 2 hours granularity + startTime < now - day * 90 ? [now - day * 180, now - day * 90, '6m' ] : null, // from 6 months ago to 3 months ago = 3 hours granularity + startTime < now - day * 180 ? [now - day * 365 * 2, now - day * 180, '2y' ] : null, // from 2 years ago to 6 months ago = 8 hours granularity + startTime < now - day * 365 * 2 ? [startTime, now - day * 365 * 2, 'all'] : null, // from start of statistics to 2 years ago = 12 hours granularity + ]; + + for (const interval of intervals) { + if (!interval) { + continue; + } + missingStatistics[interval[2] as string] = await this.$getMissingStatisticsInterval(interval, startTime); + } + + return missingStatistics; + } catch (e: any) { + logger.err(`Cannot fetch missing statistics times from db. Reason: ` + (e instanceof Error ? e.message : e)); + throw e; + } + } + + private async $getMissingStatisticsInterval(interval: any, startTime: number): Promise> { + try { + const start = interval[0]; + const end = interval[1]; + const step = steps[interval[2]]; + + const [rows]: any[] = await DB.query(` + SELECT UNIX_TIMESTAMP(added) as added + FROM statistics + WHERE added >= FROM_UNIXTIME(?) AND added <= FROM_UNIXTIME(?) + GROUP BY UNIX_TIMESTAMP(added) DIV ${step} ORDER BY statistics.added DESC + `, [start, end]); + + const startingTime = Math.max(startTime, start) - Math.max(startTime, start) % step; + + const timeSteps: number[] = []; + for (let time = startingTime; time < end; time += step) { + timeSteps.push(time); + } + + if (timeSteps.length === 0) { + return new Set(); + } + + const roundedTimesAlreadyHere = new Set(rows.map(row => this.roundToNearestStep(row.added, step))); + const missingTimes = new Set(timeSteps.filter(time => !roundedTimesAlreadyHere.has(time))); + + // Don't bother fetching if very few rows are missing + if (missingTimes.size < timeSteps.length * 0.005) { + return new Set(); + } + + return missingTimes; + } catch (e: any) { + logger.err(`Cannot fetch missing statistics times from db. Reason: ` + (e instanceof Error ? e.message : e)); + throw e; + } + } + + private roundToNearestStep(time: number, step: number): number { + const remainder = time % step; + if (remainder < step / 2) { + return time - remainder; + } else { + return time + (step - remainder); + } + } + + private getStartTimeFromConfig(): number { + const now = Math.floor(Date.now() / 1000); + const day = 60 * 60 * 24; + + let startTime: number; + if (typeof(config.REPLICATION.STATISTICS_START_TIME) === 'string' && ['24h', '1w', '1m', '3m', '6m', '2y', 'all'].includes(config.REPLICATION.STATISTICS_START_TIME)) { + if (config.REPLICATION.STATISTICS_START_TIME === 'all') { + startTime = 1481932800; + } else if (config.REPLICATION.STATISTICS_START_TIME === '2y') { + startTime = now - day * 365 * 2; + } else if (config.REPLICATION.STATISTICS_START_TIME === '6m') { + startTime = now - day * 180; + } else if (config.REPLICATION.STATISTICS_START_TIME === '3m') { + startTime = now - day * 90; + } else if (config.REPLICATION.STATISTICS_START_TIME === '1m') { + startTime = now - day * 30; + } else if (config.REPLICATION.STATISTICS_START_TIME === '1w') { + startTime = now - day * 7; + } else { + startTime = now - day; + } + } else { + startTime = Math.max(config.REPLICATION.STATISTICS_START_TIME as number || 1481932800, 1481932800); + } + + return startTime; + } + +} + +export default new StatisticsReplication(); + diff --git a/docker/backend/mempool-config.json b/docker/backend/mempool-config.json index 24ecf60c7..60ee8b729 100644 --- a/docker/backend/mempool-config.json +++ b/docker/backend/mempool-config.json @@ -137,6 +137,8 @@ "ENABLED": __REPLICATION_ENABLED__, "AUDIT": __REPLICATION_AUDIT__, "AUDIT_START_HEIGHT": __REPLICATION_AUDIT_START_HEIGHT__, + "STATISTICS": __REPLICATION_STATISTICS__, + "STATISTICS_START_TIME": __REPLICATION_STATISTICS_START_TIME__, "SERVERS": __REPLICATION_SERVERS__ }, "MEMPOOL_SERVICES": { diff --git a/docker/backend/start.sh b/docker/backend/start.sh index 4108e0534..014b9672f 100755 --- a/docker/backend/start.sh +++ b/docker/backend/start.sh @@ -138,6 +138,8 @@ __MAXMIND_GEOIP2_ISP__=${MAXMIND_GEOIP2_ISP:=""} __REPLICATION_ENABLED__=${REPLICATION_ENABLED:=false} __REPLICATION_AUDIT__=${REPLICATION_AUDIT:=false} __REPLICATION_AUDIT_START_HEIGHT__=${REPLICATION_AUDIT_START_HEIGHT:=774000} +__REPLICATION_STATISTICS__=${REPLICATION_STATISTICS:=false} +__REPLICATION_STATISTICS_START_TIME__=${REPLICATION_STATISTICS_START_TIME:=1481932800} __REPLICATION_SERVERS__=${REPLICATION_SERVERS:=[]} # MEMPOOL_SERVICES @@ -284,6 +286,8 @@ sed -i "s!__MAXMIND_GEOIP2_ISP__!${__MAXMIND_GEOIP2_ISP__}!g" mempool-config.jso sed -i "s!__REPLICATION_ENABLED__!${__REPLICATION_ENABLED__}!g" mempool-config.json sed -i "s!__REPLICATION_AUDIT__!${__REPLICATION_AUDIT__}!g" mempool-config.json sed -i "s!__REPLICATION_AUDIT_START_HEIGHT__!${__REPLICATION_AUDIT_START_HEIGHT__}!g" mempool-config.json +sed -i "s!__REPLICATION_STATISTICS__!${__REPLICATION_STATISTICS__}!g" mempool-config.json +sed -i "s!__REPLICATION_STATISTICS_START_TIME__!${__REPLICATION_STATISTICS_START_TIME__}!g" mempool-config.json sed -i "s!__REPLICATION_SERVERS__!${__REPLICATION_SERVERS__}!g" mempool-config.json # MEMPOOL_SERVICES diff --git a/production/mempool-config.mainnet.json b/production/mempool-config.mainnet.json index 385f8cbdc..ae6c7ae46 100644 --- a/production/mempool-config.mainnet.json +++ b/production/mempool-config.mainnet.json @@ -97,6 +97,8 @@ "ENABLED": true, "AUDIT": true, "AUDIT_START_HEIGHT": 774000, + "STATISTICS": true, + "STATISTICS_START_TIME": "24h", "SERVERS": [ "node201.fmt.mempool.space", "node202.fmt.mempool.space",