This commit is contained in:
Ben Arc
2022-01-31 00:12:59 +00:00
parent 228717195b
commit a147a07bab
4 changed files with 612 additions and 361 deletions

View File

@ -29,6 +29,7 @@ uvicorn = {extras = ["standard"], version = "*"}
sse-starlette = "*" sse-starlette = "*"
jinja2 = "3.0.1" jinja2 = "3.0.1"
pyngrok = "*" pyngrok = "*"
secp256k1 = "*"
[dev-packages] [dev-packages]
black = "==20.8b1" black = "==20.8b1"

840
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,11 +2,14 @@ import bitstring # type: ignore
import re import re
import hashlib import hashlib
from typing import List, NamedTuple, Optional from typing import List, NamedTuple, Optional
from bech32 import bech32_decode, CHARSET # type: ignore from bech32 import bech32_encode, bech32_decode, CHARSET
from ecdsa import SECP256k1, VerifyingKey # type: ignore from ecdsa import SECP256k1, VerifyingKey # type: ignore
from ecdsa.util import sigdecode_string # type: ignore from ecdsa.util import sigdecode_string # type: ignore
from binascii import unhexlify from binascii import unhexlify
import time
from decimal import Decimal
import embit
import secp256k1
class Route(NamedTuple): class Route(NamedTuple):
pubkey: str pubkey: str
@ -138,23 +141,23 @@ def encode(options):
if options.fallback: if options.fallback:
addr.tags.append(("f", options.fallback)) addr.tags.append(("f", options.fallback))
if options.route:
for r in options.route: for r in options.route:
splits = r.split("/") splits = r.split("/")
route = [] route = []
while len(splits) >= 5: while len(splits) >= 5:
route.append( route.append(
( (
unhexlify(splits[0]), unhexlify(splits[0]),
unhexlify(splits[1]), unhexlify(splits[1]),
int(splits[2]), int(splits[2]),
int(splits[3]), int(splits[3]),
int(splits[4]), int(splits[4]),
)
) )
) splits = splits[5:]
splits = splits[5:] assert len(splits) == 0
assert len(splits) == 0 addr.tags.append(("r", route))
addr.tags.append(("r", route))
return lnencode(addr, options.privkey) return lnencode(addr, options.privkey)
@ -171,7 +174,7 @@ def lnencode(addr, privkey):
else: else:
amount = addr.currency if addr.currency else "" amount = addr.currency if addr.currency else ""
hrp = "ln" + amount hrp = "ln" + amount + "0n"
# Start with the timestamp # Start with the timestamp
data = bitstring.pack("uint:35", addr.date) data = bitstring.pack("uint:35", addr.date)
@ -264,6 +267,19 @@ class LnAddr(object):
) )
def shorten_amount(amount):
""" Given an amount in bitcoin, shorten it
"""
# Convert to pico initially
amount = int(amount * 10**12)
units = ['p', 'n', 'u', 'm', '']
for unit in units:
if amount % 1000 == 0:
amount //= 1000
else:
break
return str(amount) + unit
def _unshorten_amount(amount: str) -> int: def _unshorten_amount(amount: str) -> int:
"""Given a shortened amount, return millisatoshis""" """Given a shortened amount, return millisatoshis"""
# BOLT #11: # BOLT #11:
@ -294,6 +310,24 @@ def _pull_tagged(stream):
return (CHARSET[tag], stream.read(length * 5), stream) return (CHARSET[tag], stream.read(length * 5), stream)
def is_p2pkh(currency, prefix):
return prefix == base58_prefix_map[currency][0]
def is_p2sh(currency, prefix):
return prefix == base58_prefix_map[currency][1]
# Tagged field containing BitArray
def tagged(char, l):
# Tagged fields need to be zero-padded to 5 bits.
while l.len % 5 != 0:
l.append('0b0')
return bitstring.pack("uint:5, uint:5, uint:5",
CHARSET.find(char),
(l.len / 5) / 32, (l.len / 5) % 32) + l
def tagged_bytes(char, l):
return tagged(char, bitstring.BitArray(l))
def _trim_to_bytes(barr): def _trim_to_bytes(barr):
# Adds a byte if necessary. # Adds a byte if necessary.
b = barr.tobytes() b = barr.tobytes()
@ -315,3 +349,11 @@ def _u5_to_bitarray(arr: List[int]) -> bitstring.BitArray:
for a in arr: for a in arr:
ret += bitstring.pack("uint:5", a) ret += bitstring.pack("uint:5", a)
return ret return ret
def bitarray_to_u5(barr):
assert barr.len % 5 == 0
ret = []
s = bitstring.ConstBitStream(barr)
while s.pos != s.len:
ret.append(s.read(5).uint)
return ret

View File

@ -4,6 +4,9 @@ import httpx
from os import getenv from os import getenv
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Dict, AsyncGenerator from typing import Optional, Dict, AsyncGenerator
import random
import string
from lnbits.helpers import urlsafe_short_hash
import hashlib import hashlib
from ..bolt11 import encode from ..bolt11 import encode
from .base import ( from .base import (
@ -15,43 +18,48 @@ from .base import (
) )
class FakeWallet(Wallet): class FakeWallet(Wallet):
"""https://github.com/lnbits/lnbits""" def __init__(self):
self.amount = 0
self.timestamp = 0
self.currency = "bc"
self.paymenthash = ""
self.privkey = getenv("FAKE_WALLET_KEY")
self.memo = ""
self.description_hashed = ""
self.description = ""
self.fallback = None
self.expires = None
self.route = None
async def status(self) -> StatusResponse:
print( print(
"The FakeWallet backend is for using LNbits as a centralised, stand-alone payment system." "The FakeWallet backend is for using LNbits as a centralised, stand-alone payment system."
) )
return StatusResponse(None, 21000000000) return StatusResponse(None, 21000000000)
async def create_invoice( async def create_invoice(
self, self,
amount: int, amount: int,
memo: Optional[str] = None, memo: Optional[str] = None,
description_hash: Optional[bytes] = None, description_hash: Optional[bytes] = None,
) -> InvoiceResponse: ) -> InvoiceResponse:
class options: print(self.privkey)
def __init__(self, amount, timestamp, payments_hash, privkey, memo, ): self.amount = amount
self.name = name self.timestamp = datetime.now().timestamp()
self.age = age
async def status(self) -> StatusResponse:
randomHash = hashlib.sha256(b"some random data").hexdigest()
options = {
"amount": amount,
"timestamp": datetime.now().timestamp(),
"payments_hash": randomHash,
"privkey": "v3qrevqrevm39qin0vq3r0ivmrewvmq3rimq03ig",
"memo": "",
"description_hashed": "",
}
if description_hash: if description_hash:
options.description_hashed = description_hash self.tags_set = {"h"}
self.description_hashed = description_hash
else: else:
options.memo = memo self.tags_set = {"d"}
payment_request = encode(options) self.memo = memo
self.description = memo
letters = string.ascii_lowercase
randomHash = hashlib.sha256(str(random.getrandbits(256)).encode('utf-8')).hexdigest()
self.paymenthash = randomHash
payment_request = encode(self)
print(payment_request) print(payment_request)
checking_id = randomHash checking_id = randomHash
return InvoiceResponse(ok, checking_id, payment_request) return InvoiceResponse(True, checking_id, payment_request)
async def pay_invoice(self, bolt11: str) -> PaymentResponse: async def pay_invoice(self, bolt11: str) -> PaymentResponse: