diff --git a/backend/src/api/services/stratum.ts b/backend/src/api/services/stratum.ts new file mode 100644 index 000000000..5fa3dd7a3 --- /dev/null +++ b/backend/src/api/services/stratum.ts @@ -0,0 +1,101 @@ +import { WebSocket } from 'ws'; +import logger from '../../logger'; +import websocketHandler from '../websocket-handler'; + +export interface StratumJob { + pool: number; + height: number; + coinbase: string; + scriptsig: string; + reward: number; + jobId: string; + extraNonce: string; + extraNonce2Size: number; + prevHash: string; + coinbase1: string; + coinbase2: string; + merkleBranches: string[]; + version: string; + bits: string; + time: string; + timestamp: number; + cleanJobs: boolean; + received: number; +} + +function isStratumJob(obj: any): obj is StratumJob { + return obj + && typeof obj === 'object' + && 'pool' in obj + && 'prevHash' in obj + && 'height' in obj + && 'received' in obj + && 'version' in obj + && 'timestamp' in obj + && 'bits' in obj + && 'merkleBranches' in obj + && 'cleanJobs' in obj; +} + +class StratumApi { + private ws: WebSocket | null = null; + private runWebsocketLoop: boolean = false; + private startedWebsocketLoop: boolean = false; + private websocketConnected: boolean = false; + private jobs: Record = {}; + + public constructor() {} + + public getJobs(): Record { + return this.jobs; + } + + private handleWebsocketMessage(msg: any): void { + if (isStratumJob(msg)) { + this.jobs[msg.pool] = msg; + websocketHandler.handleNewStratumJob(this.jobs[msg.pool]); + } + } + + public async connectWebsocket(): Promise { + this.runWebsocketLoop = true; + if (this.startedWebsocketLoop) { + return; + } + while (this.runWebsocketLoop) { + this.startedWebsocketLoop = true; + if (!this.ws) { + this.ws = new WebSocket(`http://localhost:3333`); + this.websocketConnected = true; + + this.ws.on('open', () => { + logger.info('Stratum websocket opened'); + }); + + this.ws.on('error', (error) => { + logger.err('Stratum websocket error: ' + error); + this.ws = null; + this.websocketConnected = false; + }); + + this.ws.on('close', () => { + logger.info('Stratum websocket closed'); + this.ws = null; + this.websocketConnected = false; + }); + + this.ws.on('message', (data, isBinary) => { + try { + const parsedMsg = JSON.parse((isBinary ? data : data.toString()) as string); + this.handleWebsocketMessage(parsedMsg); + } catch (e) { + logger.warn('Failed to parse stratum websocket message: ' + (e instanceof Error ? e.message : e)); + } + }); + } + await new Promise(resolve => setTimeout(resolve, 5000)); + } + } +} + +export default new StratumApi(); \ No newline at end of file diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 13e27c360..390896caa 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -38,6 +38,7 @@ interface AddressTransactions { import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; import { calculateMempoolTxCpfp } from './cpfp'; import { getRecentFirstSeen } from '../utils/file-read'; +import stratumApi, { StratumJob } from './services/stratum'; // valid 'want' subscriptions const wantable = [ @@ -403,6 +404,16 @@ class WebsocketHandler { delete client['track-mempool']; } + if (parsedMessage && parsedMessage['track-stratum'] != null) { + if (parsedMessage['track-stratum']) { + const sub = parsedMessage['track-stratum']; + client['track-stratum'] = sub; + response['stratumJobs'] = this.socketData['stratumJobs']; + } else { + client['track-stratum'] = false; + } + } + if (Object.keys(response).length) { client.send(this.serializeResponse(response)); } @@ -1384,6 +1395,23 @@ class WebsocketHandler { await statistics.runStatistics(); } + public handleNewStratumJob(job: StratumJob): void { + this.updateSocketDataFields({ 'stratumJobs': stratumApi.getJobs() }); + + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { + if (client.readyState !== WebSocket.OPEN) { + return; + } + if (client['track-stratum'] && (client['track-stratum'] === 'all' || client['track-stratum'] === job.pool)) { + client.send(JSON.stringify({ + 'stratumJob': job + })); + } + }); + } + } + // takes a dictionary of JSON serialized values // and zips it together into a valid JSON object private serializeResponse(response): string { diff --git a/backend/src/index.ts b/backend/src/index.ts index c179b66bc..53c4c2f22 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -48,6 +48,7 @@ import accelerationRoutes from './api/acceleration/acceleration.routes'; import aboutRoutes from './api/about.routes'; import mempoolBlocks from './api/mempool-blocks'; import walletApi from './api/services/wallets'; +import stratumApi from './api/services/stratum'; class Server { private wss: WebSocket.Server | undefined; @@ -320,6 +321,7 @@ class Server { loadingIndicators.setProgressChangedCallback(websocketHandler.handleLoadingChanged.bind(websocketHandler)); accelerationApi.connectWebsocket(); + stratumApi.connectWebsocket(); } setUpHttpApiRoutes(): void { diff --git a/frontend/src/app/components/stratum/stratum-list/stratum-list.component.html b/frontend/src/app/components/stratum/stratum-list/stratum-list.component.html new file mode 100644 index 000000000..6132035be --- /dev/null +++ b/frontend/src/app/components/stratum/stratum-list/stratum-list.component.html @@ -0,0 +1,45 @@ +
+

Stratum Jobs

+ +
+ +
+ + + + + + + + + + + @for (row of rows; track row.job.pool) { + + + + @for (cell of row.merkleCells; track $index) { + + } + + + } + +
HeightReward + Merkle Branches + Pool
+ {{ row.job.height }} + + + +
+
+ @if (pools[row.job.pool]) { + + + {{ pools[row.job.pool].name}} + + } +
+
+
diff --git a/frontend/src/app/components/stratum/stratum-list/stratum-list.component.scss b/frontend/src/app/components/stratum/stratum-list/stratum-list.component.scss new file mode 100644 index 000000000..da0e63967 --- /dev/null +++ b/frontend/src/app/components/stratum/stratum-list/stratum-list.component.scss @@ -0,0 +1,101 @@ +.stratum-table { + width: 100%; +} + +td { + position: relative; + height: 2em; + + &.height, &.reward { + padding: 0 5px; + } + + &.pool { + padding-left: 5px; + padding-right: 20px; + } + + &.merkle { + width: 100px; + .pipe-segment { + position: absolute; + border-color: white; + box-sizing: content-box; + + &.vertical { + top: 0; + right: 0; + width: 50%; + height: 100%; + border-left: solid 4px; + } + &.horizontal { + bottom: 0; + left: 0; + width: 100%; + height: 50%; + border-top: solid 4px; + } + &.branch-top { + bottom: 0; + right: 0; + width: 100%; + height: 50%; + border-top: solid 4px; + &::after { + content: ""; + position: absolute; + box-sizing: content-box; + top: -4px; + right: 0px; + bottom: 0; + width: 50%; + border-top: solid 4px; + border-left: solid 4px; + border-top-left-radius: 5px; + } + } + &.branch-mid { + bottom: 0; + right: 0px; + width: 50%; + height: 100%; + border-left: solid 4px; + &::after { + content: ""; + position: absolute; + box-sizing: content-box; + top: -4px; + left: -4px; + width: 100%; + height: 50%; + border-bottom: solid 4px; + border-left: solid 4px; + border-bottom-left-radius: 5px; + } + } + &.branch-end { + top: -4px; + right: 0; + width: 50%; + height: 50%; + border-bottom-left-radius: 5px; + border-bottom: solid 4px; + border-left: solid 4px; + } + } + } +} + +.badge { + position: relative; + color: #FFF; +} + +.pool-logo { + width: 15px; + height: 15px; + position: relative; + top: -1px; + margin-right: 2px; +} \ No newline at end of file diff --git a/frontend/src/app/components/stratum/stratum-list/stratum-list.component.ts b/frontend/src/app/components/stratum/stratum-list/stratum-list.component.ts new file mode 100644 index 000000000..25bb42d1c --- /dev/null +++ b/frontend/src/app/components/stratum/stratum-list/stratum-list.component.ts @@ -0,0 +1,185 @@ +import { Component, OnInit, ChangeDetectionStrategy, OnDestroy, ChangeDetectorRef } from '@angular/core'; +import { StateService } from '../../../services/state.service'; +import { WebsocketService } from '../../../services/websocket.service'; +import { map, Observable } from 'rxjs'; +import { StratumJob } from '../../../interfaces/websocket.interface'; +import { MiningService } from '../../../services/mining.service'; +import { SinglePoolStats } from '../../../interfaces/node-api.interface'; + +type MerkleCellType = ' ' | '┬' | '├' | '└' | '│' | '─' | 'leaf'; + +interface MerkleCell { + hash: string; + type: MerkleCellType; + job?: StratumJob; +} + +interface MerkleTree { + hash?: string; + job: string; + size: number; + children?: MerkleTree[]; +} + +interface PoolRow { + job: StratumJob; + merkleCells: MerkleCell[]; +} + +@Component({ + selector: 'app-stratum-list', + templateUrl: './stratum-list.component.html', + styleUrls: ['./stratum-list.component.scss'], + changeDetection: ChangeDetectionStrategy.OnPush, +}) +export class StratumList implements OnInit, OnDestroy { + rows$: Observable; + pools: { [id: number]: SinglePoolStats } = {}; + poolsReady: boolean = false; + + constructor( + private stateService: StateService, + private websocketService: WebsocketService, + private miningService: MiningService, + private cd: ChangeDetectorRef, + ) {} + + ngOnInit(): void { + this.miningService.getPools().subscribe(pools => { + this.pools = {}; + for (const pool of pools) { + this.pools[pool.unique_id] = pool; + } + this.poolsReady = true; + this.cd.markForCheck(); + }); + this.rows$ = this.stateService.stratumJobs$.pipe( + map((jobs) => this.processJobs(jobs)), + ); + this.websocketService.startTrackStratum('all'); + } + + processJobs(jobs: Record): PoolRow[] { + if (Object.keys(jobs).length === 0) { + return []; + } + + const numBranches = Math.max(...Object.values(jobs).map(job => job.merkleBranches.length)); + + let trees: MerkleTree[] = Object.keys(jobs).map(job => ({ + job, + size: 1, + })); + + // build tree from bottom up + for (let col = numBranches - 1; col >= 0; col--) { + const groups: Record = {}; + for (const tree of trees) { + const hash = jobs[tree.job].merkleBranches[col]; + if (!groups[hash]) { + groups[hash] = []; + } + groups[hash].push(tree); + } + trees = Object.values(groups).map(group => ({ + hash: jobs[group[0].job].merkleBranches[col], + job: group[0].job, + children: group, + size: group.reduce((acc, tree) => acc + tree.size, 0), + })); + } + + // initialize grid of cells + const rows: (MerkleCell | null)[][] = []; + for (let i = 0; i < Object.keys(jobs).length; i++) { + const row: (MerkleCell | null)[] = []; + for (let j = 0; j <= numBranches; j++) { + row.push(null); + } + rows.push(row); + } + + // fill in the cells + let colTrees = [trees.sort((a, b) => { + if (a.size !== b.size) { + return b.size - a.size; + } + return a.job.localeCompare(b.job); + })]; + for (let col = 0; col <= numBranches; col++) { + let row = 0; + const nextTrees: MerkleTree[][] = []; + for (let g = 0; g < colTrees.length; g++) { + for (let t = 0; t < colTrees[g].length; t++) { + const tree = colTrees[g][t]; + const isFirstTree = (t === 0); + const isLastTree = (t === colTrees[g].length - 1); + for (let i = 0; i < tree.size; i++) { + const isFirstCell = (i === 0); + const isLeaf = (col === numBranches); + rows[row][col] = { + hash: tree.hash, + job: isLeaf ? jobs[tree.job] : undefined, + type: 'leaf', + }; + if (col > 0) { + rows[row][col - 1].type = getCellType(isFirstCell, isFirstTree, isLastTree); + } + row++; + } + if (tree.children) { + nextTrees.push(tree.children.sort((a, b) => { + if (a.size !== b.size) { + return b.size - a.size; + } + return a.job.localeCompare(b.job); + })); + } + } + } + colTrees = nextTrees; + } + return rows.map(row => ({ + job: row[row.length - 1].job, + merkleCells: row.slice(0, -1), + })); + } + + pipeToClass(type: MerkleCellType): string { + return { + ' ': 'empty', + '┬': 'branch-top', + '├': 'branch-mid', + '└': 'branch-end', + '│': 'vertical', + '─': 'horizontal', + 'leaf': 'leaf' + }[type]; + } + + ngOnDestroy(): void { + this.websocketService.stopTrackStratum(); + } +} + +function getCellType(isFirstCell, isFirstTree, isLastTree): MerkleCellType { + if (isFirstCell) { + if (isFirstTree) { + if (isLastTree) { + return '─'; + } else { + return '┬'; + } + } else if (isLastTree) { + return '└'; + } else { + return '├'; + } + } else { + if (isLastTree) { + return ' '; + } else { + return '│'; + } + } +} diff --git a/frontend/src/app/interfaces/websocket.interface.ts b/frontend/src/app/interfaces/websocket.interface.ts index d61610a2e..9281f0fc7 100644 --- a/frontend/src/app/interfaces/websocket.interface.ts +++ b/frontend/src/app/interfaces/websocket.interface.ts @@ -21,6 +21,8 @@ export interface WebsocketResponse { rbfInfo?: RbfTree; rbfLatest?: RbfTree[]; rbfLatestSummary?: ReplacementInfo[]; + stratumJob?: StratumJob; + stratumJobs?: Record; utxoSpent?: object; transactions?: TransactionStripped[]; loadingIndicators?: ILoadingIndicators; @@ -37,6 +39,7 @@ export interface WebsocketResponse { 'track-rbf-summary'?: boolean; 'track-accelerations'?: boolean; 'track-wallet'?: string; + 'track-stratum'?: string | number; 'watch-mempool'?: boolean; 'refresh-blocks'?: boolean; } @@ -150,3 +153,24 @@ export interface HealthCheckHost { electrs?: string; } } + +export interface StratumJob { + pool: number; + height: number; + coinbase: string; + scriptsig: string; + reward: number; + jobId: string; + extraNonce: string; + extraNonce2Size: number; + prevHash: string; + coinbase1: string; + coinbase2: string; + merkleBranches: string[]; + version: string; + bits: string; + time: string; + timestamp: number; + cleanJobs: boolean; + received: number; +} diff --git a/frontend/src/app/master-page.module.ts b/frontend/src/app/master-page.module.ts index 2ee2e0bd8..35b632ab5 100644 --- a/frontend/src/app/master-page.module.ts +++ b/frontend/src/app/master-page.module.ts @@ -10,9 +10,10 @@ import { TestTransactionsComponent } from '@components/test-transactions/test-tr import { CalculatorComponent } from '@components/calculator/calculator.component'; import { BlocksList } from '@components/blocks-list/blocks-list.component'; import { RbfList } from '@components/rbf-list/rbf-list.component'; +import { StratumList } from '@components/stratum/stratum-list/stratum-list.component'; import { ServerHealthComponent } from '@components/server-health/server-health.component'; import { ServerStatusComponent } from '@components/server-health/server-status.component'; -import { FaucetComponent } from '@components/faucet/faucet.component' +import { FaucetComponent } from '@components/faucet/faucet.component'; const browserWindow = window || {}; // @ts-ignore @@ -56,6 +57,10 @@ const routes: Routes = [ path: 'rbf', component: RbfList, }, + { + path: 'stratum', + component: StratumList, + }, { path: 'terms-of-service', loadChildren: () => import('@components/terms-of-service/terms-of-service.module').then(m => m.TermsOfServiceModule), diff --git a/frontend/src/app/services/mining.service.ts b/frontend/src/app/services/mining.service.ts index 760ce93cb..a181ef771 100644 --- a/frontend/src/app/services/mining.service.ts +++ b/frontend/src/app/services/mining.service.ts @@ -64,8 +64,8 @@ export class MiningService { ); } } - - /** + + /** * Get names and slugs of all pools */ public getPools(): Observable { @@ -75,7 +75,6 @@ export class MiningService { return this.poolsData; }) ); - } /** * Set the hashrate power of ten we want to display diff --git a/frontend/src/app/services/state.service.ts b/frontend/src/app/services/state.service.ts index 0d006b552..9d2c2ed7b 100644 --- a/frontend/src/app/services/state.service.ts +++ b/frontend/src/app/services/state.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, PLATFORM_ID, LOCALE_ID } from '@angular/core'; import { ReplaySubject, BehaviorSubject, Subject, fromEvent, Observable } from 'rxjs'; -import { AddressTxSummary, Transaction } from '@interfaces/electrs.interface'; -import { AccelerationDelta, HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, isMempoolState } from '@interfaces/websocket.interface'; +import { Transaction } from '@interfaces/electrs.interface'; +import { AccelerationDelta, HealthCheckHost, IBackendInfo, MempoolBlock, MempoolBlockUpdate, MempoolInfo, Recommendedfees, ReplacedTransaction, ReplacementInfo, StratumJob, isMempoolState } from '@interfaces/websocket.interface'; import { Acceleration, AccelerationPosition, BlockExtended, CpfpInfo, DifficultyAdjustment, MempoolPosition, OptimizedMempoolStats, RbfTree, TransactionStripped } from '@interfaces/node-api.interface'; import { Router, NavigationStart } from '@angular/router'; import { isPlatformBrowser } from '@angular/common'; @@ -159,6 +159,8 @@ export class StateService { liveMempoolBlockTransactions$: Observable<{ block: number, transactions: { [txid: string]: TransactionStripped} }>; accelerations$ = new Subject(); liveAccelerations$: Observable; + stratumJobUpdate$ = new Subject<{ state: Record } | { job: StratumJob }>(); + stratumJobs$ = new BehaviorSubject>({}); txConfirmed$ = new Subject<[string, BlockExtended]>(); txReplaced$ = new Subject(); txRbfInfo$ = new Subject(); @@ -303,6 +305,24 @@ export class StateService { map((accMap) => Object.values(accMap).sort((a,b) => b.added - a.added)) ); + this.stratumJobUpdate$.pipe( + scan((acc: Record, update: { state: Record } | { job: StratumJob }) => { + if ('state' in update) { + // Replace the entire state + return update.state; + } else { + // Update or create a single job entry + return { + ...acc, + [update.job.pool]: update.job + }; + } + }, {}), + shareReplay(1) + ).subscribe(val => { + this.stratumJobs$.next(val); + }); + this.networkChanged$.subscribe((network) => { this.transactions$ = new BehaviorSubject(null); this.blocksSubject$.next([]); diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index 0f5368244..b82b32dd5 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -36,6 +36,7 @@ export class WebsocketService { private isTrackingAccelerations: boolean = false; private isTrackingWallet: boolean = false; private trackingWalletName: string; + private isTrackingStratum: string | number | false = false; private trackingMempoolBlock: number; private trackingMempoolBlockNetwork: string; private stoppingTrackMempoolBlock: any | null = null; @@ -143,6 +144,9 @@ export class WebsocketService { if (this.isTrackingWallet) { this.startTrackingWallet(this.trackingWalletName); } + if (this.isTrackingStratum !== false) { + this.startTrackStratum(this.isTrackingStratum); + } this.stateService.connectionState$.next(2); } @@ -289,6 +293,18 @@ export class WebsocketService { } } + startTrackStratum(pool: number | string) { + this.websocketSubject.next({ 'track-stratum': pool }); + this.isTrackingStratum = pool; + } + + stopTrackStratum() { + if (this.isTrackingStratum) { + this.websocketSubject.next({ 'track-stratum': null }); + this.isTrackingStratum = false; + } + } + fetchStatistics(historicalDate: string) { this.websocketSubject.next({ historicalDate }); } @@ -512,6 +528,14 @@ export class WebsocketService { this.stateService.previousRetarget$.next(response.previousRetarget); } + if (response.stratumJobs) { + this.stateService.stratumJobUpdate$.next({ state: response.stratumJobs }); + } + + if (response.stratumJob) { + this.stateService.stratumJobUpdate$.next({ job: response.stratumJob }); + } + if (response['tomahawk']) { this.stateService.serverHealth$.next(response['tomahawk']); } diff --git a/frontend/src/app/shared/shared.module.ts b/frontend/src/app/shared/shared.module.ts index 2bcfc1255..1d4f5fd99 100644 --- a/frontend/src/app/shared/shared.module.ts +++ b/frontend/src/app/shared/shared.module.ts @@ -83,6 +83,7 @@ import { AmountShortenerPipe } from '@app/shared/pipes/amount-shortener.pipe'; import { DifficultyAdjustmentsTable } from '@components/difficulty-adjustments-table/difficulty-adjustments-table.components'; import { BlocksList } from '@components/blocks-list/blocks-list.component'; import { RbfList } from '@components/rbf-list/rbf-list.component'; +import { StratumList } from '@components/stratum/stratum-list/stratum-list.component'; import { RewardStatsComponent } from '@components/reward-stats/reward-stats.component'; import { DataCyDirective } from '@app/data-cy.directive'; import { LoadingIndicatorComponent } from '@components/loading-indicator/loading-indicator.component'; @@ -201,6 +202,7 @@ import { OnlyVsizeDirective, OnlyWeightDirective } from '@app/shared/components/ DifficultyAdjustmentsTable, BlocksList, RbfList, + StratumList, DataCyDirective, RewardStatsComponent, LoadingIndicatorComponent, @@ -345,6 +347,7 @@ import { OnlyVsizeDirective, OnlyWeightDirective } from '@app/shared/components/ AmountShortenerPipe, DifficultyAdjustmentsTable, BlocksList, + StratumList, DataCyDirective, RewardStatsComponent, LoadingIndicatorComponent,