From b5987915b1b6d259387649b94fdf369614e8a45e Mon Sep 17 00:00:00 2001 From: Ricardo Arturo Cabral Mejia Date: Sun, 7 Aug 2022 19:46:48 +0000 Subject: [PATCH] chore: refactor into adapter & handlers --- .eslintrc.js | 25 +- .prettierrc | 7 - ...220524_191000_add_events_added_function.js | 18 - ..._191100_add_events_after_insert_trigger.js | 13 - package-lock.json | 106 ------ package.json | 3 - seeds/events.json | 295 +++++++++++++++ src/client.ts | 23 ++ src/database/client.ts | 21 +- src/handlers/event-message-handler.ts | 34 ++ src/handlers/subscribe-message-handler.ts | 63 ++++ src/handlers/unsubscribe-message-handler.ts | 24 ++ src/index.ts | 353 +++++++++--------- src/relay/web-server-adapter.ts | 51 +++ src/relay/web-socket-server-adapter.ts | 125 +++++++ src/repositories/event-repository.ts | 15 +- src/schemas/event-schema.ts | 1 + src/types/message-handlers.ts | 12 + src/types/messages.ts | 4 +- src/types/repositories.ts | 2 +- src/types/servers.ts | 14 + 21 files changed, 838 insertions(+), 371 deletions(-) delete mode 100644 .prettierrc delete mode 100644 migrations/20220524_191000_add_events_added_function.js delete mode 100644 migrations/20220524_191100_add_events_after_insert_trigger.js create mode 100644 src/client.ts create mode 100644 src/handlers/event-message-handler.ts create mode 100644 src/handlers/subscribe-message-handler.ts create mode 100644 src/handlers/unsubscribe-message-handler.ts create mode 100644 src/relay/web-server-adapter.ts create mode 100644 src/relay/web-socket-server-adapter.ts create mode 100644 src/types/message-handlers.ts create mode 100644 src/types/servers.ts diff --git a/.eslintrc.js b/.eslintrc.js index 34c5c23..822577f 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -1,22 +1,21 @@ module.exports = { - parser: '@typescript-eslint/parser', + parser: "@typescript-eslint/parser", parserOptions: { - sourceType: 'module', + sourceType: "module", }, - plugins: ['@typescript-eslint/eslint-plugin'], - extends: [ - 'plugin:@typescript-eslint/recommended', - 'plugin:prettier/recommended', - ], + plugins: ["@typescript-eslint/eslint-plugin"], + extends: ["plugin:@typescript-eslint/recommended"], root: true, env: { node: true, }, - ignorePatterns: ['.eslintrc.js', 'dist', 'tslint.json', 'node_modules'], + ignorePatterns: [".eslintrc.js", "dist", "tslint.json", "node_modules"], rules: { - '@typescript-eslint/interface-name-prefix': 'off', - '@typescript-eslint/no-explicit-any': 'off', - '@typescript-eslint/no-unused-vars': ['error', { argsIgnorePattern: '^_' }], - 'no-console': 'off', + "@typescript-eslint/interface-name-prefix": "off", + "@typescript-eslint/no-explicit-any": "off", + "@typescript-eslint/no-unused-vars": ["error", { argsIgnorePattern: "^_" }], + "no-console": "off", + semi: ["error", "never"], + quotes: ["error", "single"] }, -} +}; diff --git a/.prettierrc b/.prettierrc deleted file mode 100644 index c389465..0000000 --- a/.prettierrc +++ /dev/null @@ -1,7 +0,0 @@ -{ - "singleQuote": true, - "trailingComma": "all", - "arrowParens": "always", - "tabWidth": 2, - "semi": false -} diff --git a/migrations/20220524_191000_add_events_added_function.js b/migrations/20220524_191000_add_events_added_function.js deleted file mode 100644 index d934b88..0000000 --- a/migrations/20220524_191000_add_events_added_function.js +++ /dev/null @@ -1,18 +0,0 @@ -exports.up = function (knex) { - return knex.raw( - `CREATE OR REPLACE FUNCTION event_added() - RETURNS trigger - LANGUAGE plpgsql - AS $function$ - BEGIN - perform pg_notify('event_added', row_to_json(NEW)::text); - return new; - END; - $function$ - ;`, - ) -} - -exports.down = function (knex) { - return knex.raw('DROP FUNCTION IF EXISTS event_added();') -} diff --git a/migrations/20220524_191100_add_events_after_insert_trigger.js b/migrations/20220524_191100_add_events_after_insert_trigger.js deleted file mode 100644 index bbb0ec1..0000000 --- a/migrations/20220524_191100_add_events_after_insert_trigger.js +++ /dev/null @@ -1,13 +0,0 @@ -exports.up = function (knex) { - return knex.raw( - `CREATE CONSTRAINT TRIGGER events_after_insert - AFTER INSERT - ON events - DEFERRABLE INITIALLY IMMEDIATE - FOR EACH ROW EXECUTE FUNCTION event_added();`, - ) -} - -exports.down = function (knex) { - return knex.raw('DROP TRIGGER IF EXISTS events_after_insert ON events') -} diff --git a/package-lock.json b/package-lock.json index 147a65b..e5c3870 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,10 +28,7 @@ "@typescript-eslint/parser": "^5.19.0", "chai": "^4.3.6", "eslint": "^8.13.0", - "eslint-config-prettier": "^8.5.0", - "eslint-plugin-prettier": "^4.0.0", "mocha": "^9.2.2", - "prettier": "^2.6.2", "rimraf": "^3.0.2", "sinon-chai": "^3.7.0", "ts-node": "^10.7.0", @@ -1027,39 +1024,6 @@ "url": "https://opencollective.com/eslint" } }, - "node_modules/eslint-config-prettier": { - "version": "8.5.0", - "resolved": "https://registry.npmjs.org/eslint-config-prettier/-/eslint-config-prettier-8.5.0.tgz", - "integrity": "sha512-obmWKLUNCnhtQRKc+tmnYuQl0pFU1ibYJQ5BGhTVB08bHe9wC8qUeG7c08dj9XX+AuPj1YSGSQIHl1pnDHZR0Q==", - "dev": true, - "bin": { - "eslint-config-prettier": "bin/cli.js" - }, - "peerDependencies": { - "eslint": ">=7.0.0" - } - }, - "node_modules/eslint-plugin-prettier": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/eslint-plugin-prettier/-/eslint-plugin-prettier-4.0.0.tgz", - "integrity": "sha512-98MqmCJ7vJodoQK359bqQWaxOE0CS8paAz/GgjaZLyex4TTk3g9HugoO89EqWCrFiOqn9EVvcoo7gZzONCWVwQ==", - "dev": true, - "dependencies": { - "prettier-linter-helpers": "^1.0.0" - }, - "engines": { - "node": ">=6.0.0" - }, - "peerDependencies": { - "eslint": ">=7.28.0", - "prettier": ">=2.0.0" - }, - "peerDependenciesMeta": { - "eslint-config-prettier": { - "optional": true - } - } - }, "node_modules/eslint-scope": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/eslint-scope/-/eslint-scope-5.1.1.tgz", @@ -1219,12 +1183,6 @@ "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", "dev": true }, - "node_modules/fast-diff": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/fast-diff/-/fast-diff-1.2.0.tgz", - "integrity": "sha512-xJuoT5+L99XlZ8twedaRf6Ax2TgQVxvgZOYoPKqZufmJib0tL2tegPBOZb1pVNgIhlqDlA0eO0c3wBvQcmzx4w==", - "dev": true - }, "node_modules/fast-glob": { "version": "3.2.11", "resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.2.11.tgz", @@ -2322,33 +2280,6 @@ "node": ">= 0.8.0" } }, - "node_modules/prettier": { - "version": "2.6.2", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-2.6.2.tgz", - "integrity": "sha512-PkUpF+qoXTqhOeWL9fu7As8LXsIUZ1WYaJiY/a7McAQzxjk82OF0tibkFXVCDImZtWxbvojFjerkiLb0/q8mew==", - "dev": true, - "bin": { - "prettier": "bin-prettier.js" - }, - "engines": { - "node": ">=10.13.0" - }, - "funding": { - "url": "https://github.com/prettier/prettier?sponsor=1" - } - }, - "node_modules/prettier-linter-helpers": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/prettier-linter-helpers/-/prettier-linter-helpers-1.0.0.tgz", - "integrity": "sha512-GbK2cP9nraSSUF9N2XwUwqfzlAFlMNYYl+ShE/V+H8a9uNl/oUqB1w2EL54Jh0OlyRSd8RfWYJ3coVS4TROP2w==", - "dev": true, - "dependencies": { - "fast-diff": "^1.1.2" - }, - "engines": { - "node": ">=6.0.0" - } - }, "node_modules/punycode": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.1.1.tgz", @@ -3924,22 +3855,6 @@ } } }, - "eslint-config-prettier": { - "version": "8.5.0", - "resolved": "https://registry.npmjs.org/eslint-config-prettier/-/eslint-config-prettier-8.5.0.tgz", - "integrity": "sha512-obmWKLUNCnhtQRKc+tmnYuQl0pFU1ibYJQ5BGhTVB08bHe9wC8qUeG7c08dj9XX+AuPj1YSGSQIHl1pnDHZR0Q==", - "dev": true, - "requires": {} - }, - "eslint-plugin-prettier": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/eslint-plugin-prettier/-/eslint-plugin-prettier-4.0.0.tgz", - "integrity": "sha512-98MqmCJ7vJodoQK359bqQWaxOE0CS8paAz/GgjaZLyex4TTk3g9HugoO89EqWCrFiOqn9EVvcoo7gZzONCWVwQ==", - "dev": true, - "requires": { - "prettier-linter-helpers": "^1.0.0" - } - }, "eslint-scope": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/eslint-scope/-/eslint-scope-5.1.1.tgz", @@ -4041,12 +3956,6 @@ "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", "dev": true }, - "fast-diff": { - "version": "1.2.0", - "resolved": "https://registry.npmjs.org/fast-diff/-/fast-diff-1.2.0.tgz", - "integrity": "sha512-xJuoT5+L99XlZ8twedaRf6Ax2TgQVxvgZOYoPKqZufmJib0tL2tegPBOZb1pVNgIhlqDlA0eO0c3wBvQcmzx4w==", - "dev": true - }, "fast-glob": { "version": "3.2.11", "resolved": "https://registry.npmjs.org/fast-glob/-/fast-glob-3.2.11.tgz", @@ -4856,21 +4765,6 @@ "integrity": "sha512-vkcDPrRZo1QZLbn5RLGPpg/WmIQ65qoWWhcGKf/b5eplkkarX0m9z8ppCat4mlOqUsWpyNuYgO3VRyrYHSzX5g==", "dev": true }, - "prettier": { - "version": "2.6.2", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-2.6.2.tgz", - "integrity": "sha512-PkUpF+qoXTqhOeWL9fu7As8LXsIUZ1WYaJiY/a7McAQzxjk82OF0tibkFXVCDImZtWxbvojFjerkiLb0/q8mew==", - "dev": true - }, - "prettier-linter-helpers": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/prettier-linter-helpers/-/prettier-linter-helpers-1.0.0.tgz", - "integrity": "sha512-GbK2cP9nraSSUF9N2XwUwqfzlAFlMNYYl+ShE/V+H8a9uNl/oUqB1w2EL54Jh0OlyRSd8RfWYJ3coVS4TROP2w==", - "dev": true, - "requires": { - "fast-diff": "^1.1.2" - } - }, "punycode": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.1.1.tgz", diff --git a/package.json b/package.json index f776027..ced06a6 100644 --- a/package.json +++ b/package.json @@ -43,10 +43,7 @@ "@typescript-eslint/parser": "^5.19.0", "chai": "^4.3.6", "eslint": "^8.13.0", - "eslint-config-prettier": "^8.5.0", - "eslint-plugin-prettier": "^4.0.0", "mocha": "^9.2.2", - "prettier": "^2.6.2", "rimraf": "^3.0.2", "sinon-chai": "^3.7.0", "ts-node": "^10.7.0", diff --git a/seeds/events.json b/seeds/events.json index f269d31..b521c69 100644 --- a/seeds/events.json +++ b/seeds/events.json @@ -1,4 +1,299 @@ [ + { + "id": "92242fb2c2d2c8228fad83d54caeaea3b7b596bd2413cbc840c91763e276edcb", + "pubkey": "32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245", + "created_at": 1659850395, + "kind": 3, + "tags": [ + [ + "p", + "6cad545430904b84a8101c5783b65043f19ae29d2da1076b8fc3e64892736f03", + "wss://nostr-pub.wellorder.net" + ], + [ + "p", + "22e804d26ed16b68db5259e78449e96dab5d464c8f470bda3eb1a70467f2c793", + "wss://nostr-pub.wellorder.net" + ], + [ + "p", + "2ef93f01cd2493e04235a6b87b10d3c4a74e2a7eb7c3caf168268f6af73314b5", + "wss://nostr-pub.wellorder.net" + ], + [ + "p", + "ed1d0e1f743a7d19aa2dfb0162df73bacdbc699f67cc55bb91a98c35f7deac69", + "wss://nostr.rocks" + ], + [ + "p", + "9ec7a778167afb1d30c4833de9322da0c08ba71a69e1911d5578d3144bb56437", + "wss://nostr.bitcoiner.social" + ], + [ + "p", + "b2d670de53b27691c0c3400225b65c35a26d06093bcc41f48ffc71e0907f9d4a", + "wss://nostr-relay.untethr.me" + ], + [ + "p", + "3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d", + "wss://nostr-relay.freeberty.net" + ], + [ + "p", + "e9e4276490374a0daf7759fd5f475deff6ffb9b0fc5fa98c902b5f4b2fe3bba2", + "wss://nostr-pub.wellorder.net" + ], + [ + "p", + "7927bc6e25892729a9c02a1332c409a69b285e143b9d845c54fd9c1fe829e25e", + "wss://nostr-relay.freeberty.net" + ], + [ + "p", + "3185f1d041ce283e0eb055eae5218552864fd8d66662594180d4beb439062d59", + "wss://nostr-relay.freeberty.net" + ], + [ + "p", + "8c0da4862130283ff9e67d889df264177a508974e2feb96de139804ea66d6168", + "wss://nostr-pub.wellorder.net" + ], + [ + "p", + "7e7272c475d920ad408e7a6faf9a123aa7b882cba7151e6105a0fc9d212fb240", + "wss://nostr-pub.wellorder.net" + ], + [ + "p", + "b238e136091cb01cd21606dac1a2f503f504e7e8e7c75d98fcefd30aed084a1c", + "wss://nostr-relay.freeberty.net" + ], + [ + "p", + "4557aab9aae76a892e01568064a9e262e613690421a79e584b8cc4c5ca9afb7e", + "wss://nostr-relay.freeberty.net" + ], + ["p", "21763c71793764a2661eb10ede32a8f2312c9f8db08bc539c888bafa38dcf368"], + ["p", "9630f464cca6a5147aa8a35f0bcdd3ce485324e732fd39e09233b1d848238f31"], + [ + "p", + "b457120b6cfb2589d48718f2ab71362dd0db43e13266771725129d35cc602dbe", + "wss://nostr-relay.freeberty.net" + ], + [ + "p", + "b1dd5e8ed19644671e8693ca2445c68729249f6d4f2d2d8f072d5e1399ba7ecb", + "wss://nostr-relay.freeberty.net" + ], + [ + "p", + "1265c1c3d41f0f05bf306224ec40628231a5086a2eaa36643b3982a4eba19c9f", + "wss://nostr.bitcoiner.social" + ], + [ + "p", + "4570d7a0b49b5524797120810116a2a5c18281423b173a557056f08f15c5382d", + "wss://nostr-pub.wellorder.net" + ], + [ + "p", + "04c915daefee38317fa734444acee390a8269fe5810b2241e5e6dd343dfbecc9", + "wss://nostr-relay.freeberty.net" + ], + [ + "p", + "13fd7010166391fa485f077e57d84153613039ed9d2319b555b50458e55beff0", + "wss://nostr-relay.untethr.me" + ], + ["p", "d3646691ba5b1d796c1e1b3430df00fe1189ec9c232877adde18c8f656af18f0"], + ["p", "07c8e6897131165bb55155377b37d5dd7e0ed77f12890d250161a0ba0aabedfb"], + ["p", "34af16f1787b261331a67fa519e9edb11f0a851c0b10a15901720b0fe0a06d1d"], + ["p", "b7c66ce6f7bbe034e96be54c2ffc0adf631a889abc0834ba1431171b67c489aa"], + ["p", "2183e94758481d0f124fbd93c56ccaa45e7e545ceeb8d52848f98253f497b975"], + ["p", "8355095016fddbe31fcf1453b26f613553e9758cf2263e190eac8fd96a3d3de9"], + ["p", "3efdaebb1d8923ebd99c9e7ace3b4194ab45512e2be79c1b7d68d9243e0d2681"], + ["p", "308a1a0d38f3f5ed18cfeaa8e8f9227d9ac5f4b46141cfea78e0d1dc37171500"], + ["p", "6b0d4c8d9dc59e110d380b0429a02891f1341a0fa2ba1b1cf83a3db4d47e3964"], + ["p", "06fca9f06f74cf86a16fe4c2feec508700643e2b105b519fd93d35332c51ad53"], + ["p", "35d26e4690cbe1a898af61cc3515661eb5fa763b57bd0b42e45099c8b32fd50f"], + ["p", "dcecb5c4c228e15a1f04305c34b39b7ff67675544cb7dc74dd5c715cf62ada74"], + ["p", "b2c61317687060b2b7e9cb7f7fde04f30bab23e12bf471f8d356000ca2b12b4a"], + ["p", "0810b5bc4cddc3e7624a1f6acbdccdc95c6e9409c144ce83365ee04a3a63314e"], + ["p", "51fc7209201b1414f721c3d2d2b3430699b1e6317716c5182cc1d7945072e358"], + ["p", "e6a92d8b6c20426f78bba8510ccdc73df5122814a3bac1d553adebac67a92b27"], + ["p", "ce5061bfcc16476b9bde3f1d5b3ec7730c4361cf8c827fbd9c14eb8c7003a1de"], + ["p", "619d0c0483a311a16767b0a7999ce9f28e58e79eff66f0edafae9ca2d9054f08"], + ["p", "d7b76d02c758a62a505e03bd5f5049aaee4e7e36283d273c7f6798912692df2b"], + ["p", "8ba2a6b558eeb7fccd1862b905ae9d9408cfbc208f1680d1262733246e92d4da"], + ["p", "975bbd239f0b7e25a080675d3db5892492ea9e9c7705c819ba3dafd8de95f3d9"], + ["p", "abf12c3fad52ab1a5313fea429a5021ecb652b4e801d516c1c1f203f0c575983"], + ["p", "76f928b303b095a6f17784151acd9a5127d183cb5f989a173b00bd0c12d07e83"], + ["p", "d4d4fdde8ab4924b1e452e896709a3bd236da4c0576274b52af5992d4d34762c"], + ["p", "7e88f589d2677ea4a863c72af5d0e85fbe1d3db111667c50d33fa42196a1afc0"], + ["p", "3878d95db7b854c3a0d3b2d6b7bf9bf28b36162be64326f5521ba71cf3b45a69"], + ["p", "ed04f9c719af697ac1c045bfff5f841cdf61a0b0d2170c9970f0ce0a04f708bf"], + ["p", "d9c8c00017a2a345c2f32132436a26e1c72cb7a57e7b6b316f62dee2f8bcf8dd"], + ["p", "ea42658e9a1291a32d1b74793edaef3d8757589a32b16931cacd85ba5470ea7c"], + ["p", "aff9a9f017f32b2e8b60754a4102db9d9cf9ff2b967804b50e070780aa45c9a8"], + ["p", "ac9ec020170155f0feb347f0d777ee5fc38dd1f36353093046323646cff5169f"], + ["p", "d91191e30e00444b942c0e82cad470b32af171764c2275bee0bd99377efd4075"], + ["p", "146bda4ec6932830503ee4f8e8b626bd7b3a5784232b8240ba15c8cbff9a07cd"], + ["p", "8f6ddf42ebe9486308975de3f27cc783bed2d93d1f7b0804d5e7090c5fad93b6"], + ["p", "ea75802dd1c86933c1e20c582541bb283d44c88e3445ed90d4375fc3d973f3a0"], + ["p", "bc6f08b3630d1afbef42db40c6650ebc666de01312ddfe7c6443dc23fe9905c4"], + ["p", "41108126409bf99cb77ff16fd53f4da2e53010b0dca04b0a53ebdf46eade37aa"], + ["p", "778fdd199044a2e8dc3cfac3c274f5577ed78c22fb3b5ccb13df6956980eff4c"], + ["p", "9682c33f9024dadb1bffdf762c3156e26b4aa340de8d06c91ca537fcc0fdb3a9"], + ["p", "a8f14f05c64f9e62bdada89c21a52f09aa5d7948b47ccf52da1be16b0de9efac"], + ["p", "80482e60178c2ce996da6d67577f56a2b2c47ccb1c84c81f2b7960637cb71b78"], + ["p", "b10c0000079a83cf26815dc7538818d8d56a2983e374e30a4143e50060978457"], + ["p", "547fcc5c7e655fe7c83da5a812e6332f0a4779c87bf540d8e75a4edbbf36fe4a"], + ["p", "48bd69736fa9e0a322aa132ec7613313b84fd064319c4f0ef4fdb8a55a66ad09"], + ["p", "dd81a8bacbab0b5c3007d1672fb8301383b4e9583d431835985057223eb298a5"], + ["p", "bb1cf5250435ff475cd8b32acb23e3ee7bbe8fc38f6951704b4798513947672c"], + ["p", "1fa91680ebfc68069ec13423fc8b9b0a746e9265584e16cf7d80be7ad721de6e"], + ["p", "52b4a076bcbbbdc3a1aefa3735816cf74993b1b8db202b01c883c58be7fad8bd"], + ["p", "b832d7fdcf4f6fed87ccfc6e10426710b968d6c260206fecb24aa096879c44ce"], + ["p", "f6c3ea7afe17b41f48c0a8e509f9be3c5cdbb449d73efcc1d22e02c6f3ebf484"], + ["p", "808ef58c7b30118467399a15d7c1dda5411c8b738c37070bbb14d95339743ae3"], + ["p", "d8139f130cc1dec5d92e3a4dc49fb11f064bf5b32c65d96da107ba2389547dc7"], + ["p", "e794d71b8f7426a291004f592b758438a25d0012e5bb969e53307b3785fd5211"], + ["p", "9ac12013d20fae4f8829ba4e5ba6343e410288d3a0752d6143386d2c1af1f57e"], + ["p", "ae683cd251952448ad0d7b8ed6c2e0f8ab451578250cb35f0c977275b56b056e"], + ["p", "46fcbe3065eaf1ae7811465924e48923363ff3f526bd6f73d7c184b16bd8ce4d"], + ["p", "b370efe1aca99ad03dee35668fcb5da5b6027778cfd22383d293a2cb07dcd748"], + ["p", "a63655cea2abd24ef8523dc84ecb106ab39061cfc834dfbfe31575f605240c71"], + ["p", "4b0572ab1f9a575415326b599f1179c20134bc23040d207156e71800b4ed33fe"], + ["p", "2fa4b9ba71b6dab17c4723745bb7850dfdafcb6ae1a8642f76f9c64fa5f43436"], + ["p", "bfc6af8244dc2859efdfd0e81a6a79f4ee395bc78acb3202b6f287a1ca3a27b3"], + ["p", "898420028069bfdf1ad1ef5512dfc8fa71a2a8528028bf816c54169770cbf99b"], + ["p", "428107e3b4b05df1e13c42b3bacb3fddf54c7ed12630e91870d5d8d0b4a091de"], + ["p", "3235036bd0957dfb27ccda02d452d7c763be40c91a1ac082ba6983b25238388c"], + ["p", "954aaf69c2e7c9fb3f9998f61944ab8ab08ce3c8679ecd985e4486a6eb696217"], + ["p", "d7f0e3917c466f1e2233e9624fbd6d4bd1392dbcfcaf3574f457569d496cb731"], + ["p", "84fe3febc748470ff1a363db8a375ffa1ff86603f2653d1c3c311ad0a70b5d0c"], + ["p", "08b80da85ba68ac031885ea555ab42bb42231fde9b690bbd0f48c128dfbf8009"], + ["p", "104749bc9151a0e54b9845ee50fc4b559439dd1ada006e36a6c49ad3ea16a55c"], + ["p", "c47daa0cd21a70797fe9404f8fe0c3f679c2b46148788d1295e6424232064f1d"], + ["p", "d987084c48390a290f5d2a34603ae64f55137d9b4affced8c0eae030eb222a25"], + ["p", "e1ff3bfdd4e40315959b08b4fcc8245eaa514637e1d4ec2ae166b743341be1af"], + ["p", "8f9c45787f179b2949608ffcaae7a73b5a7d12a4ad309594c2a20273601ae70f"], + ["p", "cf9413eb6bbe55c8a3c10119ec0635e134fa266f2c50f825d7225da9b92ecc4e"], + ["p", "09e935f7c01fda340051a4700cfb9dde533202bdf56808f68cafef6bae07a5bd"], + ["p", "9c3e7a87c357f9d698e4d7167ae08e269b1b1b1433311f8cf0dfdb8c0faf52c3"], + ["p", "af4f5ea6086182e8b411368fa50d92d66ceb91a54fab6e3f7e29094ce7fbb426"], + ["p", "179744407ac4fda143a8635e7ae9c9eabf3ab107a818a4f740a9e46b39412a42"], + ["p", "bae77874946ec111f94be59aef282de092dc4baf213f8ecb8c9e15cb7ed7304e"], + ["p", "9211af4fe742043111e923a6235065b1df69acb34df4d894b50f10e5ba57de8b"], + ["p", "44bb2dd1615ed2a527946c41d854995f18866a8feffa88eb375728c20aeea30c"], + ["p", "cd6b2f16c7afb47570ab242e0cbe0b9da64e1e7c6978a23c5ef33d4bb4a1cf57"], + ["p", "6398e15e3416de093b963ca38783d2a66a9657cb08cbba4f02546cdd55b6f1a4"], + ["p", "f24ab1e41a924b40a285064c5294541b575191a0af32ded8c84fd5af4215dfaf"], + ["p", "40e162e0a8d139c9ef1d1bcba5265d1953be1381fb4acd227d8f3c391f9b9486"], + ["p", "ba67bf89761da5ed59966077c4d514e94bb11195ce81d252572e3bac14e6803b"], + ["p", "57dd4ca641c95019151e957861929a175268a604810628fdacfb831776d12d29"], + ["p", "c7eda660a6bc8270530e82b4a7712acdea2e31dc0a56f8dc955ac009efd97c86"], + ["p", "f031876048ebdb65054a46d10c988bdfa2b53fb11d8f527c7c1a9c4d67c23c67"], + ["p", "6bb28e797d075bd0a822769b0173f8d0fc876fedeba4168f238709631be41273"], + ["p", "2590201e2919a8aa6568c88900192aa54ef00e6c0974a5b0432f52614a841ec8"], + ["p", "38b07a31f3b23dbeb9f59deb7bec5b993173fb4022206980f3809d0b68abf959"], + ["p", "d474da30f6cd727889743fcdce790fbdfc6376b1984206dbf50bd0d36d066351"], + ["p", "23a2cf63ec81e65561acafc655898d2fd0ef190084653fa452413f75e5a3d5bc"], + ["p", "e9c0a9c12e3a04edd79afc77d89b6c6413cc942ef9e61c51e51283cbe9db0c8f"], + ["p", "1c6b3be353041dd9e09bb568a4a92344e240b39ef5eb390f5e9e821273f0ae6f"], + ["p", "32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"], + ["p", "47bae3a008414e24b4d91c8c170f7fce777dedc6780a462d010761dca6482327"], + ["p", "f6d31b8781b7a6a98a87c1ebf390460399cca1046d58a1790428a401184297b5"], + ["p", "0fe4703dc4851c7843d5d07bd5263f743611f45d49949c407992333abdb1c7cc"], + ["p", "6c9580ff66ac28ddabb19f0238c4db75b350ef50a5567beb5a955da4fd5f65b4"], + ["p", "0000a0fa65fcccd99e6fd32fc7870339af40f4a94703ea30999fc5c091daa222"], + ["p", "0d6c8388dcb049b8dd4fc8d3d8c3bb93de3da90ba828e4f09c8ad0f346488a33"], + ["p", "e4c47aedea8ea54255f5ba07a77053b24553e9b975435e56da343da19aec7881"], + ["p", "fed5ca89be2b53e6234d9a3656bfd39777057ab71179a8b4632bad6e20d5e079"], + ["p", "566516663d91d4fef824eaeccbf9c2631a8d8a2efee8048ca5ee6095e6e5c843"], + ["p", "11b9a89404dbf3034e7e1886ba9dc4c6d376f239a118271bd2ec567a889850ce"], + ["p", "8d233d8babe9f40f170c5b0706fd4832869e07d040cfcd6b702d57e070aad1cb"], + ["p", "651dece809a1dbf9b9eb8a3e630ea674b8ea5996dc0a7726329c0cef4efc686b"], + ["p", "0ba7c7f4a32c25bcc644ef65bf9c0d75e99c83c7eadfdac7214ff426b0af64c6"], + ["p", "4a146fa1d1cb4c93fbe4594b4f6ad770dfb9d6f461f91b67f69c8731d3aa6b74"], + ["p", "cdb55b719d18e264364d53b7826422daf05cf11af0ef6fa4076a7e2724b722a8"], + ["p", "8600f1e35c28d941b8e3303142f17960a034e10457742d2b8c03c967a4e26934"], + ["p", "ff72534233d23c9f9880c5c99a1fe978697caeea08c439b9d4b5e3778e7c05f9"], + ["p", "ab6bf7466fa6b9f355d31335b001ea33ef3127349caaf5587334a083607f92d5"], + ["p", "b99b149370e4f8533ce53d143af3f39e1f2628a39847f7fdd7544c9585da9299"], + ["p", "ac4e18391f45932c0067e28203d5083a356ce301ab60867de094c94b98358666"], + ["p", "f45efc1451e13ba933c34300fd2e017f14eadf4b14b16a60c5d5049455b09ad8"], + ["p", "a869ca735271275671864337122a99386b5630d30dfcf5a46d62c271d9113c0b"], + ["p", "0d7df18a42d41e1ba747199490764d17c80956afb0d402b2f7c5ce620bd4d7a7"], + ["p", "70fc0e65e2239286a0318d6ee5ae849af7b79154d1e3dfb45c334d1b261f2257"], + ["p", "9577439173a22c446e3cd94669abae148af638e5d27970f3b2fa44d2938ddd0e"], + ["p", "e938040374dbf27726c62a5b8b4437defdb8c363c4795f478d9ebed68bf9d22b"], + ["p", "cbf781e3b1eb3b167728cfa25bfe135562ca9993e626be6b326ff5e10d689be8"], + ["p", "4f33aefaa8e5eac5f29799bce192be267826d1d2e226737fd3a3029b9f979d6e"], + ["p", "d34a2a9a302816601b64db8f8262883c5646744427f1b218cc34003cba506690"], + ["p", "9d12fc75d4cd3ffa89e3276e060415ce7fa10ef07af552d03ec7dec5cc52aca7"], + ["p", "81ab5b5c5c36bbc3960beeddd5c0e5966fa28672498fd960ed648794e7f6db15"], + ["p", "68b9ea7860f262482dee6c32615e00c88ce09ff16b6e971d3ab9721c405d3353"], + ["p", "fc6bf3e9c044981d3cdbf146c92472716b3592712310d86fc51130dca7b52923"], + ["p", "62610847e87afb99f40c949cd67f490ac6dd539689957c0b315ed67d1d13220f"], + ["p", "4d5ce768123563bc583697db5e84841fb528f7b708d966f2e546286ce3c72077"], + ["p", "edc0120f7c50760a6a39438a187432183653f695d4595a60983dc6faaa7f94b9"], + ["p", "e0d05a5b8c7789eb83f87672f4eb0dca78f99292ab038e5c66f84d97d77b95ae"], + ["p", "86f0689bd48dcd19c67a19d994f938ee34f251d8c39976290955ff585f2db42e"], + ["p", "62903b1ff41559daf9ee98ef1ae67cc52f301bb5ce26d14baba3052f649c3f49"], + ["p", "6446d04ecf9e0bb72c5ae218df9fc6c0a273149d9ecbfbe42519c53667b4405a"], + ["p", "9a29ee8c3771573e5306bb7701182e970b188ce3552713ca68a157ebc3c0bf75"], + ["p", "57225e0adcbad1fddf8d9ba1f5f36d657f134b7e0ea7aed6c0eb7013e4ef45f1"], + ["p", "e3f0c72e7b653f395f64e03519bae3efeac184bcf0b3f38bdccb62a4d2aa5d30"], + ["p", "b175db709771d32bbe7d8599e0c41f3f8768cc3a8333603d93c6d72d41c42f76"], + ["p", "9b9f5f1ec13105c8d1c2ea16aa952e98640b170b871420980ea11b18eb1f1e03"], + ["p", "f61abb9886e1f4cd5d20419c197d5d7f3649addab24b6a32a2367124ca3194b4"], + ["p", "3ed35796636aa19b7327de00b1192fbb985e8bf05d71604237bcd9df9b8bc73c"], + ["p", "1221fd0054a6c8ebd07b39c5eeea388f7f0244409f8cd8649ac22fcd668d02f6"], + ["p", "234d39919c1bd120766c4d874e8f34df4c80981236d76cdd95e246b1d01ae10b"], + ["p", "a4cb51f4618cfcd16b2d3171c466179bed8e197c43b8598823b04de266cef110"], + ["p", "c181af1aca3a13243a9ef9c302d5e988eaec25caa60c9923e5faed097e52cd69"], + ["p", "fd66e2e83d7ac4548724e2a9d56d46cb00244a8573775a8c97e5eee4f474e99c"], + ["p", "2b36fb6ae1022d0d4eac2a9f13fc2638f3350acc9b07bdca1de43a7c63429644"], + ["p", "472f440f29ef996e92a186b8d320ff180c855903882e59d50de1b8bd5669301e"], + ["p", "9aeb3bb495f09be3799048c3ef76649917efc46a8c8a69fefc31a7d012f6eccb"], + ["p", "f5424d002fd0d48fadd6e54879387714c54bfa46535976ff2b385843aaddf8e5"], + ["p", "152311509d52ef1fd3397b2bc297f206401ce00daabb69ab9a72e08d1547b8d4"], + ["p", "2bb514ba0acd6c9e6db7a76f53fd92016f893c62053acb7ef9a9f61042ade52e"], + ["p", "1f7dfb1b51bd4fb5d15245b28d86fab670a677580e2a0633a2cf76509d02471c"], + ["p", "3707f1efc7515524dce41d3bf50bfd9fdaed3494620b5f94fcf16d2766da4ec2"], + ["p", "4d960b819ff5c4f417431e73e7bf70ad41f181136d1baef47c25d1c8b23b4de2"], + ["p", "3fde182cc7e6efa69a393b16ef41b10c03928df3b96acf4f0eb03f9fca63a09a"], + ["p", "8ff7a6132ffe1bb3600aa20496ab648f1daf6b50ceaa8054a37e6a0b1f7ee491"], + ["p", "cbc5ef6b01cbd1ffa2cb95a954f04c385a936c1a86e1bb9ccdf2cf0f4ebeaccb"], + ["p", "647fcbfef88dd2347a4f69a296f0fd6a470f96eb1cd294066e2594e95fc9480d"], + ["p", "b1576eb99a4774158a32fc5e190afa3ded4da19f51fbfa0b1a1bf6421ea5733a"], + ["p", "14347702b99786cc0ee644620a5f71bc6a88e2882491f57c372f1deaed198701"], + ["p", "e37d948a0eee45e6cd113faaad934fcf17a97de2236c655b70650d4252daa9d3"], + ["p", "42a0825e980b9f97943d2501d99c3a3859d4e68cd6028c02afe58f96ba661a9d"], + ["p", "464b1e81a178e597314b0793ee51b4db2a1f1dca6fab4895309306faab0ec4d7"], + ["p", "f00c952da33c06e02c930f76aba1085021b98075657daaff8ad119edcfde691e"], + ["p", "c697f7f5f59de8ddb93c6b74fdd759ab2dc654bc36315f39770c214607fcd65e"], + ["p", "76f5960d381e7146b7f374a4a65afa403038441b46933840c71e436facb82ae7"], + ["p", "8837f562e064282e4fb9902ae6062ee436a53236909a68c6d19564df6c208fbe"], + ["p", "f43c1f9bff677b8f27b602725ea0ad51af221344f69a6b352a74991a4479bac3"], + ["p", "3dc473414bea4de1df1f172c3196a820719dbb58baaf8764b9edaa07c6d5e9dc"], + ["p", "66e346dfe3a4e572359519f086bf45771a19224343183aa1c86b9f9e31b78ac9"], + ["p", "8c24f2bf7df33aea0f05706162176343f34389d95ca5696dba1c2768887f586f"], + ["p", "b9e76546ba06456ed301d9e52bc49fa48e70a6bf2282be7a1ae72947612023dc"], + ["p", "c47c24c8f4000a83f5efcd2f7eadaeafb8f95437027df3354fd1c01d7f852583"], + ["p", "88a2c3b420b4a027706a98600d1fd744ac6cfd12e201b74189be5ef4b2b3aa45"], + ["p", "c3557fd9f7771fc9db60d2708715996a047d0ddd1e9cb0868010e8004b21a808"], + ["p", "1f5cd0b7618dcd0b4040e0daa1e6719ae9e4b5c0822fc5f47ed55725e08b6564"], + ["p", "343558f07b07ffcb24b27b73812d74d4ff8f46e81ea903f1e7f37d30d907bcfc"], + ["p", "1d643df20fb811b33c3cfb04c72db04c6ae031dbf68f613ba45407cbdd9446b6"], + ["p", "2ab80bb67da174e8de71d2747a2c205c09fd31b8ecfeaf5825c58bd574cbf75d"] + ], + "content": "{\"wss:\\/\\/nostr.drss.io\":{\"write\":true,\"read\":true},\"wss:\\/\\/relay.damus.io\":{\"write\":true,\"read\":true},\"ws:\\/\\/monad.jb55.com:8080\":{\"write\":true,\"read\":true},\"wss:\\/\\/nostr.zaprite.io\":{\"write\":true,\"read\":true},\"wss:\\/\\/relay.nostr.info\":{\"write\":true,\"read\":true},\"wss:\\/\\/expensive-relay.fiatjaf.com\":{\"write\":true,\"read\":true},\"wss:\\/\\/nostr-relay.wlvs.space\":{\"write\":true,\"read\":true},\"wss:\\/\\/nostr-pub.wellorder.net\":{\"write\":true,\"read\":true}}", + "sig": "d532da78b22e128535320ce6547d33b17876eef8a9546741a32726fbcbdceee7cc8595ce46028cd02bd17e390f59033afe8c914a8c6b3132b404fed2b65b5e4f" + }, { "id": "cf8de9db67a1d7203512d1d81e6190f5e53abfdc0ac90275f67172b65a5b09a0", "pubkey": "e8b487c079b0f67c695ae6c4c2552a47f38adfa2533cc5926bd2c102942fdcb7", diff --git a/src/client.ts b/src/client.ts new file mode 100644 index 0000000..444bc58 --- /dev/null +++ b/src/client.ts @@ -0,0 +1,23 @@ +import { WebSocket } from 'ws' + +import { IClient } from './types/clients' +import { Message } from './types/messages' + +export class Client implements IClient { + public constructor( + private readonly websocket: WebSocket + ) { } + + public from(websocket: WebSocket): IClient { + return new Client(websocket) + } + + public isConnected(): boolean { + return this.websocket.readyState === WebSocket.OPEN + } + + public send(message: Message): void { + this.websocket.send(JSON.stringify(message)) + } + +} \ No newline at end of file diff --git a/src/database/client.ts b/src/database/client.ts index 94ef293..f2f7425 100644 --- a/src/database/client.ts +++ b/src/database/client.ts @@ -1,9 +1,7 @@ import 'pg' import knex, { Knex } from 'knex' -const createDbConfig = ( - onNotificationCallback: (event: any) => void, -): Knex.Config => ({ +const createDbConfig = (): Knex.Config => ({ client: 'pg', connection: { host: process.env.DB_HOST, @@ -16,29 +14,14 @@ const createDbConfig = ( min: 2, max: 3, idleTimeoutMillis: 10000, - afterCreate: function (connection, callback) { - connection.on('error', function (error) { - console.error('PG error', error) - }) - connection.query('LISTEN event_added') - connection.on('notification', onNotificationCallback) - callback(null, connection) - }, }, acquireConnectionTimeout: 2000, }) let client: Knex export const getDbClient = () => { - const onNotificationCallback = (event: { channel: string; payload: any }) => { - if (event.channel !== 'event_added') { - return - } - client.emit('event_added', JSON.parse(event.payload)) - } - if (!client) { - client = knex(createDbConfig(onNotificationCallback)) + client = knex(createDbConfig()) } return client diff --git a/src/handlers/event-message-handler.ts b/src/handlers/event-message-handler.ts new file mode 100644 index 0000000..be69ccc --- /dev/null +++ b/src/handlers/event-message-handler.ts @@ -0,0 +1,34 @@ +import { IMessageHandler } from '../types/message-handlers' +import { MessageType, IncomingEventMessage } from '../types/messages' +import { IWebSocketServerAdapter } from '../types/servers' +import { IEventRepository } from '../types/repositories' + +export class EventMessageHandler implements IMessageHandler { + public constructor( + private readonly adapter: IWebSocketServerAdapter, + private readonly eventRepository: IEventRepository, + ) { } + + public canHandleMessageType(messageType: MessageType): boolean { + return messageType === MessageType.EVENT + } + + public async handleMessage(message: IncomingEventMessage): Promise { + // TODO: validate + try { + const count = await this.eventRepository.create(message[1]) + if (!count) { + console.debug('Event already exists.') + return true + } + + await this.adapter.broadcastEvent(message[1]) + + return true + } catch (error) { + console.error(`Unable to add event. Reason: ${error.message}`) + + return false + } + } +} \ No newline at end of file diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts new file mode 100644 index 0000000..9c7acb6 --- /dev/null +++ b/src/handlers/subscribe-message-handler.ts @@ -0,0 +1,63 @@ +import { inspect } from 'util' +import { WebSocket } from 'ws' + +import { createOutgoingEventMessage, createEndOfStoredEventsNoticeMessage } from '../messages' +import { IMessageHandler } from '../types/message-handlers' +import { MessageType, SubscribeMessage } from '../types/messages' +import { IWebSocketServerAdapter } from '../types/servers' +import { IEventRepository } from '../types/repositories' +import { SubscriptionId, SubscriptionFilter } from '../types/subscription' + + +export class SubscribeMessageHandler implements IMessageHandler { + public constructor( + private readonly adapter: IWebSocketServerAdapter, + private readonly eventRepository: IEventRepository, + ) { } + + public canHandleMessageType(messageType: MessageType): boolean { + return messageType === MessageType.REQ + } + + public async handleMessage(message: SubscribeMessage, client: WebSocket): Promise { + const subscriptionId = message[1] as SubscriptionId + const filters = message.slice(2) as SubscriptionFilter[] + + const exists = this.adapter.getSubscriptions(client)?.get(subscriptionId) + + this.adapter.getSubscriptions(client)?.set(subscriptionId, filters) + + console.log( + `Subscription ${subscriptionId} ${exists ? 'updated' : 'created' + } with filters:`, + inspect(filters) + ) + + // TODO: search for matching events on the DB, then send ESOE + + return this.eventRepository.findByfilters(filters).then( + (events) => { + events.forEach((event) => { + client.send( + JSON.stringify( + createOutgoingEventMessage(subscriptionId, event) + ) + ) + }) + console.debug(`Sent ${events.length} events to:`, subscriptionId) + client.send( + JSON.stringify( + createEndOfStoredEventsNoticeMessage(subscriptionId) + ) + ) + console.debug('Sent EOSE to:', subscriptionId) + return true + }, + (error) => { + console.error('Unable to find by filters: ', error) + return true + } + ) + } + +} \ No newline at end of file diff --git a/src/handlers/unsubscribe-message-handler.ts b/src/handlers/unsubscribe-message-handler.ts new file mode 100644 index 0000000..36edf03 --- /dev/null +++ b/src/handlers/unsubscribe-message-handler.ts @@ -0,0 +1,24 @@ +import { WebSocket } from 'ws' + +import { IMessageHandler } from '../types/message-handlers' +import { MessageType, UnsubscribeMessage } from '../types/messages' +import { IWebSocketServerAdapter } from '../types/servers' + + +export class UnsubscribeMessageHandler implements IMessageHandler { + public constructor( + private readonly adapter: IWebSocketServerAdapter, + ) { } + + public canHandleMessageType(messageType: MessageType): boolean { + return messageType === MessageType.CLOSE + } + + public async handleMessage(message: UnsubscribeMessage, client: WebSocket): Promise { + const subscriptionId = message[1] + + this.adapter.getSubscriptions(client)?.delete(subscriptionId) + + return true + } +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 7feca2e..b8ac004 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,223 +1,208 @@ import * as http from 'http' -import { WebSocket, WebSocketServer } from 'ws' -import { applySpec, prop, pipe } from 'ramda' -import Joi from 'joi' -import util from 'util' - -import { - createEndOfStoredEventsNoticeMessage, - createOutgoingEventMessage, -} from './messages' -import packageJson from '../package.json' -import { Settings } from './settings' -import { Message, MessageType } from './types/messages' -import { SubscriptionFilter, SubscriptionId } from './types/subscription' +import { WebSocketServer } from 'ws' import { getDbClient } from './database/client' -import { messageSchema } from './schemas/message-schema' -import { Event } from './types/event' -import { isEventMatchingFilter } from './event' import { EventRepository } from './repositories/event-repository' +import { WebSocketServerAdapter } from './relay/web-socket-server-adapter' +import { SubscribeMessageHandler } from './handlers/subscribe-message-handler' +import { UnsubscribeMessageHandler } from './handlers/unsubscribe-message-handler' +import { EventMessageHandler } from './handlers/event-message-handler' -const inspect = (myObject) => - util.inspect(myObject, { showHidden: false, depth: null, colors: true }) +// const inspect = (myObject) => +// util.inspect(myObject, { showHidden: false, depth: null, colors: true }) -const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000 +// const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000 const server = http.createServer() const wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 }) const dbClient = getDbClient() const eventRepository = new EventRepository(dbClient) -dbClient.raw('SELECT 1=1').then(() => void 0) +const adapter = new WebSocketServerAdapter( + server, + wss, +) +adapter.addMessageHandler(new SubscribeMessageHandler(adapter, eventRepository)) +adapter.addMessageHandler(new UnsubscribeMessageHandler(adapter)) +adapter.addMessageHandler(new EventMessageHandler(adapter, eventRepository)) -const stripEscape = (flerp) => flerp.slice(2) +// const subscriptions = new WeakMap< +// WebSocket, +// Map +// >() -const createEventFromDb = applySpec({ - id: pipe(prop('event_id'), stripEscape), - pubkey: pipe(prop('event_pubkey'), stripEscape), - created_at: prop('event_created_at'), - kind: prop('event_kind'), - tags: prop('event_tags'), - content: prop('event_content'), - sig: pipe(prop('event_signature'), stripEscape), -}) +// function broadcastEvent(event: Event) { +// wss.clients.forEach((ws) => { +// if (ws.readyState !== WebSocket.OPEN) { +// return +// } +// subscriptions.get(ws)?.forEach((filters, subscriptionId) => { +// if ( +// !filters.map(isEventMatchingFilter).some((isMatch) => isMatch(event)) +// ) { +// return +// } -dbClient.on('event_added', (event) => { - const nostrEvent = createEventFromDb(event) as Event +// console.log('Event sent', event.id) - wss.clients.forEach((ws) => { - if (ws.readyState !== WebSocket.OPEN) { - return - } - Object.entries( - (ws as any).subscriptions as { - [subscriptionId: SubscriptionId]: SubscriptionFilter[] - }, - ).forEach(([subscriptionId, filters]) => { - if ( - !filters - .map(isEventMatchingFilter) - .some((isMatch) => isMatch(nostrEvent)) - ) { - return - } - console.log( - `Broadcasting to client with subscription ${subscriptionId}`, - inspect(filters), - inspect(nostrEvent), - ) +// ws.send( +// JSON.stringify(createOutgoingEventMessage(subscriptionId, event)) +// ) +// }) +// }) +// } - ws.send( - JSON.stringify(createOutgoingEventMessage(subscriptionId, nostrEvent)), - ) - }) - }) -}) +// function heartbeat() { +// this.isAlive = true +// } -function heartbeat() { - this.isAlive = true -} +// wss.on('connection', function (ws, _req) { +// subscriptions.set(ws, new Map()) +// ws['isAlive'] = true -wss.on('connection', function (ws, _req) { - ws['subscriptions'] = {} - ws['isAlive'] = true +// ws.on('message', function onMessage(raw) { +// let message: Message - ws.on('message', function onMessage(raw) { - let message: Message +// try { +// message = Joi.attempt(JSON.parse(raw.toString('utf8')), messageSchema, { +// stripUnknown: true, +// abortEarly: true, +// }) as Message +// } catch (error) { +// console.error('Invalid message', error, raw.toString('utf8')) +// return +// } - try { - message = Joi.attempt(JSON.parse(raw.toString('utf8')), messageSchema, { - stripUnknown: true, - abortEarly: true, - }) as Message - } catch (error) { - console.error('Invalid message', error, JSON.stringify(raw)) - return - } +// const command = message[0] +// switch (command) { +// case MessageType.EVENT: +// { +// eventRepository.create(message[1]).then( +// (count) => { +// if (!count) { +// console.debug('Event already exists.') +// return +// } +// broadcastEvent(message[1] as Event) +// }, +// (error) => { +// console.error(`Unable to add event. Reason: ${error.message}`) +// } +// ) +// } +// break +// case MessageType.REQ: +// { +// const subscriptionId = message[1] as SubscriptionId +// const filters = message.slice(2) as SubscriptionFilter[] - const command = message[0] - switch (command) { - case MessageType.EVENT: - { - if (message[1] === null || typeof message[1] !== 'object') { - // ws.send(JSON.stringify(createNotice(`Invalid event`))) - return - } +// const exists = subscriptions.get(ws)?.get(subscriptionId) - eventRepository.create(message[1]).catch((error) => { - console.error(`Unable to add event. Reason: ${error.message}`) - }) - } - break - case MessageType.REQ: - { - const subscriptionId = message[1] as SubscriptionId - const filters = message.slice(2) as SubscriptionFilter[] +// subscriptions.get(ws)?.set(subscriptionId, filters) - const exists = subscriptionId in ws['subscriptions'] +// console.log( +// `Subscription ${subscriptionId} ${exists ? 'updated' : 'created' +// } with filters:`, +// inspect(filters) +// ) - ws['subscriptions'][subscriptionId] = filters +// // TODO: search for matching events on the DB, then send ESOE - console.log( - `Subscription ${subscriptionId} ${ - exists ? 'updated' : 'created' - } with filters:`, - inspect(filters), - ) +// eventRepository.findByfilters(filters).then( +// (events) => { +// events.forEach((event) => { +// ws.send( +// JSON.stringify( +// createOutgoingEventMessage(subscriptionId, event) +// ) +// ) +// }) +// console.debug(`Sent ${events.length} events to:`, subscriptionId) +// ws.send( +// JSON.stringify( +// createEndOfStoredEventsNoticeMessage(subscriptionId) +// ) +// ) +// console.debug('Sent EOSE to:', subscriptionId) +// }, +// (error) => { +// console.error('Unable to find by filters: ', error) +// } +// ) +// } +// break +// case MessageType.CLOSE: +// { +// const subscriptionId = message[1] as SubscriptionId - // TODO: search for matching events on the DB, then send ESOE +// subscriptions.get(ws)?.delete(subscriptionId) +// } +// break +// case MessageType.EOSE: +// break +// } +// }) - eventRepository.findByfilters(filters).then( - (events) => { - events.forEach((event) => { - ws.send( - JSON.stringify( - createOutgoingEventMessage(subscriptionId, event), - ), - ) - }) - ws.send( - JSON.stringify( - createEndOfStoredEventsNoticeMessage(subscriptionId), - ), - ) - console.log(`Found ${events.length} events matching filter.`) - }, - (error) => { - console.error('Unable to find by filters: ', error) - }, - ) - } - break - case MessageType.CLOSE: - { - const subscriptionId = message[1] as SubscriptionId - delete ws['subscriptions'][subscriptionId] - } - break - } - }) +// ws.on('pong', heartbeat) - ws.on('pong', heartbeat) +// ws.on('close', function onClose(code) { +// const clientSubs = subscriptions.get(this) +// clientSubs?.clear() +// if (clientSubs) { +// subscriptions.delete(this) +// } +// console.log('disconnected %s', code) +// }) +// }) - ws.on('close', function onClose(code) { - Object.keys(ws['subscriptions']).forEach( - (subscriptionId) => delete ws['subscriptions'][subscriptionId], - ) - delete ws['subscriptions'] - // TODO: Clean up subscriptions - console.log('disconnected %s', code) - }) -}) +// const heartbeatInterval = setInterval(function ping() { +// wss.clients.forEach(function each(ws) { +// if (!ws['isAlive']) { +// return ws.terminate() +// } -const heartbeatInterval = setInterval(function ping() { - wss.clients.forEach(function each(ws) { - if (!ws['isAlive']) { - return ws.terminate() - } +// ws['isAlive'] = false +// ws.ping() +// }) +// }, WSS_CLIENT_HEALTH_PROBE_INTERVAL) - ws['isAlive'] = false - ws.ping() - }) -}, WSS_CLIENT_HEALTH_PROBE_INTERVAL) +// wss.on('close', function close() { +// clearInterval(heartbeatInterval) +// }) -wss.on('close', function close() { - clearInterval(heartbeatInterval) -}) +// server.on('request', async (req, res) => { +// if (req.headers['accept'] === 'application/nostr+json') { +// const { +// info: { name, description, pubkey, contact }, +// } = Settings -server.on('request', async (req, res) => { - if (req.headers['accept'] === 'application/nostr+json') { - const { - info: { name, description, pubkey, contact }, - } = Settings +// const relayInformationDocument = { +// name, +// description, +// pubkey, +// contact, +// supported_nips: [11], +// software: packageJson.repository.url, +// version: packageJson.version, +// } - const relayInformationDocument = { - name, - description, - pubkey, - contact, - supported_nips: [11], - software: packageJson.repository.url, - version: packageJson.version, - } +// res.setHeader('content-type', 'application/nostr+json') +// res.end(JSON.stringify(relayInformationDocument)) +// } else { +// res.end() +// } +// }) - res.setHeader('content-type', 'application/nostr+json') - res.end(JSON.stringify(relayInformationDocument)) - } else { - res.end() - } -}) +// server.on('clientError', (err, socket) => { +// if (err['code'] === 'ECONNRESET' || !socket.writable) { +// return +// } +// socket.end('HTTP/1.1 400 Bad Request\r\n\r\n') +// }) -server.on('clientError', (err, socket) => { - if (err['code'] === 'ECONNRESET' || !socket.writable) { - return - } - socket.end('HTTP/1.1 400 Bad Request\r\n\r\n') -}) - -const port = process.env.SERVER_PORT ?? 8008 -console.log(`Listening on port: ${port}`) -server.listen(port) +const port = Number(process.env.SERVER_PORT) || 8008 +adapter.listen(port) +// console.log(`Listening on port: ${port}`) +// server.listen(port) process.on('SIGINT', function () { console.log('Caught interrupt signal') diff --git a/src/relay/web-server-adapter.ts b/src/relay/web-server-adapter.ts new file mode 100644 index 0000000..33c7c3a --- /dev/null +++ b/src/relay/web-server-adapter.ts @@ -0,0 +1,51 @@ +import { IncomingMessage, Server, ServerResponse } from 'http' +import { Duplex } from 'stream' + +import packageJson from '../../package.json' +import { Settings } from '../settings' +import { IWebServerAdapter } from '../types/servers' + +export class WebServerAdapter implements IWebServerAdapter { + + public constructor( + private readonly webServer: Server, + ) { + this.webServer.on('request', this.onWebServerRequest.bind(this)) + this.webServer.on('clientError', this.onWebServerSocketError.bind(this)) + } + + public listen(port: number): void { + console.log('Listening on port:', port) + this.webServer.listen(port) + } + + private onWebServerRequest(request: IncomingMessage, response: ServerResponse) { + if (request.headers['accept'] === 'application/nostr+json') { + const { + info: { name, description, pubkey, contact }, + } = Settings + + const relayInformationDocument = { + name, + description, + pubkey, + contact, + supported_nips: [11], + software: packageJson.repository.url, + version: packageJson.version, + } + + response.setHeader('content-type', 'application/nostr+json') + response.end(JSON.stringify(relayInformationDocument)) + } else { + response.end() + } + } + + private onWebServerSocketError(error: Error, socket: Duplex) { + if (error['code'] === 'ECONNRESET' || !socket.writable) { + return + } + socket.end('HTTP/1.1 400 Bad Request\r\n\r\n') + } +} \ No newline at end of file diff --git a/src/relay/web-socket-server-adapter.ts b/src/relay/web-socket-server-adapter.ts new file mode 100644 index 0000000..e8afe90 --- /dev/null +++ b/src/relay/web-socket-server-adapter.ts @@ -0,0 +1,125 @@ +import { Server } from 'http' +import WebSocket, { WebSocketServer } from 'ws' + +import { isEventMatchingFilter } from '../event' +import { createOutgoingEventMessage } from '../messages' +import { Event } from '../types/event' +import { IMessageHandler } from '../types/message-handlers' +import { Message } from '../types/messages' +import { IWebSocketServerAdapter } from '../types/servers' +import { SubscriptionId, SubscriptionFilter } from '../types/subscription' +import { WebServerAdapter } from './web-server-adapter' + +const WSS_CLIENT_HEALTH_PROBE_INTERVAL = 30000 + +export class WebSocketServerAdapter extends WebServerAdapter implements IWebSocketServerAdapter { + private subscriptions: WeakMap< + WebSocket, + Map + > + private heartbeats: WeakMap + + private readonly handlers: IMessageHandler[] = [] + + private heartbeatInterval: NodeJS.Timer + + public constructor( + webServer: Server, + private readonly webSocketServer: WebSocketServer, + ) { + super(webServer) + + this.subscriptions = new WeakMap>() + this.heartbeats = new WeakMap() + + this.webSocketServer.on('connection', this.onWebSocketServerConnection.bind(this)) + this.webSocketServer.on('close', this.onWebSocketServerClose.bind(this)) + this.heartbeatInterval = setInterval(this.onWebSocketServerHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) + } + + public addMessageHandler(messageHandler: IMessageHandler): void { + this.handlers.push(messageHandler) + } + + public getSubscriptions(client: WebSocket): Map | undefined { + return this.subscriptions.get(client) + } + + public async broadcastEvent(event: Event): Promise { + this.webSocketServer.clients.forEach((client: WebSocket) => { + if (client.readyState !== WebSocket.OPEN) { + return + } + this.subscriptions.get(client)?.forEach((filters, subscriptionId) => { + if ( + !filters.map(isEventMatchingFilter).some((isMatch) => isMatch(event)) + ) { + return + } + + console.log('Event sent', event.id) + + client.send( + JSON.stringify(createOutgoingEventMessage(subscriptionId, event)) + ) + }) + }) + } + + private onWebSocketServerConnection(client: WebSocket) { + this.subscriptions.set(client, new Map()) + + client.on('message', (raw: WebSocket.RawData) => { + try { + const message = JSON.parse(raw.toString('utf-8')) + this.onWebSocketClientMessage(client, message) + } catch (error) { + console.error('Unable to parse message', raw.toString('utf-8')) + } + }) + + client.on('close', (_code: number) => { + this.onWebSocketClientClose(client) + }) + + client.on('pong', () => this.onWebSocketClientPong.bind(this)(client)) + } + + private async onWebSocketClientMessage(client: WebSocket, message: Message) { + for (const handler of this.handlers) { + if (handler.canHandleMessageType(message[0])) { + const handled = await handler.handleMessage(message, client) + if (handled) { + break + } + } + } + } + + private onWebSocketClientPong(client: WebSocket) { + this.heartbeats.set(client, true) + } + + private onWebSocketServerHeartbeat() { + this.webSocketServer.clients.forEach((client) => { + if (!this.heartbeats.get(client)) { + client.terminate() + return + } + + this.heartbeats.set(client, false) + client.ping() + }) + } + + private onWebSocketServerClose() { + clearInterval(this.heartbeatInterval) + } + + private onWebSocketClientClose(client: WebSocket) { + this.subscriptions.delete(client) + + client.removeAllListeners() + } +} \ No newline at end of file diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index 101c1b6..6d66eff 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -47,7 +47,7 @@ export class EventRepository implements IEventRepository { const [query, ...subqueries] = queries if (subqueries.length) { - query.union(subqueries) + query.union(subqueries, true) } console.log('Query', query.toString()) @@ -68,7 +68,7 @@ export class EventRepository implements IEventRepository { ) } - public async create(event: Event): Promise { + public async create(event: Event): Promise { console.log('Creating event', event) const toJSON = (input: any) => JSON.stringify(input) @@ -87,8 +87,13 @@ export class EventRepository implements IEventRepository { .insert(row) .onConflict('event_id') .ignore() - .then((number) => { - console.log(`Rows added`, (number as any).rowCount) - }) + .then( + (result) => { + return (result as any).rowCount + }, + (error) => { + console.error('mistakes were made', error) + }, + ) } } diff --git a/src/schemas/event-schema.ts b/src/schemas/event-schema.ts index 1af8cdc..dbcb822 100644 --- a/src/schemas/event-schema.ts +++ b/src/schemas/event-schema.ts @@ -30,6 +30,7 @@ export const eventSchema = Schema.object({ tags: Schema.array().items(tagSchema).required(), content: Schema.string() .max(64 * 1024) // 64 kB + .allow('') .required(), sig: signatureSchema.required(), }) diff --git a/src/types/message-handlers.ts b/src/types/message-handlers.ts new file mode 100644 index 0000000..4d1cad4 --- /dev/null +++ b/src/types/message-handlers.ts @@ -0,0 +1,12 @@ +import { WebSocket } from 'ws' + +import { Message, MessageType } from './messages' + +export interface IMessageHandler { + canHandleMessageType(messageType: MessageType): boolean + handleMessage(message: Message, client: WebSocket): Promise +} + +export interface IMessageProcessor { + process(message: Message, client: WebSocket): Promise +} \ No newline at end of file diff --git a/src/types/messages.ts b/src/types/messages.ts index 4c56539..e74c95f 100644 --- a/src/types/messages.ts +++ b/src/types/messages.ts @@ -11,13 +11,13 @@ export enum MessageType { } export type Message = - | SubscriptionMessage + | SubscribeMessage | IncomingEventMessage | UnsubscribeMessage | Notice | EndOfStoredEventsNotice -export type SubscriptionMessage = { +export type SubscribeMessage = { [index in Range<2, 100>]: SubscriptionFilter } & { 0: MessageType.REQ diff --git a/src/types/repositories.ts b/src/types/repositories.ts index 1a24fa9..8294e81 100644 --- a/src/types/repositories.ts +++ b/src/types/repositories.ts @@ -2,6 +2,6 @@ import { Event } from './event' import { SubscriptionFilter } from './subscription' export interface IEventRepository { - create(event: Event): Promise + create(event: Event): Promise findByfilters(filters: SubscriptionFilter[]): Promise } diff --git a/src/types/servers.ts b/src/types/servers.ts new file mode 100644 index 0000000..7f1d051 --- /dev/null +++ b/src/types/servers.ts @@ -0,0 +1,14 @@ +import { WebSocket } from 'ws' +import { Event } from './event' +import { IMessageHandler } from './message-handlers' +import { SubscriptionFilter, SubscriptionId } from './subscription' + +export interface IWebSocketServerAdapter { + addMessageHandler(messageHandler: IMessageHandler): void + getSubscriptions(client: WebSocket): Map | undefined + broadcastEvent(event: Event): Promise +} + +export interface IWebServerAdapter { + listen(port: number) +} \ No newline at end of file