nostrdvm/nostr_dvm/utils/zap_utils.py
Believethehype db7fb286e0 more cleanup
2024-10-17 16:12:18 +02:00

440 lines
17 KiB
Python

# LIGHTNING/ZAP FUNCTIONS
import json
import os
import random
import string
import urllib.parse
from hashlib import sha256
from pathlib import Path
import bech32
import dotenv
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from bech32 import bech32_decode, convertbits, bech32_encode
from nostr_sdk import PublicKey, SecretKey, Event, EventBuilder, Tag, Keys, generate_shared_key, Kind, \
Timestamp
from nostr_dvm.utils.nostr_utils import get_event_by_id, check_and_decrypt_own_tags, update_profile_lnaddress
# TODO tor connection to lnbits
# proxies = {
# 'http': 'socks5h://127.0.0.1:9050',
# 'https': 'socks5h://127.0.0.1:9050'
# }
proxies = {}
async def parse_zap_event_tags(zap_event, keys, name, client, config):
zapped_event = None
invoice_amount = 0
anon = False
message = ""
sender = zap_event.author()
for tag in zap_event.tags():
if tag.as_vec()[0] == 'bolt11':
invoice_amount = parse_amount_from_bolt11_invoice(tag.as_vec()[1])
elif tag.as_vec()[0] == 'e':
zapped_event = await get_event_by_id(tag.as_vec()[1], client=client, config=config)
if zapped_event is not None:
zapped_event = check_and_decrypt_own_tags(zapped_event, config)
elif tag.as_vec()[0] == 'p':
p_tag = tag.as_vec()[1]
elif tag.as_vec()[0] == 'description':
zap_request_event = Event.from_json(tag.as_vec()[1])
sender = check_for_zapplepay(zap_request_event.author().to_hex(),
zap_request_event.content())
for z_tag in zap_request_event.tags():
if z_tag.as_vec()[0] == 'anon':
if len(z_tag.as_vec()) > 1:
# print("[" + name + "] Private Zap received.")
decrypted_content = decrypt_private_zap_message(z_tag.as_vec()[1],
keys.secret_key(),
zap_request_event.author())
decrypted_private_event = Event.from_json(decrypted_content)
if decrypted_private_event.kind().as_u16() == 9733:
sender = decrypted_private_event.author().to_hex()
message = decrypted_private_event.content()
# if message != "":
# print("Zap Message: " + message)
else:
anon = True
print(
"[" + name + "] Anonymous Zap received. Unlucky, I don't know from whom, and never will")
return invoice_amount, zapped_event, sender, message, anon
def parse_amount_from_bolt11_invoice(bolt11_invoice: str) -> int:
def get_index_of_first_letter(ip):
index = 0
for c in ip:
if c.isalpha():
return index
else:
index = index + 1
return len(ip)
remaining_invoice = bolt11_invoice[4:]
index = get_index_of_first_letter(remaining_invoice)
identifier = remaining_invoice[index]
number_string = remaining_invoice[:index]
number = float(number_string)
if identifier == 'm':
number = number * 100000000 * 0.001
elif identifier == 'u':
number = number * 100000000 * 0.000001
elif identifier == 'n':
number = number * 100000000 * 0.000000001
elif identifier == 'p':
number = number * 100000000 * 0.000000000001
return int(number)
def create_bolt11_ln_bits(sats: int, config) -> (str, str):
if config.LNBITS_URL == "":
return None, None
url = config.LNBITS_URL + "/api/v1/payments"
data = {'out': False, 'amount': sats, 'memo': "Nostr-DVM " + config.NIP89.NAME}
headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'}
try:
res = requests.post(url, json=data, headers=headers)
obj = json.loads(res.text)
if obj.get("payment_request") and obj.get("payment_hash"):
return obj["payment_request"], obj["payment_hash"] #
else:
print("LNBITS: " + res.text)
return None, None
except Exception as e:
print("LNBITS: " + str(e))
return None, None
def create_bolt11_lud16(lud16, amount):
if lud16.startswith("LNURL") or lud16.startswith("lnurl"):
url = decode_bech32(lud16)
print(url)
elif '@' in lud16: # LNaddress
url = 'https://' + str(lud16).split('@')[1] + '/.well-known/lnurlp/' + str(lud16).split('@')[0]
else: # No lud16 set or format invalid
return None
try:
print(url)
response = requests.get(url)
ob = json.loads(response.content)
callback = ob["callback"]
response = requests.get(callback + "?amount=" + str(int(amount) * 1000))
ob = json.loads(response.content)
return ob["pr"]
except Exception as e:
print("LUD16: " + e)
return None
def create_lnbits_account(name):
if os.getenv("LNBITS_WALLET_ID") is None or os.getenv("LNBITS_WALLET_ID") == "":
print("No admin id set, no wallet created.")
return "", "", "", "", "failed"
data = {
'admin_id': os.getenv("LNBITS_WALLET_ID"),
'wallet_name': name,
'user_name': name,
}
try:
json_object = json.dumps(data)
url = os.getenv("LNBITS_HOST") + '/usermanager/api/v1/users'
print(url)
headers = {'X-API-Key': os.getenv("LNBITS_ADMIN_KEY"), 'Content-Type': 'application/json', 'charset': 'UTF-8'}
r = requests.post(url, data=json_object, headers=headers, proxies=proxies)
walletjson = json.loads(r.text)
print(walletjson)
if walletjson.get("wallets"):
return walletjson['wallets'][0]['inkey'], walletjson['wallets'][0]['adminkey'], walletjson['wallets'][0][
'id'], walletjson['id'], "success"
except:
print("error creating wallet")
return "", "", "", "", "failed"
def check_bolt11_ln_bits_is_paid(payment_hash: str, config):
url = config.LNBITS_URL + "/api/v1/payments/" + payment_hash
headers = {'X-API-Key': config.LNBITS_INVOICE_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'}
try:
res = requests.get(url, headers=headers, proxies=proxies)
obj = json.loads(res.text)
if obj.get("paid"):
return obj["paid"]
else:
return False
except Exception as e:
return None
def pay_bolt11_ln_bits(bolt11: str, config):
url = config.LNBITS_URL + "/api/v1/payments"
data = {'out': True, 'bolt11': bolt11}
headers = {'X-API-Key': config.LNBITS_ADMIN_KEY, 'Content-Type': 'application/json', 'charset': 'UTF-8'}
try:
res = requests.post(url, json=data, headers=headers)
obj = json.loads(res.text)
if obj.get("payment_hash"):
return obj["payment_hash"]
else:
return "Error"
except Exception as e:
print("LNBITS: " + str(e))
return "Error"
# DECRYPT ZAPS
def check_for_zapplepay(pubkey_hex: str, content: str):
try:
# Special case Zapplepay
if (pubkey_hex == PublicKey.from_bech32("npub1wxl6njlcgygduct7jkgzrvyvd9fylj4pqvll6p32h59wyetm5fxqjchcan")
.to_hex()):
real_sender_bech32 = content.replace("From: nostr:", "")
pubkey_hex = PublicKey.from_bech32(real_sender_bech32).to_hex()
return pubkey_hex
except Exception as e:
print(e)
return pubkey_hex
def enrypt_private_zap_message(message, privatekey, publickey):
# Generate a random IV
shared_secret = generate_shared_key(privatekey, publickey)
iv = os.urandom(16)
# Encrypt the message
cipher = AES.new(bytearray(shared_secret), AES.MODE_CBC, bytearray(iv))
utf8message = message.encode('utf-8')
padded_message = pad(utf8message, AES.block_size)
encrypted_msg = cipher.encrypt(padded_message)
encrypted_msg_bech32 = bech32_encode("pzap", convertbits(encrypted_msg, 8, 5, True))
iv_bech32 = bech32_encode("iv", convertbits(iv, 8, 5, True))
return encrypted_msg_bech32 + "_" + iv_bech32
def decrypt_private_zap_message(msg: str, privkey: SecretKey, pubkey: PublicKey):
shared_secret = generate_shared_key(privkey, pubkey)
if len(shared_secret) != 16 and len(shared_secret) != 32:
return "invalid shared secret size"
parts = msg.split("_")
if len(parts) != 2:
return "invalid message format"
try:
_, encrypted_msg = bech32_decode(parts[0])
encrypted_bytes = convertbits(encrypted_msg, 5, 8, False)
_, iv = bech32_decode(parts[1])
iv_bytes = convertbits(iv, 5, 8, False)
except Exception as e:
return e
try:
cipher = AES.new(bytearray(shared_secret), AES.MODE_CBC, bytearray(iv_bytes))
decrypted_bytes = cipher.decrypt(bytearray(encrypted_bytes))
plaintext = decrypted_bytes.decode("utf-8")
decoded = plaintext.rsplit("}", 1)[0] + "}" # weird symbols at the end
return decoded
except Exception as ex:
return str(ex)
def decode_bech32(encoded_lnurl):
# Decode the bech32 encoded string
hrp, data = bech32.bech32_decode(encoded_lnurl)
if hrp != 'lnurl':
raise ValueError("Invalid human-readable part (hrp)")
# Convert the data back from 5-bit words to 8-bit bytes
decoded_bytes = bech32.convertbits(data, 5, 8, False)
# Convert the bytes back to a string
decoded_url = bytes(decoded_bytes).decode('utf-8')
return decoded_url
def zaprequest(lud16: str, amount: int, content, zapped_event, zapped_user, keys, relay_list, zaptype="public"):
print(lud16)
print(str(amount))
print(content)
if lud16.startswith("LNURL") or lud16.startswith("lnurl"):
url = decode_bech32(lud16)
elif '@' in lud16: # LNaddress
url = 'https://' + str(lud16).split('@')[1] + '/.well-known/lnurlp/' + str(lud16).split('@')[0]
else: # No lud16 set or format invalid
return None
try:
response = requests.get(url)
ob = json.loads(response.content)
callback = ob["callback"]
print(ob["callback"])
# encoded_lnurl = lnurl.encode(url)
url_bytes = url.encode()
encoded_lnurl = bech32.bech32_encode('lnurl', bech32.convertbits(url_bytes, 8, 5))
amount_tag = Tag.parse(['amount', str(amount * 1000)])
relays_tag = Tag.parse(['relays', str(relay_list)])
lnurl_tag = Tag.parse(['lnurl', encoded_lnurl])
if zapped_event is not None:
p_tag = Tag.parse(['p', zapped_event.author().to_hex()])
e_tag = Tag.parse(['e', zapped_event.id().to_hex()])
tags = [amount_tag, relays_tag, p_tag, e_tag, lnurl_tag]
else:
p_tag = Tag.parse(['p', zapped_user.to_hex()])
tags = [amount_tag, relays_tag, p_tag, lnurl_tag]
if zaptype == "private":
if zapped_event is not None:
key_str = keys.secret_key().to_hex() + zapped_event.id().to_hex() + str(
zapped_event.created_at().as_secs())
else:
key_str = keys.secret_key().to_hex() + str(Timestamp.now().as_secs())
encryption_key = sha256(key_str.encode('utf-8')).hexdigest()
tags = [p_tag]
if zapped_event is not None:
tags.append(e_tag)
zap_request = EventBuilder(Kind(9733), content,
tags).to_event(keys).as_json()
keys = Keys.parse(encryption_key)
if zapped_event is not None:
encrypted_content = enrypt_private_zap_message(zap_request, keys.secret_key(), zapped_event.author())
else:
encrypted_content = enrypt_private_zap_message(zap_request, keys.secret_key(), zapped_user)
anon_tag = Tag.parse(['anon', encrypted_content])
tags.append(anon_tag)
content = ""
zap_request = EventBuilder(Kind(9734), content,
tags).to_event(keys).as_json()
response = requests.get(callback + "?amount=" + str(int(amount) * 1000) + "&nostr=" + urllib.parse.quote_plus(
zap_request) + "&lnurl=" + encoded_lnurl)
ob = json.loads(response.content)
return ob["pr"]
except Exception as e:
print("ZAP REQUEST: " + str(e))
return None
def get_price_per_sat(currency):
import requests
url = "https://openapiv1.coinstats.app/coins/bitcoin"
params = {"skip": 0, "limit": 1, "currency": currency}
price_currency_per_sat = 0.0004
if os.getenv("COINSTATSOPENAPI_KEY"):
header = {'accept': 'application/json', 'X-API-KEY': os.getenv("COINSTATSOPENAPI_KEY")}
try:
response = requests.get(url, headers=header, params=params)
response_json = response.json()
bitcoin_price = response_json["price"]
price_currency_per_sat = bitcoin_price / 100000000.0
except:
price_currency_per_sat = 0.0004
return price_currency_per_sat
def randomword(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
def make_ln_address_nostdress(identifier, npub, pin, nostdressdomain, newname=" ", currentname=" "):
if newname == " ":
newname = identifier
print(os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()))
data = {
'name': newname,
'domain': nostdressdomain,
'kind': "lnbits",
'host': os.getenv("LNBITS_HOST"),
'key': os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()),
'pin': pin,
'npub': npub,
'currentname': currentname
}
try:
url = "https://" + nostdressdomain + "/api/easy/"
res = requests.post(url, data=data)
print(res.text)
obj = json.loads(res.text)
if obj.get("ok"):
return data["name"] + "@" + nostdressdomain, obj["pin"]
except Exception as e:
print("Creating random name..")
data["name"] = data["name"] + "_" + randomword(10)
try:
url = "https://" + nostdressdomain + "/api/easy/"
res = requests.post(url, data=data)
print(res.text)
obj = json.loads(res.text)
if obj.get("ok"):
return data["name"] + "@" + nostdressdomain, obj["pin"]
except Exception as e:
return "", ""
def check_and_set_ln_bits_keys(identifier, npub):
if not os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()):
invoicekey, adminkey, walletid, userid, success = create_lnbits_account(identifier)
add_key_to_env_file("LNBITS_INVOICE_KEY_" + identifier.upper(), invoicekey)
add_key_to_env_file("LNBITS_ADMIN_KEY_" + identifier.upper(), adminkey)
add_key_to_env_file("LNBITS_WALLET_ID_" + identifier.upper(), userid)
lnaddress = ""
pin = ""
if os.getenv("NOSTDRESS_DOMAIN") and success != "failed":
print(os.getenv("NOSTDRESS_DOMAIN"))
lnaddress, pin = make_ln_address_nostdress(identifier, npub, " ", os.getenv("NOSTDRESS_DOMAIN"), identifier)
add_key_to_env_file("LNADDRESS_" + identifier.upper(), lnaddress)
add_key_to_env_file("LNADDRESS_PIN_" + identifier.upper(), pin)
return invoicekey, adminkey, walletid, lnaddress
else:
return (os.getenv("LNBITS_INVOICE_KEY_" + identifier.upper()),
os.getenv("LNBITS_ADMIN_KEY_" + identifier.upper()),
os.getenv("LNBITS_WALLET_ID_" + identifier.upper()),
os.getenv("LNADDRESS_" + identifier.upper()))
async def change_ln_address(identifier, new_identifier, dvm_config, updateprofile=False):
previous_identifier = os.getenv("LNADDRESS_" + identifier.upper()).split("@")[0]
pin = os.getenv("LNADDRESS_PIN_" + identifier.upper())
npub = Keys.parse(os.getenv("DVM_PRIVATE_KEY_" + identifier.upper())).public_key().to_hex()
lnaddress, pin = make_ln_address_nostdress(identifier, npub, pin, os.getenv("NOSTDRESS_DOMAIN"), new_identifier,
currentname=previous_identifier)
add_key_to_env_file("LNADDRESS_" + identifier.upper(), lnaddress)
add_key_to_env_file("LNADDRESS_PIN_" + identifier.upper(), pin)
print("changed lnaddress")
if updateprofile:
private_key = os.getenv("DVM_PRIVATE_KEY_" + identifier.upper())
await update_profile_lnaddress(private_key, dvm_config, lud16=lnaddress)
def add_key_to_env_file(value, oskey):
env_path = Path('.env')
if env_path.is_file():
dotenv.load_dotenv(env_path, verbose=True, override=True)
dotenv.set_key(env_path, value, oskey)