diff --git a/src/ORM/client/client.service.ts b/src/ORM/client/client.service.ts index c952843..0399dd1 100644 --- a/src/ORM/client/client.service.ts +++ b/src/ORM/client/client.service.ts @@ -1,8 +1,6 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { BehaviorSubject, lastValueFrom } from 'rxjs'; -import { FindOptionsWhere, ObjectId, Repository } from 'typeorm'; -import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'; +import { Repository } from 'typeorm'; import { ClientEntity } from './client.entity'; @@ -11,54 +9,13 @@ import { ClientEntity } from './client.entity'; @Injectable() export class ClientService { - private insertQueue: { client: Partial, subject: BehaviorSubject }[] = []; - private heartbeatQueue: { - criteria: string | string[] | number | number[] | Date | Date[] | ObjectId | ObjectId[] | FindOptionsWhere, - partialEntity: QueryDeepPartialEntity, - subject: BehaviorSubject - }[] = []; + constructor( @InjectRepository(ClientEntity) private clientRepository: Repository ) { - setInterval(async () => { - if (this.heartbeatQueue.length < 1) { - return; - } - const heartbeatToInsert = [...this.heartbeatQueue]; - this.heartbeatQueue = []; - - - await this.clientRepository.manager.transaction(async transactionalEntityManager => { - for (let i = 0; i < heartbeatToInsert.length; i++) { - await this.clientRepository.update(heartbeatToInsert[i].criteria, heartbeatToInsert[i].partialEntity); - } - }); - - - heartbeatToInsert.forEach((val, index) => { - heartbeatToInsert[index].subject.next(null); - heartbeatToInsert[index].subject.complete(); - }); - - }, 5000); - - setInterval(async () => { - if (this.insertQueue.length < 1) { - return; - } - const clientsToInsert = [...this.insertQueue]; - this.insertQueue = []; - const results = await this.clientRepository.insert(clientsToInsert.map(i => i.client)); - - results.generatedMaps.forEach((val, index) => { - clientsToInsert[index].subject.next({ ...clientsToInsert[index].client, ...val }); - clientsToInsert[index].subject.complete(); - }); - - }, 5000); } @@ -74,15 +31,7 @@ export class ClientService { } public async heartbeat(address: string, clientName: string, sessionId: string, hashRate: number) { - - const subject$ = new BehaviorSubject(null); - this.heartbeatQueue.push({ - criteria: { address, clientName, sessionId }, - partialEntity: { hashRate, deletedAt: null, updatedAt: new Date() }, - subject: subject$ - } - ); - return lastValueFrom(subject$); + return await this.clientRepository.update({ address, clientName, sessionId }, { hashRate, deletedAt: null, updatedAt: new Date() }); } // public async save(client: Partial) { @@ -91,10 +40,14 @@ export class ClientService { public async insert(partialClient: Partial): Promise { + const insertResult = await this.clientRepository.insert(partialClient); - const subject$ = new BehaviorSubject(null); - this.insertQueue.push({ client: partialClient, subject: subject$ }); - return lastValueFrom(subject$); + const client = { + ...partialClient, + ...insertResult.generatedMaps[0] + }; + + return client as ClientEntity; } public async delete(sessionId: string) {