bouncer: improve session & idents handling

Signed-off-by: Yonle <yonle@lecturify.net>
This commit is contained in:
Yonle
2024-05-11 10:59:10 +07:00
parent 6bf93d5be6
commit bc4784bfe0
2 changed files with 24 additions and 28 deletions

View File

@@ -26,7 +26,7 @@ if (approved_publishers?.length) {
const worker = new Worker(__dirname + "/worker_bouncer.js", { name: "Bostr (worker)" }); const worker = new Worker(__dirname + "/worker_bouncer.js", { name: "Bostr (worker)" });
const csess = {}; // this is used for relays. const csess = {}; // this is used for relays.
const ident = {}; const idents = {};
let zeroStats = { let zeroStats = {
raw_rx: 0, raw_rx: 0,
@@ -54,7 +54,7 @@ function handleConnection(ws, req) {
ws.accurateMode = parseInt(query.accurate); ws.accurateMode = parseInt(query.accurate);
ws.saveMode = parseInt(query.save); ws.saveMode = parseInt(query.save);
ident[ws.ident] = ws; idents[ws.ident] = ws;
if (noscraper || authorized_keys?.length) { if (noscraper || authorized_keys?.length) {
authKey = Date.now() + Math.random().toString(36); authKey = Date.now() + Math.random().toString(36);
@@ -96,9 +96,9 @@ function handleConnection(ws, req) {
) return ws.send(JSON.stringify(["OK", data[1]?.id, false, "rejected: unauthorized"])); ) return ws.send(JSON.stringify(["OK", data[1]?.id, false, "rejected: unauthorized"]));
if (!sessStarted) { if (!sessStarted) {
sessStarted = true;
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`); console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
await getIdleSess(ws); await getIdleSess(ws);
sessStarted = true;
} }
_event(ws.id, data[1]); _event(ws.id, data[1]);
@@ -113,9 +113,9 @@ function handleConnection(ws, req) {
} }
if (!sessStarted) { if (!sessStarted) {
sessStarted = true;
console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`); console.log(process.pid, `>>>`, `${ws.ip} executed ${data[0]} command for the first. Initializing session`);
await getIdleSess(ws); await getIdleSess(ws);
sessStarted = true;
} }
_req(ws.id, data[1], data.slice(2)); _req(ws.id, data[1], data.slice(2));
@@ -144,7 +144,7 @@ function handleConnection(ws, req) {
ws.on('error', console.error); ws.on('error', console.error);
ws.on('close', _ => { ws.on('close', _ => {
delete ident[ws.ident]; delete idents[ws.ident];
console.log(process.pid, "---", `${ws.ip} disconnected`); console.log(process.pid, "---", `${ws.ip} disconnected`);
@@ -157,12 +157,12 @@ function handleConnection(ws, req) {
function handleWorker(msg) { function handleWorker(msg) {
switch (msg.type) { switch (msg.type) {
case "sessreg": { case "sessreg": {
if (!ident.hasOwnProperty(msg.ident)) return _destroy(msg.id); if (!idents.hasOwnProperty(msg.ident)) return _destroy(msg.id);
const ws = ident[msg.ident]; if (idents[msg.ident].id === msg.id) return ws.onready(); // if existing is the same as the current one, just poke ready.
const ws = idents[msg.ident];
ws.id = msg.id; ws.id = msg.id;
ws.onready(); ws.onready();
csess[msg.id] = ws; csess[msg.id] = ws;
delete ident[msg.ident];
break; break;
} }
case "upstream_msg": case "upstream_msg":
@@ -219,6 +219,7 @@ function _destroy(id) {
function getIdleSess(ws) { function getIdleSess(ws) {
const data = { const data = {
ip: ws.ip, ip: ws.ip,
ident: ws.ident,
pubkey: ws.pubkey, pubkey: ws.pubkey,
rejectKinds: ws.rejectKinds, rejectKinds: ws.rejectKinds,
acceptKinds: ws.acceptKinds, acceptKinds: ws.acceptKinds,
@@ -229,7 +230,6 @@ function getIdleSess(ws) {
worker.postMessage({ worker.postMessage({
type: "getsess", type: "getsess",
ident: ws.ident,
data data
}); });

View File

@@ -20,6 +20,7 @@ if (relays.length < max_eose_score) max_eose_score = 0;
const csess = {}; // this is used for relays. const csess = {}; // this is used for relays.
const userRelays = {}; // per ID contains Set() of <WebSocket> const userRelays = {}; // per ID contains Set() of <WebSocket>
const idleSess = new Set(); const idleSess = new Set();
const idents = {};
let stats = { let stats = {
_global: { _global: {
@@ -33,8 +34,7 @@ let stats = {
parentPort.on('message', m => { parentPort.on('message', m => {
switch (m.type) { switch (m.type) {
case "getsess": case "getsess":
// [<ident>, <user info>] getIdleSess(m.data);
getIdleSess(m.ident, m.data);
break; break;
case "req": { case "req": {
if (!csess.hasOwnProperty(m.id)) return; if (!csess.hasOwnProperty(m.id)) return;
@@ -139,6 +139,7 @@ parentPort.on('message', m => {
delete userRelays[m.id]; delete userRelays[m.id];
delete csess[m.id]; delete csess[m.id];
delete idents[m.ident];
break; break;
case "auth": case "auth":
if (!csess.hasOwnProperty(m.id)) return; if (!csess.hasOwnProperty(m.id)) return;
@@ -204,8 +205,15 @@ function bc(msg, id, toCacheOnly) {
} }
} }
function getIdleSess(ident, infos) { function getIdleSess(ws) {
const ws = {}; if (idents.hasOwnProperty(ws.ident)) {
return parentPort.postMessage({
type: "sessreg",
ident: ws.ident,
id: idents[ws.ident].id
});
}
ws.subs = {}; // contains filter submitted by clients. per subID ws.subs = {}; // contains filter submitted by clients. per subID
ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID ws.pause_subs = new Set(); // pause subscriptions from receiving events after reached over <filter.limit> until all relays send EOSE. per subID
ws.events = {}; // only to prevent the retransmit of the same event. per subID ws.events = {}; // only to prevent the retransmit of the same event. per subID
@@ -215,19 +223,6 @@ function getIdleSess(ident, infos) {
ws.fakesubalias = {}; ws.fakesubalias = {};
ws.mergedFilters = {}; ws.mergedFilters = {};
// handled in bouncer.js
ws.ip = null;
ws.pubkey = null;
ws.rejectKinds = null;
ws.acceptKinds = null;
ws.forcedLimit = null;
ws.accurateMode = 0;
ws.saveMode = 0;
for (const i in infos) {
ws[i] = infos[i];
}
if (ws.pubkey && private_keys[ws.pubkey]) { if (ws.pubkey && private_keys[ws.pubkey]) {
for (const relay of userRelays[ws.id]) { for (const relay of userRelays[ws.id]) {
for (const challenge of relay.pendingNIP42) { for (const challenge of relay.pendingNIP42) {
@@ -240,14 +235,15 @@ function getIdleSess(ident, infos) {
ws.id = idleSess.values().next().value; ws.id = idleSess.values().next().value;
idleSess.delete(ws.id); idleSess.delete(ws.id);
csess[ws.id] = ws; csess[ws.id] = ws;
idents[ws.ident] = ws;
parentPort.postMessage({ parentPort.postMessage({
type: "sessreg", type: "sessreg",
ident, ident: ws.ident,
id: ws.id id: ws.id
}); });
if (log_about_relays) console.log(threadId, "---", ws.ip, "is now using session", ws.id); console.log(threadId, "---", ws.ip, "is now using session", ws.id);
newsess(); newsess();
} }