RPC load reduction

This commit is contained in:
Ben 2023-11-24 14:56:35 -05:00
parent 2aa0ebe587
commit b9af79978f
8 changed files with 122 additions and 22 deletions

View File

@ -15,5 +15,6 @@
"Satoshis",
"submitblock",
"Tempalte"
]
],
"idf.portWin": "COM30"
}

View File

@ -0,0 +1,14 @@
import { Column, Entity, PrimaryColumn } from 'typeorm';
@Entity()
export class RpcBlockEntity {
@PrimaryColumn()
blockHeight: number;
@Column({ nullable: true })
lockedBy?: string;
@Column({ nullable: true })
data?: string;
}

View File

@ -0,0 +1,14 @@
import { Global, Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { RpcBlockEntity } from './rpc-block.entity';
import { RpcBlockService } from './rpc-block.service';
@Global()
@Module({
imports: [TypeOrmModule.forFeature([RpcBlockEntity])],
providers: [RpcBlockService],
exports: [TypeOrmModule, RpcBlockService],
})
export class RpcBlocksModule { }

View File

@ -0,0 +1,28 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { RpcBlockEntity } from './rpc-block.entity';
@Injectable()
export class RpcBlockService {
constructor(
@InjectRepository(RpcBlockEntity)
private rpcBlockRepository: Repository<RpcBlockEntity>
) {
}
public getBlock(blockHeight: number) {
return this.rpcBlockRepository.findOne({
where: { blockHeight }
});
}
public lockBlock(blockHeight: number, process: string) {
return this.rpcBlockRepository.save({ blockHeight, data: null, lockedBy: process });
}
public saveBlock(blockHeight: number, data: string) {
return this.rpcBlockRepository.update(blockHeight, { data })
}
}

View File

@ -13,6 +13,7 @@ import { AddressSettingsModule } from './ORM/address-settings/address-settings.m
import { BlocksModule } from './ORM/blocks/blocks.module';
import { ClientStatisticsModule } from './ORM/client-statistics/client-statistics.module';
import { ClientModule } from './ORM/client/client.module';
import { RpcBlocksModule } from './ORM/rpc-block/rpc-block.module';
import { TelegramSubscriptionsModule } from './ORM/telegram-subscriptions/telegram-subscriptions.module';
import { AppService } from './services/app.service';
import { BitcoinRpcService } from './services/bitcoin-rpc.service';
@ -25,12 +26,14 @@ import { StratumV1JobsService } from './services/stratum-v1-jobs.service';
import { StratumV1Service } from './services/stratum-v1.service';
import { TelegramService } from './services/telegram.service';
const ORMModules = [
ClientStatisticsModule,
ClientModule,
AddressSettingsModule,
TelegramSubscriptionsModule,
BlocksModule
BlocksModule,
RpcBlocksModule
]
@Module({

View File

@ -352,8 +352,7 @@ export class StratumV1Client {
}
this.stratumSubscription = this.stratumV1JobsService.newMiningJob$.pipe(
).subscribe(async (jobTemplate) => {
this.stratumSubscription = this.stratumV1JobsService.newMiningJob$.subscribe(async (jobTemplate) => {
try {
await this.sendNewMiningJob(jobTemplate);
} catch (e) {

View File

@ -2,10 +2,12 @@ import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { RPCClient } from 'rpc-bitcoin';
import { BehaviorSubject, filter, shareReplay } from 'rxjs';
import { RpcBlockService } from 'src/ORM/rpc-block/rpc-block.service';
import { IBlockTemplate } from '../models/bitcoin-rpc/IBlockTemplate';
import { IMiningInfo } from '../models/bitcoin-rpc/IMiningInfo';
import * as zmq from 'zeromq';
// import * as zmq from 'zeromq';
@Injectable()
export class BitcoinRpcService {
@ -15,7 +17,11 @@ export class BitcoinRpcService {
private _newBlock$: BehaviorSubject<IMiningInfo> = new BehaviorSubject(undefined);
public newBlock$ = this._newBlock$.pipe(filter(block => block != null), shareReplay({ refCount: true, bufferSize: 1 }));
constructor(private readonly configService: ConfigService) {
constructor(
private readonly configService: ConfigService,
private rpcBlockService: RpcBlockService
) {
const url = this.configService.get('BITCOIN_RPC_URL');
const user = this.configService.get('BITCOIN_RPC_USER');
const pass = this.configService.get('BITCOIN_RPC_PASSWORD');
@ -27,13 +33,13 @@ export class BitcoinRpcService {
console.log('Bitcoin RPC connected');
if (this.configService.get('BITCOIN_ZMQ_HOST')) {
const sock = zmq.socket("sub");
sock.connect(this.configService.get('BITCOIN_ZMQ_HOST'));
sock.subscribe("rawblock");
sock.on("message", async (topic: Buffer, message: Buffer) => {
console.log("new block zmq");
this.pollMiningInfo();
});
// const sock = zmq.socket("sub");
// sock.connect(this.configService.get('BITCOIN_ZMQ_HOST'));
// sock.subscribe("rawblock");
// sock.on("message", async (topic: Buffer, message: Buffer) => {
// console.log("new block zmq");
// this.pollMiningInfo();
// });
this.pollMiningInfo();
} else {
setInterval(this.pollMiningInfo.bind(this), 500);
@ -49,18 +55,53 @@ export class BitcoinRpcService {
}
}
public async getBlockTemplate(): Promise<IBlockTemplate> {
private async waitForBlock(blockHeight: number) {
while (true) {
await new Promise(r => setTimeout(r, 100));
const block = await this.rpcBlockService.getBlock(blockHeight);
if (block != null && block.data != null) {
console.log('promise loop resolved');
return Promise.resolve(JSON.parse(block.data));
}
console.log('promise loop');
}
}
public async getBlockTemplate(blockHeight: number): Promise<IBlockTemplate> {
let result: IBlockTemplate;
try {
result = await this.client.getblocktemplate({
template_request: {
rules: ['segwit'],
mode: 'template',
capabilities: ['serverlist', 'proposal']
const block = await this.rpcBlockService.getBlock(blockHeight);
if (block != null && block.data != null) {
return Promise.resolve(JSON.parse(block.data));
} else if (block == null) {
const { lockedBy } = await this.rpcBlockService.lockBlock(blockHeight, process.env.NODE_APP_INSTANCE);
if (lockedBy != process.env.NODE_APP_INSTANCE) {
await this.waitForBlock(blockHeight);
return;
}
});
result = await this.client.getblocktemplate({
template_request: {
rules: ['segwit'],
mode: 'template',
capabilities: ['serverlist', 'proposal']
}
});
await this.rpcBlockService.saveBlock(blockHeight, JSON.stringify(result));
} else {
//wait for block
await this.waitForBlock(blockHeight);
}
} catch (e) {
console.error('Error getblocktemplate:' ,e.message);
console.error('Error getblocktemplate:', e.message);
throw new Error('Error getblocktemplate');
}
console.log(`getblocktemplate tx count: ${result.transactions.length}`);

View File

@ -44,7 +44,7 @@ export class StratumV1JobsService {
this.newMiningJob$ = combineLatest([this.bitcoinRpcService.newBlock$, interval(60000).pipe(delay(this.delay), startWith(-1))]).pipe(
switchMap(([miningInfo, interval]) => {
return from(this.bitcoinRpcService.getBlockTemplate()).pipe(map((blockTemplate) => {
return from(this.bitcoinRpcService.getBlockTemplate(miningInfo.blocks)).pipe(map((blockTemplate) => {
return {
blockTemplate,
interval