From c97570c6b92e84d21fa9d4c98e7bea2c77290b3f Mon Sep 17 00:00:00 2001 From: Lina Date: Tue, 28 Oct 2025 10:16:00 +0100 Subject: [PATCH 01/12] Query for last chat messages --- cognition_objects/message.py | 38 ++++++++++++++++++++++++++++++++++++ enums.py | 6 ++++++ 2 files changed, 44 insertions(+) diff --git a/cognition_objects/message.py b/cognition_objects/message.py index eab633b..9a31e57 100644 --- a/cognition_objects/message.py +++ b/cognition_objects/message.py @@ -1,5 +1,7 @@ from typing import Any, Dict, List, Optional, Union, Tuple from datetime import datetime + +from submodules.model.enums import MessageType from ..business_objects import general from ..session import session from ..models import CognitionMessage @@ -605,3 +607,39 @@ def get_count_by_project_id(project_id: str) -> int: ) .count() ) + + +def get_last_chat_messages( + organization_id: str, + project_id: str, + message_type: str, + starting_from: str, +) -> List[Any]: + + project_id = prevent_sql_injection(project_id, isinstance(project_id, str)) + organization_id = prevent_sql_injection( + organization_id, isinstance(organization_id, str) + ) + message_type = prevent_sql_injection(message_type, isinstance(message_type, str)) + starting_from = prevent_sql_injection(starting_from, isinstance(starting_from, str)) + + message_type_filter = "" + + if message_type == MessageType.WITH_ERROR.value: + message_type_filter = "AND c.error IS NOT NULL" + elif message_type == MessageType.WITHOUT_ERROR.value: + message_type_filter = "AND c.error IS NULL" + + query = f""" + SELECT m.created_at, m.created_by, m.question, m.answer, m.initiated_via, c.error + FROM cognition.message m + JOIN cognition.conversation c on c.id = m.conversation_id + JOIN "user" u on m.created_by = u.id + WHERE m.project_id = '{project_id}' + AND m.created_at >= '{starting_from}' + AND u.organization_id = '{organization_id}' + {message_type_filter} + """ + print(query) + + return general.execute_all(query) diff --git a/enums.py b/enums.py index 079a6bc..26a7ab3 100644 --- a/enums.py +++ b/enums.py @@ -1012,3 +1012,9 @@ class MessageInitiationType(Enum): UI = "UI" API = "API" MACRO = "MACRO" + + +class MessageType(Enum): + WITH_ERROR = "WITH_ERROR" + WITHOUT_ERROR = "WITHOUT_ERROR" + ALL = "ALL" From 560c122d5a572ab68e91ffe7db5cdf5997eed8c7 Mon Sep 17 00:00:00 2001 From: Lina Date: Tue, 28 Oct 2025 10:59:02 +0100 Subject: [PATCH 02/12] Fix in query for last chat messages --- cognition_objects/message.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cognition_objects/message.py b/cognition_objects/message.py index 9a31e57..aefc090 100644 --- a/cognition_objects/message.py +++ b/cognition_objects/message.py @@ -631,15 +631,15 @@ def get_last_chat_messages( message_type_filter = "AND c.error IS NULL" query = f""" - SELECT m.created_at, m.created_by, m.question, m.answer, m.initiated_via, c.error + SELECT m.created_at, m.created_by, m.question, m.initiated_via, c.error FROM cognition.message m - JOIN cognition.conversation c on c.id = m.conversation_id - JOIN "user" u on m.created_by = u.id + JOIN cognition.conversation c on c.id = m.conversation_id + JOIN cognition.project cp on cp.id = m.project_id WHERE m.project_id = '{project_id}' - AND m.created_at >= '{starting_from}' - AND u.organization_id = '{organization_id}' - {message_type_filter} + AND m.created_at >= '{starting_from}' + AND cp.organization_id = '{organization_id}' + {message_type_filter} + ORDER BY m.created_at DESC + LIMIT 10 """ - print(query) - return general.execute_all(query) From 4e83daeee884e0e1506d6d0f1ebcc887ab72081b Mon Sep 17 00:00:00 2001 From: Lina Date: Tue, 28 Oct 2025 15:51:37 +0100 Subject: [PATCH 03/12] Query change for grouping orgs and projects --- cognition_objects/message.py | 41 ++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/cognition_objects/message.py b/cognition_objects/message.py index aefc090..7edc268 100644 --- a/cognition_objects/message.py +++ b/cognition_objects/message.py @@ -610,36 +610,45 @@ def get_count_by_project_id(project_id: str) -> int: def get_last_chat_messages( - organization_id: str, - project_id: str, message_type: str, starting_from: str, + ending_to: Optional[str] = None, ) -> List[Any]: - project_id = prevent_sql_injection(project_id, isinstance(project_id, str)) - organization_id = prevent_sql_injection( - organization_id, isinstance(organization_id, str) - ) message_type = prevent_sql_injection(message_type, isinstance(message_type, str)) starting_from = prevent_sql_injection(starting_from, isinstance(starting_from, str)) + if ending_to: + ending_to = prevent_sql_injection(ending_to, isinstance(ending_to, str)) message_type_filter = "" + ending_to_filter = "" if message_type == MessageType.WITH_ERROR.value: message_type_filter = "AND c.error IS NOT NULL" elif message_type == MessageType.WITHOUT_ERROR.value: message_type_filter = "AND c.error IS NULL" + if ending_to: + ending_to_filter = f"AND m.created_at <= '{ending_to}'" query = f""" - SELECT m.created_at, m.created_by, m.question, m.initiated_via, c.error - FROM cognition.message m - JOIN cognition.conversation c on c.id = m.conversation_id - JOIN cognition.project cp on cp.id = m.project_id - WHERE m.project_id = '{project_id}' - AND m.created_at >= '{starting_from}' - AND cp.organization_id = '{organization_id}' - {message_type_filter} - ORDER BY m.created_at DESC - LIMIT 10 + SELECT * + FROM ( + SELECT m.created_at, m.created_by, m.question, m.answer, m.initiated_via, c.error, cp.id AS project_id, cp.name AS project_name, cp.organization_id, o.name AS organization_name, c.id AS conversation_id, + ROW_NUMBER() OVER ( + PARTITION BY cp.organization_id, cp.id + ORDER BY m.created_at DESC + ) AS rn + FROM cognition.message m + JOIN cognition.conversation c ON c.id = m.conversation_id + JOIN cognition.project cp ON cp.id = m.project_id + JOIN organization o ON o.id = cp.organization_id + WHERE + m.created_at >= '{starting_from}' + {message_type_filter} + {ending_to_filter} + ) sub + WHERE rn <= 5 + ORDER BY organization_id, project_id, created_at DESC """ + return general.execute_all(query) From c478218e1abb67d5feed1a78d7e6ca08650edbd7 Mon Sep 17 00:00:00 2001 From: Lina Date: Thu, 30 Oct 2025 15:03:30 +0100 Subject: [PATCH 04/12] Query for etl tasks --- cognition_objects/markdown_file.py | 42 ++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index 8b1e2d2..ce3dfd9 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -218,3 +218,45 @@ def delete_many(org_id: str, md_file_ids: List[str], with_commit: bool = True) - CognitionMarkdownFile.id.in_(md_file_ids), ).delete(synchronize_session=False) general.flush_or_commit(with_commit) + + +def get_last_etl_tasks( + states: List[str], + created_at_from: str, + created_at_to: Optional[str] = None, +) -> List[Any]: + + states = prevent_sql_injection(states, isinstance(states, list)) + created_at_from = prevent_sql_injection( + created_at_from, isinstance(created_at_from, str) + ) + if created_at_to: + created_at_to = prevent_sql_injection( + created_at_to, isinstance(created_at_to, str) + ) + created_at_to_filter = "" + + if created_at_to: + created_at_to_filter = f"AND mf.created_at <= '{created_at_to}'" + + query = f""" + SELECT * + FROM ( + SELECT mf.created_at, mf.created_by, mf.started_at, mf.finished_at, mf.file_name, mf.error, mf.state, md.id AS dataset_id, md.name AS dataset_name, md.organization_id, o.name AS organization_name, + ROW_NUMBER() OVER ( + PARTITION BY md.organization_id, md.id + ORDER BY mf.created_at DESC + ) AS rn + FROM cognition.markdown_file mf + JOIN cognition.markdown_dataset md ON md.id = mf.dataset_id + JOIN organization o ON o.id = md.organization_id + WHERE + mf.created_at >= '{created_at_from}' + AND mf.state IN ({', '.join(f"'{state}'" for state in states)}) + {created_at_to_filter} + ) sub + WHERE rn <= 5 + ORDER BY organization_id, dataset_id, created_at DESC + """ + + return general.execute_all(query) From 4d312585ba628f191ba579e8ac4e6cfa4950d390 Mon Sep 17 00:00:00 2001 From: Lina Date: Mon, 3 Nov 2025 15:00:53 +0100 Subject: [PATCH 05/12] Query for integrations tasks --- cognition_objects/integration.py | 68 +++++++++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 348045d..342cb60 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -333,12 +333,70 @@ def get_distinct_item_ids_for_all_permissions( def get_last_integrations_tasks() -> List[Dict[str, Any]]: query = f""" - SELECT * + WITH embedding_agg AS ( + SELECT + project_id, + jsonb_agg( + jsonb_build_object( + 'createdBy', e.created_by, + 'finishedAt', e.finished_at, + 'id', e.id, + 'name', e.name, + 'startedAt', e.started_at, + 'state', e.state + ) + ) AS embeddings + FROM embedding e + GROUP BY project_id + ), + + attribute_agg AS ( + SELECT + project_id, + jsonb_agg( + jsonb_build_object( + 'dataType', a.data_type, + 'finishedAt', a.finished_at, + 'id', a.id, + 'name', a.name, + 'startedAt', a.started_at, + 'state', a.state + ) + ) AS attributes + FROM "attribute" a + GROUP BY project_id + ), + + record_tokenization_task_agg AS ( + SELECT + project_id, + jsonb_agg( + jsonb_build_object( + 'finishedAt', rtt.finished_at, + 'id', rtt.id, + 'startedAt', rtt.started_at, + 'state', rtt.state, + 'type', rtt.type + ) + ) AS record_tokenization_tasks + FROM record_tokenization_task rtt + GROUP BY project_id + ) + + SELECT + i.id AS integration_id, i.name AS integration_name, i.error_message, i.started_at, i.finished_at, i.state, + o.id AS organization_id, + o.name AS organization_name, + jsonb_build_object( + 'embeddings', coalesce(ea.embeddings, '[]'::jsonb), + 'attributes', coalesce(aa.attributes, '[]'::jsonb), + 'record_tokenization_tasks', coalesce(rtt.record_tokenization_tasks, '[]'::jsonb) + ) AS full_data FROM cognition.integration i - JOIN project p ON p.id = i.project_id - JOIN embedding e ON e.project_id = p.id - JOIN "attribute" a ON a.project_id = p.id - JOIN record_tokenized rt ON rt.project_id = p.id + LEFT JOIN embedding_agg ea ON ea.project_id = i.project_id + LEFT JOIN attribute_agg aa ON aa.project_id = i.project_id + LEFT JOIN record_tokenization_task_agg rtt ON rtt.project_id = i.project_id + LEFT JOIN organization o ON o.id = i.organization_id """ return general.execute_all(query) From fbc76097572193a6bcbd3d3a666f1aecef3382cc Mon Sep 17 00:00:00 2001 From: Lina Date: Mon, 3 Nov 2025 16:42:58 +0100 Subject: [PATCH 06/12] Improved query for integrations tasks --- cognition_objects/integration.py | 130 ++++++++++++++++++------------- 1 file changed, 77 insertions(+), 53 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 342cb60..42e969b 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -334,69 +334,93 @@ def get_distinct_item_ids_for_all_permissions( def get_last_integrations_tasks() -> List[Dict[str, Any]]: query = f""" WITH embedding_agg AS ( - SELECT - project_id, - jsonb_agg( - jsonb_build_object( - 'createdBy', e.created_by, - 'finishedAt', e.finished_at, - 'id', e.id, - 'name', e.name, - 'startedAt', e.started_at, - 'state', e.state - ) - ) AS embeddings - FROM embedding e - GROUP BY project_id + SELECT + project_id, + jsonb_agg( + jsonb_build_object( + 'createdBy', e.created_by, + 'finishedAt', e.finished_at, + 'id', e.id, + 'name', e.name, + 'startedAt', e.started_at, + 'state', e.state + ) ORDER BY e.started_at DESC + ) AS embeddings + FROM embedding e + GROUP BY project_id ), attribute_agg AS ( - SELECT - project_id, - jsonb_agg( - jsonb_build_object( - 'dataType', a.data_type, - 'finishedAt', a.finished_at, - 'id', a.id, - 'name', a.name, - 'startedAt', a.started_at, - 'state', a.state - ) - ) AS attributes - FROM "attribute" a - GROUP BY project_id + SELECT + project_id, + jsonb_agg( + jsonb_build_object( + 'dataType', a.data_type, + 'finishedAt', a.finished_at, + 'id', a.id, + 'name', a.name, + 'startedAt', a.started_at, + 'state', a.state + ) ORDER BY a.started_at DESC + ) AS attributes + FROM "attribute" a + GROUP BY project_id ), record_tokenization_task_agg AS ( - SELECT - project_id, - jsonb_agg( + SELECT + project_id, + jsonb_agg( + jsonb_build_object( + 'finishedAt', rtt.finished_at, + 'id', rtt.id, + 'startedAt', rtt.started_at, + 'state', rtt.state, + 'type', rtt.type + ) ORDER BY rtt.started_at DESC + ) AS record_tokenization_tasks + FROM record_tokenization_task rtt + GROUP BY project_id + ), + + integration_data AS ( + SELECT + i.id AS integration_id, + i.name AS integration_name, + i.error_message, + i.started_at, + i.finished_at, + i.state, + i.organization_id, jsonb_build_object( - 'finishedAt', rtt.finished_at, - 'id', rtt.id, - 'startedAt', rtt.started_at, - 'state', rtt.state, - 'type', rtt.type - ) - ) AS record_tokenization_tasks - FROM record_tokenization_task rtt - GROUP BY project_id + 'embeddings', coalesce(ea.embeddings, '[]'::jsonb), + 'attributes', coalesce(aa.attributes, '[]'::jsonb), + 'record_tokenization_tasks', coalesce(rtt.record_tokenization_tasks, '[]'::jsonb) + ) AS full_data + FROM cognition.integration i + LEFT JOIN embedding_agg ea ON ea.project_id = i.project_id + LEFT JOIN attribute_agg aa ON aa.project_id = i.project_id + LEFT JOIN record_tokenization_task_agg rtt ON rtt.project_id = i.project_id ) SELECT - i.id AS integration_id, i.name AS integration_name, i.error_message, i.started_at, i.finished_at, i.state, - o.id AS organization_id, - o.name AS organization_name, - jsonb_build_object( - 'embeddings', coalesce(ea.embeddings, '[]'::jsonb), - 'attributes', coalesce(aa.attributes, '[]'::jsonb), - 'record_tokenization_tasks', coalesce(rtt.record_tokenization_tasks, '[]'::jsonb) - ) AS full_data - FROM cognition.integration i - LEFT JOIN embedding_agg ea ON ea.project_id = i.project_id - LEFT JOIN attribute_agg aa ON aa.project_id = i.project_id - LEFT JOIN record_tokenization_task_agg rtt ON rtt.project_id = i.project_id - LEFT JOIN organization o ON o.id = i.organization_id + o.id AS organization_id, + o.name AS organization_name, + jsonb_agg( + jsonb_build_object( + 'id', integration_id, + 'name', integration_name, + 'error_message', error_message, + 'started_at', i.started_at, + 'finished_at', finished_at, + 'state', state, + 'fullData', full_data + ) ORDER BY i.started_at DESC + ) AS integrations + FROM organization o + LEFT JOIN integration_data i ON i.organization_id = o.id + GROUP BY o.id, o.name + """ return general.execute_all(query) From 922207118f837664b287f9008f9babd1b88ab5bb Mon Sep 17 00:00:00 2001 From: Lina Date: Tue, 4 Nov 2025 11:20:23 +0100 Subject: [PATCH 07/12] Added project name to integrations tasks --- cognition_objects/integration.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 42e969b..8b95f74 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -343,10 +343,12 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: 'id', e.id, 'name', e.name, 'startedAt', e.started_at, - 'state', e.state + 'state', e.state, + 'projectName', p.name ) ORDER BY e.started_at DESC ) AS embeddings FROM embedding e + LEFT JOIN project p ON p.id = e.project_id GROUP BY project_id ), @@ -360,10 +362,12 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: 'id', a.id, 'name', a.name, 'startedAt', a.started_at, - 'state', a.state + 'state', a.state, + 'projectName', p.name ) ORDER BY a.started_at DESC ) AS attributes FROM "attribute" a + LEFT JOIN project p ON p.id = a.project_id GROUP BY project_id ), @@ -376,10 +380,12 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: 'id', rtt.id, 'startedAt', rtt.started_at, 'state', rtt.state, - 'type', rtt.type + 'type', rtt.type, + 'projectName', p.name ) ORDER BY rtt.started_at DESC ) AS record_tokenization_tasks FROM record_tokenization_task rtt + LEFT JOIN project p ON p.id = rtt.project_id GROUP BY project_id ), @@ -392,6 +398,7 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: i.finished_at, i.state, i.organization_id, + i.project_id, jsonb_build_object( 'embeddings', coalesce(ea.embeddings, '[]'::jsonb), 'attributes', coalesce(aa.attributes, '[]'::jsonb), @@ -414,13 +421,14 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: 'started_at', i.started_at, 'finished_at', finished_at, 'state', state, - 'fullData', full_data + 'fullData', full_data, + 'projectName', p.name ) ORDER BY i.started_at DESC ) AS integrations FROM organization o LEFT JOIN integration_data i ON i.organization_id = o.id - GROUP BY o.id, o.name - + LEFT JOIN project p ON p.id = i.project_id + GROUP BY o.id, o.name, p.name """ return general.execute_all(query) From 3b77e65c447d5a9f50bda673a120341e71648f9e Mon Sep 17 00:00:00 2001 From: Lina Date: Tue, 4 Nov 2025 16:05:36 +0100 Subject: [PATCH 08/12] Strategy info query --- cognition_objects/strategy.py | 42 ++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/cognition_objects/strategy.py b/cognition_objects/strategy.py index 298632b..0e0097f 100644 --- a/cognition_objects/strategy.py +++ b/cognition_objects/strategy.py @@ -1,6 +1,8 @@ -from typing import List, Optional +from typing import Any, List, Optional from datetime import datetime +from submodules.model.util import prevent_sql_injection + from ..business_objects import general from ..session import session from ..models import CognitionStrategy @@ -107,3 +109,41 @@ def delete_all_by_project_id(project_id: str, with_commit: bool = True) -> None: CognitionStrategy.project_id == project_id ).delete() general.flush_or_commit(with_commit) + + +def get_strategies_info( + step_types: List[str], + created_at_from: str, + created_at_to: Optional[str] = None, +) -> List[Any]: + + step_types = prevent_sql_injection(step_types, isinstance(step_types, list)) + created_at_from = prevent_sql_injection( + created_at_from, isinstance(created_at_from, str) + ) + if created_at_to: + created_at_to = prevent_sql_injection( + created_at_to, isinstance(created_at_to, str) + ) + created_at_to_filter = "" + + if created_at_to: + created_at_to_filter = f"AND ss.created_at <= '{created_at_to}'" + + query = f""" + SELECT + s.id as strategy_id, s.name as strategy_name, + ss.id as step_id, ss.created_by, ss.created_at, ss.name as step_name, ss.step_type , + p.name as project_name, p.id as project_id, + o.name as organization_name, o.id as organization_id + FROM cognition.strategy s + JOIN cognition.strategy_step ss on ss.strategy_id = s.id + JOIN cognition.project p on p.id = s.project_id + JOIN organization o on o.id = p.organization_id + WHERE ss.created_at >= '{created_at_from}' + AND ss.step_type IN ({', '.join(f"'{step_type}'" for step_type in step_types)}) + {created_at_to_filter} + ORDER BY s.id, ss.created_at DESC + """ + + return general.execute_all(query) From 7c38a50bf130ff4987c30b1da748b8168b43f40d Mon Sep 17 00:00:00 2001 From: Lina Date: Wed, 5 Nov 2025 14:14:11 +0100 Subject: [PATCH 09/12] Integrations tasks page improvements --- cognition_objects/integration.py | 136 +++++++++++++++++++++++-------- 1 file changed, 101 insertions(+), 35 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 8b95f74..ee13e61 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -11,6 +11,7 @@ CognitionIntegrationType, ) from ..util import prevent_sql_injection +from submodules.model import enums FINISHED_STATES = [ CognitionMarkdownFileState.FINISHED.value, @@ -333,9 +334,11 @@ def get_distinct_item_ids_for_all_permissions( def get_last_integrations_tasks() -> List[Dict[str, Any]]: query = f""" - WITH embedding_agg AS ( - SELECT - project_id, + WITH embeddings_by_state AS ( + SELECT + e.project_id, + e.state, + COUNT(*) AS count, jsonb_agg( jsonb_build_object( 'createdBy', e.created_by, @@ -343,18 +346,39 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: 'id', e.id, 'name', e.name, 'startedAt', e.started_at, - 'state', e.state, - 'projectName', p.name + 'state', e.state ) ORDER BY e.started_at DESC ) AS embeddings FROM embedding e LEFT JOIN project p ON p.id = e.project_id - GROUP BY project_id + GROUP BY e.project_id, e.state ), - attribute_agg AS ( - SELECT - project_id, + embedding_agg AS ( + SELECT + e.project_id, + ( + SELECT jsonb_object_agg( + ebs.state, + jsonb_build_object( + 'count', ebs.count, + 'embeddings', ebs.embeddings + ) + ) + FROM embeddings_by_state ebs + WHERE ebs.project_id = e.project_id + ) AS embeddings_by_state + + FROM embedding e + LEFT JOIN project p ON p.id = e.project_id + GROUP BY e.project_id + ), + + attribute_by_state AS ( + SELECT + a.project_id, + a.state, + COUNT(*) AS count, jsonb_agg( jsonb_build_object( 'dataType', a.data_type, @@ -362,31 +386,73 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: 'id', a.id, 'name', a.name, 'startedAt', a.started_at, - 'state', a.state, - 'projectName', p.name + 'state', a.state ) ORDER BY a.started_at DESC ) AS attributes - FROM "attribute" a + FROM attribute a LEFT JOIN project p ON p.id = a.project_id - GROUP BY project_id + WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED') + GROUP BY a.project_id, a.state ), - record_tokenization_task_agg AS ( - SELECT - project_id, + attribute_agg AS ( + SELECT + a.project_id, + ( + SELECT jsonb_object_agg( + abs.state, + jsonb_build_object( + 'count', abs.count, + 'attributes', abs.attributes + ) + ) + FROM attribute_by_state abs + WHERE abs.project_id = a.project_id + ) AS attributes_by_state + + FROM attribute a + LEFT JOIN project p ON p.id = a.project_id + WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED') + GROUP BY a.project_id + ), + + record_tokenization_tasks_by_state AS ( + SELECT + rtt.project_id, + rtt.state, + COUNT(*) AS count, jsonb_agg( jsonb_build_object( 'finishedAt', rtt.finished_at, 'id', rtt.id, 'startedAt', rtt.started_at, 'state', rtt.state, - 'type', rtt.type, - 'projectName', p.name + 'type', rtt.type ) ORDER BY rtt.started_at DESC ) AS record_tokenization_tasks FROM record_tokenization_task rtt LEFT JOIN project p ON p.id = rtt.project_id - GROUP BY project_id + GROUP BY rtt.project_id, rtt.state + ), + + record_tokenization_task_agg AS ( + SELECT + rtt.project_id, + ( + SELECT jsonb_object_agg( + rtts.state, + jsonb_build_object( + 'count', rtts.count, + 'record_tokenization_tasks', rtts.record_tokenization_tasks + ) + ) + FROM record_tokenization_tasks_by_state rtts + WHERE rtts.project_id = rtt.project_id + ) AS record_tokenization_tasks_by_state + + FROM record_tokenization_task rtt + LEFT JOIN project p ON p.id = rtt.project_id + GROUP BY rtt.project_id ), integration_data AS ( @@ -399,10 +465,12 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: i.state, i.organization_id, i.project_id, + i.created_by, + i.type, jsonb_build_object( - 'embeddings', coalesce(ea.embeddings, '[]'::jsonb), - 'attributes', coalesce(aa.attributes, '[]'::jsonb), - 'record_tokenization_tasks', coalesce(rtt.record_tokenization_tasks, '[]'::jsonb) + 'embeddingsByState', coalesce(ea.embeddings_by_state, '[]'::jsonb), + 'attributesByState', coalesce(aa.attributes_by_state, '[]'::jsonb), + 'recordTokenizationTasksByState', coalesce(rtt.record_tokenization_tasks_by_state, '[]'::jsonb) ) AS full_data FROM cognition.integration i LEFT JOIN embedding_agg ea ON ea.project_id = i.project_id @@ -413,22 +481,20 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: SELECT o.id AS organization_id, o.name AS organization_name, - jsonb_agg( - jsonb_build_object( - 'id', integration_id, - 'name', integration_name, - 'error_message', error_message, - 'started_at', i.started_at, - 'finished_at', finished_at, - 'state', state, - 'fullData', full_data, - 'projectName', p.name - ) ORDER BY i.started_at DESC - ) AS integrations + i.integration_id, + i.integration_name, + i.error_message, + i.started_at, + i.finished_at, + i.state, + i.full_data, + i.created_by, + i.type, + p.name AS project_name FROM organization o LEFT JOIN integration_data i ON i.organization_id = o.id LEFT JOIN project p ON p.id = i.project_id - GROUP BY o.id, o.name, p.name + ORDER BY o.id, i.started_at DESC """ return general.execute_all(query) From 5cd521b17b82b77934d6eb3bc7eabdc101586115 Mon Sep 17 00:00:00 2001 From: Lina Date: Wed, 5 Nov 2025 16:38:20 +0100 Subject: [PATCH 10/12] Strategu steps improvements --- cognition_objects/integration.py | 2 +- cognition_objects/strategy.py | 63 +++++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index ee13e61..9c7abe0 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -492,7 +492,7 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: i.type, p.name AS project_name FROM organization o - LEFT JOIN integration_data i ON i.organization_id = o.id + JOIN integration_data i ON i.organization_id = o.id LEFT JOIN project p ON p.id = i.project_id ORDER BY o.id, i.started_at DESC """ diff --git a/cognition_objects/strategy.py b/cognition_objects/strategy.py index 0e0097f..32313e1 100644 --- a/cognition_objects/strategy.py +++ b/cognition_objects/strategy.py @@ -6,7 +6,7 @@ from ..business_objects import general from ..session import session from ..models import CognitionStrategy -from ..enums import StrategyComplexity +from ..enums import StrategyComplexity, StrategyStepType def get(project_id: str, strategy_id: str) -> CognitionStrategy: @@ -130,20 +130,55 @@ def get_strategies_info( if created_at_to: created_at_to_filter = f"AND ss.created_at <= '{created_at_to}'" + step_types_sql = ", ".join([f"'{st}'" for st in step_types]) + query = f""" - SELECT - s.id as strategy_id, s.name as strategy_name, - ss.id as step_id, ss.created_by, ss.created_at, ss.name as step_name, ss.step_type , - p.name as project_name, p.id as project_id, - o.name as organization_name, o.id as organization_id - FROM cognition.strategy s - JOIN cognition.strategy_step ss on ss.strategy_id = s.id - JOIN cognition.project p on p.id = s.project_id - JOIN organization o on o.id = p.organization_id - WHERE ss.created_at >= '{created_at_from}' - AND ss.step_type IN ({', '.join(f"'{step_type}'" for step_type in step_types)}) - {created_at_to_filter} - ORDER BY s.id, ss.created_at DESC + WITH step_data AS ( + SELECT + s.id AS strategy_id, s.name AS strategy_name, + ss.id AS step_id, ss.created_by,ss.created_at, ss.name AS step_name, ss.step_type, + p.name AS project_name, p.id AS project_id, + o.name AS organization_name,o.id AS organization_id, + st.config::jsonb AS template_config + FROM cognition.strategy s + JOIN cognition.strategy_step ss ON ss.strategy_id = s.id + JOIN cognition.project p ON p.id = s.project_id + JOIN organization o ON o.id = p.organization_id + LEFT JOIN cognition.step_templates st ON st.id = (ss.config->>'templateId')::uuid + WHERE ss.created_at >= '{created_at_from}' + {created_at_to_filter} + ) + SELECT strategy_id, strategy_name, step_id, created_by, created_at, step_name, step_type, project_name,project_id, organization_name,organization_id, + CASE + WHEN step_type = '{StrategyStepType.TEMPLATED.value}' + AND EXISTS ( + SELECT 1 + FROM jsonb_array_elements(template_config->'steps') t + WHERE t->>'stepType' IN ({step_types_sql}) + ) + THEN ( + SELECT array_agg((t->>'stepType') || ':' || (t->>'stepName')) + FROM jsonb_array_elements(template_config->'steps') t + ) + WHEN step_type IN ({step_types_sql}) + THEN ARRAY[(step_type || ':' || step_name)] + ELSE NULL + END AS templated_step_names + FROM step_data + WHERE + step_type IN ({step_types_sql}) + OR ( + step_type = '{StrategyStepType.TEMPLATED.value}' + AND EXISTS ( + SELECT 1 + FROM jsonb_array_elements(template_config->'steps') t + WHERE t->>'stepType' IN ({step_types_sql}) + ) + ) + ORDER BY strategy_id, created_at DESC """ + if len(step_types) == 0: + return [] + return general.execute_all(query) From 3b62c82cb3d538feabb27efb97c8ea5c0460b366 Mon Sep 17 00:00:00 2001 From: Lina Date: Fri, 7 Nov 2025 12:38:14 +0100 Subject: [PATCH 11/12] PR comments --- cognition_objects/integration.py | 223 +++++++++++++---------------- cognition_objects/markdown_file.py | 9 +- cognition_objects/message.py | 6 +- cognition_objects/strategy.py | 8 +- 4 files changed, 115 insertions(+), 131 deletions(-) diff --git a/cognition_objects/integration.py b/cognition_objects/integration.py index 9c7abe0..c2999c8 100644 --- a/cognition_objects/integration.py +++ b/cognition_objects/integration.py @@ -334,125 +334,97 @@ def get_distinct_item_ids_for_all_permissions( def get_last_integrations_tasks() -> List[Dict[str, Any]]: query = f""" - WITH embeddings_by_state AS ( + WITH embedding_agg AS ( SELECT - e.project_id, - e.state, - COUNT(*) AS count, - jsonb_agg( + project_id, + jsonb_object_agg( + state, jsonb_build_object( - 'createdBy', e.created_by, - 'finishedAt', e.finished_at, - 'id', e.id, - 'name', e.name, - 'startedAt', e.started_at, - 'state', e.state - ) ORDER BY e.started_at DESC - ) AS embeddings - FROM embedding e - LEFT JOIN project p ON p.id = e.project_id - GROUP BY e.project_id, e.state - ), - - embedding_agg AS ( - SELECT - e.project_id, - ( - SELECT jsonb_object_agg( - ebs.state, - jsonb_build_object( - 'count', ebs.count, - 'embeddings', ebs.embeddings - ) + 'count', count, + 'embeddings', embeddings ) - FROM embeddings_by_state ebs - WHERE ebs.project_id = e.project_id ) AS embeddings_by_state - - FROM embedding e - LEFT JOIN project p ON p.id = e.project_id - GROUP BY e.project_id - ), - - attribute_by_state AS ( - SELECT - a.project_id, - a.state, - COUNT(*) AS count, - jsonb_agg( - jsonb_build_object( - 'dataType', a.data_type, - 'finishedAt', a.finished_at, - 'id', a.id, - 'name', a.name, - 'startedAt', a.started_at, - 'state', a.state - ) ORDER BY a.started_at DESC - ) AS attributes - FROM attribute a - LEFT JOIN project p ON p.id = a.project_id - WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED') - GROUP BY a.project_id, a.state + FROM ( + SELECT + e.project_id, + e.state, + COUNT(*) AS count, + jsonb_agg( + jsonb_build_object( + 'createdBy', e.created_by, + 'finishedAt', e.finished_at, + 'id', e.id, + 'name', e.name, + 'startedAt', e.started_at, + 'state', e.state + ) ORDER BY e.started_at DESC + ) AS embeddings + FROM embedding e + GROUP BY e.project_id, e.state + ) AS x + GROUP BY project_id ), attribute_agg AS ( SELECT - a.project_id, - ( - SELECT jsonb_object_agg( - abs.state, - jsonb_build_object( - 'count', abs.count, - 'attributes', abs.attributes - ) + project_id, + jsonb_object_agg( + state, + jsonb_build_object( + 'count', count, + 'attributes', attributes ) - FROM attribute_by_state abs - WHERE abs.project_id = a.project_id ) AS attributes_by_state - - FROM attribute a - LEFT JOIN project p ON p.id = a.project_id - WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED') - GROUP BY a.project_id - ), - - record_tokenization_tasks_by_state AS ( - SELECT - rtt.project_id, - rtt.state, - COUNT(*) AS count, - jsonb_agg( - jsonb_build_object( - 'finishedAt', rtt.finished_at, - 'id', rtt.id, - 'startedAt', rtt.started_at, - 'state', rtt.state, - 'type', rtt.type - ) ORDER BY rtt.started_at DESC - ) AS record_tokenization_tasks - FROM record_tokenization_task rtt - LEFT JOIN project p ON p.id = rtt.project_id - GROUP BY rtt.project_id, rtt.state + FROM ( + SELECT + a.project_id, + a.state, + COUNT(*) AS count, + jsonb_agg( + jsonb_build_object( + 'dataType', a.data_type, + 'finishedAt', a.finished_at, + 'id', a.id, + 'name', a.name, + 'startedAt', a.started_at, + 'state', a.state + ) ORDER BY a.started_at DESC + ) AS attributes + FROM attribute a + WHERE a.state NOT IN ('UPLOADED','AUTOMATICALLY_CREATED') + GROUP BY a.project_id, a.state + ) AS x + GROUP BY project_id ), record_tokenization_task_agg AS ( SELECT - rtt.project_id, - ( - SELECT jsonb_object_agg( - rtts.state, - jsonb_build_object( - 'count', rtts.count, - 'record_tokenization_tasks', rtts.record_tokenization_tasks - ) + project_id, + jsonb_object_agg( + state, + jsonb_build_object( + 'count', count, + 'record_tokenization_tasks', record_tokenization_tasks ) - FROM record_tokenization_tasks_by_state rtts - WHERE rtts.project_id = rtt.project_id ) AS record_tokenization_tasks_by_state - - FROM record_tokenization_task rtt - LEFT JOIN project p ON p.id = rtt.project_id - GROUP BY rtt.project_id + FROM ( + SELECT + rtt.project_id, + rtt.state, + COUNT(*) AS count, + jsonb_agg( + jsonb_build_object( + 'finishedAt', rtt.finished_at, + 'id', rtt.id, + 'startedAt', rtt.started_at, + 'state', rtt.state, + 'type', rtt.type + ) ORDER BY rtt.started_at DESC + ) AS record_tokenization_tasks + FROM record_tokenization_task rtt + GROUP BY rtt.project_id, rtt.state + ) AS x + GROUP BY project_id ), integration_data AS ( @@ -467,34 +439,41 @@ def get_last_integrations_tasks() -> List[Dict[str, Any]]: i.project_id, i.created_by, i.type, + o.name AS organization_name, + p.name AS project_name, jsonb_build_object( 'embeddingsByState', coalesce(ea.embeddings_by_state, '[]'::jsonb), 'attributesByState', coalesce(aa.attributes_by_state, '[]'::jsonb), 'recordTokenizationTasksByState', coalesce(rtt.record_tokenization_tasks_by_state, '[]'::jsonb) ) AS full_data FROM cognition.integration i - LEFT JOIN embedding_agg ea ON ea.project_id = i.project_id - LEFT JOIN attribute_agg aa ON aa.project_id = i.project_id - LEFT JOIN record_tokenization_task_agg rtt ON rtt.project_id = i.project_id + LEFT JOIN embedding_agg ea + ON ea.project_id = i.project_id + LEFT JOIN attribute_agg aa + ON aa.project_id = i.project_id + LEFT JOIN record_tokenization_task_agg rtt + ON rtt.project_id = i.project_id + JOIN organization o + ON o.id = i.organization_id + JOIN project p + ON p.id = i.project_id ) SELECT - o.id AS organization_id, - o.name AS organization_name, - i.integration_id, - i.integration_name, - i.error_message, - i.started_at, - i.finished_at, - i.state, - i.full_data, - i.created_by, - i.type, - p.name AS project_name - FROM organization o - JOIN integration_data i ON i.organization_id = o.id - LEFT JOIN project p ON p.id = i.project_id - ORDER BY o.id, i.started_at DESC + int_data.organization_id as organization_id, + int_data.organization_name as organization_name, + int_data.integration_id, + int_data.integration_name, + int_data.error_message, + int_data.started_at, + int_data.finished_at, + int_data.state, + int_data.full_data, + int_data.created_by, + int_data.type, + int_data.project_name + FROM integration_data int_data + ORDER BY int_data.organization_id, int_data.started_at DESC """ return general.execute_all(query) diff --git a/cognition_objects/markdown_file.py b/cognition_objects/markdown_file.py index ce3dfd9..9e76aef 100644 --- a/cognition_objects/markdown_file.py +++ b/cognition_objects/markdown_file.py @@ -226,7 +226,10 @@ def get_last_etl_tasks( created_at_to: Optional[str] = None, ) -> List[Any]: - states = prevent_sql_injection(states, isinstance(states, list)) + states = [prevent_sql_injection(st, isinstance(st, str)) for st in states] + if len(states) == 0: + return [] + created_at_from = prevent_sql_injection( created_at_from, isinstance(created_at_from, str) ) @@ -239,6 +242,8 @@ def get_last_etl_tasks( if created_at_to: created_at_to_filter = f"AND mf.created_at <= '{created_at_to}'" + states_filter_sql = ", ".join([f"'{state}'" for state in states]) + query = f""" SELECT * FROM ( @@ -252,7 +257,7 @@ def get_last_etl_tasks( JOIN organization o ON o.id = md.organization_id WHERE mf.created_at >= '{created_at_from}' - AND mf.state IN ({', '.join(f"'{state}'" for state in states)}) + AND mf.state IN ({states_filter_sql}) {created_at_to_filter} ) sub WHERE rn <= 5 diff --git a/cognition_objects/message.py b/cognition_objects/message.py index 7edc268..72bdc00 100644 --- a/cognition_objects/message.py +++ b/cognition_objects/message.py @@ -610,7 +610,7 @@ def get_count_by_project_id(project_id: str) -> int: def get_last_chat_messages( - message_type: str, + message_type: MessageType, starting_from: str, ending_to: Optional[str] = None, ) -> List[Any]: @@ -623,9 +623,9 @@ def get_last_chat_messages( message_type_filter = "" ending_to_filter = "" - if message_type == MessageType.WITH_ERROR.value: + if message_type == MessageType.WITH_ERROR: message_type_filter = "AND c.error IS NOT NULL" - elif message_type == MessageType.WITHOUT_ERROR.value: + elif message_type == MessageType.WITHOUT_ERROR: message_type_filter = "AND c.error IS NULL" if ending_to: ending_to_filter = f"AND m.created_at <= '{ending_to}'" diff --git a/cognition_objects/strategy.py b/cognition_objects/strategy.py index 32313e1..3980a30 100644 --- a/cognition_objects/strategy.py +++ b/cognition_objects/strategy.py @@ -117,7 +117,10 @@ def get_strategies_info( created_at_to: Optional[str] = None, ) -> List[Any]: - step_types = prevent_sql_injection(step_types, isinstance(step_types, list)) + step_types = [prevent_sql_injection(st, isinstance(st, str)) for st in step_types] + if len(step_types) == 0: + return [] + created_at_from = prevent_sql_injection( created_at_from, isinstance(created_at_from, str) ) @@ -178,7 +181,4 @@ def get_strategies_info( ORDER BY strategy_id, created_at DESC """ - if len(step_types) == 0: - return [] - return general.execute_all(query) From c0c6e2b3e02d177d52d1fb3236bdefc7a6bc5ed6 Mon Sep 17 00:00:00 2001 From: Lina Date: Fri, 7 Nov 2025 13:20:43 +0100 Subject: [PATCH 12/12] PR comments --- cognition_objects/strategy.py | 58 ++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/cognition_objects/strategy.py b/cognition_objects/strategy.py index 3980a30..ff2ff4d 100644 --- a/cognition_objects/strategy.py +++ b/cognition_objects/strategy.py @@ -141,43 +141,45 @@ def get_strategies_info( s.id AS strategy_id, s.name AS strategy_name, ss.id AS step_id, ss.created_by,ss.created_at, ss.name AS step_name, ss.step_type, p.name AS project_name, p.id AS project_id, - o.name AS organization_name,o.id AS organization_id, - st.config::jsonb AS template_config + o.name AS organization_name, o.id AS organization_id, + st.config::jsonb AS template_config, + CASE + WHEN ss.step_type = '{StrategyStepType.TEMPLATED.value}' AND st.config IS NOT NULL + THEN ARRAY( + SELECT (t->>'stepType') || ':' || (t->>'stepName') + FROM jsonb_array_elements((st.config->'steps')::jsonb) t + ) + ELSE NULL + END AS template_step_names, + CASE + WHEN ss.step_type = '{StrategyStepType.TEMPLATED.value}' AND st.config IS NOT NULL + THEN ARRAY( + SELECT t->>'stepType' + FROM jsonb_array_elements((st.config->'steps')::jsonb) t + ) + ELSE NULL + END AS template_step_types FROM cognition.strategy s - JOIN cognition.strategy_step ss ON ss.strategy_id = s.id - JOIN cognition.project p ON p.id = s.project_id - JOIN organization o ON o.id = p.organization_id - LEFT JOIN cognition.step_templates st ON st.id = (ss.config->>'templateId')::uuid + JOIN cognition.strategy_step ss + ON ss.strategy_id = s.id + JOIN cognition.project p + ON p.id = s.project_id + JOIN organization o + ON o.id = p.organization_id + LEFT JOIN cognition.step_templates st + ON st.id = (ss.config->>'templateId')::uuid WHERE ss.created_at >= '{created_at_from}' {created_at_to_filter} ) - SELECT strategy_id, strategy_name, step_id, created_by, created_at, step_name, step_type, project_name,project_id, organization_name,organization_id, + SELECT strategy_id, strategy_name, step_id, created_by, created_at, step_name, step_type, project_name, project_id, organization_name, organization_id, CASE - WHEN step_type = '{StrategyStepType.TEMPLATED.value}' - AND EXISTS ( - SELECT 1 - FROM jsonb_array_elements(template_config->'steps') t - WHERE t->>'stepType' IN ({step_types_sql}) - ) - THEN ( - SELECT array_agg((t->>'stepType') || ':' || (t->>'stepName')) - FROM jsonb_array_elements(template_config->'steps') t - ) - WHEN step_type IN ({step_types_sql}) - THEN ARRAY[(step_type || ':' || step_name)] - ELSE NULL + WHEN step_type = '{StrategyStepType.TEMPLATED.value}' THEN template_step_names + ELSE ARRAY[step_type || ':' || step_name] END AS templated_step_names FROM step_data WHERE step_type IN ({step_types_sql}) - OR ( - step_type = '{StrategyStepType.TEMPLATED.value}' - AND EXISTS ( - SELECT 1 - FROM jsonb_array_elements(template_config->'steps') t - WHERE t->>'stepType' IN ({step_types_sql}) - ) - ) + OR (step_type = '{StrategyStepType.TEMPLATED.value}' AND template_step_types && ARRAY[{step_types_sql}]) ORDER BY strategy_id, created_at DESC """