This commit is contained in:
Ben 2023-09-30 08:42:15 -04:00
parent cd93fc388a
commit 398cc61fbc

@ -1,6 +1,8 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { BehaviorSubject, lastValueFrom } from 'rxjs';
import { FindOptionsWhere, ObjectId, Repository } from 'typeorm';
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
import { ClientEntity } from './client.entity';
@ -9,13 +11,54 @@ import { ClientEntity } from './client.entity';
@Injectable()
export class ClientService {
private insertQueue: { client: Partial<ClientEntity>, subject: BehaviorSubject<any> }[] = [];
private heartbeatQueue: {
criteria: string | string[] | number | number[] | Date | Date[] | ObjectId | ObjectId[] | FindOptionsWhere<ClientEntity>,
partialEntity: QueryDeepPartialEntity<ClientEntity>,
subject: BehaviorSubject<any>
}[] = [];
constructor(
@InjectRepository(ClientEntity)
private clientRepository: Repository<ClientEntity>
) {
setInterval(async () => {
if (this.heartbeatQueue.length < 1) {
return;
}
const heartbeatToInsert = [...this.heartbeatQueue];
this.heartbeatQueue = [];
await this.clientRepository.manager.transaction(async transactionalEntityManager => {
for (let i = 0; i < heartbeatToInsert.length; i++) {
await this.clientRepository.update(heartbeatToInsert[i].criteria, heartbeatToInsert[i].partialEntity);
}
});
heartbeatToInsert.forEach((val, index) => {
heartbeatToInsert[index].subject.next(null);
heartbeatToInsert[index].subject.complete();
});
}, 5000);
setInterval(async () => {
if (this.insertQueue.length < 1) {
return;
}
const clientsToInsert = [...this.insertQueue];
this.insertQueue = [];
const results = await this.clientRepository.insert(clientsToInsert.map(i => i.client));
results.generatedMaps.forEach((val, index) => {
clientsToInsert[index].subject.next({ ...clientsToInsert[index].client, ...val });
clientsToInsert[index].subject.complete();
});
}, 5000);
}
@ -31,7 +74,15 @@ export class ClientService {
}
public async heartbeat(address: string, clientName: string, sessionId: string, hashRate: number) {
return await this.clientRepository.update({ address, clientName, sessionId }, { hashRate, deletedAt: null, updatedAt: new Date() });
const subject$ = new BehaviorSubject<ClientEntity>(null);
this.heartbeatQueue.push({
criteria: { address, clientName, sessionId },
partialEntity: { hashRate, deletedAt: null, updatedAt: new Date() },
subject: subject$
}
);
return lastValueFrom(subject$);
}
// public async save(client: Partial<ClientEntity>) {
@ -40,14 +91,10 @@ export class ClientService {
public async insert(partialClient: Partial<ClientEntity>): Promise<ClientEntity> {
const insertResult = await this.clientRepository.insert(partialClient);
const client = {
...partialClient,
...insertResult.generatedMaps[0]
};
return client as ClientEntity;
const subject$ = new BehaviorSubject<ClientEntity>(null);
this.insertQueue.push({ client: partialClient, subject: subject$ });
return lastValueFrom(subject$);
}
public async delete(sessionId: string) {