# LIGHTNING FUNCTIONS import json import requests from Crypto.Cipher import AES from bech32 import bech32_decode, convertbits from nostr_sdk import nostr_sdk, PublicKey, SecretKey, Event from utils.dvmconfig import DVMConfig from utils.nostr_utils import get_event_by_id def parse_amount_from_bolt11_invoice(bolt11_invoice: str) -> int: def get_index_of_first_letter(ip): index = 0 for c in ip: if c.isalpha(): return index else: index = index + 1 return len(ip) remaining_invoice = bolt11_invoice[4:] index = get_index_of_first_letter(remaining_invoice) identifier = remaining_invoice[index] number_string = remaining_invoice[:index] number = float(number_string) if identifier == 'm': number = number * 100000000 * 0.001 elif identifier == 'u': number = number * 100000000 * 0.000001 elif identifier == 'n': number = number * 100000000 * 0.000000001 elif identifier == 'p': number = number * 100000000 * 0.000000000001 return int(number) def parse_zap_event_tags(zap_event, keys, name, client, config): zapped_event = None invoice_amount = 0 anon = False sender = zap_event.pubkey() for tag in zap_event.tags(): if tag.as_vec()[0] == 'bolt11': invoice_amount = parse_amount_from_bolt11_invoice(tag.as_vec()[1]) elif tag.as_vec()[0] == 'e': zapped_event = get_event_by_id(tag.as_vec()[1], client=client, config=config) elif tag.as_vec()[0] == 'description': zap_request_event = Event.from_json(tag.as_vec()[1]) sender = check_for_zapplepay(zap_request_event.pubkey().to_hex(), zap_request_event.content()) for z_tag in zap_request_event.tags(): if z_tag.as_vec()[0] == 'anon': if len(z_tag.as_vec()) > 1: print("[" + name + "] Private Zap received.") decrypted_content = decrypt_private_zap_message(z_tag.as_vec()[1], keys.secret_key(), zap_request_event.pubkey()) decrypted_private_event = Event.from_json(decrypted_content) if decrypted_private_event.kind() == 9733: sender = decrypted_private_event.pubkey().to_hex() message = decrypted_private_event.content() if message != "": print("Zap Message: " + message) else: anon = True print( "[" + name + "] Anonymous Zap received. Unlucky, I don't know from whom, and never will") return invoice_amount, zapped_event, sender, anon def create_bolt11_ln_bits(sats: int, config: DVMConfig) -> (str, str): url = config.LNBITS_URL + "/api/v1/payments" data = {'out': False, 'amount': sats, 'memo': "Nostr-DVM " + config.NIP89.name} headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'} try: res = requests.post(url, json=data, headers=headers) obj = json.loads(res.text) return obj["payment_request"], obj["payment_hash"] except Exception as e: print("LNBITS: " + str(e)) return None, None def check_bolt11_ln_bits_is_paid(payment_hash: str, config: DVMConfig): url = config.LNBITS_URL + "/api/v1/payments/" + payment_hash headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'} try: res = requests.get(url, headers=headers) obj = json.loads(res.text) return obj["paid"] #TODO cast except Exception as e: return None # DECRYPT ZAPS def check_for_zapplepay(pubkey_hex: str, content: str): try: # Special case Zapplepay if (pubkey_hex == PublicKey.from_bech32("npub1wxl6njlcgygduct7jkgzrvyvd9fylj4pqvll6p32h59wyetm5fxqjchcan") .to_hex()): real_sender_bech32 = content.replace("From: nostr:", "") pubkey_hex = PublicKey.from_bech32(real_sender_bech32).to_hex() return pubkey_hex except Exception as e: print(e) return pubkey_hex def decrypt_private_zap_message(msg: str, privkey: SecretKey, pubkey: PublicKey): shared_secret = nostr_sdk.generate_shared_key(privkey, pubkey) if len(shared_secret) != 16 and len(shared_secret) != 32: return "invalid shared secret size" parts = msg.split("_") if len(parts) != 2: return "invalid message format" try: _, encrypted_msg = bech32_decode(parts[0]) encrypted_bytes = convertbits(encrypted_msg, 5, 8, False) _, iv = bech32_decode(parts[1]) iv_bytes = convertbits(iv, 5, 8, False) except Exception as e: return e try: cipher = AES.new(bytearray(shared_secret), AES.MODE_CBC, bytearray(iv_bytes)) decrypted_bytes = cipher.decrypt(bytearray(encrypted_bytes)) plaintext = decrypted_bytes.decode("utf-8") decoded = plaintext.rsplit("}", 1)[0] + "}" # weird symbols at the end return decoded except Exception as ex: return str(ex)