Usage reports (#118)

---------

Co-authored-by: amohamdy99 <a.mohamdy99@gmail.com>
This commit is contained in:
Chris Weaver
2024-06-22 16:39:54 -07:00
parent 5c12a3e872
commit f0b2b57d81
16 changed files with 797 additions and 11 deletions

View File

@@ -14,6 +14,7 @@ from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import global_version
from ee.danswer.background.user_group_sync import name_user_group_sync_task
from ee.danswer.db.user_group import fetch_user_groups
from ee.danswer.server.reporting.usage_export_generation import create_new_usage_report
from ee.danswer.user_groups.sync import sync_user_groups
logger = setup_logger()
@@ -76,6 +77,20 @@ def check_for_user_groups_sync_task() -> None:
register_task(task.id, task_name, db_session)
@celery_app.task(
name="autogenerate_usage_report_task",
soft_time_limit=JOB_TIMEOUT,
)
def autogenerate_usage_report_task() -> None:
"""This generates usage report under the /admin/generate-usage/report endpoint"""
with Session(get_sqlalchemy_engine()) as db_session:
create_new_usage_report(
db_session=db_session,
user_id=None,
period=None,
)
#####
# Celery Beat (Periodic Tasks) Settings
#####
@@ -84,5 +99,9 @@ celery_app.conf.beat_schedule = {
"task": "check_for_user_groups_sync_task",
"schedule": timedelta(seconds=5),
},
"autogenerate_usage_report": {
"task": "autogenerate_usage_report_task",
"schedule": timedelta(days=30), # TODO: change this to config flag
},
**(celery_app.conf.beat_schedule or {}),
}

View File

@@ -2,6 +2,8 @@ import datetime
from typing import Literal
from sqlalchemy import asc
from sqlalchemy import BinaryExpression
from sqlalchemy import ColumnElement
from sqlalchemy import desc
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
@@ -17,16 +19,22 @@ def fetch_chat_sessions_eagerly_by_time(
start: datetime.datetime,
end: datetime.datetime,
db_session: Session,
ascending: bool = False,
limit: int | None = 500,
initial_id: int | None = None,
) -> list[ChatSession]:
time_order = asc(ChatSession.time_created) if ascending else desc(ChatSession.time_created) # type: ignore
id_order = desc(ChatSession.id) # type: ignore
time_order = desc(ChatSession.time_created) # type: ignore
message_order = asc(ChatMessage.id) # type: ignore
filters: list[ColumnElement | BinaryExpression] = [
ChatSession.time_created.between(start, end)
]
if initial_id:
filters.append(ChatSession.id < initial_id)
subquery = (
db_session.query(ChatSession.id, ChatSession.time_created)
.filter(ChatSession.time_created.between(start, end))
.order_by(desc(ChatSession.id), time_order)
.filter(*filters)
.order_by(id_order, time_order)
.distinct(ChatSession.id)
.limit(limit)
.subquery()
@@ -34,7 +42,7 @@ def fetch_chat_sessions_eagerly_by_time(
query = (
db_session.query(ChatSession)
.join(subquery, ChatSession.id == subquery.c.id)
.join(subquery, ChatSession.id == subquery.c.id) # type: ignore
.outerjoin(ChatMessage, ChatSession.id == ChatMessage.chat_session_id)
.options(
joinedload(ChatSession.user),

View File

@@ -0,0 +1,108 @@
import uuid
from collections.abc import Generator
from datetime import datetime
from typing import IO
from fastapi_users_db_sqlalchemy import UUID_ID
from sqlalchemy.orm import Session
from danswer.configs.constants import MessageType
from danswer.db.models import UsageReport
from danswer.file_store.file_store import get_default_file_store
from ee.danswer.db.query_history import fetch_chat_sessions_eagerly_by_time
from ee.danswer.server.reporting.usage_export_models import ChatMessageSkeleton
from ee.danswer.server.reporting.usage_export_models import FlowType
from ee.danswer.server.reporting.usage_export_models import UsageReportMetadata
# Gets skeletons of all message
def get_empty_chat_messages_entries__paginated(
db_session: Session,
period: tuple[datetime, datetime],
limit: int | None = 1,
initial_id: int | None = None,
) -> list[ChatMessageSkeleton]:
chat_sessions = fetch_chat_sessions_eagerly_by_time(
period[0], period[1], db_session, limit=limit, initial_id=initial_id
)
message_skeletons: list[ChatMessageSkeleton] = []
for chat_session in chat_sessions:
if chat_session.one_shot:
flow_type = FlowType.SEARCH
elif chat_session.danswerbot_flow:
flow_type = FlowType.SLACK
else:
flow_type = FlowType.CHAT
for message in chat_session.messages:
# only count user messages
if message.message_type != MessageType.USER:
continue
message_skeletons.append(
ChatMessageSkeleton(
message_id=chat_session.id,
chat_session_id=chat_session.id,
user_id=str(chat_session.user_id) if chat_session.user_id else None,
flow_type=flow_type,
time_sent=message.time_sent,
)
)
return message_skeletons
def get_all_empty_chat_message_entries(
db_session: Session,
period: tuple[datetime, datetime],
) -> Generator[list[ChatMessageSkeleton], None, None]:
initial_id = None
while True:
message_skeletons = get_empty_chat_messages_entries__paginated(
db_session, period, initial_id=initial_id
)
if not message_skeletons:
return
yield message_skeletons
initial_id = message_skeletons[-1].message_id
def get_all_usage_reports(db_session: Session) -> list[UsageReportMetadata]:
return [
UsageReportMetadata(
report_name=r.report_name,
requestor=str(r.requestor_user_id) if r.requestor_user_id else None,
time_created=r.time_created,
period_from=r.period_from,
period_to=r.period_to,
)
for r in db_session.query(UsageReport).all()
]
def get_usage_report_data(
db_session: Session,
report_name: str,
) -> IO:
file_store = get_default_file_store(db_session)
# usage report may be very large, so don't load it all into memory
return file_store.read_file(file_name=report_name, mode="b", use_tempfile=True)
def write_usage_report(
db_session: Session,
report_name: str,
user_id: uuid.UUID | UUID_ID | None,
period: tuple[datetime, datetime] | None,
) -> UsageReport:
new_report = UsageReport(
report_name=report_name,
requestor_user_id=user_id,
period_from=period[0] if period else None,
period_to=period[1] if period else None,
)
db_session.add(new_report)
db_session.commit()
return new_report

View File

@@ -33,6 +33,7 @@ from ee.danswer.server.query_and_chat.query_backend import (
basic_router as query_router,
)
from ee.danswer.server.query_history.api import router as query_history_router
from ee.danswer.server.reporting.usage_export_api import router as usage_export_router
from ee.danswer.server.saml import router as saml_router
from ee.danswer.server.seeding import seed_db
from ee.danswer.server.token_rate_limits.api import (
@@ -97,6 +98,7 @@ def get_ee_application() -> FastAPI:
application, token_rate_limit_settings_router
)
include_router_with_global_prefix_prepended(application, enterprise_settings_router)
include_router_with_global_prefix_prepended(application, usage_export_router)
# Ensure all routes have auth enabled or are explicitly marked as public
check_ee_router_auth(application)

View File

@@ -0,0 +1,82 @@
from collections.abc import Generator
from datetime import datetime
from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from fastapi import Response
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from danswer.auth.users import current_admin_user
from danswer.db.engine import get_session
from danswer.db.models import User
from danswer.file_store.constants import STANDARD_CHUNK_SIZE
from ee.danswer.db.usage_export import get_all_usage_reports
from ee.danswer.db.usage_export import get_usage_report_data
from ee.danswer.db.usage_export import UsageReportMetadata
from ee.danswer.server.reporting.usage_export_generation import create_new_usage_report
router = APIRouter()
class GenerateUsageReportParams(BaseModel):
period_from: str | None = None
period_to: str | None = None
@router.post("/admin/generate-usage-report")
def generate_report(
params: GenerateUsageReportParams,
user: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> UsageReportMetadata:
period = None
if params.period_from and params.period_to:
try:
period = (
datetime.fromisoformat(params.period_from),
datetime.fromisoformat(params.period_to),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
new_report = create_new_usage_report(db_session, user.id if user else None, period)
return new_report
@router.get("/admin/usage-report/{report_name}")
def read_usage_report(
report_name: str,
_: User | None = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> Response:
try:
file = get_usage_report_data(db_session, report_name)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
def iterfile() -> Generator[bytes, None, None]:
while True:
chunk = file.read(STANDARD_CHUNK_SIZE)
if not chunk:
break
yield chunk
return StreamingResponse(
content=iterfile(),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={report_name}"},
)
@router.get("/admin/usage-report")
def fetch_usage_reports(
_: User | None = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> list[UsageReportMetadata]:
try:
return get_all_usage_reports(db_session)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))

View File

@@ -0,0 +1,165 @@
import csv
import tempfile
import uuid
import zipfile
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from fastapi_users_db_sqlalchemy import UUID_ID
from sqlalchemy.orm import Session
from danswer.auth.schemas import UserStatus
from danswer.configs.constants import FileOrigin
from danswer.db.users import list_users
from danswer.file_store.constants import MAX_IN_MEMORY_SIZE
from danswer.file_store.file_store import FileStore
from danswer.file_store.file_store import get_default_file_store
from ee.danswer.db.usage_export import get_all_empty_chat_message_entries
from ee.danswer.db.usage_export import write_usage_report
from ee.danswer.server.reporting.usage_export_models import UsageReportMetadata
from ee.danswer.server.reporting.usage_export_models import UserSkeleton
def generate_chat_messages_report(
db_session: Session,
file_store: FileStore,
report_id: str,
period: tuple[datetime, datetime] | None,
) -> str:
file_name = f"{report_id}_chat_sessions"
if period is None:
period = (
datetime.fromtimestamp(0, tz=timezone.utc),
datetime.now(tz=timezone.utc),
)
else:
# time-picker sends a time which is at the beginning of the day
# so we need to add one day to the end time to make it inclusive
period = (
period[0],
period[1] + timedelta(days=1),
)
with tempfile.SpooledTemporaryFile(
max_size=MAX_IN_MEMORY_SIZE, mode="w+"
) as temp_file:
csvwriter = csv.writer(temp_file, delimiter=",")
csvwriter.writerow(["session_id", "user_id", "flow_type", "time_sent"])
for chat_message_skeleton_batch in get_all_empty_chat_message_entries(
db_session, period
):
for chat_message_skeleton in chat_message_skeleton_batch:
csvwriter.writerow(
[
chat_message_skeleton.chat_session_id,
chat_message_skeleton.user_id,
chat_message_skeleton.flow_type,
chat_message_skeleton.time_sent.isoformat(),
]
)
# after writing seek to begining of buffer
temp_file.seek(0)
file_store.save_file(
file_name=file_name,
content=temp_file,
display_name=file_name,
file_origin=FileOrigin.OTHER,
file_type="text/csv",
)
return file_name
def generate_user_report(
db_session: Session,
file_store: FileStore,
report_id: str,
) -> str:
file_name = f"{report_id}_users"
with tempfile.SpooledTemporaryFile(
max_size=MAX_IN_MEMORY_SIZE, mode="w+"
) as temp_file:
csvwriter = csv.writer(temp_file, delimiter=",")
csvwriter.writerow(["user_id", "status"])
users = list_users(db_session)
for user in users:
user_skeleton = UserSkeleton(
user_id=str(user.id),
status=UserStatus.LIVE if user.is_active else UserStatus.DEACTIVATED,
)
csvwriter.writerow([user_skeleton.user_id, user_skeleton.status])
temp_file.seek(0)
file_store.save_file(
file_name=file_name,
content=temp_file,
display_name=file_name,
file_origin=FileOrigin.OTHER,
file_type="text/csv",
)
return file_name
def create_new_usage_report(
db_session: Session,
user_id: UUID_ID | None, # None = auto-generated
period: tuple[datetime, datetime] | None,
) -> UsageReportMetadata:
report_id = str(uuid.uuid4())
file_store = get_default_file_store(db_session)
messages_filename = generate_chat_messages_report(
db_session, file_store, report_id, period
)
users_filename = generate_user_report(db_session, file_store, report_id)
with tempfile.SpooledTemporaryFile(max_size=MAX_IN_MEMORY_SIZE) as zip_buffer:
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
# write messages
chat_messages_tmpfile = file_store.read_file(
messages_filename, mode="b", use_tempfile=True
)
zip_file.writestr(
"chat_messages.csv",
chat_messages_tmpfile.read(),
)
# write users
users_tmpfile = file_store.read_file(
users_filename, mode="b", use_tempfile=True
)
zip_file.writestr("users.csv", users_tmpfile.read())
zip_buffer.seek(0)
# store zip blob to file_store
report_name = (
f"{datetime.now(tz=timezone.utc).strftime('%Y-%m-%d')}"
f"_{report_id}_usage_report.zip"
)
file_store.save_file(
file_name=report_name,
content=zip_buffer,
display_name=report_name,
file_origin=FileOrigin.GENERATED_REPORT,
file_type="application/zip",
)
# add report after zip file is written
new_report = write_usage_report(db_session, report_name, user_id, period)
return UsageReportMetadata(
report_name=new_report.report_name,
requestor=(
str(new_report.requestor_user_id) if new_report.requestor_user_id else None
),
time_created=new_report.time_created,
period_from=new_report.period_from,
period_to=new_report.period_to,
)

View File

@@ -0,0 +1,33 @@
from datetime import datetime
from enum import Enum
from pydantic import BaseModel
from danswer.auth.schemas import UserStatus
class FlowType(str, Enum):
CHAT = "chat"
SEARCH = "search"
SLACK = "slack"
class ChatMessageSkeleton(BaseModel):
message_id: int
chat_session_id: int
user_id: str | None
flow_type: FlowType
time_sent: datetime
class UserSkeleton(BaseModel):
user_id: str
status: UserStatus
class UsageReportMetadata(BaseModel):
report_name: str
requestor: str | None
time_created: datetime
period_from: datetime | None # None = All time
period_to: datetime | None