2024-05-10 13:18:05 +07:00
"use strict" ;
// Anything about talking to upstream relays is handled here.
const { parentPort , threadId } = require ( "worker_threads" ) ;
const { version } = require ( "./package.json" ) ;
const WebSocket = require ( "ws" ) ;
const { validateEvent , nip19 , matchFilters , mergeFilters , getFilterLimit } = require ( "nostr-tools" ) ;
const nip42 = require ( "./nip42.js" ) ;
2024-05-11 10:19:27 +07:00
let { relays , log _about _relays , private _keys , reconnect _time , wait _eose , pause _on _limit , max _eose _score , upstream _ratelimit _expiration , max _client _subs , idle _sessions , cache _relays , loadbalancer } = require ( process . env . BOSTR _CONFIG _PATH || "./config" ) ;
2024-05-10 13:18:05 +07:00
log _about _relays = process . env . LOG _ABOUT _RELAYS || log _about _relays ;
loadbalancer = loadbalancer || [ ] ;
if ( relays . length ) loadbalancer . unshift ( "_me" ) ;
// CL MaxEoseScore: Set <max_eose_score> as 0 if configured relays is under of the expected number from <max_eose_score>
if ( relays . length < max _eose _score ) max _eose _score = 0 ;
2024-05-10 18:35:05 +07:00
const csess = { } ; // this is used for relays.
const userRelays = { } ; // per ID contains Set() of <WebSocket>
2024-05-10 13:18:05 +07:00
const idleSess = new Set ( ) ;
let stats = {
_global : {
raw _rx : 0 ,
rx : 0 ,
tx : 0 ,
f : 0
}
} ;
parentPort . on ( 'message' , m => {
switch ( m . type ) {
case "getsess" :
// [<ident>, <user info>]
getIdleSess ( m . ident , m . data ) ;
break ;
case "req" : {
2024-05-10 18:35:05 +07:00
if ( ! csess . hasOwnProperty ( m . id ) ) return ;
const ws = csess [ m . id ] ;
2024-05-10 13:18:05 +07:00
2024-05-10 18:35:05 +07:00
if ( ( max _client _subs !== - 1 ) && ( Object . keys ( ws . subs ) . length > max _client _subs ) )
2024-05-10 13:18:05 +07:00
return parentPort . postMessage ( {
type : "upstream_msg" ,
id : m . id ,
data : JSON . stringify ( [ "CLOSED" , data [ 1 ] , "rate-limited: too many subscriptions." ] )
} ) ;
const origID = m . sid ;
2024-05-10 18:35:05 +07:00
if ( ws . fakesubalias . hasOwnProperty ( origID ) ) {
const faked = ws . fakesubalias [ origID ] ;
delete ws . subs [ origID ] ;
delete ws . events [ origID ] ;
delete ws . pendingEOSE [ origID ] ;
2024-05-10 13:18:05 +07:00
ws . pause _subs . delete ( origID ) ;
2024-05-10 18:35:05 +07:00
delete ws . fakesubalias [ origID ] ;
delete ws . subalias [ faked ] ;
delete ws . mergedFilters [ origID ] ;
2024-05-10 13:18:05 +07:00
bc ( [ "CLOSE" , faked ] , m . id ) ;
} ;
const faked = Date . now ( ) + Math . random ( ) . toString ( 36 ) ;
let filters = m . filters ;
let filter = mergeFilters ( ... filters ) ;
for ( const fn in filters ) {
if ( ! Array . isArray ( filters [ fn ] . kinds ) ) {
filters [ fn ] . kinds = ws . acceptKinds ;
continue ;
} else {
filters [ fn ] . kinds = filters [ fn ] . kinds ? . filter ( kind => {
if ( ws . rejectKinds && ws . rejectKinds . includes ( kind ) ) return false ;
if ( ws . acceptKinds && ! ws . acceptKinds . includes ( kind ) ) return false ;
return true ;
} ) ;
}
if ( filters [ fn ] . limit > ws . forcedLimit )
filters [ fn ] . limit = ws . forcedLimit ;
}
2024-05-10 18:35:05 +07:00
ws . subs [ origID ] = filters ;
ws . events [ origID ] = new Set ( ) ;
2024-05-10 13:18:05 +07:00
ws . pause _subs . delete ( origID ) ;
2024-05-10 18:35:05 +07:00
ws . subalias [ faked ] = origID ;
ws . fakesubalias [ origID ] = faked ;
2024-05-10 13:18:05 +07:00
if ( ! filter . since ) filter . since = Math . floor ( Date . now ( ) / 1000 ) ; // Will not impact everything. Only used for handling passing pause_on_limit (or save mode)
2024-05-10 18:35:05 +07:00
ws . mergedFilters [ origID ] = filter ;
2024-05-10 13:18:05 +07:00
bc ( [ "REQ" , faked , ... filters ] , m . id ) ;
if ( filter . limit < 1 ) return parentPort . postMessage ( {
type : "upstream_msg" ,
id : m . id ,
data : JSON . stringify ( [ "EOSE" , origID ] )
} ) ;
2024-05-10 18:35:05 +07:00
ws . pendingEOSE [ origID ] = 0 ;
2024-05-10 13:18:05 +07:00
break ;
}
case "close" : {
2024-05-10 18:35:05 +07:00
if ( ! csess . hasOwnProperty ( m . id ) ) return ;
const ws = csess [ m . id ] ;
if ( ! ws . fakesubalias . hasOwnProperty ( m . sid ) ) return ;
2024-05-10 13:18:05 +07:00
const origID = m . sid ;
2024-05-10 18:35:05 +07:00
const faked = ws . fakesubalias [ origID ] ;
delete ws . subs [ origID ] ;
delete ws . events [ origID ] ;
delete ws . pendingEOSE [ origID ] ;
2024-05-10 13:18:05 +07:00
ws . pause _subs . delete ( origID ) ;
2024-05-10 18:35:05 +07:00
delete ws . fakesubalias [ origID ] ;
delete ws . subalias [ faked ] ;
delete ws . mergedFilters [ origID ] ;
2024-05-10 13:18:05 +07:00
bc ( [ "CLOSE" , faked ] , m . id ) ;
parentPort . postMessage ( {
type : "upstream_msg" ,
id : m . id ,
data : JSON . stringify ( [ "CLOSED" , origID , "" ] )
} ) ;
break ;
}
case "event" : {
2024-05-10 18:35:05 +07:00
if ( ! csess . hasOwnProperty ( m . id ) ) return ;
const ws = csess [ m . id ] ;
2024-05-10 13:18:05 +07:00
bc ( [ "EVENT" , m . event ] , m . id ) ;
parentPort . postMessage ( {
type : "upstream_msg" ,
id : m . id ,
data : JSON . stringify ( [ "OK" , m . event . id , true , "" ] )
} ) ;
break ;
}
case "destroy" :
2024-05-10 18:35:05 +07:00
if ( ! csess . hasOwnProperty ( m . id ) ) return ;
2024-05-10 13:18:05 +07:00
2024-05-10 18:35:05 +07:00
for ( const sock of userRelays [ m . id ] ) {
2024-05-10 13:18:05 +07:00
sock . terminate ( ) ;
}
2024-05-10 18:35:05 +07:00
delete userRelays [ m . id ] ;
delete csess [ m . id ] ;
2024-05-10 13:18:05 +07:00
break ;
case "auth" :
2024-05-10 18:35:05 +07:00
if ( ! csess . hasOwnProperty ( m . id ) ) return ;
csess [ m . id ] . pubkey = m . pubkey ;
2024-05-10 13:18:05 +07:00
if ( m . pubkey && private _keys [ m . pubkey ] ) {
2024-05-10 18:35:05 +07:00
for ( const relay of userRelays [ m . id ] ) {
2024-05-10 13:18:05 +07:00
for ( const challenge of relay . pendingNIP42 ) {
nip42 ( relay , m . pubkey , private _keys [ m . pubkey ] , challenge ) ;
relay . pendingNIP42 . delete ( challenge ) ;
}
}
}
break ;
}
parentPort . postMessage ( {
type : "stats" ,
data : stats
} ) ;
} ) ;
// WS - New session for client $id
// mostly for new idle session.
function newsess ( ) {
const id = Date . now ( ) + "_" + threadId + "_" + Math . random ( ) ;
const shift = loadbalancer . shift ( ) ;
loadbalancer . push ( shift ) ;
2024-05-10 18:35:05 +07:00
userRelays [ id ] = new Set ( ) ;
csess [ id ] = null ;
2024-05-10 13:18:05 +07:00
idleSess . add ( id ) ;
if ( cache _relays ) {
for ( const url of cache _relays ) {
newConn ( url , id ) ;
}
}
switch ( shift ) {
case "_me" :
for ( const url of relays ) {
newConn ( url , id ) ;
}
break ;
default :
newConn ( shift , id ) ;
break ;
}
}
// WS - Broadcast message to every existing sockets
function bc ( msg , id , toCacheOnly ) {
if ( toCacheOnly && ! cache _relays ? . length ) return ;
2024-05-10 18:35:05 +07:00
for ( const relay of userRelays [ id ] ) {
2024-05-10 13:18:05 +07:00
if ( relay . readyState !== 1 ) continue ;
if ( toCacheOnly && ! relay . isCache ) continue ;
// skip the ratelimit after <config.upstream_ratelimit_expiration>
if ( ( upstream _ratelimit _expiration ) > ( Date . now ( ) - relay . ratelimit ) ) continue ;
relay . send ( JSON . stringify ( msg ) ) ;
}
}
function getIdleSess ( ident , infos ) {
const ws = { } ;
2024-05-10 18:35:05 +07:00
ws . subs = { } ; // contains filter submitted by clients. per subID
2024-05-10 13:18:05 +07:00
ws . pause _subs = new Set ( ) ; // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
2024-05-10 18:35:05 +07:00
ws . events = { } ; // only to prevent the retransmit of the same event. per subID
ws . pendingEOSE = { } ; // each contain subID
2024-05-10 13:18:05 +07:00
ws . reconnectTimeout = new Set ( ) ; // relays timeout() before reconnection. Only use after client disconnected.
2024-05-10 18:35:05 +07:00
ws . subalias = { } ;
ws . fakesubalias = { } ;
ws . mergedFilters = { } ;
2024-05-10 13:18:05 +07:00
// handled in bouncer.js
ws . ip = null ;
ws . pubkey = null ;
ws . rejectKinds = null ;
ws . acceptKinds = null ;
ws . forcedLimit = null ;
ws . accurateMode = 0 ;
ws . saveMode = 0 ;
for ( const i in infos ) {
ws [ i ] = infos [ i ] ;
}
if ( ws . pubkey && private _keys [ ws . pubkey ] ) {
2024-05-10 18:35:05 +07:00
for ( const relay of userRelays [ ws . id ] ) {
2024-05-10 13:18:05 +07:00
for ( const challenge of relay . pendingNIP42 ) {
nip42 ( relay , ws . pubkey , private _keys [ ws . pubkey ] , challenge ) ;
relay . pendingNIP42 . delete ( challenge ) ;
}
}
}
ws . id = idleSess . values ( ) . next ( ) . value ;
idleSess . delete ( ws . id ) ;
2024-05-10 18:35:05 +07:00
csess [ ws . id ] = ws ;
2024-05-10 13:18:05 +07:00
parentPort . postMessage ( {
type : "sessreg" ,
ident ,
id : ws . id
} ) ;
if ( log _about _relays ) console . log ( threadId , "---" , ws . ip , "is now using session" , ws . id ) ;
newsess ( ) ;
}
function _matchFilters ( filters , event ) {
// nostr-tools being randomly throw error in their own code. Put safety.
try {
return matchFilters ( filters , event ) ;
} catch {
return false ;
}
}
function relay _type ( addr ) {
switch ( true ) {
case relays . includes ( addr ) :
return "relay" ;
break ;
case cache _relays . includes ( addr ) :
return "cache_relay" ;
break ;
case loadbalancer . includes ( addr ) :
return "loadbalancer" ;
break ;
}
}
// WS - Sessions
function newConn ( addr , id , reconn _t = 0 ) {
2024-05-10 18:35:05 +07:00
if ( ! csess . hasOwnProperty ( id ) ) return ;
2024-05-10 13:18:05 +07:00
if ( ! stats [ addr ] ) stats [ addr ] = { raw _rx : 0 , rx : 0 , tx : 0 , f : 0 } ;
const relay = new WebSocket ( addr , {
headers : {
"User-Agent" : ` Bostr ${ version } ; The nostr relay bouncer; https://github.com/Yonle/bostr ` ,
2024-05-11 10:30:08 +07:00
}
2024-05-10 13:18:05 +07:00
} ) ;
relay . isCache = relay _type ( addr ) === "cache_relay" ;
relay . isLoadBalancer = relay _type ( addr ) === "loadbalancer" ;
relay . ratelimit = 0 ;
relay . pendingNIP42 = new Set ( ) ;
relay . on ( 'open' , _ => {
2024-05-10 18:35:05 +07:00
if ( ! csess . hasOwnProperty ( id ) ) return relay . terminate ( ) ;
const client = csess [ id ] ;
2024-05-10 13:18:05 +07:00
reconn _t = 0 ;
if ( log _about _relays ) console . log ( threadId , "---" , id , "Connected to" , addr , ` ( ${ relay _type ( addr ) } ) ` ) ;
if ( ! client ) return ;
2024-05-10 18:35:05 +07:00
for ( const i in client . subs ) {
relay . send ( JSON . stringify ( [ "REQ" , client . fakesubalias [ i ] , ... client . subs [ i ] ] ) ) ;
2024-05-10 13:18:05 +07:00
}
} ) ;
relay . on ( 'message' , data => {
try {
data = JSON . parse ( data ) ;
} catch ( error ) {
return ;
}
2024-05-10 18:35:05 +07:00
const client = csess [ id ] ;
2024-05-10 13:18:05 +07:00
if ( ! client ) return ;
switch ( data [ 0 ] ) {
case "EVENT" : {
stats . _global . raw _rx ++ ;
stats [ addr ] . raw _rx ++ ;
if ( data . length < 3 || typeof ( data [ 1 ] ) !== "string" || typeof ( data [ 2 ] ) !== "object" ) return ;
2024-05-10 18:35:05 +07:00
if ( ! client . subalias . hasOwnProperty ( data [ 1 ] ) ) return ;
data [ 1 ] = client . subalias [ data [ 1 ] ] ;
2024-05-10 13:18:05 +07:00
2024-05-10 18:35:05 +07:00
if ( client . events [ data [ 1 ] ] . hasOwnProperty ( data [ 2 ] ? . id ) ) return ; // No need to transmit once it has been transmitted before.
2024-05-10 13:18:05 +07:00
if ( ! relay . isCache ) bc ( [ "EVENT" , data [ 2 ] ] , id , true ) ; // store to cache relay
2024-05-10 18:35:05 +07:00
const filter = client . mergedFilters [ data [ 1 ] ] ;
2024-05-10 13:18:05 +07:00
if ( client . pause _subs . has ( data [ 1 ] ) && ( filter . since > data [ 2 ] . created _at ) && ! relay . isCache ) return ;
if ( client . rejectKinds && client . rejectKinds . includes ( data [ 2 ] ? . id ) ) return ;
2024-05-10 18:35:05 +07:00
const filters = client . subs [ data [ 1 ] ] ;
2024-05-10 13:18:05 +07:00
if ( ! _matchFilters ( filters , data [ 2 ] ) ) return ;
const NotInSearchQuery = "search" in filter && ! data [ 2 ] ? . content ? . toLowerCase ( ) . includes ( filter . search . toLowerCase ( ) ) ;
if ( NotInSearchQuery ) return ;
2024-05-10 18:35:05 +07:00
if ( ! relay . isLoadBalancer ) client . events [ data [ 1 ] ] . add ( data [ 2 ] ? . id ) ;
2024-05-10 13:18:05 +07:00
parentPort . postMessage ( { type : "upstream_msg" , id , data : JSON . stringify ( data ) } ) ;
stats . _global . rx ++ ;
stats [ addr ] . rx ++ ;
// Now count for REQ limit requested by client.
// If it's at the limit, Send EOSE to client and delete pendingEOSE of subID
// Skip if EOSE has been omitted
2024-05-10 18:35:05 +07:00
if ( ! client . pendingEOSE . hasOwnProperty ( data [ 1 ] ) || client . pause _subs . has ( data [ 1 ] ) || relay . isLoadBalancer ) return ;
2024-05-10 13:18:05 +07:00
const limit = getFilterLimit ( filter ) ;
if ( limit === Infinity ) return ;
2024-05-10 18:35:05 +07:00
if ( client . events [ data [ 1 ] ] . size >= limit ) {
2024-05-10 13:18:05 +07:00
// Once reached to <filter.limit>, send EOSE to client.
parentPort . postMessage ( { type : "upstream_msg" , id , data : JSON . stringify ( [ "EOSE" , data [ 1 ] ] ) } ) ;
if ( ! client . accurateMode && ( client . saveMode || pause _on _limit ) ) {
client . pause _subs . add ( data [ 1 ] ) ;
} else {
2024-05-10 18:35:05 +07:00
delete client . pendingEOSE [ data [ 1 ] ] ;
2024-05-10 13:18:05 +07:00
}
}
break ;
}
case "EOSE" :
2024-05-10 18:35:05 +07:00
if ( ! client . subalias . hasOwnProperty ( data [ 1 ] ) ) return ;
data [ 1 ] = client . subalias [ data [ 1 ] ] ;
if ( ! client . pendingEOSE . hasOwnProperty ( data [ 1 ] ) && ! relay . isLoadBalancer ) return ;
client . pendingEOSE [ data [ 1 ] ] ++ ;
2024-05-10 13:18:05 +07:00
2024-05-10 18:35:05 +07:00
if ( log _about _relays ) console . log ( threadId , "---" , id , ` got EOSE from ${ addr } for ${ data [ 1 ] } . There are ${ client . pendingEOSE [ data [ 1 ] ] } EOSE received out of ${ userRelays [ id ] . size } connected relays. ` ) ;
2024-05-10 13:18:05 +07:00
2024-05-10 18:35:05 +07:00
if ( ! relay . isCache && ( wait _eose && ( ( client . pendingEOSE [ data [ 1 ] ] < max _eose _score ) || ( client . pendingEOSE [ data [ 1 ] ] < userRelays [ id ] . size ) ) ) ) return ;
if ( relay . isCache && ! client . events [ data [ 1 ] ] . size ) return ; // if cache relays did not send anything but EOSE, Don't send EOSE yet.
delete client . pendingEOSE [ data [ 1 ] ] ;
2024-05-10 13:18:05 +07:00
if ( client . pause _subs . has ( data [ 1 ] ) && ! relay . isLoadBalancer ) {
client . pause _subs . delete ( data [ 1 ] ) ;
} else {
parentPort . postMessage ( { type : "upstream_msg" , id , data : JSON . stringify ( data ) } ) ;
}
break ;
case "AUTH" :
if ( ! private _keys || typeof ( data [ 1 ] ) !== "string" || ! client . pubkey ) return relay . pendingNIP42 . add ( data [ 1 ] ) ;
nip42 ( relay , client . pubkey , private _keys [ client . pubkey ] , data [ 1 ] ) ;
break ;
case "NOTICE" :
if ( typeof ( data [ 1 ] ) !== "string" ) return ;
if ( data [ 1 ] . startsWith ( "rate-limited" ) ) relay . ratelimit = Date . now ( ) ;
if ( log _about _relays ) console . log ( threadId , id , addr , data [ 0 ] , data [ 1 ] ) ;
stats . _global . f ++
stats [ addr ] . f ++
break ;
case "CLOSED" :
if ( ( typeof ( data [ 1 ] ) !== "string" ) || ( typeof ( data [ 2 ] ) !== "string" ) ) return ;
if ( data [ 2 ] . startsWith ( "rate-limited" ) ) relay . ratelimit = Date . now ( ) ;
if ( log _about _relays ) console . log ( threadId , id , addr , data [ 0 ] , data [ 1 ] , data [ 2 ] ) ;
if ( data [ 2 ] . length ) {
stats . _global . f ++ ;
stats [ addr ] . f ++ ;
}
2024-05-10 18:35:05 +07:00
if ( client . pendingEOSE . hasOwnProperty ( data [ 1 ] ) ) client . pendingEOSE [ data [ 1 ] ] ++ ;
2024-05-10 13:18:05 +07:00
break ;
case "OK" :
if ( ( typeof ( data [ 1 ] ) !== "string" ) || ( typeof ( data [ 2 ] ) !== "boolean" ) || ( typeof ( data [ 3 ] ) !== "string" ) ) return ;
if ( data [ 3 ] . startsWith ( "rate-limited" ) ) relay . ratelimit = Date . now ( ) ;
if ( log _about _relays ) console . log ( threadId , id , addr , data [ 0 ] , data [ 1 ] , data [ 2 ] , data [ 3 ] ) ;
switch ( data [ 2 ] ) {
case true :
stats . _global . tx ++ ;
stats [ addr ] . tx ++ ;
case false :
stats . _global . f ++
stats [ addr ] . f ++
}
break ;
}
} ) ;
relay . on ( 'error' , _ => {
if ( log _about _relays ) console . error ( threadId , "-!-" , id , addr , _ . toString ( ) )
} ) ;
relay . on ( 'close' , _ => {
2024-05-10 18:35:05 +07:00
if ( ! userRelays . hasOwnProperty ( id ) ) return ;
userRelays [ id ] . delete ( relay ) ; // Remove this socket session from <client.relays> list
2024-05-10 13:18:05 +07:00
if ( log _about _relays ) console . log ( threadId , "-!-" , id , "Disconnected from" , addr , ` ( ${ relay _type ( addr ) } ) ` ) ;
reconn _t += reconnect _time || 5000
setTimeout ( _ => {
newConn ( addr , id , reconn _t ) ;
} , reconn _t ) ;
stats . _global . f ++
stats [ addr ] . f ++
} ) ;
relay . on ( 'unexpected-response' , ( req , res ) => {
2024-05-10 18:35:05 +07:00
if ( ! userRelays . hasOwnProperty ( id ) ) return ;
userRelays [ id ] . delete ( relay ) ;
2024-05-10 13:18:05 +07:00
if ( res . statusCode >= 500 ) return relay . emit ( "close" , null ) ;
relays = relays . filter ( _ => _ != addr ) ;
console . log ( threadId , "-!-" , ` ${ addr } give status code ${ res . statusCode } . Not (re)connect with new session again. ` ) ;
stats . _global . f ++
stats [ addr ] . f ++
} ) ;
2024-05-10 18:35:05 +07:00
userRelays [ id ] . add ( relay ) ; // Add this socket session to <client.relays>
2024-05-10 13:18:05 +07:00
}
for ( let i = 1 ; i <= ( idle _sessions || 1 ) ; i ++ ) {
newsess ( ) ;
}