better unsubscribe, startup stagger

This commit is contained in:
Ben Wilson 2023-08-05 09:57:39 -04:00
parent fe677b88f8
commit b2f5cb5806
4 changed files with 19 additions and 16 deletions

View File

@ -56,6 +56,8 @@ export class ClientService {
const client = this.clientRepository.create(partialClient);
this.bulkInsertClients.push(client);
// wait for the bulk insert to go though
// while we await node is free to service other requests
await setTimeout(2000);
return client;
}

View File

@ -3,6 +3,7 @@ import { NestFactory } from '@nestjs/core';
import { FastifyAdapter, NestFastifyApplication } from '@nestjs/platform-fastify';
import * as bitcoinjs from 'bitcoinjs-lib';
import { useContainer } from 'class-validator';
import { setTimeout } from 'timers/promises';
import * as ecc from 'tiny-secp256k1';
import { AppModule } from './app.module';
@ -20,6 +21,13 @@ async function bootstrap() {
// };
//const app = await NestFactory.create<NestFastifyApplication>(AppModule, new FastifyAdapter({ https: httpsOptions }));
// stagger startup
if (process.env.NODE_APP_INSTANCE != null) {
await setTimeout(parseInt(process.env.NODE_APP_INSTANCE) * 5000);
}
const app = await NestFactory.create<NestFastifyApplication>(AppModule, new FastifyAdapter());
app.setGlobalPrefix('api')
app.useGlobalPipes(

View File

@ -6,7 +6,7 @@ import { validate, ValidatorOptions } from 'class-validator';
import * as crypto from 'crypto';
import { Socket } from 'net';
import PromiseSocket from 'promise-socket';
import { firstValueFrom, takeUntil } from 'rxjs';
import { firstValueFrom, Subscription } from 'rxjs';
import { AddressSettingsService } from '../ORM/address-settings/address-settings.service';
import { BlocksService } from '../ORM/blocks/blocks.service';
@ -16,7 +16,6 @@ import { ClientService } from '../ORM/client/client.service';
import { BitcoinRpcService } from '../services/bitcoin-rpc.service';
import { NotificationService } from '../services/notification.service';
import { IJobTemplate, StratumV1JobsService } from '../services/stratum-v1-jobs.service';
import { EasyUnsubscribe } from '../utils/EasyUnsubscribe';
import { eRequestMethod } from './enums/eRequestMethod';
import { eResponseMethod } from './enums/eResponseMethod';
import { eStratumErrorCode } from './enums/eStratumErrorCode';
@ -30,12 +29,13 @@ import { SuggestDifficulty } from './stratum-messages/SuggestDifficultyMessage';
import { StratumV1ClientStatistics } from './StratumV1ClientStatistics';
export class StratumV1Client extends EasyUnsubscribe {
export class StratumV1Client {
private clientSubscription: SubscriptionMessage;
private clientConfiguration: ConfigurationMessage;
private clientAuthorization: AuthorizationMessage;
private clientSuggestedDifficulty: SuggestDifficulty;
private stratumSubscription: Subscription;
private statistics: StratumV1ClientStatistics;
private stratumInitialized = false;
@ -47,6 +47,7 @@ export class StratumV1Client extends EasyUnsubscribe {
public sessionStart: Date;
constructor(
public readonly promiseSocket: PromiseSocket<Socket>,
private readonly stratumV1JobsService: StratumV1JobsService,
@ -58,7 +59,6 @@ export class StratumV1Client extends EasyUnsubscribe {
private readonly configService: ConfigService,
private readonly addressSettingsService: AddressSettingsService
) {
super();
this.promiseSocket.socket.on('data', (data: Buffer) => {
data.toString()
@ -80,6 +80,10 @@ export class StratumV1Client extends EasyUnsubscribe {
console.log(`New client ID: : ${this.extraNonceAndSessionId}`);
}
public destroy() {
this.stratumSubscription.unsubscribe();
}
private getRandomHexString() {
const randomBytes = crypto.randomBytes(4); // 4 bytes = 32 bits
const randomNumber = randomBytes.readUInt32BE(0); // Convert bytes to a 32-bit unsigned integer
@ -289,8 +293,7 @@ export class StratumV1Client extends EasyUnsubscribe {
bestDifficulty: 0
});
this.stratumV1JobsService.newMiningJob$.pipe(
takeUntil(this.easyUnsubscribe)
this.stratumSubscription = this.stratumV1JobsService.newMiningJob$.pipe(
).subscribe(async (jobTemplate) => {
try {
await this.sendNewMiningJob(jobTemplate);

View File

@ -1,10 +0,0 @@
import { Subject } from 'rxjs';
export class EasyUnsubscribe {
protected easyUnsubscribe = new Subject<void>();
public destroy(): void {
this.easyUnsubscribe.next();
this.easyUnsubscribe.complete();
}
}