mirror of
https://github.com/believethehype/nostrdvm.git
synced 2025-03-18 05:41:51 +01:00
364 lines
14 KiB
Python
364 lines
14 KiB
Python
# LIGHTNING/ZAP FUNCTIONS
|
|
import json
|
|
import os
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import pad
|
|
from bech32 import bech32_decode, convertbits, bech32_encode
|
|
from nostr_sdk import nostr_sdk, PublicKey, SecretKey, Event, EventBuilder, Tag, Keys, generate_shared_key
|
|
|
|
from nostr_dvm.utils.nostr_utils import get_event_by_id, check_and_decrypt_own_tags
|
|
import lnurl
|
|
from hashlib import sha256
|
|
import dotenv
|
|
|
|
# TODO tor connection to lnbits
|
|
# proxies = {
|
|
# 'http': 'socks5h://127.0.0.1:9050',
|
|
# 'https': 'socks5h://127.0.0.1:9050'
|
|
# }
|
|
|
|
proxies = {}
|
|
|
|
|
|
def parse_zap_event_tags(zap_event, keys, name, client, config):
|
|
zapped_event = None
|
|
invoice_amount = 0
|
|
anon = False
|
|
message = ""
|
|
sender = zap_event.pubkey()
|
|
for tag in zap_event.tags():
|
|
if tag.as_vec()[0] == 'bolt11':
|
|
invoice_amount = parse_amount_from_bolt11_invoice(tag.as_vec()[1])
|
|
elif tag.as_vec()[0] == 'e':
|
|
zapped_event = get_event_by_id(tag.as_vec()[1], client=client, config=config)
|
|
zapped_event = check_and_decrypt_own_tags(zapped_event, config)
|
|
elif tag.as_vec()[0] == 'p':
|
|
p_tag = tag.as_vec()[1]
|
|
elif tag.as_vec()[0] == 'description':
|
|
zap_request_event = Event.from_json(tag.as_vec()[1])
|
|
sender = check_for_zapplepay(zap_request_event.pubkey().to_hex(),
|
|
zap_request_event.content())
|
|
for z_tag in zap_request_event.tags():
|
|
if z_tag.as_vec()[0] == 'anon':
|
|
if len(z_tag.as_vec()) > 1:
|
|
# print("[" + name + "] Private Zap received.")
|
|
decrypted_content = decrypt_private_zap_message(z_tag.as_vec()[1],
|
|
keys.secret_key(),
|
|
zap_request_event.pubkey())
|
|
decrypted_private_event = Event.from_json(decrypted_content)
|
|
if decrypted_private_event.kind() == 9733:
|
|
sender = decrypted_private_event.pubkey().to_hex()
|
|
message = decrypted_private_event.content()
|
|
# if message != "":
|
|
# print("Zap Message: " + message)
|
|
else:
|
|
anon = True
|
|
print(
|
|
"[" + name + "] Anonymous Zap received. Unlucky, I don't know from whom, and never will")
|
|
|
|
return invoice_amount, zapped_event, sender, message, anon
|
|
|
|
|
|
def parse_amount_from_bolt11_invoice(bolt11_invoice: str) -> int:
|
|
def get_index_of_first_letter(ip):
|
|
index = 0
|
|
for c in ip:
|
|
if c.isalpha():
|
|
return index
|
|
else:
|
|
index = index + 1
|
|
return len(ip)
|
|
|
|
remaining_invoice = bolt11_invoice[4:]
|
|
index = get_index_of_first_letter(remaining_invoice)
|
|
identifier = remaining_invoice[index]
|
|
number_string = remaining_invoice[:index]
|
|
number = float(number_string)
|
|
if identifier == 'm':
|
|
number = number * 100000000 * 0.001
|
|
elif identifier == 'u':
|
|
number = number * 100000000 * 0.000001
|
|
elif identifier == 'n':
|
|
number = number * 100000000 * 0.000000001
|
|
elif identifier == 'p':
|
|
number = number * 100000000 * 0.000000000001
|
|
|
|
return int(number)
|
|
|
|
|
|
def create_bolt11_ln_bits(sats: int, config) -> (str, str):
|
|
if config.LNBITS_URL == "":
|
|
return None, None
|
|
url = config.LNBITS_URL + "/api/v1/payments"
|
|
data = {'out': False, 'amount': sats, 'memo': "Nostr-DVM " + config.NIP89.NAME}
|
|
headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'}
|
|
try:
|
|
res = requests.post(url, json=data, headers=headers)
|
|
obj = json.loads(res.text)
|
|
if obj.get("payment_request") and obj.get("payment_hash"):
|
|
return obj["payment_request"], obj["payment_hash"] #
|
|
else:
|
|
print("LNBITS: " + res.text)
|
|
return None, None
|
|
except Exception as e:
|
|
print("LNBITS: " + str(e))
|
|
return None, None
|
|
|
|
|
|
def create_bolt11_lud16(lud16, amount):
|
|
if lud16.startswith("LNURL") or lud16.startswith("lnurl"):
|
|
url = lnurl.decode(lud16)
|
|
elif '@' in lud16: # LNaddress
|
|
url = 'https://' + str(lud16).split('@')[1] + '/.well-known/lnurlp/' + str(lud16).split('@')[0]
|
|
else: # No lud16 set or format invalid
|
|
return None
|
|
try:
|
|
print(url)
|
|
response = requests.get(url)
|
|
ob = json.loads(response.content)
|
|
callback = ob["callback"]
|
|
response = requests.get(callback + "?amount=" + str(int(amount) * 1000))
|
|
ob = json.loads(response.content)
|
|
return ob["pr"]
|
|
except Exception as e:
|
|
print("LUD16: " + e)
|
|
return None
|
|
|
|
|
|
def create_lnbits_account(name):
|
|
if os.getenv("LNBITS_ADMIN_ID") is None or os.getenv("LNBITS_ADMIN_ID") == "":
|
|
print("No admin id set, no wallet created.")
|
|
return "","","","", "failed"
|
|
data = {
|
|
'admin_id': os.getenv("LNBITS_ADMIN_ID"),
|
|
'wallet_name': name,
|
|
'user_name': name,
|
|
}
|
|
try:
|
|
json_object = json.dumps(data)
|
|
url = os.getenv("LNBITS_HOST") + '/usermanager/api/v1/users'
|
|
print(url)
|
|
headers = {'X-API-Key': os.getenv("LNBITS_ADMIN_KEY"), 'Content-Type': 'application/json', 'charset': 'UTF-8'}
|
|
r = requests.post(url, data=json_object, headers=headers, proxies=proxies)
|
|
walletjson = json.loads(r.text)
|
|
print(walletjson)
|
|
if walletjson.get("wallets"):
|
|
return walletjson['wallets'][0]['inkey'], walletjson['wallets'][0]['adminkey'], walletjson['wallets'][0][
|
|
'id'], walletjson['id'], "success"
|
|
except:
|
|
print("error creating wallet")
|
|
return "", "", "", "", "failed"
|
|
|
|
|
|
def check_bolt11_ln_bits_is_paid(payment_hash: str, config):
|
|
url = config.LNBITS_URL + "/api/v1/payments/" + payment_hash
|
|
headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'}
|
|
try:
|
|
res = requests.get(url, headers=headers, proxies=proxies)
|
|
obj = json.loads(res.text)
|
|
if obj.get("paid"):
|
|
return obj["paid"]
|
|
else:
|
|
return False
|
|
except Exception as e:
|
|
return None
|
|
|
|
|
|
def pay_bolt11_ln_bits(bolt11: str, config):
|
|
url = config.LNBITS_URL + "/api/v1/payments"
|
|
data = {'out': True, 'bolt11': bolt11}
|
|
headers = {'X-API-Key': config.LNBITS_ADMIN_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'}
|
|
try:
|
|
res = requests.post(url, json=data, headers=headers)
|
|
obj = json.loads(res.text)
|
|
if obj.get("payment_hash"):
|
|
return obj["payment_hash"]
|
|
else:
|
|
return "Error"
|
|
except Exception as e:
|
|
print("LNBITS: " + str(e))
|
|
return "Error"
|
|
|
|
|
|
# DECRYPT ZAPS
|
|
def check_for_zapplepay(pubkey_hex: str, content: str):
|
|
try:
|
|
# Special case Zapplepay
|
|
if (pubkey_hex == PublicKey.from_bech32("npub1wxl6njlcgygduct7jkgzrvyvd9fylj4pqvll6p32h59wyetm5fxqjchcan")
|
|
.to_hex()):
|
|
real_sender_bech32 = content.replace("From: nostr:", "")
|
|
pubkey_hex = PublicKey.from_bech32(real_sender_bech32).to_hex()
|
|
return pubkey_hex
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
return pubkey_hex
|
|
|
|
|
|
def enrypt_private_zap_message(message, privatekey, publickey):
|
|
# Generate a random IV
|
|
shared_secret = generate_shared_key(privatekey, publickey)
|
|
iv = os.urandom(16)
|
|
|
|
# Encrypt the message
|
|
cipher = AES.new(bytearray(shared_secret), AES.MODE_CBC, bytearray(iv))
|
|
utf8message = message.encode('utf-8')
|
|
padded_message = pad(utf8message, AES.block_size)
|
|
encrypted_msg = cipher.encrypt(padded_message)
|
|
|
|
encrypted_msg_bech32 = bech32_encode("pzap", convertbits(encrypted_msg, 8, 5, True))
|
|
iv_bech32 = bech32_encode("iv", convertbits(iv, 8, 5, True))
|
|
return encrypted_msg_bech32 + "_" + iv_bech32
|
|
|
|
|
|
def decrypt_private_zap_message(msg: str, privkey: SecretKey, pubkey: PublicKey):
|
|
shared_secret = generate_shared_key(privkey, pubkey)
|
|
if len(shared_secret) != 16 and len(shared_secret) != 32:
|
|
return "invalid shared secret size"
|
|
parts = msg.split("_")
|
|
if len(parts) != 2:
|
|
return "invalid message format"
|
|
try:
|
|
_, encrypted_msg = bech32_decode(parts[0])
|
|
encrypted_bytes = convertbits(encrypted_msg, 5, 8, False)
|
|
_, iv = bech32_decode(parts[1])
|
|
iv_bytes = convertbits(iv, 5, 8, False)
|
|
except Exception as e:
|
|
return e
|
|
try:
|
|
cipher = AES.new(bytearray(shared_secret), AES.MODE_CBC, bytearray(iv_bytes))
|
|
decrypted_bytes = cipher.decrypt(bytearray(encrypted_bytes))
|
|
plaintext = decrypted_bytes.decode("utf-8")
|
|
decoded = plaintext.rsplit("}", 1)[0] + "}" # weird symbols at the end
|
|
return decoded
|
|
except Exception as ex:
|
|
return str(ex)
|
|
|
|
|
|
def zaprequest(lud16: str, amount: int, content, zapped_event, zapped_user, keys, relay_list, zaptype="public"):
|
|
if lud16.startswith("LNURL") or lud16.startswith("lnurl"):
|
|
url = lnurl.decode(lud16)
|
|
elif '@' in lud16: # LNaddress
|
|
url = 'https://' + str(lud16).split('@')[1] + '/.well-known/lnurlp/' + str(lud16).split('@')[0]
|
|
else: # No lud16 set or format invalid
|
|
return None
|
|
try:
|
|
response = requests.get(url)
|
|
ob = json.loads(response.content)
|
|
callback = ob["callback"]
|
|
encoded_lnurl = lnurl.encode(url)
|
|
amount_tag = Tag.parse(['amount', str(amount * 1000)])
|
|
relays_tag = Tag.parse(['relays', str(relay_list)])
|
|
lnurl_tag = Tag.parse(['lnurl', encoded_lnurl])
|
|
if zapped_event is not None:
|
|
p_tag = Tag.parse(['p', zapped_event.pubkey().to_hex()])
|
|
e_tag = Tag.parse(['e', zapped_event.id().to_hex()])
|
|
tags = [amount_tag, relays_tag, p_tag, e_tag, lnurl_tag]
|
|
else:
|
|
p_tag = Tag.parse(['p', zapped_user.to_hex()])
|
|
tags = [amount_tag, relays_tag, p_tag, lnurl_tag]
|
|
|
|
|
|
if zaptype == "private":
|
|
key_str = keys.secret_key().to_hex() + zapped_event.id().to_hex() + str(zapped_event.created_at().as_secs())
|
|
encryption_key = sha256(key_str.encode('utf-8')).hexdigest()
|
|
|
|
zap_request = EventBuilder(9733, content,
|
|
[p_tag, e_tag]).to_event(keys).as_json()
|
|
keys = Keys.from_sk_str(encryption_key)
|
|
encrypted_content = enrypt_private_zap_message(zap_request, keys.secret_key(), zapped_event.pubkey())
|
|
anon_tag = Tag.parse(['anon', encrypted_content])
|
|
tags.append(anon_tag)
|
|
content = ""
|
|
|
|
zap_request = EventBuilder(9734, content,
|
|
tags).to_event(keys).as_json()
|
|
|
|
response = requests.get(callback + "?amount=" + str(int(amount) * 1000) + "&nostr=" + urllib.parse.quote_plus(
|
|
zap_request) + "&lnurl=" + encoded_lnurl)
|
|
ob = json.loads(response.content)
|
|
return ob["pr"]
|
|
|
|
except Exception as e:
|
|
print("ZAP REQUEST: " + e)
|
|
return None
|
|
|
|
def get_price_per_sat(currency):
|
|
import requests
|
|
|
|
url = "https://api.coinstats.app/public/v1/coins"
|
|
params = {"skip": 0, "limit": 1, "currency": currency}
|
|
try:
|
|
response = requests.get(url, params=params)
|
|
response_json = response.json()
|
|
|
|
bitcoin_price = response_json["coins"][0]["price"]
|
|
price_currency_per_sat = bitcoin_price / 100000000.0
|
|
except:
|
|
price_currency_per_sat = 0.0004
|
|
|
|
return price_currency_per_sat
|
|
|
|
|
|
def make_ln_address_nostdress(identifier, npub, pin, nostdressdomain):
|
|
print(os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()))
|
|
data = {
|
|
'name': identifier,
|
|
'domain': nostdressdomain,
|
|
'kind': "lnbits",
|
|
'host': os.getenv("LNBITS_HOST"),
|
|
'key': os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()),
|
|
'pin': pin,
|
|
'npub': npub,
|
|
'currentname': " "
|
|
}
|
|
|
|
try:
|
|
url = "https://" + nostdressdomain + "/api/easy/"
|
|
res = requests.post(url, data=data)
|
|
print(res.text)
|
|
obj = json.loads(res.text)
|
|
|
|
if obj.get("ok"):
|
|
return identifier + "@" + nostdressdomain, obj["pin"]
|
|
except Exception as e:
|
|
print(e)
|
|
return "", ""
|
|
|
|
|
|
|
|
|
|
def check_and_set_ln_bits_keys(identifier, npub):
|
|
if not os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()):
|
|
invoicekey, adminkey, walletid, userid, success = create_lnbits_account(identifier)
|
|
add_key_to_env_file("LNBITS_INVOICE_KEY_" + identifier.upper(), invoicekey)
|
|
add_key_to_env_file("LNBITS_ADMIN_KEY_" + identifier.upper(), adminkey)
|
|
add_key_to_env_file("LNBITS_USER_ID_" + identifier.upper(), userid)
|
|
add_key_to_env_file("LNBITS_WALLET_ID_" + identifier.upper(), userid)
|
|
|
|
lnaddress = ""
|
|
pin = ""
|
|
if os.getenv("NOSTDRESS_DOMAIN") and success != "failed":
|
|
print(os.getenv("NOSTDRESS_DOMAIN"))
|
|
lnaddress, pin = make_ln_address_nostdress(identifier, npub, " ", os.getenv("NOSTDRESS_DOMAIN"))
|
|
add_key_to_env_file("LNADDRESS_" + identifier.upper(), lnaddress)
|
|
add_key_to_env_file("LNADDRESS_PIN_" + identifier.upper(), pin)
|
|
|
|
return invoicekey, adminkey, userid, walletid, lnaddress
|
|
else:
|
|
return (os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()),
|
|
os.getenv("LNBITS_ADMIN_KEY_" + identifier.upper()),
|
|
os.getenv("LNBITS_USER_ID_" + identifier.upper()),
|
|
os.getenv("LNBITS_WALLET_ID_" + identifier.upper()),
|
|
os.getenv("LNADDRESS_" + identifier.upper()))
|
|
|
|
|
|
def add_key_to_env_file(value, oskey):
|
|
env_path = Path('.env')
|
|
if env_path.is_file():
|
|
dotenv.load_dotenv(env_path, verbose=True, override=True)
|
|
dotenv.set_key(env_path, value, oskey) |