mirror of
https://github.com/Yonle/bostr.git
synced 2025-10-05 18:32:37 +02:00
bouncer: add caching feature back.
Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
13
bouncer.js
13
bouncer.js
@@ -6,7 +6,7 @@ const { validateEvent, nip19, matchFilters, mergeFilters, getFilterLimit } = req
|
|||||||
const auth = require("./auth.js");
|
const auth = require("./auth.js");
|
||||||
const nip42 = require("./nip42.js");
|
const nip42 = require("./nip42.js");
|
||||||
|
|
||||||
let { relays, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions } = require(process.env.BOSTR_CONFIG_PATH || "./config");
|
let { relays, approved_publishers, blocked_publishers, log_about_relays, authorized_keys, private_keys, reconnect_time, wait_eose, pause_on_limit, max_eose_score, broadcast_ratelimit, upstream_ratelimit_expiration, max_client_subs, idle_sessions, cache_relays } = require(process.env.BOSTR_CONFIG_PATH || "./config");
|
||||||
|
|
||||||
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
|
log_about_relays = process.env.LOG_ABOUT_RELAYS || log_about_relays;
|
||||||
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
authorized_keys = authorized_keys?.map(i => i.startsWith("npub") ? nip19.decode(i).data : i);
|
||||||
@@ -164,6 +164,7 @@ module.exports = (ws, req, onClose) => {
|
|||||||
ws.send(JSON.stringify(["CLOSED", origID, ""]));
|
ws.send(JSON.stringify(["CLOSED", origID, ""]));
|
||||||
break;
|
break;
|
||||||
case "AUTH":
|
case "AUTH":
|
||||||
|
if (authorized) return;
|
||||||
if (auth(authKey, data[1], ws, req)) {
|
if (auth(authKey, data[1], ws, req)) {
|
||||||
ws.pubkey = data[1].pubkey;
|
ws.pubkey = data[1].pubkey;
|
||||||
console.log(process.pid, "---", ws.ip, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
console.log(process.pid, "---", ws.ip, "successfully authorized as", ws.pubkey, private_keys[ws.pubkey] ? "(admin)" : "(user)");
|
||||||
@@ -201,13 +202,15 @@ function newsess() {
|
|||||||
userRelays.set(id, new Set());
|
userRelays.set(id, new Set());
|
||||||
csess.set(id, null);
|
csess.set(id, null);
|
||||||
idleSess.add(id);
|
idleSess.add(id);
|
||||||
|
cache_relays?.forEach(_ => newConn(_, id));
|
||||||
relays.forEach(_ => newConn(_, id));
|
relays.forEach(_ => newConn(_, id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// WS - Broadcast message to every existing sockets
|
// WS - Broadcast message to every existing sockets
|
||||||
function bc(msg, id) {
|
function bc(msg, id, toCacheOnly) {
|
||||||
for (const sock of userRelays.get(id)) {
|
for (const sock of userRelays.get(id)) {
|
||||||
if (sock.readyState !== 1) continue;
|
if (sock.readyState !== 1) continue;
|
||||||
|
if (toCacheOnly && !sock.isCache) continue
|
||||||
|
|
||||||
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
// skip the ratelimit after <config.upstream_ratelimit_expiration>
|
||||||
if ((upstream_ratelimit_expiration) > (Date.now() - sock.ratelimit)) continue;
|
if ((upstream_ratelimit_expiration) > (Date.now() - sock.ratelimit)) continue;
|
||||||
@@ -236,6 +239,7 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
allowSynchronousEvents: true
|
allowSynchronousEvents: true
|
||||||
});
|
});
|
||||||
|
|
||||||
|
relay.isCache = cache_relays?.includes(addr);
|
||||||
relay.ratelimit = 0;
|
relay.ratelimit = 0;
|
||||||
relay.on('open', _ => {
|
relay.on('open', _ => {
|
||||||
if (!csess.has(id)) return relay.terminate();
|
if (!csess.has(id)) return relay.terminate();
|
||||||
@@ -267,9 +271,10 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
case "EVENT": {
|
case "EVENT": {
|
||||||
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
if (data.length < 3 || typeof(data[1]) !== "string" || typeof(data[2]) !== "object") return;
|
||||||
if (!client.subalias.has(data[1])) return;
|
if (!client.subalias.has(data[1])) return;
|
||||||
|
if (!relay.isCache) bc(["EVENT", data[2]], id, true); // store to cache relay
|
||||||
data[1] = client.subalias.get(data[1]);
|
data[1] = client.subalias.get(data[1]);
|
||||||
const filter = client.mergedFilters.get(data[1]);
|
const filter = client.mergedFilters.get(data[1]);
|
||||||
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at)) return;
|
if (client.pause_subs.has(data[1]) && (filter.since > data[2].created_at) && !relay.isCache) return;
|
||||||
|
|
||||||
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
|
if (client.rejectKinds && client.rejectKinds.includes(data[2]?.id)) return;
|
||||||
|
|
||||||
@@ -293,6 +298,7 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
if (client.events.get(data[1]).size >= limit) {
|
if (client.events.get(data[1]).size >= limit) {
|
||||||
// Once reached to <filter.limit>, send EOSE to client.
|
// Once reached to <filter.limit>, send EOSE to client.
|
||||||
client.send(JSON.stringify(["EOSE", data[1]]));
|
client.send(JSON.stringify(["EOSE", data[1]]));
|
||||||
|
|
||||||
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
|
if (!client.accurateMode && (client.saveMode || pause_on_limit)) {
|
||||||
client.pause_subs.add(data[1]);
|
client.pause_subs.add(data[1]);
|
||||||
} else {
|
} else {
|
||||||
@@ -306,6 +312,7 @@ function newConn(addr, id, reconn_t = 0) {
|
|||||||
data[1] = client.subalias.get(data[1]);
|
data[1] = client.subalias.get(data[1]);
|
||||||
if (!client.pendingEOSE.has(data[1])) return;
|
if (!client.pendingEOSE.has(data[1])) return;
|
||||||
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
client.pendingEOSE.set(data[1], client.pendingEOSE.get(data[1]) + 1);
|
||||||
|
|
||||||
if (log_about_relays) console.log(process.pid, "---", id, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`);
|
if (log_about_relays) console.log(process.pid, "---", id, `got EOSE from ${relay.url} for ${data[1]}. There are ${client.pendingEOSE.get(data[1])} EOSE received out of ${userRelays.get(id).size} connected relays.`);
|
||||||
|
|
||||||
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < userRelays.get(id).size))) return;
|
if (wait_eose && ((client.pendingEOSE.get(data[1]) < max_eose_score) || (client.pendingEOSE.get(data[1]) < userRelays.get(id).size))) return;
|
||||||
|
@@ -149,5 +149,16 @@ module.exports = {
|
|||||||
"wss://example2.com",
|
"wss://example2.com",
|
||||||
// "wss://example3.com",
|
// "wss://example3.com",
|
||||||
// ...and so on
|
// ...and so on
|
||||||
|
],
|
||||||
|
|
||||||
|
// Cache relays - Store received events to cache relay(s) (Optional).
|
||||||
|
// Could improve the speed of event deliveries.
|
||||||
|
//
|
||||||
|
// Ensure that the cache relay does not has websocket ratelimit being set.
|
||||||
|
// CAUTION: - Cache relay is intensive in storing events. Watch for CPU usage.
|
||||||
|
// - Only works best with only 1 cache relay.
|
||||||
|
// - Things may not work properly if you configure more than just a single cache relays.
|
||||||
|
cache_relays: [
|
||||||
|
// "ws://localhost:3000"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user