bulk client stat updates

This commit is contained in:
Benjamin Wilson 2025-02-25 19:02:38 -05:00
parent 637d30f1a5
commit f465b837f5
4 changed files with 66 additions and 28 deletions

View File

@ -8,6 +8,8 @@ import { ClientStatisticsEntity } from './client-statistics.entity';
@Injectable()
export class ClientStatisticsService {
private bulkAsyncUpdates: Partial<ClientStatisticsEntity>[] = [];
constructor(
@InjectDataSource()
@ -18,16 +20,57 @@ export class ClientStatisticsService {
}
public async update(clientStatistic: Partial<ClientStatisticsEntity>) {
// public async update(clientStatistic: Partial<ClientStatisticsEntity>) {
await this.clientStatisticsRepository.update({ clientId: clientStatistic.clientId, time: clientStatistic.time },
{
shares: clientStatistic.shares,
acceptedCount: clientStatistic.acceptedCount,
updatedAt: new Date()
});
// await this.clientStatisticsRepository.update({ clientId: clientStatistic.clientId, time: clientStatistic.time },
// {
// shares: clientStatistic.shares,
// acceptedCount: clientStatistic.acceptedCount,
// updatedAt: new Date()
// });
// }
public async updateBulkAsync(clientStatistic: Partial<ClientStatisticsEntity>, lastSeenIndex: number) {
if(this.bulkAsyncUpdates.length > lastSeenIndex && this.bulkAsyncUpdates[lastSeenIndex].clientId == clientStatistic.clientId && this.bulkAsyncUpdates[lastSeenIndex].time == clientStatistic.time){
this.bulkAsyncUpdates[lastSeenIndex].shares = clientStatistic.shares;
this.bulkAsyncUpdates[lastSeenIndex].acceptedCount = clientStatistic.acceptedCount;
return lastSeenIndex;
}
return this.bulkAsyncUpdates.push(clientStatistic) - 1;
}
public async doBulkAsyncUpdate(){
// Step 1: Prepare data for bulk update
const values = this.bulkAsyncUpdates.map(stat =>
`('${stat.clientId}', ${stat.time}, ${stat.shares || 0}, ${stat.acceptedCount || 0}, NOW())`
).join(',');
// Step 2: Use a temp table and single UPDATE
await this.clientStatisticsRepository.query(`
CREATE TEMP TABLE temp_stats (
clientId UUID,
time BIGINT,
shares INT,
acceptedCount INT,
updatedAt TIMESTAMP
) ON COMMIT DROP;
INSERT INTO temp_stats (clientId, time, shares, acceptedCount, updatedAt)
VALUES ${values};
UPDATE "client_statistics_entity" cse
SET shares = ts.shares,
acceptedCount = ts.acceptedCount,
updatedAt = ts.updatedAt
FROM temp_stats ts
WHERE cse.clientId = ts.clientId AND cse.time = ts.time;
`);
this.bulkAsyncUpdates = [];
}
public async insert(clientStatistic: Partial<ClientStatisticsEntity>) {
await this.clientStatisticsRepository.insert(clientStatistic);
}

View File

@ -72,7 +72,7 @@ const ORMModules = [
],
synchronize: configService.get('PRODUCTION') != 'true',
logging: false,
poolSize: 30,
poolSize: 10,
migrations: [
UniqueNonceIndex
]

View File

@ -16,6 +16,8 @@ export class StratumV1ClientStatistics {
private currentTimeSlot: number = null;
private lastSave: number = null;
private bulkUpdateIndex = 0;
constructor(
private readonly clientStatisticsService: ClientStatisticsService,
) {
@ -60,15 +62,12 @@ export class StratumV1ClientStatistics {
} else if (this.currentTimeSlot != timeSlot) {
// Transitioning to a new time slot,
// First update the old time slot with the latest data
await this.clientStatisticsService.update({
this.bulkUpdateIndex = await this.clientStatisticsService.updateBulkAsync({
time: this.currentTimeSlot,
clientId: client.id,
shares: this.shares,
acceptedCount: this.acceptedCount,
address: client.address,
clientName: client.clientName,
sessionId: client.sessionId
});
}, this.bulkUpdateIndex);
// Set the new time slot and add incoming shares then insert it
this.currentTimeSlot = timeSlot;
this.shares = targetDifficulty;
@ -83,27 +82,18 @@ export class StratumV1ClientStatistics {
sessionId: client.sessionId
});
this.lastSave = new Date().getTime();
} else if ((date.getTime() - 60 * 1000) > this.lastSave) {
// If we haven't saved for a minute, update the table
this.shares += targetDifficulty;
this.acceptedCount++;
await this.clientStatisticsService.update({
time: this.currentTimeSlot,
clientId: client.id,
shares: this.shares,
acceptedCount: this.acceptedCount,
address: client.address,
clientName: client.clientName,
sessionId: client.sessionId
});
this.lastSave = new Date().getTime();
} else {
// Accept the shares if none of the prior conditions are met,
// saving to memory for storing later
this.shares += targetDifficulty;
this.acceptedCount++;
this.bulkUpdateIndex = await this.clientStatisticsService.updateBulkAsync({
time: this.currentTimeSlot,
clientId: client.id,
shares: this.shares,
acceptedCount: this.acceptedCount,
}, this.bulkUpdateIndex);
}
}
public getSuggestedDifficulty(clientDifficulty: number) {

View File

@ -42,6 +42,11 @@ export class AppService implements OnModuleInit {
await this.updateChart();
}, 1000 * 60 * 10);
setInterval(async () => {
console.log('Bulk update client stats');
await this.clientStatisticsService.doBulkAsyncUpdate();
}, 1000 * 60 * 1);
setInterval(async () => {
console.log('Refreshing user agent report view')