feat: cleanup on library dir creation and upload endpoints (#3069)

This commit is contained in:
dni ⚡ 2025-04-01 09:27:11 +02:00 committed by GitHub
parent bafb4ddf75
commit 1323a2005b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 208 additions and 111 deletions

View File

@ -163,13 +163,13 @@ def create_app() -> FastAPI:
core_app_extra.register_new_ratelimiter = register_new_ratelimiter(app)
# register static files
static_path = Path("lnbits", "static")
static = StaticFiles(directory=static_path)
app.mount("/static", static, name="static")
images_path = os.path.abspath(os.path.join(settings.lnbits_data_folder, "images"))
os.makedirs(images_path, exist_ok=True)
app.mount("/library", StaticFiles(directory=images_path), name="library")
app.mount("/static", StaticFiles(directory=Path("lnbits", "static")), name="static")
Path(settings.lnbits_data_folder, "images").mkdir(parents=True, exist_ok=True)
app.mount(
"/library",
StaticFiles(directory=Path(settings.lnbits_data_folder, "images")),
name="library",
)
g().base_url = f"http://{settings.host}:{settings.port}"

View File

@ -46,3 +46,8 @@ class SimpleItem(BaseModel):
class DbVersion(BaseModel):
db: str
version: int
class Image(BaseModel):
filename: str
directory: str = "library"

View File

@ -31,7 +31,12 @@
class="q-pt-md q-pb-md row items-center justify-between"
>
<small
><div class="text-caption ellipsis" v-text="image.filename"></div
><div
class="text-caption ellipsis"
style="max-width: 100px"
:title="image.filename"
v-text="image.filename"
></div
></small>
<q-btn
dense
@ -43,11 +48,13 @@
><q-tooltip>Copy image link</q-tooltip></q-btn
>
<q-btn
color="negative"
icon="delete"
dense
flat
size="sm"
icon="delete"
color="negative"
@click="deleteImage(image.filename)"
:title="$t('delete')"
><q-tooltip>Delete image</q-tooltip></q-btn
>
</q-card-section>

View File

@ -1,19 +1,19 @@
import glob
import imghdr
import os
import time
from http import HTTPStatus
from io import BytesIO
from shutil import make_archive
from pathlib import Path
from shutil import make_archive, move
from subprocess import Popen
from typing import Optional
from tempfile import NamedTemporaryFile
from typing import IO, Optional
from urllib.parse import urlparse
import shortuuid
from fastapi import APIRouter, Depends, File, HTTPException, Path, UploadFile
import filetype
from fastapi import APIRouter, Depends, File, Header, HTTPException, UploadFile
from fastapi.responses import FileResponse
from lnbits.core.models import User
from lnbits.core.models.misc import Image, SimpleStatus
from lnbits.core.models.notifications import NotificationType
from lnbits.core.services import (
enqueue_notification,
@ -23,6 +23,7 @@ from lnbits.core.services import (
from lnbits.core.services.notifications import send_email_notification
from lnbits.core.services.settings import dict_to_settings
from lnbits.decorators import check_admin, check_super_user
from lnbits.helpers import safe_upload_file_path
from lnbits.server import server_restart
from lnbits.settings import AdminSettings, Settings, UpdateSettings, settings
from lnbits.tasks import invoice_listeners
@ -171,40 +172,53 @@ async def api_download_backup() -> FileResponse:
status_code=HTTPStatus.OK,
dependencies=[Depends(check_admin)],
)
async def upload_image(file: UploadFile = file_upload):
if not file or not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
async def upload_image(
file: UploadFile = file_upload,
content_length: int = Header(..., le=settings.lnbits_upload_size_bytes),
) -> Image:
if not file.filename:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="No filename provided."
)
ext = file.filename.split(".")[-1].lower()
if ext not in {"png", "jpg", "jpeg", "gif"}:
raise HTTPException(status_code=400, detail="Unsupported file type")
# validate file types
file_info = filetype.guess(file.file)
if file_info is None:
raise HTTPException(
status_code=HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
detail="Unable to determine file type",
)
detected_content_type = file_info.extension.lower()
if (
file.content_type not in settings.lnbits_upload_allowed_types
or detected_content_type not in settings.lnbits_upload_allowed_types
):
raise HTTPException(HTTPStatus.UNSUPPORTED_MEDIA_TYPE, "Unsupported file type")
contents = BytesIO()
total_size = 0
max_size = 500000
while chunk := await file.read(1024 * 1024):
total_size += len(chunk)
if total_size > max_size:
# validate file name
try:
file_path = safe_upload_file_path(file.filename)
except ValueError as e:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=f"The requested filename '{file.filename}' is forbidden.",
) from e
# validate file size
real_file_size = 0
temp: IO = NamedTemporaryFile(delete=False)
for chunk in file.file:
real_file_size += len(chunk)
if real_file_size > content_length:
raise HTTPException(
status_code=413, detail=f"File too large ({max_size / 1000} KB max)"
status_code=HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
detail=f"File too large ({content_length / 1000} KB max)",
)
contents.write(chunk)
temp.write(chunk)
temp.close()
contents.seek(0)
kind = imghdr.what(None, h=contents.read(512))
if kind not in {"png", "jpeg", "gif"}:
raise HTTPException(status_code=400, detail="Invalid image file")
contents.seek(0)
filename = f"{shortuuid.uuid()[:5]}.{ext}"
image_folder = os.path.join(settings.lnbits_data_folder, "images")
file_path = os.path.join(image_folder, filename)
with open(file_path, "wb") as f:
f.write(contents.read())
return {"filename": filename, "url": f"{settings.lnbits_baseurl}library/{filename}"}
move(temp.name, file_path)
return Image(filename=file.filename)
@admin_router.get(
@ -212,23 +226,13 @@ async def upload_image(file: UploadFile = file_upload):
status_code=HTTPStatus.OK,
dependencies=[Depends(check_admin)],
)
async def list_uploaded_images():
image_folder = os.path.join(settings.lnbits_data_folder, "images")
if not os.path.exists(image_folder):
return []
files = glob.glob(os.path.join(image_folder, "*"))
async def list_uploaded_images() -> list[Image]:
image_folder = Path(settings.lnbits_data_folder, "images")
files = image_folder.glob("*")
images = []
for file_path in files:
if os.path.isfile(file_path):
filename = os.path.basename(file_path)
images.append(
{
"filename": filename,
"url": f"{settings.lnbits_baseurl}library/{filename}",
}
)
for file in files:
if file.is_file():
images.append(Image(filename=file.name))
return images
@ -237,18 +241,17 @@ async def list_uploaded_images():
status_code=HTTPStatus.OK,
dependencies=[Depends(check_admin)],
)
async def delete_uploaded_image(
filename: str = Path(..., description="Name of the image file to delete")
):
image_folder = os.path.join(settings.lnbits_data_folder, "images")
file_path = os.path.join(image_folder, filename)
async def delete_uploaded_image(filename: str) -> SimpleStatus:
try:
file_path = safe_upload_file_path(filename)
except ValueError as e:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail=f"The requested filename '{filename}' is forbidden.",
) from e
# Prevent dir traversal attack
if not os.path.abspath(file_path).startswith(os.path.abspath(image_folder)):
raise HTTPException(status_code=400, detail="Invalid filename")
if not file_path.exists():
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Image not found.")
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="Image not found")
os.remove(file_path)
return {"status": "success", "message": f"{filename} deleted"}
file_path.unlink()
return SimpleStatus(success=True, message=f"{filename} deleted")

View File

@ -347,3 +347,14 @@ def path_segments(path: str) -> list[str]:
def normalize_path(path: Optional[str]) -> str:
path = path or ""
return "/" + "/".join(path_segments(path))
def safe_upload_file_path(filename: str, directory: str = "images") -> Path:
image_folder = Path(settings.lnbits_data_folder, directory)
file_path = image_folder / filename
# Prevent dir traversal attack
if image_folder.resolve() not in file_path.resolve().parents:
raise ValueError("Unsafe filename.")
# Prevent filename with subdirectories
file_path = image_folder / filename.split("/")[-1]
return file_path.resolve()

View File

@ -271,6 +271,23 @@ class ThemesSettings(LNbitsSettings):
class OpsSettings(LNbitsSettings):
lnbits_baseurl: str = Field(default="http://127.0.0.1:5000/")
lnbits_hide_api: bool = Field(default=False)
lnbits_upload_size_bytes: int = Field(default=512_000, ge=0) # 500kb
lnbits_upload_allowed_types: list[str] = Field(
default=[
"image/png",
"image/jpeg",
"image/jpg",
"image/heic",
"image/heif",
"image/heics",
"png",
"jpeg",
"jpg",
"heic",
"heif",
"heics",
]
)
class FeeSettings(LNbitsSettings):

View File

@ -560,59 +560,57 @@ window.AdminPageLogic = {
this.uploadImage(file)
}
},
async uploadImage(file) {
uploadImage(file) {
const formData = new FormData()
formData.append('file', file)
try {
const response = await LNbits.api.request(
LNbits.api
.request(
'POST',
'/admin/api/v1/images',
this.g.user.wallets[0].adminkey,
formData,
{headers: {'Content-Type': 'multipart/form-data'}}
)
this.$q.notify({
type: 'positive',
message: 'Image uploaded!',
icon: null
.then(() => {
this.$q.notify({
type: 'positive',
message: 'Image uploaded!',
icon: null
})
this.getUploadedImages()
})
await this.getUploadedImages()
} catch (error) {
LNbits.utils.notifyApiError(error)
}
.catch(LNbits.utils.notifyApiError)
},
async getUploadedImages() {
try {
const response = await LNbits.api.request(
'GET',
'/admin/api/v1/images',
this.g.user.wallets[0].inkey
)
this.library_images = response.data
} catch (error) {
LNbits.utils.notifyApiError(error)
}
getUploadedImages() {
LNbits.api
.request('GET', '/admin/api/v1/images', this.g.user.wallets[0].inkey)
.then(response => {
this.library_images = response.data.map(image => ({
...image,
url: `${window.origin}/${image.directory}/${image.filename}`
}))
})
.catch(LNbits.utils.notifyApiError)
},
async deleteImage(filename) {
deleteImage(filename) {
LNbits.utils
.confirmDialog('Are you sure you want to delete this image?')
.onOk(async () => {
try {
await LNbits.api.request(
.onOk(() => {
LNbits.api
.request(
'DELETE',
`/admin/api/v1/images/${filename}`,
this.g.user.wallets[0].adminkey
)
this.$q.notify({
type: 'positive',
message: 'Image deleted!',
icon: null
.then(() => {
this.$q.notify({
type: 'positive',
message: 'Image deleted!',
icon: null
})
this.getUploadedImages()
})
await this.getUploadedImages()
} catch (error) {
LNbits.utils.notifyApiError(error)
}
.catch(LNbits.utils.notifyApiError)
})
},
downloadBackup() {

13
poetry.lock generated
View File

@ -1083,6 +1083,17 @@ docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1
testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"]
typing = ["typing-extensions (>=4.8)"]
[[package]]
name = "filetype"
version = "1.2.0"
description = "Infer file type and MIME type of any file/buffer. No external dependencies."
optional = false
python-versions = "*"
files = [
{file = "filetype-1.2.0-py2.py3-none-any.whl", hash = "sha256:7ce71b6880181241cf7ac8697a2f1eb6a8bd9b429f7ad6d27b8db9ba5f1c2d25"},
{file = "filetype-1.2.0.tar.gz", hash = "sha256:66b56cd6474bf41d8c54660347d37afcc3f7d1970648de365c102ef77548aadb"},
]
[[package]]
name = "greenlet"
version = "3.0.3"
@ -3403,4 +3414,4 @@ liquid = ["wallycore"]
[metadata]
lock-version = "2.0"
python-versions = "~3.12 | ~3.11 | ~3.10"
content-hash = "96dd180aaa4fbfeb34fa6f9647c8684fce183a72b1b41d22101a9dd4b962fa2e"
content-hash = "f56154a228bfd11ca92c1818dd2b7d71ff67b218225103f4b701e6523ba499ed"

View File

@ -63,6 +63,8 @@ breez-sdk = {version = "0.6.6", optional = true}
jsonpath-ng = "^1.7.0"
pynostr = "^0.6.2"
python-multipart = "^0.0.20"
filetype = "^1.2.0"
[tool.poetry.extras]
breez = ["breez-sdk"]
liquid = ["wallycore"]
@ -145,6 +147,7 @@ module = [
"fastapi_sso.sso.*",
"json5.*",
"jsonpath_ng.*",
"filetype.*",
]
ignore_missing_imports = "True"

View File

@ -0,0 +1,42 @@
from pathlib import Path
import pytest
from lnbits.helpers import safe_upload_file_path
from lnbits.settings import settings
@pytest.mark.parametrize(
"filepath",
[
"test.txt",
"test/test.txt",
"test/test/test.txt",
"test/../test.txt",
"*/test.txt",
"test/**/test.txt",
"./test.txt",
],
)
def test_safe_upload_file_path(filepath: str):
safe_path = safe_upload_file_path(filepath)
assert safe_path.name == "test.txt"
# check if subdirectories got removed
images_folder = Path(settings.lnbits_data_folder) / "images"
assert images_folder.resolve() / "test.txt" == safe_path
@pytest.mark.parametrize(
"filepath",
[
"../test.txt",
"test/../../test.txt",
"../../test.txt",
"test/../../../test.txt",
"../../../test.txt",
],
)
def test_unsafe_upload_file_path(filepath: str):
with pytest.raises(ValueError):
safe_upload_file_path(filepath)