diff --git a/src/ORM/_migrations/UniqueNonceIndex.ts b/src/ORM/_migrations/UniqueNonceIndex.ts new file mode 100644 index 0000000..3182338 --- /dev/null +++ b/src/ORM/_migrations/UniqueNonceIndex.ts @@ -0,0 +1,15 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class UniqueNonceIndex implements MigrationInterface { + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE UNIQUE INDEX "IDX_unique_nonce" ON "client_entity" ("sessionId") WHERE "deletedAt" IS NOT NULL` + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "IDX_unique_nonce"`); + } + +} \ No newline at end of file diff --git a/src/ORM/address-settings/address-settings.entity.ts b/src/ORM/address-settings/address-settings.entity.ts index 00fcd4e..8d287e0 100644 --- a/src/ORM/address-settings/address-settings.entity.ts +++ b/src/ORM/address-settings/address-settings.entity.ts @@ -14,6 +14,9 @@ export class AddressSettingsEntity extends TrackedEntity { @Column({ type: 'decimal', default: 0 }) bestDifficulty: number; + @Column({ nullable: true }) + bestDifficultyUserAgent: string; + @Column({ nullable: true }) miscCoinbaseScriptData: string; diff --git a/src/ORM/address-settings/address-settings.service.ts b/src/ORM/address-settings/address-settings.service.ts index ec3baab..4246668 100644 --- a/src/ORM/address-settings/address-settings.service.ts +++ b/src/ORM/address-settings/address-settings.service.ts @@ -17,13 +17,18 @@ export class AddressSettingsService { public async getSettings(address: string, createIfNotFound: boolean) { const settings = await this.addressSettingsRepository.findOne({ where: { address } }); if (createIfNotFound == true && settings == null) { - return await this.createNew(address); + // It's possible to have a race condition here so if we get a PK violation, fetch it + try { + return await this.createNew(address); + } catch (e) { + return await this.addressSettingsRepository.findOne({ where: { address } }); + } } return settings; } - public async updateBestDifficulty(address: string, bestDifficulty: number) { - return await this.addressSettingsRepository.update({ address }, { bestDifficulty }); + public async updateBestDifficulty(address: string, bestDifficulty: number, bestDifficultyUserAgent: string) { + return await this.addressSettingsRepository.update({ address }, { bestDifficulty, bestDifficultyUserAgent }); } public async createNew(address: string) { @@ -46,4 +51,12 @@ export class AddressSettingsService { bestDifficulty: 0 }); } + + public async getHighScores() { + return await this.addressSettingsRepository.createQueryBuilder() + .select('"updatedAt", "bestDifficulty", "bestDifficultyUserAgent"') + .orderBy('"bestDifficulty"', 'DESC') + .limit(10) + .execute(); + } } \ No newline at end of file diff --git a/src/ORM/client-statistics/client-statistics.entity.ts b/src/ORM/client-statistics/client-statistics.entity.ts index 272fc08..a449505 100644 --- a/src/ORM/client-statistics/client-statistics.entity.ts +++ b/src/ORM/client-statistics/client-statistics.entity.ts @@ -1,12 +1,11 @@ -import { Column, Entity, Index, PrimaryGeneratedColumn } from 'typeorm'; +import { Column, Entity, Index, ManyToOne, PrimaryGeneratedColumn } from 'typeorm'; +import { ClientEntity } from '../client/client.entity'; import { TrackedEntity } from '../utils/TrackedEntity.entity'; @Entity() -//Index for getHashRateForSession -@Index(["address", "clientName", "sessionId"]) //Index for statistics save -@Index(["address", "clientName", "sessionId", "time"]) +@Index(["clientId", "time"]) export class ClientStatisticsEntity extends TrackedEntity { @PrimaryGeneratedColumn() @@ -32,4 +31,17 @@ export class ClientStatisticsEntity extends TrackedEntity { acceptedCount: number; + + @ManyToOne( + () => ClientEntity, + clientEntity => clientEntity.statistics, + { nullable: false, } + ) + client: ClientEntity; + + @Index() + @Column({ name: 'clientId' }) + public clientId: string; + + } \ No newline at end of file diff --git a/src/ORM/client-statistics/client-statistics.service.ts b/src/ORM/client-statistics/client-statistics.service.ts index efc8b87..18d67c0 100644 --- a/src/ORM/client-statistics/client-statistics.service.ts +++ b/src/ORM/client-statistics/client-statistics.service.ts @@ -19,25 +19,15 @@ export class ClientStatisticsService { } public async save(clientStatistic: Partial) { - // Attempt to update the existing record - const updateResult = await this.clientStatisticsRepository - .createQueryBuilder() - .update(ClientStatisticsEntity) - .set({ - shares: () => `"shares" + :sharesIncrement`, - acceptedCount: () => `"acceptedCount" + 1` + // // Attempt to update the existing record + const result = await this.clientStatisticsRepository.update({ clientId: clientStatistic.clientId, time: clientStatistic.time }, + { + shares: clientStatistic.shares, + acceptedCount: clientStatistic.acceptedCount }) - .where('address = :address AND clientName = :clientName AND sessionId = :sessionId AND time = :time', { - address: clientStatistic.address, - clientName: clientStatistic.clientName, - sessionId: clientStatistic.sessionId, - time: clientStatistic.time, - sharesIncrement: clientStatistic.shares - }) - .execute(); // Check if the update affected any rows - if (updateResult.affected === 0) { + if (result.affected === 0) { // If no rows were updated, insert a new record await this.clientStatisticsRepository.insert(clientStatistic); } @@ -139,7 +129,7 @@ export class ClientStatisticsService { } - public async getHashRateForSession(address: string, clientName: string, sessionId: string) { + public async getHashRateForSession(clientId: string) { const query = ` SELECT @@ -149,12 +139,12 @@ export class ClientStatisticsService { FROM client_statistics_entity AS entry WHERE - entry.address = $1 AND entry."clientName" = $2 AND entry."sessionId" = $3 + entry."clientId" = $1 ORDER BY time DESC LIMIT 2; `; - const result = await this.clientStatisticsRepository.query(query, [address, clientName, sessionId]); + const result = await this.clientStatisticsRepository.query(query, [clientId]); if (result.length < 1) { return 0; @@ -179,7 +169,7 @@ export class ClientStatisticsService { } - public async getChartDataForSession(address: string, clientName: string, sessionId: string) { + public async getChartDataForSession(clientId: string) { var yesterday = new Date(new Date().getTime() - (24 * 60 * 60 * 1000)); const query = ` @@ -189,7 +179,7 @@ export class ClientStatisticsService { FROM client_statistics_entity AS entry WHERE - entry.address = $1 AND entry."clientName" = $2 AND entry."sessionId" = $3 AND entry.time > ${yesterday.getTime()} + entry."clientId" = $1 AND entry.time > ${yesterday.getTime()} GROUP BY time ORDER BY @@ -197,7 +187,7 @@ export class ClientStatisticsService { LIMIT 144; `; - const result = await this.clientStatisticsRepository.query(query, [address, clientName, sessionId]); + const result = await this.clientStatisticsRepository.query(query, [clientId]); return result.map(res => { res.label = new Date(parseInt(res.label)).toISOString(); diff --git a/src/ORM/client/client.entity.ts b/src/ORM/client/client.entity.ts index 921b674..8566890 100644 --- a/src/ORM/client/client.entity.ts +++ b/src/ORM/client/client.entity.ts @@ -1,21 +1,24 @@ -import { Column, Entity, Index, PrimaryColumn } from 'typeorm'; +import { Column, Entity, Index, OneToMany, PrimaryGeneratedColumn } from 'typeorm'; +import { ClientStatisticsEntity } from '../client-statistics/client-statistics.entity'; import { TrackedEntity } from '../utils/TrackedEntity.entity'; @Entity() -@Index(['address', 'clientName', 'sessionId'], { unique: true }) +@Index("IDX_unique_nonce", { synchronize: false }) export class ClientEntity extends TrackedEntity { - - @PrimaryColumn({ length: 62, type: 'varchar' }) - address: string; - - @PrimaryColumn({ length: 64, type: 'varchar' }) - clientName: string; + @PrimaryGeneratedColumn('uuid') + id: string; @Index() - @PrimaryColumn({ length: 8, type: 'varchar' }) + @Column({ length: 62, type: 'varchar' }) + address: string; + + @Column({ length: 64, type: 'varchar' }) + clientName: string; + + @Column({ length: 8, type: 'varchar', }) sessionId: string; @@ -23,7 +26,6 @@ export class ClientEntity extends TrackedEntity { userAgent: string; - @Column({ type: 'timestamp' }) startTime: Date; @@ -33,5 +35,11 @@ export class ClientEntity extends TrackedEntity { @Column({ default: 0, type: 'decimal' }) hashRate: number; + @OneToMany( + () => ClientStatisticsEntity, + clientStatisticsEntity => clientStatisticsEntity.client + ) + statistics: ClientStatisticsEntity[] + } diff --git a/src/ORM/client/client.service.ts b/src/ORM/client/client.service.ts index 14b6539..05d597c 100644 --- a/src/ORM/client/client.service.ts +++ b/src/ORM/client/client.service.ts @@ -28,8 +28,8 @@ export class ClientService { .execute(); } - public async heartbeat(address: string, clientName: string, sessionId: string, hashRate: number, updatedAt: Date) { - return await this.clientRepository.update({ address, clientName, sessionId }, { hashRate, deletedAt: null, updatedAt }); + public async heartbeat(id, hashRate: number, updatedAt: Date) { + return await this.clientRepository.update({ id }, { hashRate, deletedAt: null, updatedAt }); } // public async save(client: Partial) { @@ -48,8 +48,8 @@ export class ClientService { return client as ClientEntity; } - public async delete(sessionId: string) { - return await this.clientRepository.softDelete({ sessionId }); + public async delete(id: string) { + return await this.clientRepository.softDelete({ id }); } public async deleteOldClients() { @@ -65,8 +65,8 @@ export class ClientService { } - public async updateBestDifficulty(sessionId: string, bestDifficulty: number) { - return await this.clientRepository.update({ sessionId }, { bestDifficulty }); + public async updateBestDifficulty(id: string, bestDifficulty: number) { + return await this.clientRepository.update({ id }, { bestDifficulty }); } public async connectedClientCount(): Promise { return await this.clientRepository.count(); @@ -111,7 +111,7 @@ export class ClientService { .addSelect('MAX(client.bestDifficulty)', 'bestDifficulty') .addSelect('SUM(client.hashRate)', 'totalHashRate') .groupBy('client.userAgent') - .orderBy('count', 'DESC') + .orderBy('"totalHashRate"', 'DESC') .getRawMany(); return result; } diff --git a/src/app.controller.ts b/src/app.controller.ts index d1115a6..de24379 100644 --- a/src/app.controller.ts +++ b/src/app.controller.ts @@ -3,6 +3,7 @@ import { Controller, Get, Inject } from '@nestjs/common'; import { Cache } from 'cache-manager'; import { firstValueFrom } from 'rxjs'; +import { AddressSettingsService } from './ORM/address-settings/address-settings.service'; import { BlocksService } from './ORM/blocks/blocks.service'; import { ClientStatisticsService } from './ORM/client-statistics/client-statistics.service'; import { ClientService } from './ORM/client/client.service'; @@ -20,7 +21,8 @@ export class AppController { private readonly clientStatisticsService: ClientStatisticsService, private readonly blocksService: BlocksService, private readonly bitcoinRpcService: BitcoinRpcService, - private readonly homeGraphService: HomeGraphService + private readonly homeGraphService: HomeGraphService, + private readonly addressSettingsService: AddressSettingsService ) { } @Get('info') @@ -37,10 +39,12 @@ export class AppController { const blockData = await this.blocksService.getFoundBlocks(); const userAgents = await this.clientService.getUserAgents(); + const highScores = await this.addressSettingsService.getHighScores(); const data = { blockData, userAgents, + highScores, uptime: this.uptime }; diff --git a/src/app.module.ts b/src/app.module.ts index 443ac68..434cae8 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -9,6 +9,7 @@ import { AppController } from './app.controller'; import { AddressController } from './controllers/address/address.controller'; import { ClientController } from './controllers/client/client.controller'; import { BitcoinAddressValidator } from './models/validators/bitcoin-address.validator'; +import { UniqueNonceIndex } from './ORM/_migrations/UniqueNonceIndex'; import { AddressSettingsEntity } from './ORM/address-settings/address-settings.entity'; import { AddressSettingsModule } from './ORM/address-settings/address-settings.module'; import { BlocksEntity } from './ORM/blocks/blocks.entity'; @@ -68,7 +69,10 @@ const ORMModules = [ ], synchronize: configService.get('PRODUCTION') != 'true', logging: false, - poolSize: 30 + poolSize: 30, + migrations: [ + UniqueNonceIndex + ] } }, imports: [ConfigModule], diff --git a/src/controllers/client/client.controller.ts b/src/controllers/client/client.controller.ts index 80bbb96..b90574b 100644 --- a/src/controllers/client/client.controller.ts +++ b/src/controllers/client/client.controller.ts @@ -75,7 +75,7 @@ export class ClientController { if (worker == null) { return new NotFoundException(); } - const chartData = await this.clientStatisticsService.getChartDataForSession(worker.address, worker.clientName, worker.sessionId); + const chartData = await this.clientStatisticsService.getChartDataForSession(worker.id); return { sessionId: worker.sessionId, diff --git a/src/models/StratumV1Client.ts b/src/models/StratumV1Client.ts index 68a7ee1..ebceb75 100644 --- a/src/models/StratumV1Client.ts +++ b/src/models/StratumV1Client.ts @@ -43,7 +43,7 @@ export class StratumV1Client { private usedSuggestedDifficulty = false; private sessionDifficulty: number = 16384; - private entity: ClientEntity; + private clientEntity: ClientEntity; private creatingEntity: Promise; public extraNonceAndSessionId: string; @@ -83,7 +83,7 @@ export class StratumV1Client { public async destroy() { - await this.clientService.delete(this.extraNonceAndSessionId); + await this.clientService.delete(this.clientEntity.id); if (this.stratumSubscription != null) { this.stratumSubscription.unsubscribe(); @@ -373,11 +373,11 @@ export class StratumV1Client { }, 60 * 1000) ); - this.backgroundWork.push( - setInterval(async () => { - await this.statistics.saveShares(this.entity); - }, 60 * 1000) - ); + // this.backgroundWork.push( + // setInterval(async () => { + // await this.statistics.saveShares(this.clientEntity); + // }, 60 * 1000) + // ); } private async sendNewMiningJob(jobTemplate: IJobTemplate) { @@ -386,8 +386,8 @@ export class StratumV1Client { const devFeeAddress = this.configService.get('DEV_FEE_ADDRESS'); //50Th/s this.noFee = false; - if (this.entity) { - this.hashRate = await this.clientStatisticsService.getHashRateForSession(this.clientAuthorization.address, this.clientAuthorization.worker, this.extraNonceAndSessionId); + if (this.clientEntity) { + this.hashRate = await this.clientStatisticsService.getHashRateForSession(this.clientEntity.id); this.noFee = this.hashRate != 0 && this.hashRate < 50000000000000; } if (this.noFee || devFeeAddress == null || devFeeAddress.length < 1) { @@ -438,11 +438,11 @@ export class StratumV1Client { private async handleMiningSubmission(submission: MiningSubmitMessage) { - if (this.entity == null) { + if (this.clientEntity == null) { if (this.creatingEntity == null) { this.creatingEntity = new Promise(async (resolve, reject) => { try { - this.entity = await this.clientService.insert({ + this.clientEntity = await this.clientService.insert({ sessionId: this.extraNonceAndSessionId, address: this.clientAuthorization.address, clientName: this.clientAuthorization.worker, @@ -515,12 +515,12 @@ export class StratumV1Client { } } try { - await this.statistics.addShares(this.sessionDifficulty); + await this.statistics.addShares(this.clientEntity, this.sessionDifficulty); const now = new Date(); // only update every minute - if (this.entity.updatedAt == null || now.getTime() - this.entity.updatedAt.getTime() > 1000 * 60) { - await this.clientService.heartbeat(this.entity.address, this.entity.clientName, this.entity.sessionId, this.hashRate, now); - this.entity.updatedAt = now; + if (this.clientEntity.updatedAt == null || now.getTime() - this.clientEntity.updatedAt.getTime() > 1000 * 60) { + await this.clientService.heartbeat(this.clientEntity.id, this.hashRate, now); + this.clientEntity.updatedAt = now; } } catch (e) { @@ -537,11 +537,11 @@ export class StratumV1Client { return false; } - if (submissionDifficulty > this.entity.bestDifficulty) { - await this.clientService.updateBestDifficulty(this.extraNonceAndSessionId, submissionDifficulty); - this.entity.bestDifficulty = submissionDifficulty; + if (submissionDifficulty > this.clientEntity.bestDifficulty) { + await this.clientService.updateBestDifficulty(this.clientEntity.id, submissionDifficulty); + this.clientEntity.bestDifficulty = submissionDifficulty; if (submissionDifficulty > (await this.addressSettingsService.getSettings(this.clientAuthorization.address, true)).bestDifficulty) { - await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty); + await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty, this.clientEntity.userAgent); } } diff --git a/src/models/StratumV1ClientStatistics.ts b/src/models/StratumV1ClientStatistics.ts index 134a671..f6b66a9 100644 --- a/src/models/StratumV1ClientStatistics.ts +++ b/src/models/StratumV1ClientStatistics.ts @@ -6,10 +6,14 @@ const TARGET_SUBMISSION_PER_SECOND = 10; const MIN_DIFF = 0.00001; export class StratumV1ClientStatistics { - private shareBacklog: number = 0; + private shares: number = 0; + private acceptedCount: number = 0; private submissionCacheStart: Date; - private submissionCache = []; + private submissionCache: { time: Date, difficulty: number }[] = []; + + private currentTimeSlot: number = null; + private lastSave: number = null; constructor( private readonly clientStatisticsService: ClientStatisticsService @@ -20,40 +24,60 @@ export class StratumV1ClientStatistics { public async saveShares(client: ClientEntity) { - if (client == null || client.address == null || client.clientName == null || client.sessionId == null) { - return; - } - - // 10 min - var coeff = 1000 * 60 * 10; - var date = new Date(); - var rounded = new Date(Math.floor(date.getTime() / coeff) * coeff); + // if (client == null || client.address == null || client.clientName == null || client.sessionId == null) { + // return; + // } await this.clientStatisticsService.save({ - time: rounded.getTime(), - shares: this.shareBacklog, + time: this.currentTimeSlot, + clientId: client.id, + shares: this.shares, + acceptedCount: this.acceptedCount, address: client.address, clientName: client.clientName, sessionId: client.sessionId }); - this.shareBacklog = 0; + + } - // We don't want to save them here because it can be DB intensive, stead do it every once in + // We don't want to save them here because it can be DB intensive, instead do it every once in // awhile with saveShares() - public async addShares(targetDifficulty: number) { + public async addShares(client: ClientEntity, targetDifficulty: number) { + + // 10 min + var coeff = 1000 * 60 * 10; + var date = new Date(); + var timeSlot = new Date(Math.floor(date.getTime() / coeff) * coeff).getTime(); if (this.submissionCache.length > CACHE_SIZE) { this.submissionCache.shift(); } - this.submissionCache.push({ - time: new Date(), + time: date, difficulty: targetDifficulty, }); - this.shareBacklog += targetDifficulty; + + + if (this.currentTimeSlot == null) { + this.currentTimeSlot = timeSlot; + } + + if (this.currentTimeSlot != timeSlot) { + await this.saveShares(client); + this.shares = 0; + this.acceptedCount = 0; + this.currentTimeSlot = timeSlot; + } else if ((date.getTime() - 60 * 1000) > this.lastSave) { + await this.saveShares(client); + this.lastSave = new Date().getTime(); + } + + + this.shares += targetDifficulty; + this.acceptedCount++; } diff --git a/src/services/bitcoin-rpc.service.ts b/src/services/bitcoin-rpc.service.ts index dff9883..147082e 100644 --- a/src/services/bitcoin-rpc.service.ts +++ b/src/services/bitcoin-rpc.service.ts @@ -1,15 +1,15 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { RPCClient } from 'rpc-bitcoin'; import { BehaviorSubject, filter, shareReplay } from 'rxjs'; import { RpcBlockService } from 'src/ORM/rpc-block/rpc-block.service'; -import * as zmq from 'zeromq/v5-compat'; +import * as zmq from 'zeromq'; import { IBlockTemplate } from '../models/bitcoin-rpc/IBlockTemplate'; import { IMiningInfo } from '../models/bitcoin-rpc/IMiningInfo'; @Injectable() -export class BitcoinRpcService { +export class BitcoinRpcService implements OnModuleInit { private blockHeight = 0; private client: RPCClient; @@ -21,6 +21,10 @@ export class BitcoinRpcService { private rpcBlockService: RpcBlockService ) { + + } + + async onModuleInit() { const url = this.configService.get('BITCOIN_RPC_URL'); const user = this.configService.get('BITCOIN_RPC_USER'); const pass = this.configService.get('BITCOIN_RPC_PASSWORD'); @@ -36,19 +40,36 @@ export class BitcoinRpcService { }); if (this.configService.get('BITCOIN_ZMQ_HOST')) { - const sock = zmq.socket("sub"); - sock.connect(this.configService.get('BITCOIN_ZMQ_HOST')); - sock.subscribe("rawblock"); - sock.on("message", async (topic: Buffer, message: Buffer) => { - console.log("new block zmq"); - await this.pollMiningInfo(); + console.log('Using ZMQ'); + const sock = new zmq.Subscriber; + + + sock.connectTimeout = 1000; + sock.events.on('connect', () => { + console.log('ZMQ Connected'); }); - this.pollMiningInfo().then(() => { }); + sock.events.on('connect:retry', () => { + console.log('ZMQ Unable to connect, Retrying'); + }); + + sock.connect(this.configService.get('BITCOIN_ZMQ_HOST')); + sock.subscribe('rawblock'); + // Don't await this, otherwise it will block the rest of the program + this.listenForNewBlocks(sock); + await this.pollMiningInfo(); + } else { setInterval(this.pollMiningInfo.bind(this), 500); } } + private async listenForNewBlocks(sock: zmq.Subscriber) { + for await (const [topic, msg] of sock) { + console.log("New Block"); + await this.pollMiningInfo(); + } + } + public async pollMiningInfo() { const miningInfo = await this.getMiningInfo(); if (miningInfo != null && miningInfo.blocks > this.blockHeight) {