diff --git a/src/ORM/address-settings/address-settings.entity.ts b/src/ORM/address-settings/address-settings.entity.ts index bf1fcf3..5520597 100644 --- a/src/ORM/address-settings/address-settings.entity.ts +++ b/src/ORM/address-settings/address-settings.entity.ts @@ -17,5 +17,8 @@ export class AddressSettingsEntity extends TrackedEntity { @Column({ nullable: true }) miscCoinbaseScriptData: string; + @Column({ nullable: true }) + bestDifficultyUserAgent: string; + } diff --git a/src/ORM/address-settings/address-settings.service.ts b/src/ORM/address-settings/address-settings.service.ts index ec3baab..9df9273 100644 --- a/src/ORM/address-settings/address-settings.service.ts +++ b/src/ORM/address-settings/address-settings.service.ts @@ -22,8 +22,16 @@ export class AddressSettingsService { return settings; } - public async updateBestDifficulty(address: string, bestDifficulty: number) { - return await this.addressSettingsRepository.update({ address }, { bestDifficulty }); + public async updateBestDifficulty(address: string, bestDifficulty: number, bestDifficultyUserAgent: string) { + return await this.addressSettingsRepository.update({ address }, { bestDifficulty, bestDifficultyUserAgent }); + } + + public async getHighScores() { + return await this.addressSettingsRepository.createQueryBuilder() + .select('"updatedAt", "bestDifficulty", "bestDifficultyUserAgent"') + .orderBy('"bestDifficulty"', 'DESC') + .limit(10) + .execute(); } public async createNew(address: string) { diff --git a/src/ORM/client-statistics/client-statistics.service.ts b/src/ORM/client-statistics/client-statistics.service.ts index 2dfdfb4..1ab53d1 100644 --- a/src/ORM/client-statistics/client-statistics.service.ts +++ b/src/ORM/client-statistics/client-statistics.service.ts @@ -17,29 +17,23 @@ export class ClientStatisticsService { } - public async save(clientStatistic: Partial) { - // Attempt to update the existing record - const updateResult = await this.clientStatisticsRepository - .createQueryBuilder() - .update(ClientStatisticsEntity) - .set({ - shares: () => `"shares" + :sharesIncrement`, - acceptedCount: () => `"acceptedCount" + 1` - }) - .where('address = :address AND clientName = :clientName AND sessionId = :sessionId AND time = :time', { - address: clientStatistic.address, - clientName: clientStatistic.clientName, - sessionId: clientStatistic.sessionId, - time: clientStatistic.time, - sharesIncrement: clientStatistic.shares - }) - .execute(); + public async update(clientStatistic: Partial) { - // Check if the update affected any rows - if (updateResult.affected === 0) { - // If no rows were updated, insert a new record - await this.clientStatisticsRepository.insert(clientStatistic); - } + await this.clientStatisticsRepository.update({ + address: clientStatistic.address, + clientName: clientStatistic.clientName, + sessionId: clientStatistic.sessionId, + time: clientStatistic.time + }, + { + shares: clientStatistic.shares, + acceptedCount: clientStatistic.acceptedCount + }); + + } + public async insert(clientStatistic: Partial) { + // If no rows were updated, insert a new record + await this.clientStatisticsRepository.insert(clientStatistic); } public async deleteOldStatistics() { diff --git a/src/ORM/rpc-block/rpc-block.service.ts b/src/ORM/rpc-block/rpc-block.service.ts index 71b5b6c..f6de1db 100644 --- a/src/ORM/rpc-block/rpc-block.service.ts +++ b/src/ORM/rpc-block/rpc-block.service.ts @@ -25,4 +25,19 @@ export class RpcBlockService { public saveBlock(blockHeight: number, data: string) { return this.rpcBlockRepository.update(blockHeight, { data }) } + + public async deleteOldBlocks() { + const result = await this.rpcBlockRepository.createQueryBuilder('entity') + .select('MAX(entity.blockHeight)', 'maxNumber') + .getRawOne(); + + const newestBlock = result ? result.maxNumber : null; + + await this.rpcBlockRepository.createQueryBuilder() + .delete() + .where('"blockHeight" < :newestBlock', { newestBlock }) + .execute(); + + return; + } } \ No newline at end of file diff --git a/src/app.controller.ts b/src/app.controller.ts index 7cb2f74..24ea040 100644 --- a/src/app.controller.ts +++ b/src/app.controller.ts @@ -3,6 +3,7 @@ import { Controller, Get, Inject } from '@nestjs/common'; import { Cache } from 'cache-manager'; import { firstValueFrom } from 'rxjs'; +import { AddressSettingsService } from './ORM/address-settings/address-settings.service'; import { BlocksService } from './ORM/blocks/blocks.service'; import { ClientStatisticsService } from './ORM/client-statistics/client-statistics.service'; import { ClientService } from './ORM/client/client.service'; @@ -18,7 +19,8 @@ export class AppController { private readonly clientService: ClientService, private readonly clientStatisticsService: ClientStatisticsService, private readonly blocksService: BlocksService, - private readonly bitcoinRpcService: BitcoinRpcService + private readonly bitcoinRpcService: BitcoinRpcService, + private readonly addressSettingsService: AddressSettingsService, ) { } @Get('info') @@ -35,10 +37,12 @@ export class AppController { const blockData = await this.blocksService.getFoundBlocks(); const userAgents = await this.clientService.getUserAgents(); + const highScores = await this.addressSettingsService.getHighScores(); const data = { blockData, userAgents, + highScores, uptime: this.uptime }; @@ -65,7 +69,7 @@ export class AppController { const totalMiners = userAgents.reduce((acc, userAgent) => acc + parseInt(userAgent.count), 0); const blockHeight = (await firstValueFrom(this.bitcoinRpcService.newBlock$)).blocks; const blocksFound = await this.blocksService.getFoundBlocks(); - + const data = { totalHashRate, blockHeight, diff --git a/src/models/StratumV1Client.ts b/src/models/StratumV1Client.ts index 2726b88..be55195 100644 --- a/src/models/StratumV1Client.ts +++ b/src/models/StratumV1Client.ts @@ -83,7 +83,9 @@ export class StratumV1Client { public async destroy() { - await this.clientService.delete(this.extraNonceAndSessionId); + if (this.extraNonceAndSessionId) { + await this.clientService.delete(this.extraNonceAndSessionId); + } if (this.stratumSubscription != null) { this.stratumSubscription.unsubscribe(); @@ -373,11 +375,6 @@ export class StratumV1Client { }, 60 * 1000) ); - this.backgroundWork.push( - setInterval(async () => { - await this.statistics.saveShares(this.entity); - }, 60 * 1000) - ); } private async sendNewMiningJob(jobTemplate: IJobTemplate) { @@ -515,7 +512,7 @@ export class StratumV1Client { } } try { - await this.statistics.addShares(this.sessionDifficulty); + await this.statistics.addShares(this.entity, this.sessionDifficulty); const now = new Date(); // only update every minute if (this.entity.updatedAt == null || now.getTime() - this.entity.updatedAt.getTime() > 1000 * 60) { @@ -541,7 +538,7 @@ export class StratumV1Client { await this.clientService.updateBestDifficulty(this.extraNonceAndSessionId, submissionDifficulty); this.entity.bestDifficulty = submissionDifficulty; if (submissionDifficulty > (await this.addressSettingsService.getSettings(this.clientAuthorization.address, true)).bestDifficulty) { - await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty); + await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty, this.entity.userAgent); } } diff --git a/src/models/StratumV1ClientStatistics.ts b/src/models/StratumV1ClientStatistics.ts index 134a671..2ea97c5 100644 --- a/src/models/StratumV1ClientStatistics.ts +++ b/src/models/StratumV1ClientStatistics.ts @@ -6,10 +6,14 @@ const TARGET_SUBMISSION_PER_SECOND = 10; const MIN_DIFF = 0.00001; export class StratumV1ClientStatistics { - private shareBacklog: number = 0; + private shares: number = 0; + private acceptedCount: number = 0; private submissionCacheStart: Date; - private submissionCache = []; + private submissionCache: { time: Date, difficulty: number }[] = []; + + private currentTimeSlot: number = null; + private lastSave: number = null; constructor( private readonly clientStatisticsService: ClientStatisticsService @@ -18,42 +22,61 @@ export class StratumV1ClientStatistics { } - public async saveShares(client: ClientEntity) { + private async saveShares(client: ClientEntity) { - if (client == null || client.address == null || client.clientName == null || client.sessionId == null) { - return; - } - // 10 min - var coeff = 1000 * 60 * 10; - var date = new Date(); - var rounded = new Date(Math.floor(date.getTime() / coeff) * coeff); - - await this.clientStatisticsService.save({ - time: rounded.getTime(), - shares: this.shareBacklog, - address: client.address, - clientName: client.clientName, - sessionId: client.sessionId - }); - - this.shareBacklog = 0; } // We don't want to save them here because it can be DB intensive, stead do it every once in // awhile with saveShares() - public async addShares(targetDifficulty: number) { + public async addShares(client: ClientEntity, targetDifficulty: number) { + + // 10 min + var coeff = 1000 * 60 * 10; + var date = new Date(); + var timeSlot = new Date(Math.floor(date.getTime() / coeff) * coeff).getTime(); if (this.submissionCache.length > CACHE_SIZE) { this.submissionCache.shift(); } - this.submissionCache.push({ - time: new Date(), + time: date, difficulty: targetDifficulty, }); - this.shareBacklog += targetDifficulty; + + + if (this.currentTimeSlot == null) { + this.currentTimeSlot = timeSlot; + } + + if (this.currentTimeSlot != timeSlot) { + await this.clientStatisticsService.insert({ + time: this.currentTimeSlot, + shares: this.shares, + acceptedCount: this.acceptedCount, + address: client.address, + clientName: client.clientName, + sessionId: client.sessionId + }); + this.shares = 0; + this.acceptedCount = 0; + this.currentTimeSlot = timeSlot; + } else if ((date.getTime() - 60 * 1000) > this.lastSave) { + await this.clientStatisticsService.update({ + time: this.currentTimeSlot, + shares: this.shares, + acceptedCount: this.acceptedCount, + address: client.address, + clientName: client.clientName, + sessionId: client.sessionId + }); + this.lastSave = new Date().getTime(); + } + + + this.shares += targetDifficulty; + this.acceptedCount++; } diff --git a/src/services/app.service.ts b/src/services/app.service.ts index 78fc73f..e2181f1 100644 --- a/src/services/app.service.ts +++ b/src/services/app.service.ts @@ -1,9 +1,9 @@ import { Injectable, OnModuleInit } from '@nestjs/common'; -import { Interval } from '@nestjs/schedule'; import { DataSource } from 'typeorm'; import { ClientStatisticsService } from '../ORM/client-statistics/client-statistics.service'; import { ClientService } from '../ORM/client/client.service'; +import { RpcBlockService } from '../ORM/rpc-block/rpc-block.service'; @Injectable() export class AppService implements OnModuleInit { @@ -11,7 +11,8 @@ export class AppService implements OnModuleInit { constructor( private readonly clientStatisticsService: ClientStatisticsService, private readonly clientService: ClientService, - private readonly dataSource: DataSource + private readonly dataSource: DataSource, + private readonly rpcBlockService: RpcBlockService, ) { } @@ -28,25 +29,38 @@ export class AppService implements OnModuleInit { await this.dataSource.query(`PRAGMA synchronous = off;`); // //6Gb // await this.dataSource.query(`PRAGMA mmap_size = 6000000000;`); + + if (process.env.ENABLE_SOLO == 'true' && (process.env.NODE_APP_INSTANCE == null || process.env.NODE_APP_INSTANCE == '0')) { + + setInterval(async () => { + await this.deleteOldStatistics(); + }, 1000 * 60 * 60); + + setInterval(async () => { + console.log('Killing dead clients'); + await this.clientService.killDeadClients(); + }, 1000 * 60 * 5); + + setInterval(async () => { + console.log('Deleting Old Blocks'); + await this.rpcBlockService.deleteOldBlocks(); + }, 1000 * 60 * 60 * 24); + + + + } + } - @Interval(1000 * 60 * 60) private async deleteOldStatistics() { console.log('Deleting statistics'); - if (process.env.ENABLE_SOLO == 'true' && (process.env.NODE_APP_INSTANCE == null || process.env.NODE_APP_INSTANCE == '0')) { - const deletedStatistics = await this.clientStatisticsService.deleteOldStatistics(); - console.log(`Deleted ${deletedStatistics.affected} old statistics`); - const deletedClients = await this.clientService.deleteOldClients(); - console.log(`Deleted ${deletedClients.affected} old clients`); - } + + const deletedStatistics = await this.clientStatisticsService.deleteOldStatistics(); + console.log(`Deleted ${deletedStatistics.affected} old statistics`); + const deletedClients = await this.clientService.deleteOldClients(); + console.log(`Deleted ${deletedClients.affected} old clients`); + } - @Interval(1000 * 60 * 5) - private async killDeadClients() { - console.log('Killing dead clients'); - if (process.env.ENABLE_SOLO == 'true' && (process.env.NODE_APP_INSTANCE == null || process.env.NODE_APP_INSTANCE == '0')) { - await this.clientService.killDeadClients(); - } - } } \ No newline at end of file diff --git a/src/services/bitcoin-rpc.service.ts b/src/services/bitcoin-rpc.service.ts index dff9883..3627eeb 100644 --- a/src/services/bitcoin-rpc.service.ts +++ b/src/services/bitcoin-rpc.service.ts @@ -1,15 +1,15 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { RPCClient } from 'rpc-bitcoin'; import { BehaviorSubject, filter, shareReplay } from 'rxjs'; import { RpcBlockService } from 'src/ORM/rpc-block/rpc-block.service'; -import * as zmq from 'zeromq/v5-compat'; +import * as zmq from 'zeromq'; import { IBlockTemplate } from '../models/bitcoin-rpc/IBlockTemplate'; import { IMiningInfo } from '../models/bitcoin-rpc/IMiningInfo'; @Injectable() -export class BitcoinRpcService { +export class BitcoinRpcService implements OnModuleInit { private blockHeight = 0; private client: RPCClient; @@ -20,7 +20,9 @@ export class BitcoinRpcService { private readonly configService: ConfigService, private rpcBlockService: RpcBlockService ) { + } + async onModuleInit() { const url = this.configService.get('BITCOIN_RPC_URL'); const user = this.configService.get('BITCOIN_RPC_USER'); const pass = this.configService.get('BITCOIN_RPC_PASSWORD'); @@ -36,19 +38,36 @@ export class BitcoinRpcService { }); if (this.configService.get('BITCOIN_ZMQ_HOST')) { - const sock = zmq.socket("sub"); - sock.connect(this.configService.get('BITCOIN_ZMQ_HOST')); - sock.subscribe("rawblock"); - sock.on("message", async (topic: Buffer, message: Buffer) => { - console.log("new block zmq"); - await this.pollMiningInfo(); + console.log('Using ZMQ'); + const sock = new zmq.Subscriber; + + + sock.connectTimeout = 1000; + sock.events.on('connect', () => { + console.log('ZMQ Connected'); }); - this.pollMiningInfo().then(() => { }); + sock.events.on('connect:retry', () => { + console.log('ZMQ Unable to connect, Retrying'); + }); + + sock.connect(this.configService.get('BITCOIN_ZMQ_HOST')); + sock.subscribe('rawblock'); + // Don't await this, otherwise it will block the rest of the program + this.listenForNewBlocks(sock); + await this.pollMiningInfo(); + } else { setInterval(this.pollMiningInfo.bind(this), 500); } } + private async listenForNewBlocks(sock: zmq.Subscriber) { + for await (const [topic, msg] of sock) { + console.log("New Block"); + await this.pollMiningInfo(); + } + } + public async pollMiningInfo() { const miningInfo = await this.getMiningInfo(); if (miningInfo != null && miningInfo.blocks > this.blockHeight) {