chore: adhere to ruff's UP

basically use `list` and `type` instead of `List` and `Type`

this is save to use for python3.9 and has been deprecated. also has some
performance drawbacks.
read more here: https://docs.astral.sh/ruff/rules/non-pep585-annotation/
This commit is contained in:
dni ⚡ 2024-04-01 18:50:21 +02:00 committed by Pavol Rusnak
parent a158056b99
commit b145bff566
12 changed files with 54 additions and 52 deletions

View File

@ -83,7 +83,7 @@ def get_super_user() -> Optional[str]:
"Superuser id not found. Please check that the file " "Superuser id not found. Please check that the file "
+ f"'{superuser_file.absolute()}' exists and has read permissions." + f"'{superuser_file.absolute()}' exists and has read permissions."
) )
with open(superuser_file, "r") as file: with open(superuser_file) as file:
return file.readline() return file.readline()

View File

@ -8,7 +8,7 @@ import time
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from enum import Enum from enum import Enum
from sqlite3 import Row from sqlite3 import Row
from typing import Any, Generic, List, Literal, Optional, Type, TypeVar from typing import Any, Generic, Literal, Optional, TypeVar
from loguru import logger from loguru import logger
from pydantic import BaseModel, ValidationError, root_validator from pydantic import BaseModel, ValidationError, root_validator
@ -175,11 +175,11 @@ class Connection(Compat):
async def fetch_page( async def fetch_page(
self, self,
query: str, query: str,
where: Optional[List[str]] = None, where: Optional[list[str]] = None,
values: Optional[List[str]] = None, values: Optional[list[str]] = None,
filters: Optional[Filters] = None, filters: Optional[Filters] = None,
model: Optional[Type[TRowModel]] = None, model: Optional[type[TRowModel]] = None,
group_by: Optional[List[str]] = None, group_by: Optional[list[str]] = None,
) -> Page[TRowModel]: ) -> Page[TRowModel]:
if not filters: if not filters:
filters = Filters() filters = Filters()
@ -298,11 +298,11 @@ class Database(Compat):
async def fetch_page( async def fetch_page(
self, self,
query: str, query: str,
where: Optional[List[str]] = None, where: Optional[list[str]] = None,
values: Optional[List[str]] = None, values: Optional[list[str]] = None,
filters: Optional[Filters] = None, filters: Optional[Filters] = None,
model: Optional[Type[TRowModel]] = None, model: Optional[type[TRowModel]] = None,
group_by: Optional[List[str]] = None, group_by: Optional[list[str]] = None,
) -> Page[TRowModel]: ) -> Page[TRowModel]:
async with self.connect() as conn: async with self.connect() as conn:
return await conn.fetch_page(query, where, values, filters, model, group_by) return await conn.fetch_page(query, where, values, filters, model, group_by)
@ -370,8 +370,8 @@ class FromRowModel(BaseModel):
class FilterModel(BaseModel): class FilterModel(BaseModel):
__search_fields__: List[str] = [] __search_fields__: list[str] = []
__sort_fields__: Optional[List[str]] = None __sort_fields__: Optional[list[str]] = None
T = TypeVar("T") T = TypeVar("T")
@ -390,10 +390,10 @@ class Filter(BaseModel, Generic[TFilterModel]):
op: Operator = Operator.EQ op: Operator = Operator.EQ
values: list[Any] values: list[Any]
model: Optional[Type[TFilterModel]] model: Optional[type[TFilterModel]]
@classmethod @classmethod
def parse_query(cls, key: str, raw_values: list[Any], model: Type[TFilterModel]): def parse_query(cls, key: str, raw_values: list[Any], model: type[TFilterModel]):
# Key format: # Key format:
# key[operator] # key[operator]
# e.g. name[eq] # e.g. name[eq]
@ -443,7 +443,7 @@ class Filters(BaseModel, Generic[TFilterModel]):
the values can be validated. Otherwise, make sure to validate the inputs manually. the values can be validated. Otherwise, make sure to validate the inputs manually.
""" """
filters: List[Filter[TFilterModel]] = [] filters: list[Filter[TFilterModel]] = []
search: Optional[str] = None search: Optional[str] = None
offset: Optional[int] = None offset: Optional[int] = None
@ -452,7 +452,7 @@ class Filters(BaseModel, Generic[TFilterModel]):
sortby: Optional[str] = None sortby: Optional[str] = None
direction: Optional[Literal["asc", "desc"]] = None direction: Optional[Literal["asc", "desc"]] = None
model: Optional[Type[TFilterModel]] = None model: Optional[type[TFilterModel]] = None
@root_validator(pre=True) @root_validator(pre=True)
def validate_sortby(cls, values): def validate_sortby(cls, values):
@ -474,7 +474,7 @@ class Filters(BaseModel, Generic[TFilterModel]):
stmt += f"OFFSET {self.offset}" stmt += f"OFFSET {self.offset}"
return stmt return stmt
def where(self, where_stmts: Optional[List[str]] = None) -> str: def where(self, where_stmts: Optional[list[str]] = None) -> str:
if not where_stmts: if not where_stmts:
where_stmts = [] where_stmts = []
if self.filters: if self.filters:
@ -498,7 +498,7 @@ class Filters(BaseModel, Generic[TFilterModel]):
return f"ORDER BY {self.sortby} {self.direction or 'asc'}" return f"ORDER BY {self.sortby} {self.direction or 'asc'}"
return "" return ""
def values(self, values: Optional[List[str]] = None) -> tuple: def values(self, values: Optional[list[str]] = None) -> tuple:
if not values: if not values:
values = [] values = []
if self.filters: if self.filters:

View File

@ -29,7 +29,7 @@ class InstalledExtensionMiddleware:
await self.app(scope, receive, send) await self.app(scope, receive, send)
return return
top_path, *rest = [p for p in full_path.split("/") if p] top_path, *rest = (p for p in full_path.split("/") if p)
headers = scope.get("headers", []) headers = scope.get("headers", [])
# block path for all users if the extension is disabled # block path for all users if the extension is disabled

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
from typing import TYPE_CHECKING, List, Optional from typing import TYPE_CHECKING, Optional
from pydantic import BaseModel from pydantic import BaseModel
@ -212,7 +212,7 @@ class Node(ABC):
pass pass
@abstractmethod @abstractmethod
async def get_channels(self) -> List[NodeChannel]: async def get_channels(self) -> list[NodeChannel]:
pass pass
@abstractmethod @abstractmethod

View File

@ -2,7 +2,7 @@ from __future__ import annotations
import asyncio import asyncio
from http import HTTPStatus from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional from typing import TYPE_CHECKING, Optional
from fastapi import HTTPException from fastapi import HTTPException
@ -168,7 +168,7 @@ class CoreLightningNode(Node):
return info["id"] return info["id"]
@catch_rpc_errors @catch_rpc_errors
async def get_peer_ids(self) -> List[str]: async def get_peer_ids(self) -> list[str]:
peers = await self.ln_rpc("listpeers") peers = await self.ln_rpc("listpeers")
return [p["id"] for p in peers["peers"] if p["connected"]] return [p["id"] for p in peers["peers"] if p["connected"]]
@ -194,7 +194,7 @@ class CoreLightningNode(Node):
return NodePeerInfo(id=node["nodeid"]) return NodePeerInfo(id=node["nodeid"])
@catch_rpc_errors @catch_rpc_errors
async def get_channels(self) -> List[NodeChannel]: async def get_channels(self) -> list[NodeChannel]:
funds = await self.ln_rpc("listfunds") funds = await self.ln_rpc("listfunds")
nodes = await self.ln_rpc("listnodes") nodes = await self.ln_rpc("listnodes")
nodes_by_id = {n["nodeid"]: n for n in nodes["nodes"]} nodes_by_id = {n["nodeid"]: n for n in nodes["nodes"]}

View File

@ -4,7 +4,7 @@ import asyncio
import base64 import base64
import json import json
from http import HTTPStatus from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional from typing import TYPE_CHECKING, Optional
from fastapi import HTTPException from fastapi import HTTPException
from httpx import HTTPStatusError from httpx import HTTPStatusError
@ -176,7 +176,7 @@ class LndRestNode(Node):
asyncio.create_task(self._close_channel(point, force)) asyncio.create_task(self._close_channel(point, force))
async def get_channels(self) -> List[NodeChannel]: async def get_channels(self) -> list[NodeChannel]:
normal, pending, closed = await asyncio.gather( normal, pending, closed = await asyncio.gather(
self.get("/v1/channels"), self.get("/v1/channels"),
self.get("/v1/channels/pending"), self.get("/v1/channels/pending"),

View File

@ -9,7 +9,7 @@ from hashlib import sha256
from os import path from os import path
from sqlite3 import Row from sqlite3 import Row
from time import time from time import time
from typing import Any, List, Optional from typing import Any, Optional
import httpx import httpx
from loguru import logger from loguru import logger
@ -36,8 +36,8 @@ class LNbitsSettings(BaseModel):
class UsersSettings(LNbitsSettings): class UsersSettings(LNbitsSettings):
lnbits_admin_users: List[str] = Field(default=[]) lnbits_admin_users: list[str] = Field(default=[])
lnbits_allowed_users: List[str] = Field(default=[]) lnbits_allowed_users: list[str] = Field(default=[])
lnbits_allow_new_accounts: bool = Field(default=True) lnbits_allow_new_accounts: bool = Field(default=True)
@property @property
@ -46,9 +46,9 @@ class UsersSettings(LNbitsSettings):
class ExtensionsSettings(LNbitsSettings): class ExtensionsSettings(LNbitsSettings):
lnbits_admin_extensions: List[str] = Field(default=[]) lnbits_admin_extensions: list[str] = Field(default=[])
lnbits_extensions_deactivate_all: bool = Field(default=False) lnbits_extensions_deactivate_all: bool = Field(default=False)
lnbits_extensions_manifests: List[str] = Field( lnbits_extensions_manifests: list[str] = Field(
default=[ default=[
"https://raw.githubusercontent.com/lnbits/lnbits-extensions/main/extensions.json" "https://raw.githubusercontent.com/lnbits/lnbits-extensions/main/extensions.json"
] ]
@ -56,18 +56,18 @@ class ExtensionsSettings(LNbitsSettings):
class ExtensionsInstallSettings(LNbitsSettings): class ExtensionsInstallSettings(LNbitsSettings):
lnbits_extensions_default_install: List[str] = Field(default=[]) lnbits_extensions_default_install: list[str] = Field(default=[])
# required due to GitHUb rate-limit # required due to GitHUb rate-limit
lnbits_ext_github_token: str = Field(default="") lnbits_ext_github_token: str = Field(default="")
class InstalledExtensionsSettings(LNbitsSettings): class InstalledExtensionsSettings(LNbitsSettings):
# installed extensions that have been deactivated # installed extensions that have been deactivated
lnbits_deactivated_extensions: List[str] = Field(default=[]) lnbits_deactivated_extensions: list[str] = Field(default=[])
# upgraded extensions that require API redirects # upgraded extensions that require API redirects
lnbits_upgraded_extensions: List[str] = Field(default=[]) lnbits_upgraded_extensions: list[str] = Field(default=[])
# list of redirects that extensions want to perform # list of redirects that extensions want to perform
lnbits_extensions_redirects: List[Any] = Field(default=[]) lnbits_extensions_redirects: list[Any] = Field(default=[])
def extension_upgrade_path(self, ext_id: str) -> Optional[str]: def extension_upgrade_path(self, ext_id: str) -> Optional[str]:
return next( return next(
@ -85,7 +85,7 @@ class ThemesSettings(LNbitsSettings):
lnbits_site_tagline: str = Field(default="free and open-source lightning wallet") lnbits_site_tagline: str = Field(default="free and open-source lightning wallet")
lnbits_site_description: str = Field(default=None) lnbits_site_description: str = Field(default=None)
lnbits_default_wallet_name: str = Field(default="LNbits wallet") lnbits_default_wallet_name: str = Field(default="LNbits wallet")
lnbits_theme_options: List[str] = Field( lnbits_theme_options: list[str] = Field(
default=[ default=[
"classic", "classic",
"freedom", "freedom",
@ -102,7 +102,7 @@ class ThemesSettings(LNbitsSettings):
default="https://shop.lnbits.com/;/static/images/lnbits-shop-light.png;/static/images/lnbits-shop-dark.png" default="https://shop.lnbits.com/;/static/images/lnbits-shop-light.png;/static/images/lnbits-shop-dark.png"
) # sneaky sneaky ) # sneaky sneaky
lnbits_ad_space_enabled: bool = Field(default=False) lnbits_ad_space_enabled: bool = Field(default=False)
lnbits_allowed_currencies: List[str] = Field(default=[]) lnbits_allowed_currencies: list[str] = Field(default=[])
lnbits_default_accounting_currency: Optional[str] = Field(default=None) lnbits_default_accounting_currency: Optional[str] = Field(default=None)
lnbits_qr_logo: str = Field(default="/static/images/logos/lnbits.png") lnbits_qr_logo: str = Field(default="/static/images/logos/lnbits.png")
@ -122,8 +122,8 @@ class OpsSettings(LNbitsSettings):
class SecuritySettings(LNbitsSettings): class SecuritySettings(LNbitsSettings):
lnbits_rate_limit_no: str = Field(default="200") lnbits_rate_limit_no: str = Field(default="200")
lnbits_rate_limit_unit: str = Field(default="minute") lnbits_rate_limit_unit: str = Field(default="minute")
lnbits_allowed_ips: List[str] = Field(default=[]) lnbits_allowed_ips: list[str] = Field(default=[])
lnbits_blocked_ips: List[str] = Field(default=[]) lnbits_blocked_ips: list[str] = Field(default=[])
lnbits_notifications: bool = Field(default=False) lnbits_notifications: bool = Field(default=False)
lnbits_killswitch: bool = Field(default=False) lnbits_killswitch: bool = Field(default=False)
lnbits_killswitch_interval: int = Field(default=60) lnbits_killswitch_interval: int = Field(default=60)
@ -286,7 +286,7 @@ class AuthMethods(Enum):
class AuthSettings(LNbitsSettings): class AuthSettings(LNbitsSettings):
auth_token_expire_minutes: int = Field(default=525600) auth_token_expire_minutes: int = Field(default=525600)
auth_all_methods = [a.value for a in AuthMethods] auth_all_methods = [a.value for a in AuthMethods]
auth_allowed_methods: List[str] = Field( auth_allowed_methods: list[str] = Field(
default=[ default=[
AuthMethods.user_id_only.value, AuthMethods.user_id_only.value,
AuthMethods.username_and_password.value, AuthMethods.username_and_password.value,
@ -396,7 +396,7 @@ class PersistenceSettings(LNbitsSettings):
class SuperUserSettings(LNbitsSettings): class SuperUserSettings(LNbitsSettings):
lnbits_allowed_funding_sources: List[str] = Field( lnbits_allowed_funding_sources: list[str] = Field(
default=[ default=[
"VoidWallet", "VoidWallet",
"FakeWallet", "FakeWallet",
@ -452,7 +452,7 @@ class ReadOnlySettings(
class Settings(EditableSettings, ReadOnlySettings, TransientSettings, BaseSettings): class Settings(EditableSettings, ReadOnlySettings, TransientSettings, BaseSettings):
@classmethod @classmethod
def from_row(cls, row: Row) -> "Settings": def from_row(cls, row: Row) -> Settings:
data = dict(row) data = dict(row)
return cls(**data) return cls(**data)
@ -477,7 +477,7 @@ class SuperSettings(EditableSettings):
class AdminSettings(EditableSettings): class AdminSettings(EditableSettings):
is_super_user: bool is_super_user: bool
lnbits_allowed_funding_sources: Optional[List[str]] lnbits_allowed_funding_sources: Optional[list[str]]
def set_cli_settings(**kwargs): def set_cli_settings(**kwargs):

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, AsyncGenerator, Coroutine, NamedTuple, Optional, Type from typing import TYPE_CHECKING, AsyncGenerator, Coroutine, NamedTuple, Optional
if TYPE_CHECKING: if TYPE_CHECKING:
from lnbits.nodes.base import Node from lnbits.nodes.base import Node
@ -96,7 +96,7 @@ class Wallet(ABC):
async def cleanup(self): async def cleanup(self):
pass pass
__node_cls__: Optional[Type[Node]] = None __node_cls__: Optional[type[Node]] = None
@abstractmethod @abstractmethod
def status(self) -> Coroutine[None, None, StatusResponse]: def status(self) -> Coroutine[None, None, StatusResponse]:

View File

@ -37,7 +37,7 @@ class FakeWallet(Wallet):
privkey: str = hashlib.pbkdf2_hmac( privkey: str = hashlib.pbkdf2_hmac(
"sha256", "sha256",
secret.encode(), secret.encode(),
("FakeWallet").encode(), b"FakeWallet",
2048, 2048,
32, 32,
).hex() ).hex()

View File

@ -169,8 +169,10 @@ extend-exclude = [
# A - flake8-builtins # A - flake8-builtins
# C - mccabe # C - mccabe
# N - naming # N - naming
select = ["F", "E", "W", "I", "A", "C", "N"] # UP - pyupgrade
ignore = [] select = ["F", "E", "W", "I", "A", "C", "N", "UP"]
# UP007: pyupgrade: use X | Y instead of Optional. (python3.10)
ignore = ["UP007"]
# Allow autofix for all enabled rules (when `--fix`) is provided. # Allow autofix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"] fixable = ["ALL"]

View File

@ -17,7 +17,7 @@ assert os.getenv("OPENAI_API_KEY"), "OPENAI_API_KEY env var not set"
def load_language(lang: str) -> dict: def load_language(lang: str) -> dict:
s = open(f"lnbits/static/i18n/{lang}.js", "rt").read() s = open(f"lnbits/static/i18n/{lang}.js").read()
prefix = "window.localisation.%s = {\n" % lang prefix = "window.localisation.%s = {\n" % lang
assert s.startswith(prefix) assert s.startswith(prefix)
s = s[len(prefix) - 2 :] s = s[len(prefix) - 2 :]
@ -27,7 +27,7 @@ def load_language(lang: str) -> dict:
def save_language(lang: str, data) -> None: def save_language(lang: str, data) -> None:
with open(f"lnbits/static/i18n/{lang}.js", "wt") as f: with open(f"lnbits/static/i18n/{lang}.js", "w") as f:
f.write("window.localisation.%s = {\n" % lang) f.write("window.localisation.%s = {\n" % lang)
row = 0 row = 0
for k, v in data.items(): for k, v in data.items():

View File

@ -17,7 +17,7 @@ def get_translation_ids_from_source():
p2 = re.compile(r'\$t\("([^"]*)"') p2 = re.compile(r'\$t\("([^"]*)"')
ids = [] ids = []
for fn in files: for fn in files:
with open(fn, "rt") as f: with open(fn) as f:
text = f.read() text = f.read()
m1 = re.findall(p1, text) m1 = re.findall(p1, text)
m2 = re.findall(p2, text) m2 = re.findall(p2, text)
@ -30,7 +30,7 @@ def get_translation_ids_from_source():
def get_translation_ids_for_language(language): def get_translation_ids_for_language(language):
ids = [] ids = []
for line in open(f"lnbits/static/i18n/{language}.js", "rt"): for line in open(f"lnbits/static/i18n/{language}.js"):
# extract ids from lines like that start with exactly 2 spaces # extract ids from lines like that start with exactly 2 spaces
if line.startswith(" ") and not line.startswith(" "): if line.startswith(" ") and not line.startswith(" "):
m = line[2:].split(":")[0] m = line[2:].split(":")[0]