This commit is contained in:
Ben 2023-09-30 08:41:35 -04:00
parent 0d75714c55
commit cd93fc388a

View File

@ -1,8 +1,6 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { BehaviorSubject, lastValueFrom } from 'rxjs';
import { FindOptionsWhere, ObjectId, Repository } from 'typeorm';
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
import { Repository } from 'typeorm';
import { ClientEntity } from './client.entity';
@ -11,54 +9,13 @@ import { ClientEntity } from './client.entity';
@Injectable()
export class ClientService {
private insertQueue: { client: Partial<ClientEntity>, subject: BehaviorSubject<any> }[] = [];
private heartbeatQueue: {
criteria: string | string[] | number | number[] | Date | Date[] | ObjectId | ObjectId[] | FindOptionsWhere<ClientEntity>,
partialEntity: QueryDeepPartialEntity<ClientEntity>,
subject: BehaviorSubject<any>
}[] = [];
constructor(
@InjectRepository(ClientEntity)
private clientRepository: Repository<ClientEntity>
) {
setInterval(async () => {
if (this.heartbeatQueue.length < 1) {
return;
}
const heartbeatToInsert = [...this.heartbeatQueue];
this.heartbeatQueue = [];
await this.clientRepository.manager.transaction(async transactionalEntityManager => {
for (let i = 0; i < heartbeatToInsert.length; i++) {
await this.clientRepository.update(heartbeatToInsert[i].criteria, heartbeatToInsert[i].partialEntity);
}
});
heartbeatToInsert.forEach((val, index) => {
heartbeatToInsert[index].subject.next(null);
heartbeatToInsert[index].subject.complete();
});
}, 5000);
setInterval(async () => {
if (this.insertQueue.length < 1) {
return;
}
const clientsToInsert = [...this.insertQueue];
this.insertQueue = [];
const results = await this.clientRepository.insert(clientsToInsert.map(i => i.client));
results.generatedMaps.forEach((val, index) => {
clientsToInsert[index].subject.next({ ...clientsToInsert[index].client, ...val });
clientsToInsert[index].subject.complete();
});
}, 5000);
}
@ -74,15 +31,7 @@ export class ClientService {
}
public async heartbeat(address: string, clientName: string, sessionId: string, hashRate: number) {
const subject$ = new BehaviorSubject<ClientEntity>(null);
this.heartbeatQueue.push({
criteria: { address, clientName, sessionId },
partialEntity: { hashRate, deletedAt: null, updatedAt: new Date() },
subject: subject$
}
);
return lastValueFrom(subject$);
return await this.clientRepository.update({ address, clientName, sessionId }, { hashRate, deletedAt: null, updatedAt: new Date() });
}
// public async save(client: Partial<ClientEntity>) {
@ -91,10 +40,14 @@ export class ClientService {
public async insert(partialClient: Partial<ClientEntity>): Promise<ClientEntity> {
const insertResult = await this.clientRepository.insert(partialClient);
const subject$ = new BehaviorSubject<ClientEntity>(null);
this.insertQueue.push({ client: partialClient, subject: subject$ });
return lastValueFrom(subject$);
const client = {
...partialClient,
...insertResult.generatedMaps[0]
};
return client as ClientEntity;
}
public async delete(sessionId: string) {