optimized share calculation and updating

This commit is contained in:
Ben Wilson 2024-02-17 18:22:09 -05:00
parent d486c778a3
commit e35bacc648
9 changed files with 161 additions and 84 deletions

View File

@ -17,5 +17,8 @@ export class AddressSettingsEntity extends TrackedEntity {
@Column({ nullable: true })
miscCoinbaseScriptData: string;
@Column({ nullable: true })
bestDifficultyUserAgent: string;
}

View File

@ -22,8 +22,16 @@ export class AddressSettingsService {
return settings;
}
public async updateBestDifficulty(address: string, bestDifficulty: number) {
return await this.addressSettingsRepository.update({ address }, { bestDifficulty });
public async updateBestDifficulty(address: string, bestDifficulty: number, bestDifficultyUserAgent: string) {
return await this.addressSettingsRepository.update({ address }, { bestDifficulty, bestDifficultyUserAgent });
}
public async getHighScores() {
return await this.addressSettingsRepository.createQueryBuilder()
.select('"updatedAt", "bestDifficulty", "bestDifficultyUserAgent"')
.orderBy('"bestDifficulty"', 'DESC')
.limit(10)
.execute();
}
public async createNew(address: string) {

View File

@ -17,29 +17,23 @@ export class ClientStatisticsService {
}
public async save(clientStatistic: Partial<ClientStatisticsEntity>) {
// Attempt to update the existing record
const updateResult = await this.clientStatisticsRepository
.createQueryBuilder()
.update(ClientStatisticsEntity)
.set({
shares: () => `"shares" + :sharesIncrement`,
acceptedCount: () => `"acceptedCount" + 1`
})
.where('address = :address AND clientName = :clientName AND sessionId = :sessionId AND time = :time', {
address: clientStatistic.address,
clientName: clientStatistic.clientName,
sessionId: clientStatistic.sessionId,
time: clientStatistic.time,
sharesIncrement: clientStatistic.shares
})
.execute();
public async update(clientStatistic: Partial<ClientStatisticsEntity>) {
// Check if the update affected any rows
if (updateResult.affected === 0) {
// If no rows were updated, insert a new record
await this.clientStatisticsRepository.insert(clientStatistic);
}
await this.clientStatisticsRepository.update({
address: clientStatistic.address,
clientName: clientStatistic.clientName,
sessionId: clientStatistic.sessionId,
time: clientStatistic.time
},
{
shares: clientStatistic.shares,
acceptedCount: clientStatistic.acceptedCount
});
}
public async insert(clientStatistic: Partial<ClientStatisticsEntity>) {
// If no rows were updated, insert a new record
await this.clientStatisticsRepository.insert(clientStatistic);
}
public async deleteOldStatistics() {

View File

@ -25,4 +25,19 @@ export class RpcBlockService {
public saveBlock(blockHeight: number, data: string) {
return this.rpcBlockRepository.update(blockHeight, { data })
}
public async deleteOldBlocks() {
const result = await this.rpcBlockRepository.createQueryBuilder('entity')
.select('MAX(entity.blockHeight)', 'maxNumber')
.getRawOne();
const newestBlock = result ? result.maxNumber : null;
await this.rpcBlockRepository.createQueryBuilder()
.delete()
.where('"blockHeight" < :newestBlock', { newestBlock })
.execute();
return;
}
}

View File

@ -3,6 +3,7 @@ import { Controller, Get, Inject } from '@nestjs/common';
import { Cache } from 'cache-manager';
import { firstValueFrom } from 'rxjs';
import { AddressSettingsService } from './ORM/address-settings/address-settings.service';
import { BlocksService } from './ORM/blocks/blocks.service';
import { ClientStatisticsService } from './ORM/client-statistics/client-statistics.service';
import { ClientService } from './ORM/client/client.service';
@ -18,7 +19,8 @@ export class AppController {
private readonly clientService: ClientService,
private readonly clientStatisticsService: ClientStatisticsService,
private readonly blocksService: BlocksService,
private readonly bitcoinRpcService: BitcoinRpcService
private readonly bitcoinRpcService: BitcoinRpcService,
private readonly addressSettingsService: AddressSettingsService,
) { }
@Get('info')
@ -35,10 +37,12 @@ export class AppController {
const blockData = await this.blocksService.getFoundBlocks();
const userAgents = await this.clientService.getUserAgents();
const highScores = await this.addressSettingsService.getHighScores();
const data = {
blockData,
userAgents,
highScores,
uptime: this.uptime
};
@ -65,7 +69,7 @@ export class AppController {
const totalMiners = userAgents.reduce((acc, userAgent) => acc + parseInt(userAgent.count), 0);
const blockHeight = (await firstValueFrom(this.bitcoinRpcService.newBlock$)).blocks;
const blocksFound = await this.blocksService.getFoundBlocks();
const data = {
totalHashRate,
blockHeight,

View File

@ -83,7 +83,9 @@ export class StratumV1Client {
public async destroy() {
await this.clientService.delete(this.extraNonceAndSessionId);
if (this.extraNonceAndSessionId) {
await this.clientService.delete(this.extraNonceAndSessionId);
}
if (this.stratumSubscription != null) {
this.stratumSubscription.unsubscribe();
@ -373,11 +375,6 @@ export class StratumV1Client {
}, 60 * 1000)
);
this.backgroundWork.push(
setInterval(async () => {
await this.statistics.saveShares(this.entity);
}, 60 * 1000)
);
}
private async sendNewMiningJob(jobTemplate: IJobTemplate) {
@ -515,7 +512,7 @@ export class StratumV1Client {
}
}
try {
await this.statistics.addShares(this.sessionDifficulty);
await this.statistics.addShares(this.entity, this.sessionDifficulty);
const now = new Date();
// only update every minute
if (this.entity.updatedAt == null || now.getTime() - this.entity.updatedAt.getTime() > 1000 * 60) {
@ -541,7 +538,7 @@ export class StratumV1Client {
await this.clientService.updateBestDifficulty(this.extraNonceAndSessionId, submissionDifficulty);
this.entity.bestDifficulty = submissionDifficulty;
if (submissionDifficulty > (await this.addressSettingsService.getSettings(this.clientAuthorization.address, true)).bestDifficulty) {
await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty);
await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty, this.entity.userAgent);
}
}

View File

@ -6,10 +6,14 @@ const TARGET_SUBMISSION_PER_SECOND = 10;
const MIN_DIFF = 0.00001;
export class StratumV1ClientStatistics {
private shareBacklog: number = 0;
private shares: number = 0;
private acceptedCount: number = 0;
private submissionCacheStart: Date;
private submissionCache = [];
private submissionCache: { time: Date, difficulty: number }[] = [];
private currentTimeSlot: number = null;
private lastSave: number = null;
constructor(
private readonly clientStatisticsService: ClientStatisticsService
@ -18,42 +22,61 @@ export class StratumV1ClientStatistics {
}
public async saveShares(client: ClientEntity) {
private async saveShares(client: ClientEntity) {
if (client == null || client.address == null || client.clientName == null || client.sessionId == null) {
return;
}
// 10 min
var coeff = 1000 * 60 * 10;
var date = new Date();
var rounded = new Date(Math.floor(date.getTime() / coeff) * coeff);
await this.clientStatisticsService.save({
time: rounded.getTime(),
shares: this.shareBacklog,
address: client.address,
clientName: client.clientName,
sessionId: client.sessionId
});
this.shareBacklog = 0;
}
// We don't want to save them here because it can be DB intensive, stead do it every once in
// awhile with saveShares()
public async addShares(targetDifficulty: number) {
public async addShares(client: ClientEntity, targetDifficulty: number) {
// 10 min
var coeff = 1000 * 60 * 10;
var date = new Date();
var timeSlot = new Date(Math.floor(date.getTime() / coeff) * coeff).getTime();
if (this.submissionCache.length > CACHE_SIZE) {
this.submissionCache.shift();
}
this.submissionCache.push({
time: new Date(),
time: date,
difficulty: targetDifficulty,
});
this.shareBacklog += targetDifficulty;
if (this.currentTimeSlot == null) {
this.currentTimeSlot = timeSlot;
}
if (this.currentTimeSlot != timeSlot) {
await this.clientStatisticsService.insert({
time: this.currentTimeSlot,
shares: this.shares,
acceptedCount: this.acceptedCount,
address: client.address,
clientName: client.clientName,
sessionId: client.sessionId
});
this.shares = 0;
this.acceptedCount = 0;
this.currentTimeSlot = timeSlot;
} else if ((date.getTime() - 60 * 1000) > this.lastSave) {
await this.clientStatisticsService.update({
time: this.currentTimeSlot,
shares: this.shares,
acceptedCount: this.acceptedCount,
address: client.address,
clientName: client.clientName,
sessionId: client.sessionId
});
this.lastSave = new Date().getTime();
}
this.shares += targetDifficulty;
this.acceptedCount++;
}

View File

@ -1,9 +1,9 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { DataSource } from 'typeorm';
import { ClientStatisticsService } from '../ORM/client-statistics/client-statistics.service';
import { ClientService } from '../ORM/client/client.service';
import { RpcBlockService } from '../ORM/rpc-block/rpc-block.service';
@Injectable()
export class AppService implements OnModuleInit {
@ -11,7 +11,8 @@ export class AppService implements OnModuleInit {
constructor(
private readonly clientStatisticsService: ClientStatisticsService,
private readonly clientService: ClientService,
private readonly dataSource: DataSource
private readonly dataSource: DataSource,
private readonly rpcBlockService: RpcBlockService,
) {
}
@ -28,25 +29,38 @@ export class AppService implements OnModuleInit {
await this.dataSource.query(`PRAGMA synchronous = off;`);
// //6Gb
// await this.dataSource.query(`PRAGMA mmap_size = 6000000000;`);
if (process.env.ENABLE_SOLO == 'true' && (process.env.NODE_APP_INSTANCE == null || process.env.NODE_APP_INSTANCE == '0')) {
setInterval(async () => {
await this.deleteOldStatistics();
}, 1000 * 60 * 60);
setInterval(async () => {
console.log('Killing dead clients');
await this.clientService.killDeadClients();
}, 1000 * 60 * 5);
setInterval(async () => {
console.log('Deleting Old Blocks');
await this.rpcBlockService.deleteOldBlocks();
}, 1000 * 60 * 60 * 24);
}
}
@Interval(1000 * 60 * 60)
private async deleteOldStatistics() {
console.log('Deleting statistics');
if (process.env.ENABLE_SOLO == 'true' && (process.env.NODE_APP_INSTANCE == null || process.env.NODE_APP_INSTANCE == '0')) {
const deletedStatistics = await this.clientStatisticsService.deleteOldStatistics();
console.log(`Deleted ${deletedStatistics.affected} old statistics`);
const deletedClients = await this.clientService.deleteOldClients();
console.log(`Deleted ${deletedClients.affected} old clients`);
}
const deletedStatistics = await this.clientStatisticsService.deleteOldStatistics();
console.log(`Deleted ${deletedStatistics.affected} old statistics`);
const deletedClients = await this.clientService.deleteOldClients();
console.log(`Deleted ${deletedClients.affected} old clients`);
}
@Interval(1000 * 60 * 5)
private async killDeadClients() {
console.log('Killing dead clients');
if (process.env.ENABLE_SOLO == 'true' && (process.env.NODE_APP_INSTANCE == null || process.env.NODE_APP_INSTANCE == '0')) {
await this.clientService.killDeadClients();
}
}
}

View File

@ -1,15 +1,15 @@
import { Injectable } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { RPCClient } from 'rpc-bitcoin';
import { BehaviorSubject, filter, shareReplay } from 'rxjs';
import { RpcBlockService } from 'src/ORM/rpc-block/rpc-block.service';
import * as zmq from 'zeromq/v5-compat';
import * as zmq from 'zeromq';
import { IBlockTemplate } from '../models/bitcoin-rpc/IBlockTemplate';
import { IMiningInfo } from '../models/bitcoin-rpc/IMiningInfo';
@Injectable()
export class BitcoinRpcService {
export class BitcoinRpcService implements OnModuleInit {
private blockHeight = 0;
private client: RPCClient;
@ -20,7 +20,9 @@ export class BitcoinRpcService {
private readonly configService: ConfigService,
private rpcBlockService: RpcBlockService
) {
}
async onModuleInit() {
const url = this.configService.get('BITCOIN_RPC_URL');
const user = this.configService.get('BITCOIN_RPC_USER');
const pass = this.configService.get('BITCOIN_RPC_PASSWORD');
@ -36,19 +38,36 @@ export class BitcoinRpcService {
});
if (this.configService.get('BITCOIN_ZMQ_HOST')) {
const sock = zmq.socket("sub");
sock.connect(this.configService.get('BITCOIN_ZMQ_HOST'));
sock.subscribe("rawblock");
sock.on("message", async (topic: Buffer, message: Buffer) => {
console.log("new block zmq");
await this.pollMiningInfo();
console.log('Using ZMQ');
const sock = new zmq.Subscriber;
sock.connectTimeout = 1000;
sock.events.on('connect', () => {
console.log('ZMQ Connected');
});
this.pollMiningInfo().then(() => { });
sock.events.on('connect:retry', () => {
console.log('ZMQ Unable to connect, Retrying');
});
sock.connect(this.configService.get('BITCOIN_ZMQ_HOST'));
sock.subscribe('rawblock');
// Don't await this, otherwise it will block the rest of the program
this.listenForNewBlocks(sock);
await this.pollMiningInfo();
} else {
setInterval(this.pollMiningInfo.bind(this), 500);
}
}
private async listenForNewBlocks(sock: zmq.Subscriber) {
for await (const [topic, msg] of sock) {
console.log("New Block");
await this.pollMiningInfo();
}
}
public async pollMiningInfo() {
const miningInfo = await this.getMiningInfo();
if (miningInfo != null && miningInfo.blocks > this.blockHeight) {