Merge pull request #1325 from lnbits/fix/mypy-bleskomat

fix bleskomat mypy issue
This commit is contained in:
calle 2023-01-11 18:24:04 +01:00 committed by GitHub
commit a813a37e68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 43 additions and 36 deletions

View File

@ -1,5 +1,6 @@
import json
import os
from typing import Callable, Dict, Union
import httpx
@ -12,7 +13,9 @@ fiat_currencies = json.load(
)
)
exchange_rate_providers = {
exchange_rate_providers: dict[
str, dict[str, Union[str, Callable[[dict, dict], str]]]
] = {
"bitfinex": {
"name": "Bitfinex",
"domain": "bitfinex.com",
@ -65,17 +68,19 @@ async def fetch_fiat_exchange_rate(currency: str, provider: str):
"to": currency.lower(),
}
url = exchange_rate_providers[provider]["api_url"]
if url:
api_url_or_none = exchange_rate_providers[provider]["api_url"]
if api_url_or_none is not None:
api_url = str(api_url_or_none)
for key in replacements.keys():
url = url.replace("{" + key + "}", replacements[key])
api_url = api_url.replace("{" + key + "}", replacements[key])
async with httpx.AsyncClient() as client:
r = await client.get(url)
r = await client.get(api_url)
r.raise_for_status()
data = r.json()
else:
data = {}
getter = exchange_rate_providers[provider]["getter"]
rate = float(getter(data, replacements))
print(getter)
if callable(getter):
rate = float(getter(data, replacements))
return rate

View File

@ -1,11 +1,11 @@
import base64
import hashlib
import hmac
import urllib
from http import HTTPStatus
from typing import Dict
from urllib import parse
from starlette.requests import Request
from fastapi import Request
def generate_bleskomat_lnurl_hash(secret: str):
@ -22,7 +22,7 @@ def generate_bleskomat_lnurl_signature(
elif api_key_encoding == "base64":
key = base64.b64decode(api_key_secret)
else:
key = bytes(f"{api_key_secret}")
key = bytes.fromhex(api_key_secret)
return hmac.new(key=key, msg=payload.encode(), digestmod=hashlib.sha256).hexdigest()
@ -57,8 +57,8 @@ class LnurlValidationError(Exception):
pass
def prepare_lnurl_params(tag: str, query: Dict[str, str]):
params = {}
def prepare_lnurl_params(tag: str, query: dict) -> dict:
params: dict = {}
if not is_supported_lnurl_subprotocol(tag):
raise LnurlValidationError(f'Unsupported subprotocol: "{tag}"')
if tag == "withdrawRequest":
@ -85,15 +85,15 @@ def query_to_signing_payload(query: Dict[str, str]) -> str:
payload = []
for key in sorted_keys:
if not key == "signature":
encoded_key = urllib.parse.quote(key, safe=encode_uri_component_safe_chars)
encoded_value = urllib.parse.quote(
encoded_key = parse.quote(key, safe=encode_uri_component_safe_chars)
encoded_value = parse.quote(
query[key], safe=encode_uri_component_safe_chars
)
payload.append(f"{encoded_key}={encoded_value}")
return "&".join(payload)
unshorten_rules = {
unshorten_rules: dict[str, dict] = {
"query": {"n": "nonce", "s": "signature", "t": "tag"},
"tags": {
"c": "channelRequest",
@ -114,7 +114,7 @@ unshorten_rules = {
}
def unshorten_lnurl_query(query: Dict[str, str]) -> Dict[str, str]:
def unshorten_lnurl_query(query: dict) -> Dict[str, str]:
new_query = {}
rules = unshorten_rules
if "tag" in query:
@ -131,9 +131,9 @@ def unshorten_lnurl_query(query: Dict[str, str]) -> Dict[str, str]:
if not tag in rules["params"]:
raise LnurlValidationError(f'Unknown tag: "{tag}"')
for key in query:
if key in rules["params"][tag]:
if key in rules["params"][str(tag)]:
short_param_key = key
long_param_key = rules["params"][tag][short_param_key]
long_param_key = rules["params"][str(tag)][short_param_key]
if short_param_key in query:
new_query[long_param_key] = query[short_param_key]
else:
@ -146,7 +146,7 @@ def unshorten_lnurl_query(query: Dict[str, str]) -> Dict[str, str]:
if short_key in query:
new_query[long_key] = query[short_key]
else:
new_query[long_key] = query[long_key]
new_query[long_key] = query[str(long_key)]
else:
# Keep unknown key/value pairs unchanged:
new_query[key] = query[key]

View File

@ -1,6 +1,5 @@
import json
import math
import traceback
from http import HTTPStatus
from loguru import logger
@ -28,7 +27,7 @@ from .helpers import (
@bleskomat_ext.get("/u", name="bleskomat.api_bleskomat_lnurl")
async def api_bleskomat_lnurl(req: Request):
try:
query = req.query_params
query = dict(req.query_params)
# Unshorten query if "s" is used instead of "signature".
if "s" in query:
@ -89,11 +88,15 @@ async def api_bleskomat_lnurl(req: Request):
# Convert to msats:
params[key] = int(amount_sats_less_fee * 1e3)
except LnurlValidationError as e:
raise LnurlHttpError(e.message, HTTPStatus.BAD_REQUEST)
raise LnurlHttpError(str(e), HTTPStatus.BAD_REQUEST)
# Create a new LNURL using the query parameters provided in the signed URL.
params = json.JSONEncoder().encode(params)
json_params = json.JSONEncoder().encode(params)
lnurl = await create_bleskomat_lnurl(
bleskomat=bleskomat, secret=secret, tag=tag, params=params, uses=1
bleskomat=bleskomat,
secret=secret,
tag=tag,
params=json_params,
uses=1,
)
# Reply with LNURL response object.

View File

@ -2,10 +2,9 @@ import json
import time
from typing import Dict
from fastapi.params import Query
from fastapi import Query, Request
from loguru import logger
from pydantic import BaseModel, validator
from starlette.requests import Request
from lnbits import bolt11
from lnbits.core.services import PaymentFailure, pay_invoice
@ -80,7 +79,7 @@ class BleskomatLnurl(BaseModel):
response["k1"] = secret
return response
def validate_action(self, query: Dict[str, str]) -> None:
def validate_action(self, query) -> None:
tag = self.tag
params = json.loads(self.params)
# Perform tag-specific checks.
@ -109,7 +108,7 @@ class BleskomatLnurl(BaseModel):
else:
raise LnurlValidationError(f'Unknown subprotocol: "{tag}"')
async def execute_action(self, query: Dict[str, str]):
async def execute_action(self, query):
self.validate_action(query)
used = False
async with db.connect() as conn:

View File

@ -1,5 +1,4 @@
from fastapi import Request
from fastapi.params import Depends
from fastapi import Depends, Request
from fastapi.templating import Jinja2Templates
from starlette.responses import HTMLResponse

View File

@ -6,7 +6,6 @@ from starlette.exceptions import HTTPException
from lnbits.core.crud import get_user
from lnbits.decorators import WalletTypeInfo, require_admin_key
from lnbits.extensions.bleskomat.models import CreateBleskomat
from . import bleskomat_ext
from .crud import (
@ -17,6 +16,7 @@ from .crud import (
update_bleskomat,
)
from .exchange_rates import fetch_fiat_exchange_rate
from .models import CreateBleskomat
@bleskomat_ext.get("/api/v1/bleskomats")
@ -27,7 +27,8 @@ async def api_bleskomats(
wallet_ids = [wallet.wallet.id]
if all_wallets:
wallet_ids = (await get_user(wallet.wallet.user)).wallet_ids
user = await get_user(wallet.wallet.user)
wallet_ids = user.wallet_ids if user else []
return [bleskomat.dict() for bleskomat in await get_bleskomats(wallet_ids)]
@ -54,9 +55,9 @@ async def api_bleskomat_create_or_update(
wallet: WalletTypeInfo = Depends(require_admin_key),
bleskomat_id=None,
):
fiat_currency = data.fiat_currency
exchange_rate_provider = data.exchange_rate_provider
try:
fiat_currency = data.fiat_currency
exchange_rate_provider = data.exchange_rate_provider
await fetch_fiat_exchange_rate(
currency=fiat_currency, provider=exchange_rate_provider
)
@ -79,6 +80,7 @@ async def api_bleskomat_create_or_update(
else:
bleskomat = await create_bleskomat(wallet_id=wallet.wallet.id, data=data)
assert bleskomat
return bleskomat.dict()

View File

@ -88,8 +88,7 @@ profile = "black"
[tool.mypy]
files = "lnbits"
exclude = """(?x)(
^lnbits/extensions/bleskomat.
| ^lnbits/extensions/boltz.
^lnbits/extensions/boltz.
| ^lnbits/wallets/lnd_grpc_files.
)"""