query history pagination tests (#3700)

* dummy pr

* Update prompts.yaml

* fixed tests and added query history pagination test

* done

* fixed

* utils!
This commit is contained in:
hagen-danswer 2025-01-18 13:28:03 -08:00 committed by GitHub
parent eec3ce8162
commit 896e716d02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 407 additions and 117 deletions

View File

@ -134,7 +134,7 @@ def get_user_chat_sessions(
@router.get("/admin/chat-session-history")
def get_chat_session_history(
page_num: int = Query(0, ge=0),
page_size: int = Query(10, ge=10),
page_size: int = Query(10, ge=1),
feedback_type: QAFeedbackType | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,

View File

@ -1,18 +1,13 @@
import json
from datetime import datetime
from urllib.parse import urlencode
from uuid import UUID
import requests
from requests.models import Response
from ee.onyx.server.query_history.models import ChatSessionMinimal
from onyx.configs.constants import QAFeedbackType
from onyx.context.search.models import RetrievalDetails
from onyx.file_store.models import FileDescriptor
from onyx.llm.override_models import LLMOverride
from onyx.llm.override_models import PromptOverride
from onyx.server.documents.models import PaginatedReturn
from onyx.server.query_and_chat.models import ChatSessionCreationRequest
from onyx.server.query_and_chat.models import CreateChatMessageRequest
from tests.integration.common_utils.constants import API_SERVER_URL
@ -140,35 +135,23 @@ class ChatSessionManager:
]
@staticmethod
def get_chat_session_history(
user_performing_action: DATestUser,
page_size: int,
page_num: int,
feedback_type: QAFeedbackType | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
) -> PaginatedReturn[ChatSessionMinimal]:
query_params = {
"page_num": page_num,
"page_size": page_size,
"feedback_type": feedback_type if feedback_type else None,
"start_time": start_time if start_time else None,
"end_time": end_time if end_time else None,
}
# Remove None values
query_params = {
key: value for key, value in query_params.items() if value is not None
}
response = requests.get(
f"{API_SERVER_URL}/admin/chat-session-history?{urlencode(query_params, doseq=True)}",
headers=user_performing_action.headers,
def create_chat_message_feedback(
message_id: int,
is_positive: bool,
user_performing_action: DATestUser | None = None,
feedback_text: str | None = None,
predefined_feedback: str | None = None,
) -> None:
response = requests.post(
url=f"{API_SERVER_URL}/chat/create-chat-message-feedback",
json={
"chat_message_id": message_id,
"is_positive": is_positive,
"feedback_text": feedback_text,
"predefined_feedback": predefined_feedback,
},
headers=user_performing_action.headers
if user_performing_action
else GENERAL_HEADERS,
)
response.raise_for_status()
data = response.json()
return PaginatedReturn(
items=[
ChatSessionMinimal(**chat_session) for chat_session in data["items"]
],
total_items=data["total_items"],
)

View File

@ -0,0 +1,84 @@
from datetime import datetime
from urllib.parse import urlencode
from uuid import UUID
import requests
from requests.models import CaseInsensitiveDict
from ee.onyx.server.query_history.models import ChatSessionMinimal
from ee.onyx.server.query_history.models import ChatSessionSnapshot
from onyx.configs.constants import QAFeedbackType
from onyx.server.documents.models import PaginatedReturn
from tests.integration.common_utils.constants import API_SERVER_URL
from tests.integration.common_utils.constants import GENERAL_HEADERS
from tests.integration.common_utils.test_models import DATestUser
class QueryHistoryManager:
@staticmethod
def get_query_history_page(
page_num: int = 0,
page_size: int = 10,
feedback_type: QAFeedbackType | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
user_performing_action: DATestUser | None = None,
) -> PaginatedReturn[ChatSessionMinimal]:
query_params: dict[str, str | int] = {
"page_num": page_num,
"page_size": page_size,
}
if feedback_type:
query_params["feedback_type"] = feedback_type.value
if start_time:
query_params["start_time"] = start_time.isoformat()
if end_time:
query_params["end_time"] = end_time.isoformat()
response = requests.get(
url=f"{API_SERVER_URL}/admin/chat-session-history?{urlencode(query_params, doseq=True)}",
headers=user_performing_action.headers
if user_performing_action
else GENERAL_HEADERS,
)
response.raise_for_status()
data = response.json()
return PaginatedReturn(
items=[ChatSessionMinimal(**item) for item in data["items"]],
total_items=data["total_items"],
)
@staticmethod
def get_chat_session_admin(
chat_session_id: UUID | str,
user_performing_action: DATestUser | None = None,
) -> ChatSessionSnapshot:
response = requests.get(
url=f"{API_SERVER_URL}/admin/chat-session-history/{chat_session_id}",
headers=user_performing_action.headers
if user_performing_action
else GENERAL_HEADERS,
)
response.raise_for_status()
return ChatSessionSnapshot(**response.json())
@staticmethod
def get_query_history_as_csv(
start_time: datetime | None = None,
end_time: datetime | None = None,
user_performing_action: DATestUser | None = None,
) -> tuple[CaseInsensitiveDict[str], str]:
query_params: dict[str, str | int] = {}
if start_time:
query_params["start"] = start_time.isoformat()
if end_time:
query_params["end"] = end_time.isoformat()
response = requests.get(
url=f"{API_SERVER_URL}/admin/query-history-csv?{urlencode(query_params, doseq=True)}",
headers=user_performing_action.headers
if user_performing_action
else GENERAL_HEADERS,
)
response.raise_for_status()
return response.headers, response.content.decode()

View File

@ -213,17 +213,16 @@ class UserManager:
is_active_filter: bool | None = None,
user_performing_action: DATestUser | None = None,
) -> PaginatedReturn[FullUserSnapshot]:
query_params = {
query_params: dict[str, str | list[str] | int] = {
"page_num": page_num,
"page_size": page_size,
"q": search_query if search_query else None,
"roles": [role.value for role in role_filter] if role_filter else None,
"is_active": is_active_filter if is_active_filter is not None else None,
}
# Remove None values
query_params = {
key: value for key, value in query_params.items() if value is not None
}
if search_query:
query_params["q"] = search_query
if role_filter:
query_params["roles"] = [role.value for role in role_filter]
if is_active_filter is not None:
query_params["is_active"] = is_active_filter
response = requests.get(
url=f"{API_SERVER_URL}/manage/users/accepted?{urlencode(query_params, doseq=True)}",

View File

@ -6,6 +6,7 @@ from pydantic import BaseModel
from pydantic import Field
from onyx.auth.schemas import UserRole
from onyx.configs.constants import QAFeedbackType
from onyx.context.search.enums import RecencyBiasSetting
from onyx.db.enums import AccessType
from onyx.server.documents.models import DocumentSource
@ -130,18 +131,21 @@ class DATestPersona(BaseModel):
label_ids: list[int]
#
class DATestChatMessage(BaseModel):
id: int
chat_session_id: UUID
parent_message_id: int | None
message: str
class DATestChatSession(BaseModel):
id: UUID
persona_id: int
description: str
class DATestChatMessage(BaseModel):
id: int
chat_session_id: UUID
parent_message_id: int | None
message: str
class DAQueryHistoryEntry(DATestChatSession):
feedback_type: QAFeedbackType | None
class StreamedResponse(BaseModel):

View File

@ -3,16 +3,15 @@ from datetime import timedelta
from datetime import timezone
import pytest
import requests
from onyx.configs.constants import QAFeedbackType
from onyx.configs.constants import SessionType
from tests.integration.common_utils.constants import API_SERVER_URL
from tests.integration.common_utils.managers.api_key import APIKeyManager
from tests.integration.common_utils.managers.cc_pair import CCPairManager
from tests.integration.common_utils.managers.chat import ChatSessionManager
from tests.integration.common_utils.managers.document import DocumentManager
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
from tests.integration.common_utils.managers.query_history import QueryHistoryManager
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestUser
@ -69,66 +68,52 @@ def test_chat_history_endpoints(
) -> None:
admin_user, first_chat_id = setup_chat_session
response = requests.get(
f"{API_SERVER_URL}/admin/chat-session-history",
headers=admin_user.headers,
# Get chat history
history_response = QueryHistoryManager.get_query_history_page(
user_performing_action=admin_user
)
assert response.status_code == 200
history_response = response.json()
# Verify we got back the one chat session we created
assert len(history_response) == 1
assert len(history_response.items) == 1
# Verify the first chat session details
first_session = history_response[0]
assert first_session["user_email"] == admin_user.email
assert first_session["name"] == "Test chat session"
assert first_session["first_user_message"] == "What was the Q1 revenue?"
assert first_session["first_ai_message"] is not None
assert first_session["assistant_id"] == 0
assert first_session["feedback_type"] is None
assert first_session["flow_type"] == SessionType.CHAT.value
assert first_session["conversation_length"] == 4 # 2 User messages + 2 AI responses
first_session = history_response.items[0]
assert first_session.user_email == admin_user.email
assert first_session.name == "Test chat session"
assert first_session.first_user_message == "What was the Q1 revenue?"
assert first_session.first_ai_message is not None
assert first_session.assistant_id == 0
assert first_session.feedback_type is None
assert first_session.flow_type == SessionType.CHAT
assert first_session.conversation_length == 4 # 2 User messages + 2 AI responses
# Test date filtering - should return no results
past_end = datetime.now(tz=timezone.utc) - timedelta(days=1)
past_start = past_end - timedelta(days=1)
response = requests.get(
f"{API_SERVER_URL}/admin/chat-session-history",
params={
"start": past_start.isoformat(),
"end": past_end.isoformat(),
},
headers=admin_user.headers,
history_response = QueryHistoryManager.get_query_history_page(
start_time=past_start,
end_time=past_end,
user_performing_action=admin_user,
)
assert response.status_code == 200
history_response = response.json()
assert len(history_response) == 0
assert len(history_response.items) == 0
# Test get specific chat session endpoint
response = requests.get(
f"{API_SERVER_URL}/admin/chat-session-history/{first_chat_id}",
headers=admin_user.headers,
session_details = QueryHistoryManager.get_chat_session_admin(
chat_session_id=first_chat_id,
user_performing_action=admin_user,
)
assert response.status_code == 200
session_details = response.json()
# Verify the session details
assert session_details["id"] == first_chat_id
assert len(session_details["messages"]) > 0
assert session_details["flow_type"] == SessionType.CHAT.value
assert str(session_details.id) == first_chat_id
assert len(session_details.messages) > 0
assert session_details.flow_type == SessionType.CHAT
# Test filtering by feedback
response = requests.get(
f"{API_SERVER_URL}/admin/chat-session-history",
params={
"feedback_type": QAFeedbackType.LIKE.value,
},
headers=admin_user.headers,
history_response = QueryHistoryManager.get_query_history_page(
feedback_type=QAFeedbackType.LIKE,
user_performing_action=admin_user,
)
assert response.status_code == 200
history_response = response.json()
assert len(history_response) == 0
assert len(history_response.items) == 0
def test_chat_history_csv_export(
@ -137,16 +122,13 @@ def test_chat_history_csv_export(
admin_user, _ = setup_chat_session
# Test CSV export endpoint with date filtering
response = requests.get(
f"{API_SERVER_URL}/admin/query-history-csv",
headers=admin_user.headers,
headers, csv_content = QueryHistoryManager.get_query_history_as_csv(
user_performing_action=admin_user,
)
assert response.status_code == 200
assert response.headers["Content-Type"] == "text/csv; charset=utf-8"
assert "Content-Disposition" in response.headers
assert headers["Content-Type"] == "text/csv; charset=utf-8"
assert "Content-Disposition" in headers
# Verify CSV content
csv_content = response.content.decode()
csv_lines = csv_content.strip().split("\n")
assert len(csv_lines) == 3 # Header + 2 QA pairs
assert "chat_session_id" in csv_content
@ -158,15 +140,10 @@ def test_chat_history_csv_export(
# Test CSV export with date filtering - should return no results
past_end = datetime.now(tz=timezone.utc) - timedelta(days=1)
past_start = past_end - timedelta(days=1)
response = requests.get(
f"{API_SERVER_URL}/admin/query-history-csv",
params={
"start": past_start.isoformat(),
"end": past_end.isoformat(),
},
headers=admin_user.headers,
headers, csv_content = QueryHistoryManager.get_query_history_as_csv(
start_time=past_start,
end_time=past_end,
user_performing_action=admin_user,
)
assert response.status_code == 200
csv_content = response.content.decode()
csv_lines = csv_content.strip().split("\n")
assert len(csv_lines) == 1 # Only header, no data rows

View File

@ -0,0 +1,112 @@
from datetime import datetime
from onyx.configs.constants import QAFeedbackType
from tests.integration.common_utils.managers.query_history import QueryHistoryManager
from tests.integration.common_utils.test_models import DAQueryHistoryEntry
from tests.integration.common_utils.test_models import DATestUser
from tests.integration.tests.query_history.utils import (
setup_chat_sessions_with_different_feedback,
)
def _verify_query_history_pagination(
chat_sessions: list[DAQueryHistoryEntry],
page_size: int = 5,
feedback_type: QAFeedbackType | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
user_performing_action: DATestUser | None = None,
) -> None:
retrieved_sessions: list[str] = []
for i in range(0, len(chat_sessions), page_size):
paginated_result = QueryHistoryManager.get_query_history_page(
page_num=i // page_size,
page_size=page_size,
feedback_type=feedback_type,
start_time=start_time,
end_time=end_time,
user_performing_action=user_performing_action,
)
# Verify that the total items is equal to the length of the chat sessions list
assert paginated_result.total_items == len(chat_sessions)
# Verify that the number of items in the page is equal to the page size
assert len(paginated_result.items) == min(page_size, len(chat_sessions) - i)
# Add the retrieved chat sessions to the list of retrieved sessions
retrieved_sessions.extend(
[str(session.id) for session in paginated_result.items]
)
# Create a set of all the expected chat session IDs
all_expected_sessions = set(str(session.id) for session in chat_sessions)
# Create a set of all the retrieved chat session IDs
all_retrieved_sessions = set(retrieved_sessions)
# Verify that the set of retrieved sessions is equal to the set of expected sessions
assert all_expected_sessions == all_retrieved_sessions
def test_query_history_pagination(reset: None) -> None:
(
admin_user,
chat_sessions_by_feedback_type,
) = setup_chat_sessions_with_different_feedback()
all_chat_sessions = []
for _, chat_sessions in chat_sessions_by_feedback_type.items():
all_chat_sessions.extend(chat_sessions)
# Verify basic pagination with different page sizes
print("Verifying basic pagination with page size 5")
_verify_query_history_pagination(
chat_sessions=all_chat_sessions,
page_size=5,
user_performing_action=admin_user,
)
print("Verifying basic pagination with page size 10")
_verify_query_history_pagination(
chat_sessions=all_chat_sessions,
page_size=10,
user_performing_action=admin_user,
)
print("Verifying pagination with feedback type LIKE")
liked_sessions = chat_sessions_by_feedback_type[QAFeedbackType.LIKE]
_verify_query_history_pagination(
chat_sessions=liked_sessions,
feedback_type=QAFeedbackType.LIKE,
user_performing_action=admin_user,
)
print("Verifying pagination with feedback type DISLIKE")
disliked_sessions = chat_sessions_by_feedback_type[QAFeedbackType.DISLIKE]
_verify_query_history_pagination(
chat_sessions=disliked_sessions,
feedback_type=QAFeedbackType.DISLIKE,
user_performing_action=admin_user,
)
print("Verifying pagination with feedback type MIXED")
mixed_sessions = chat_sessions_by_feedback_type[QAFeedbackType.MIXED]
_verify_query_history_pagination(
chat_sessions=mixed_sessions,
feedback_type=QAFeedbackType.MIXED,
user_performing_action=admin_user,
)
# Test with a small page size to verify handling of partial pages
print("Verifying pagination with page size 3")
_verify_query_history_pagination(
chat_sessions=all_chat_sessions,
page_size=3,
user_performing_action=admin_user,
)
# Test with a page size larger than the total number of items
print("Verifying pagination with page size 50")
_verify_query_history_pagination(
chat_sessions=all_chat_sessions,
page_size=50,
user_performing_action=admin_user,
)

View File

@ -0,0 +1,131 @@
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
from onyx.configs.constants import QAFeedbackType
from tests.integration.common_utils.managers.api_key import APIKeyManager
from tests.integration.common_utils.managers.cc_pair import CCPairManager
from tests.integration.common_utils.managers.chat import ChatSessionManager
from tests.integration.common_utils.managers.document import DocumentManager
from tests.integration.common_utils.managers.llm_provider import LLMProviderManager
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DAQueryHistoryEntry
from tests.integration.common_utils.test_models import DATestUser
def _create_chat_session_with_feedback(
admin_user: DATestUser,
i: int,
feedback_type: QAFeedbackType | None,
) -> tuple[QAFeedbackType | None, DAQueryHistoryEntry]:
print(f"Creating chat session {i} with feedback type {feedback_type}")
# Create chat session with timestamp spread over 30 days
chat_session = ChatSessionManager.create(
persona_id=0,
description=f"Test chat session {i}",
user_performing_action=admin_user,
)
test_session = DAQueryHistoryEntry(
id=chat_session.id,
persona_id=0,
description=f"Test chat session {i}",
feedback_type=feedback_type,
)
# First message in chat
ChatSessionManager.send_message(
chat_session_id=chat_session.id,
message=f"Question {i}?",
user_performing_action=admin_user,
)
messages = ChatSessionManager.get_chat_history(
chat_session=chat_session,
user_performing_action=admin_user,
)
if feedback_type == QAFeedbackType.MIXED or feedback_type == QAFeedbackType.DISLIKE:
ChatSessionManager.create_chat_message_feedback(
message_id=messages[-1].id,
is_positive=False,
user_performing_action=admin_user,
)
# Second message with different feedback types
ChatSessionManager.send_message(
chat_session_id=chat_session.id,
message=f"Follow up {i}?",
user_performing_action=admin_user,
parent_message_id=messages[-1].id,
)
# Get updated messages to get the ID of the second message
messages = ChatSessionManager.get_chat_history(
chat_session=chat_session,
user_performing_action=admin_user,
)
if feedback_type == QAFeedbackType.MIXED or feedback_type == QAFeedbackType.LIKE:
ChatSessionManager.create_chat_message_feedback(
message_id=messages[-1].id,
is_positive=True,
user_performing_action=admin_user,
)
return feedback_type, test_session
def setup_chat_sessions_with_different_feedback() -> (
tuple[DATestUser, dict[QAFeedbackType | None, list[DAQueryHistoryEntry]]]
):
# Create admin user and required resources
admin_user: DATestUser = UserManager.create(name="admin_user")
cc_pair = CCPairManager.create_from_scratch(user_performing_action=admin_user)
api_key = APIKeyManager.create(user_performing_action=admin_user)
LLMProviderManager.create(user_performing_action=admin_user)
# Seed a document
cc_pair.documents = []
cc_pair.documents.append(
DocumentManager.seed_doc_with_content(
cc_pair=cc_pair,
content="The company's revenue in Q1 was $1M",
api_key=api_key,
)
)
chat_sessions_by_feedback_type: dict[
QAFeedbackType | None, list[DAQueryHistoryEntry]
] = {}
# Use ThreadPoolExecutor to create chat sessions in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit all tasks and store futures
j = 0
# Will result in 40 sessions
number_of_sessions = 10
futures = []
for feedback_type in [
QAFeedbackType.MIXED,
QAFeedbackType.LIKE,
QAFeedbackType.DISLIKE,
None,
]:
futures.extend(
[
executor.submit(
_create_chat_session_with_feedback,
admin_user,
(j * number_of_sessions) + i,
feedback_type,
)
for i in range(number_of_sessions)
]
)
j += 1
# Collect results in order
for future in as_completed(futures):
feedback_type, chat_session = future.result()
chat_sessions_by_feedback_type.setdefault(feedback_type, []).append(
chat_session
)
return admin_user, chat_sessions_by_feedback_type

View File

@ -43,12 +43,6 @@ def _verify_user_pagination(
assert all_expected_emails == all_retrieved_emails
def _verify_user_role_and_status(users: list) -> None:
for user in users:
assert UserManager.is_role(user, user.role)
assert UserManager.is_status(user, user.is_active)
def test_user_pagination(reset: None) -> None:
# Create an admin user to perform actions
user_performing_action: DATestUser = UserManager.create(
@ -108,7 +102,13 @@ def test_user_pagination(reset: None) -> None:
+ inactive_admins
+ searchable_curators
)
_verify_user_role_and_status(all_users)
for user in all_users:
# Verify that the user's role in the db matches
# the role in the user object
assert UserManager.is_role(user, user.role)
# Verify that the user's status in the db matches
# the status in the user object
assert UserManager.is_status(user, user.is_active)
# Verify pagination
_verify_user_pagination(