db refactor

This commit is contained in:
Ben Wilson 2024-02-16 17:12:36 -05:00
parent 8e977c21a1
commit 3e2de62259
13 changed files with 190 additions and 96 deletions

View File

@ -0,0 +1,15 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class UniqueNonceIndex implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE UNIQUE INDEX "IDX_unique_nonce" ON "client_entity" ("sessionId") WHERE "deletedAt" IS NOT NULL`
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "IDX_unique_nonce"`);
}
}

View File

@ -14,6 +14,9 @@ export class AddressSettingsEntity extends TrackedEntity {
@Column({ type: 'decimal', default: 0 })
bestDifficulty: number;
@Column({ nullable: true })
bestDifficultyUserAgent: string;
@Column({ nullable: true })
miscCoinbaseScriptData: string;

View File

@ -17,13 +17,18 @@ export class AddressSettingsService {
public async getSettings(address: string, createIfNotFound: boolean) {
const settings = await this.addressSettingsRepository.findOne({ where: { address } });
if (createIfNotFound == true && settings == null) {
return await this.createNew(address);
// It's possible to have a race condition here so if we get a PK violation, fetch it
try {
return await this.createNew(address);
} catch (e) {
return await this.addressSettingsRepository.findOne({ where: { address } });
}
}
return settings;
}
public async updateBestDifficulty(address: string, bestDifficulty: number) {
return await this.addressSettingsRepository.update({ address }, { bestDifficulty });
public async updateBestDifficulty(address: string, bestDifficulty: number, bestDifficultyUserAgent: string) {
return await this.addressSettingsRepository.update({ address }, { bestDifficulty, bestDifficultyUserAgent });
}
public async createNew(address: string) {
@ -46,4 +51,12 @@ export class AddressSettingsService {
bestDifficulty: 0
});
}
public async getHighScores() {
return await this.addressSettingsRepository.createQueryBuilder()
.select('"updatedAt", "bestDifficulty", "bestDifficultyUserAgent"')
.orderBy('"bestDifficulty"', 'DESC')
.limit(10)
.execute();
}
}

View File

@ -1,12 +1,11 @@
import { Column, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
import { Column, Entity, Index, ManyToOne, PrimaryGeneratedColumn } from 'typeorm';
import { ClientEntity } from '../client/client.entity';
import { TrackedEntity } from '../utils/TrackedEntity.entity';
@Entity()
//Index for getHashRateForSession
@Index(["address", "clientName", "sessionId"])
//Index for statistics save
@Index(["address", "clientName", "sessionId", "time"])
@Index(["clientId", "time"])
export class ClientStatisticsEntity extends TrackedEntity {
@PrimaryGeneratedColumn()
@ -32,4 +31,17 @@ export class ClientStatisticsEntity extends TrackedEntity {
acceptedCount: number;
@ManyToOne(
() => ClientEntity,
clientEntity => clientEntity.statistics,
{ nullable: false, }
)
client: ClientEntity;
@Index()
@Column({ name: 'clientId' })
public clientId: string;
}

View File

@ -19,25 +19,15 @@ export class ClientStatisticsService {
}
public async save(clientStatistic: Partial<ClientStatisticsEntity>) {
// Attempt to update the existing record
const updateResult = await this.clientStatisticsRepository
.createQueryBuilder()
.update(ClientStatisticsEntity)
.set({
shares: () => `"shares" + :sharesIncrement`,
acceptedCount: () => `"acceptedCount" + 1`
// // Attempt to update the existing record
const result = await this.clientStatisticsRepository.update({ clientId: clientStatistic.clientId, time: clientStatistic.time },
{
shares: clientStatistic.shares,
acceptedCount: clientStatistic.acceptedCount
})
.where('address = :address AND clientName = :clientName AND sessionId = :sessionId AND time = :time', {
address: clientStatistic.address,
clientName: clientStatistic.clientName,
sessionId: clientStatistic.sessionId,
time: clientStatistic.time,
sharesIncrement: clientStatistic.shares
})
.execute();
// Check if the update affected any rows
if (updateResult.affected === 0) {
if (result.affected === 0) {
// If no rows were updated, insert a new record
await this.clientStatisticsRepository.insert(clientStatistic);
}
@ -139,7 +129,7 @@ export class ClientStatisticsService {
}
public async getHashRateForSession(address: string, clientName: string, sessionId: string) {
public async getHashRateForSession(clientId: string) {
const query = `
SELECT
@ -149,12 +139,12 @@ export class ClientStatisticsService {
FROM
client_statistics_entity AS entry
WHERE
entry.address = $1 AND entry."clientName" = $2 AND entry."sessionId" = $3
entry."clientId" = $1
ORDER BY time DESC
LIMIT 2;
`;
const result = await this.clientStatisticsRepository.query(query, [address, clientName, sessionId]);
const result = await this.clientStatisticsRepository.query(query, [clientId]);
if (result.length < 1) {
return 0;
@ -179,7 +169,7 @@ export class ClientStatisticsService {
}
public async getChartDataForSession(address: string, clientName: string, sessionId: string) {
public async getChartDataForSession(clientId: string) {
var yesterday = new Date(new Date().getTime() - (24 * 60 * 60 * 1000));
const query = `
@ -189,7 +179,7 @@ export class ClientStatisticsService {
FROM
client_statistics_entity AS entry
WHERE
entry.address = $1 AND entry."clientName" = $2 AND entry."sessionId" = $3 AND entry.time > ${yesterday.getTime()}
entry."clientId" = $1 AND entry.time > ${yesterday.getTime()}
GROUP BY
time
ORDER BY
@ -197,7 +187,7 @@ export class ClientStatisticsService {
LIMIT 144;
`;
const result = await this.clientStatisticsRepository.query(query, [address, clientName, sessionId]);
const result = await this.clientStatisticsRepository.query(query, [clientId]);
return result.map(res => {
res.label = new Date(parseInt(res.label)).toISOString();

View File

@ -1,21 +1,24 @@
import { Column, Entity, Index, PrimaryColumn } from 'typeorm';
import { Column, Entity, Index, OneToMany, PrimaryGeneratedColumn } from 'typeorm';
import { ClientStatisticsEntity } from '../client-statistics/client-statistics.entity';
import { TrackedEntity } from '../utils/TrackedEntity.entity';
@Entity()
@Index(['address', 'clientName', 'sessionId'], { unique: true })
@Index("IDX_unique_nonce", { synchronize: false })
export class ClientEntity extends TrackedEntity {
@PrimaryColumn({ length: 62, type: 'varchar' })
address: string;
@PrimaryColumn({ length: 64, type: 'varchar' })
clientName: string;
@PrimaryGeneratedColumn('uuid')
id: string;
@Index()
@PrimaryColumn({ length: 8, type: 'varchar' })
@Column({ length: 62, type: 'varchar' })
address: string;
@Column({ length: 64, type: 'varchar' })
clientName: string;
@Column({ length: 8, type: 'varchar', })
sessionId: string;
@ -23,7 +26,6 @@ export class ClientEntity extends TrackedEntity {
userAgent: string;
@Column({ type: 'timestamp' })
startTime: Date;
@ -33,5 +35,11 @@ export class ClientEntity extends TrackedEntity {
@Column({ default: 0, type: 'decimal' })
hashRate: number;
@OneToMany(
() => ClientStatisticsEntity,
clientStatisticsEntity => clientStatisticsEntity.client
)
statistics: ClientStatisticsEntity[]
}

View File

@ -28,8 +28,8 @@ export class ClientService {
.execute();
}
public async heartbeat(address: string, clientName: string, sessionId: string, hashRate: number, updatedAt: Date) {
return await this.clientRepository.update({ address, clientName, sessionId }, { hashRate, deletedAt: null, updatedAt });
public async heartbeat(id, hashRate: number, updatedAt: Date) {
return await this.clientRepository.update({ id }, { hashRate, deletedAt: null, updatedAt });
}
// public async save(client: Partial<ClientEntity>) {
@ -48,8 +48,8 @@ export class ClientService {
return client as ClientEntity;
}
public async delete(sessionId: string) {
return await this.clientRepository.softDelete({ sessionId });
public async delete(id: string) {
return await this.clientRepository.softDelete({ id });
}
public async deleteOldClients() {
@ -65,8 +65,8 @@ export class ClientService {
}
public async updateBestDifficulty(sessionId: string, bestDifficulty: number) {
return await this.clientRepository.update({ sessionId }, { bestDifficulty });
public async updateBestDifficulty(id: string, bestDifficulty: number) {
return await this.clientRepository.update({ id }, { bestDifficulty });
}
public async connectedClientCount(): Promise<number> {
return await this.clientRepository.count();
@ -111,7 +111,7 @@ export class ClientService {
.addSelect('MAX(client.bestDifficulty)', 'bestDifficulty')
.addSelect('SUM(client.hashRate)', 'totalHashRate')
.groupBy('client.userAgent')
.orderBy('count', 'DESC')
.orderBy('"totalHashRate"', 'DESC')
.getRawMany();
return result;
}

View File

@ -3,6 +3,7 @@ import { Controller, Get, Inject } from '@nestjs/common';
import { Cache } from 'cache-manager';
import { firstValueFrom } from 'rxjs';
import { AddressSettingsService } from './ORM/address-settings/address-settings.service';
import { BlocksService } from './ORM/blocks/blocks.service';
import { ClientStatisticsService } from './ORM/client-statistics/client-statistics.service';
import { ClientService } from './ORM/client/client.service';
@ -20,7 +21,8 @@ export class AppController {
private readonly clientStatisticsService: ClientStatisticsService,
private readonly blocksService: BlocksService,
private readonly bitcoinRpcService: BitcoinRpcService,
private readonly homeGraphService: HomeGraphService
private readonly homeGraphService: HomeGraphService,
private readonly addressSettingsService: AddressSettingsService
) { }
@Get('info')
@ -37,10 +39,12 @@ export class AppController {
const blockData = await this.blocksService.getFoundBlocks();
const userAgents = await this.clientService.getUserAgents();
const highScores = await this.addressSettingsService.getHighScores();
const data = {
blockData,
userAgents,
highScores,
uptime: this.uptime
};

View File

@ -9,6 +9,7 @@ import { AppController } from './app.controller';
import { AddressController } from './controllers/address/address.controller';
import { ClientController } from './controllers/client/client.controller';
import { BitcoinAddressValidator } from './models/validators/bitcoin-address.validator';
import { UniqueNonceIndex } from './ORM/_migrations/UniqueNonceIndex';
import { AddressSettingsEntity } from './ORM/address-settings/address-settings.entity';
import { AddressSettingsModule } from './ORM/address-settings/address-settings.module';
import { BlocksEntity } from './ORM/blocks/blocks.entity';
@ -68,7 +69,10 @@ const ORMModules = [
],
synchronize: configService.get('PRODUCTION') != 'true',
logging: false,
poolSize: 30
poolSize: 30,
migrations: [
UniqueNonceIndex
]
}
},
imports: [ConfigModule],

View File

@ -75,7 +75,7 @@ export class ClientController {
if (worker == null) {
return new NotFoundException();
}
const chartData = await this.clientStatisticsService.getChartDataForSession(worker.address, worker.clientName, worker.sessionId);
const chartData = await this.clientStatisticsService.getChartDataForSession(worker.id);
return {
sessionId: worker.sessionId,

View File

@ -43,7 +43,7 @@ export class StratumV1Client {
private usedSuggestedDifficulty = false;
private sessionDifficulty: number = 16384;
private entity: ClientEntity;
private clientEntity: ClientEntity;
private creatingEntity: Promise<void>;
public extraNonceAndSessionId: string;
@ -83,7 +83,7 @@ export class StratumV1Client {
public async destroy() {
await this.clientService.delete(this.extraNonceAndSessionId);
await this.clientService.delete(this.clientEntity.id);
if (this.stratumSubscription != null) {
this.stratumSubscription.unsubscribe();
@ -373,11 +373,11 @@ export class StratumV1Client {
}, 60 * 1000)
);
this.backgroundWork.push(
setInterval(async () => {
await this.statistics.saveShares(this.entity);
}, 60 * 1000)
);
// this.backgroundWork.push(
// setInterval(async () => {
// await this.statistics.saveShares(this.clientEntity);
// }, 60 * 1000)
// );
}
private async sendNewMiningJob(jobTemplate: IJobTemplate) {
@ -386,8 +386,8 @@ export class StratumV1Client {
const devFeeAddress = this.configService.get('DEV_FEE_ADDRESS');
//50Th/s
this.noFee = false;
if (this.entity) {
this.hashRate = await this.clientStatisticsService.getHashRateForSession(this.clientAuthorization.address, this.clientAuthorization.worker, this.extraNonceAndSessionId);
if (this.clientEntity) {
this.hashRate = await this.clientStatisticsService.getHashRateForSession(this.clientEntity.id);
this.noFee = this.hashRate != 0 && this.hashRate < 50000000000000;
}
if (this.noFee || devFeeAddress == null || devFeeAddress.length < 1) {
@ -438,11 +438,11 @@ export class StratumV1Client {
private async handleMiningSubmission(submission: MiningSubmitMessage) {
if (this.entity == null) {
if (this.clientEntity == null) {
if (this.creatingEntity == null) {
this.creatingEntity = new Promise(async (resolve, reject) => {
try {
this.entity = await this.clientService.insert({
this.clientEntity = await this.clientService.insert({
sessionId: this.extraNonceAndSessionId,
address: this.clientAuthorization.address,
clientName: this.clientAuthorization.worker,
@ -515,12 +515,12 @@ export class StratumV1Client {
}
}
try {
await this.statistics.addShares(this.sessionDifficulty);
await this.statistics.addShares(this.clientEntity, this.sessionDifficulty);
const now = new Date();
// only update every minute
if (this.entity.updatedAt == null || now.getTime() - this.entity.updatedAt.getTime() > 1000 * 60) {
await this.clientService.heartbeat(this.entity.address, this.entity.clientName, this.entity.sessionId, this.hashRate, now);
this.entity.updatedAt = now;
if (this.clientEntity.updatedAt == null || now.getTime() - this.clientEntity.updatedAt.getTime() > 1000 * 60) {
await this.clientService.heartbeat(this.clientEntity.id, this.hashRate, now);
this.clientEntity.updatedAt = now;
}
} catch (e) {
@ -537,11 +537,11 @@ export class StratumV1Client {
return false;
}
if (submissionDifficulty > this.entity.bestDifficulty) {
await this.clientService.updateBestDifficulty(this.extraNonceAndSessionId, submissionDifficulty);
this.entity.bestDifficulty = submissionDifficulty;
if (submissionDifficulty > this.clientEntity.bestDifficulty) {
await this.clientService.updateBestDifficulty(this.clientEntity.id, submissionDifficulty);
this.clientEntity.bestDifficulty = submissionDifficulty;
if (submissionDifficulty > (await this.addressSettingsService.getSettings(this.clientAuthorization.address, true)).bestDifficulty) {
await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty);
await this.addressSettingsService.updateBestDifficulty(this.clientAuthorization.address, submissionDifficulty, this.clientEntity.userAgent);
}
}

View File

@ -6,10 +6,14 @@ const TARGET_SUBMISSION_PER_SECOND = 10;
const MIN_DIFF = 0.00001;
export class StratumV1ClientStatistics {
private shareBacklog: number = 0;
private shares: number = 0;
private acceptedCount: number = 0;
private submissionCacheStart: Date;
private submissionCache = [];
private submissionCache: { time: Date, difficulty: number }[] = [];
private currentTimeSlot: number = null;
private lastSave: number = null;
constructor(
private readonly clientStatisticsService: ClientStatisticsService
@ -20,40 +24,60 @@ export class StratumV1ClientStatistics {
public async saveShares(client: ClientEntity) {
if (client == null || client.address == null || client.clientName == null || client.sessionId == null) {
return;
}
// 10 min
var coeff = 1000 * 60 * 10;
var date = new Date();
var rounded = new Date(Math.floor(date.getTime() / coeff) * coeff);
// if (client == null || client.address == null || client.clientName == null || client.sessionId == null) {
// return;
// }
await this.clientStatisticsService.save({
time: rounded.getTime(),
shares: this.shareBacklog,
time: this.currentTimeSlot,
clientId: client.id,
shares: this.shares,
acceptedCount: this.acceptedCount,
address: client.address,
clientName: client.clientName,
sessionId: client.sessionId
});
this.shareBacklog = 0;
}
// We don't want to save them here because it can be DB intensive, stead do it every once in
// We don't want to save them here because it can be DB intensive, instead do it every once in
// awhile with saveShares()
public async addShares(targetDifficulty: number) {
public async addShares(client: ClientEntity, targetDifficulty: number) {
// 10 min
var coeff = 1000 * 60 * 10;
var date = new Date();
var timeSlot = new Date(Math.floor(date.getTime() / coeff) * coeff).getTime();
if (this.submissionCache.length > CACHE_SIZE) {
this.submissionCache.shift();
}
this.submissionCache.push({
time: new Date(),
time: date,
difficulty: targetDifficulty,
});
this.shareBacklog += targetDifficulty;
if (this.currentTimeSlot == null) {
this.currentTimeSlot = timeSlot;
}
if (this.currentTimeSlot != timeSlot) {
await this.saveShares(client);
this.shares = 0;
this.acceptedCount = 0;
this.currentTimeSlot = timeSlot;
} else if ((date.getTime() - 60 * 1000) > this.lastSave) {
await this.saveShares(client);
this.lastSave = new Date().getTime();
}
this.shares += targetDifficulty;
this.acceptedCount++;
}

View File

@ -1,15 +1,15 @@
import { Injectable } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { RPCClient } from 'rpc-bitcoin';
import { BehaviorSubject, filter, shareReplay } from 'rxjs';
import { RpcBlockService } from 'src/ORM/rpc-block/rpc-block.service';
import * as zmq from 'zeromq/v5-compat';
import * as zmq from 'zeromq';
import { IBlockTemplate } from '../models/bitcoin-rpc/IBlockTemplate';
import { IMiningInfo } from '../models/bitcoin-rpc/IMiningInfo';
@Injectable()
export class BitcoinRpcService {
export class BitcoinRpcService implements OnModuleInit {
private blockHeight = 0;
private client: RPCClient;
@ -21,6 +21,10 @@ export class BitcoinRpcService {
private rpcBlockService: RpcBlockService
) {
}
async onModuleInit() {
const url = this.configService.get('BITCOIN_RPC_URL');
const user = this.configService.get('BITCOIN_RPC_USER');
const pass = this.configService.get('BITCOIN_RPC_PASSWORD');
@ -36,19 +40,36 @@ export class BitcoinRpcService {
});
if (this.configService.get('BITCOIN_ZMQ_HOST')) {
const sock = zmq.socket("sub");
sock.connect(this.configService.get('BITCOIN_ZMQ_HOST'));
sock.subscribe("rawblock");
sock.on("message", async (topic: Buffer, message: Buffer) => {
console.log("new block zmq");
await this.pollMiningInfo();
console.log('Using ZMQ');
const sock = new zmq.Subscriber;
sock.connectTimeout = 1000;
sock.events.on('connect', () => {
console.log('ZMQ Connected');
});
this.pollMiningInfo().then(() => { });
sock.events.on('connect:retry', () => {
console.log('ZMQ Unable to connect, Retrying');
});
sock.connect(this.configService.get('BITCOIN_ZMQ_HOST'));
sock.subscribe('rawblock');
// Don't await this, otherwise it will block the rest of the program
this.listenForNewBlocks(sock);
await this.pollMiningInfo();
} else {
setInterval(this.pollMiningInfo.bind(this), 500);
}
}
private async listenForNewBlocks(sock: zmq.Subscriber) {
for await (const [topic, msg] of sock) {
console.log("New Block");
await this.pollMiningInfo();
}
}
public async pollMiningInfo() {
const miningInfo = await this.getMiningInfo();
if (miningInfo != null && miningInfo.blocks > this.blockHeight) {