-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathVE_Concept_Creation.py
More file actions
2197 lines (1865 loc) · 114 KB
/
VE_Concept_Creation.py
File metadata and controls
2197 lines (1865 loc) · 114 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# ve_concept_creation.py
import autogen
import copy
from pydantic import BaseModel, ValidationError
from pydantic.json_schema import model_json_schema
from autogen import Agent, ConversableAgent, AssistantAgent, OpenAIWrapper, UserProxyAgent, gather_usage_summary
from autogen.io.websockets import IOWebsockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from threading import Thread
from queue import Queue
from bson import ObjectId # Import ObjectId to use in your query
import asyncio
import uuid
import uvicorn
from pymongo import MongoClient, DESCENDING
import os
import json
import logging
import traceback
from motor.motor_asyncio import AsyncIOMotorClient
from shared_app import app, add_initialization_coroutine, shared_websocket_manager
from datetime import datetime
import re
from typing import Any, Dict, List, Optional, Tuple, Union
try:
from termcolor import colored
except ImportError:
def colored(x, *args, **kwargs):
return x
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# MongoDB connection (using Motor for async operations)
connection_string = "mongodb+srv://dev:OQaCVOXwkwGyaBJu@mozaiks.d82loyy.mongodb.net/"
mongo_client = AsyncIOMotorClient(connection_string)
# Connect to your Mozaiks database
db1 = mongo_client['MozaiksDB'] # Database name
enterprises_collection = db1['Enterprises'] # Collection name.
# Connect to your Agents database
db2 = mongo_client['autogen_ai_agents'] # Database name
concepts_collection = db2['Concepts'] # Collection name
llm_config_collection = db2['LLMConfig'] # Collection name
# Global variables
client = None
llm_config = None
llm_config_vision = None
llm_config_concept = None
ConceptVerificationConvo = None
enterprise_id = None
user_id = None
# Define structured models for features and third-party integrations
class SuggestedFeature(BaseModel):
feature_title: str
description: str
class ThirdPartyIntegration(BaseModel):
technology_title: str
description: str
# Update VisionResponse to reflect the structured data
class VisionResponse(BaseModel):
core_focus: str
monetization: int
suggested_features: List[SuggestedFeature] # Updated to list of objects
third_party_integrations: List[ThirdPartyIntegration] # Updated to list of objects
liked_apps: List[str]
# Ensure that VisionAgentOutputs correctly stores multiple VisionResponse objects
class VisionAgentOutputs(BaseModel):
output_response: List[VisionResponse]
class FeatureOutput(BaseModel):
feature_title: str
description: str
class ConceptRefinementResponse(BaseModel):
tagline: str
narrative: str
features: List[FeatureOutput]
important_note: str
status: int # 0 for continuing, 1 for ending the chat
class ConceptRefinementOutputs(BaseModel):
output_response: List[ConceptRefinementResponse]
# Modify your load_config function
async def load_config():
llm_config_doc = await llm_config_collection.find_one()
if not llm_config_doc:
raise ValueError("No LLM configuration found in the database")
openai_ApiKey = llm_config_doc.get('ApiKey')
model_name = llm_config_doc.get('Model', 'o3-mini')
if not openai_ApiKey:
raise ValueError("OpenAI API key not found in the LLM configuration")
model_pricing = {
"o3-mini": [0.0011, 0.0044]
}
price = model_pricing.get(model_name)
if not price:
raise ValueError(f"Pricing information for model '{model_name}' is not available")
config_list = [{
"model": model_name,
"api_key": openai_ApiKey,
"price": price
}]
client = OpenAIWrapper(config_list=config_list)
return client, config_list
async def cc_initialize():
global client, llm_config, llm_config_vision, llm_config_concept
try:
client, config_list = await load_config()
client, config_list_vision = await load_config()
client, config_list_concept = await load_config()
except ValueError as e:
logger.error(f"Error in load_config: {e}")
return
# Add response formats to config_list
for config in config_list_vision:
config["response_format"] = VisionAgentOutputs
# Add response formats to config_list
for config in config_list_concept:
config["response_format"] = ConceptRefinementOutputs
llm_config = {
"timeout": 600,
"cache_seed": 163,
"config_list": config_list
}
llm_config_vision = {
"timeout": 600,
"cache_seed": 163,
"config_list": config_list_vision
}
llm_config_concept = {
"timeout": 600,
"cache_seed": 163,
"config_list": config_list_concept
}
logger.info("CC initialization completed.")
async def load_concept_verification(enterprise_id=None):
try:
# Create query based on whether enterprise_id is provided
query = {}
if enterprise_id:
enterprise_id_str = str(enterprise_id)
query["enterprise_id"] = enterprise_id_str
# Load latest concept verification conversation
latest_concept = await concepts_collection.find_one(
query,
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
ConceptVerificationConvo = latest_concept.get('ConceptVerificationConvo', [])
logger.info(f"Loaded concept verification conversation for concept code: {latest_concept.get('ConceptCode')}")
else:
logger.warning("No existing concept verification analysis found. Using empty list.")
ConceptVerificationConvo = []
return ConceptVerificationConvo
except Exception as e:
logger.error(f"Error in load_concept_verification: {str(e)}")
logger.error(traceback.format_exc())
raise
class AsyncConversableWrapper(ConversableAgent):
async def a_respond(self, recipient, messages, sender, config, last_message=None):
logger.info(f"a_respond called with last_message: {last_message}")
message_content = None
should_append = True
# Try to extract status from last_message with improved error handling
if last_message:
try:
if isinstance(last_message, str):
# Only attempt to parse as JSON if it looks like JSON
if last_message.strip().startswith('{') and last_message.strip().endswith('}'):
try:
message_content = json.loads(last_message)
except json.JSONDecodeError:
message_content = last_message # Use as plain text if parsing fails
else:
message_content = last_message # Not JSON-formatted, use as is
else:
message_content = last_message # Already an object, not a string
# Check status only if we have a dictionary
if isinstance(message_content, dict) and message_content.get('status') == 1:
should_append = False
logger.info("Skipping append due to status = 1")
except Exception as e:
logger.warning(f"Error processing last_message: {str(e)}")
message_content = last_message # Fallback to original message
# Ensure each message has a role and name
for message in messages:
if "role" not in message:
message["role"] = "user" if sender == "user_proxy" else "assistant"
if "name" not in message or not message["name"].strip():
message["name"] = "unknown"
# Agent-specific behavior overrides
if self.name == "Data_Agent" and isinstance(message_content, dict) and message_content.get('status') == 1:
should_append = False
logger.info(f"Skipping append due to status = 1 from {self.name}")
elif self.name == "Concept_Refinement_Agent":
should_append = True
# Conditionally append last message
if last_message and last_message not in messages and should_append:
logger.info(f"Appending last_message: {last_message}")
messages.append({"role": "assistant", "content": last_message, "name": self.name})
else:
logger.info(f"Skipping appending last_message: {last_message}")
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, self.generate_reply, messages, sender)
return True, response
class UserProxyWebAgent(UserProxyAgent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.iostream: Optional[IOWebsockets] = None
self.message_queue: Optional[asyncio.Queue] = None
self.groupchat_manager: Optional[AsyncGroupChatManager] = None
self.input_event = asyncio.Event()
self.last_input = None
async def a_initiate_chat(self, recipient, message):
logger.info(f"a_initiate_chat called by {self.__class__.__name__} with recipient: {recipient.__class__.__name__}")
if isinstance(recipient, AsyncGroupChatManager):
await recipient.a_receive(message, self)
else:
logger.info(f"Unexpected recipient type: {type(recipient)}")
async def a_get_human_input(self, prompt: str) -> str:
logger.info(f"a_get_human_input called with prompt: {prompt}")
if self.iostream and not self.iostream.websocket.client_state.DISCONNECTED:
try:
self.input_event.clear()
await self.input_event.wait()
return self.last_input
except Exception as e:
logger.info(f"Unable to get human input: {str(e)}")
return ""
else:
logger.info("WebSocket not connected. Skipping human input.")
return ""
async def receive_input(self, input_data: str):
try:
# Try to parse as JSON
reply_json = json.loads(input_data)
content = reply_json.get("content", "")
logger.info(f"Parsed JSON input: {content}")
except json.JSONDecodeError:
# If not JSON, use the raw input
content = input_data
logger.info(f"Using raw input: {content}")
self.last_input = content
self.input_event.set()
async def a_respond(self, recipient, messages, sender, config, last_message=None):
logger.info(f"a_respond called by {self.__class__.__name__} with sender: {sender.name if hasattr(sender, 'name') else 'Unknown'} and recipient: {recipient.__class__.__name__ if recipient else 'NoneType'}")
if hasattr(sender, 'name') and sender.name == "Feedback_Agent":
logger.info(f"user_proxy awaiting user input after Feedback_Agent's message.")
try:
reply = await self.a_get_human_input("")
if not reply:
logger.info("No human input received. Skipping user_proxy response.")
return False, None # Skip the response and don't trigger the next agent
if reply.lower() == "exit":
logger.info("User chose to exit. Ending session.")
return True, None # End the chat session if 'exit' is typed
logger.info(f"User feedback received: {reply}")
return True, reply
except WebSocketDisconnect:
logger.info("WebSocket disconnected while waiting for user input.")
return False, None # Don't end the chat session, just skip this response
logger.info(f"Sender {sender.name if hasattr(sender, 'name') else 'Unknown'} is not Feedback_Agent, proceeding with default behavior.")
return False, None
class WorkflowManager:
def __init__(self, llm_config, websocket: WebSocket, chat_id: str, user_id: str, enterprise_id: str, client):
self.llm_config = llm_config # Single unified config
self.websocket = websocket
self.chat_id = chat_id
self.user_id = user_id
self.enterprise_id = enterprise_id
self.client = client
self.AgentHistory = []
self.message_queue = asyncio.Queue()
self.agents = self.create_agents()
self.groupchat = self.create_groupchat()
self.groupchat_manager = self.create_groupchat_manager()
self.autogengroupchatmanager = autogen.GroupChatManager(groupchat=self.groupchat, llm_config=llm_config)
self.IterationCount = 0
self.Concept_Refinement_Agent_count = 0
self.UserFeedbackCount = 0
self.agent_dict = {agent.name: agent for agent in self.agents}
self.LastSpeaker = None
self.SessionID = str(uuid.uuid4()) # Generate a unique session ID
self.cumulative_PromptTokens = 0
self.cumulative_CompletionTokens = 0
self.cumulative_TotalTokens = 0
self.cumulative_TotalCost = 0.0
def initialize_new_session_for_tracking(self):
self.SessionID = str(uuid.uuid4())
self.cumulative_PromptTokens = 0
self.cumulative_CompletionTokens = 0
self.cumulative_TotalTokens = 0
self.cumulative_TotalCost = 0.0
logger.info(f"Initialized a new tracking session with SessionID: {self.SessionID}")
def initialize_new_chat(self):
self.AgentHistory = []
self.IterationCount = 0
self.Concept_Refinement_Agent_count = 0
self.UserFeedbackCount = 0
self.LastSpeaker = None
self.initialize_new_session_for_tracking()
logger.info(f"Initialized a new chat and tracking session")
async def update_CreationChatStatus(self, status):
try:
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
current_status = latest_concept.get('CreationChatStatus')
if current_status != status:
update_result = await concepts_collection.update_one(
{'_id': latest_concept['_id']},
{'$set': {'CreationChatStatus': status}}
)
if update_result.modified_count > 0:
logger.info(f"Updated CreationChatStatus to '{status}'.")
else:
logger.warning(f"No documents were updated. The status might already be '{status}'.")
else:
logger.warning(f"No concept document found for enterprise_id: {enterprise_id_str} to update status.")
except Exception as e:
logger.error(f"Error updating CreationChatStatus: {str(e)}")
async def update_ConceptCreationConvo(self, sender: str, content: str):
logger.info(f"Updating concept creation convo with sender: {sender}, content: {content[:50]}...")
try:
# Force status 0 for first Concept_Refinement_Agent response
if sender == 'Concept_Refinement_Agent':
content = await self.ensure_initial_status_zero(content)
message = {
'timestamp': datetime.utcnow().isoformat(),
'sender': sender,
'content': content,
'role': 'user' if sender == 'user_proxy' else 'assistant',
'name': sender
}
# Check if this message is already in the history
if not any(msg['content'] == content and msg['sender'] == sender for msg in self.AgentHistory):
self.AgentHistory.append(message)
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
# Fetch the latest concept document
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
# Initialize update data with the message push
update_data = {'$push': {'ConceptCreationConvo': message}}
update_data['$set'] = {}
# Extract data based on the agent type
if sender == 'Users_Vision_Analyst':
try:
# Clean the message content
clean_message = content.replace('```json', '').replace('```', '').strip()
vision_data = json.loads(clean_message)
# Try to parse as VisionAgentOutputs
if isinstance(vision_data, dict):
# Handle both direct response and wrapped response
vision_response = (
vision_data.get('output_response', [{}])[0]
if 'output_response' in vision_data
else vision_data
)
# Map the fields to your database schema
update_data['$set'].update({
'CoreFocus': vision_response.get('core_focus', ''),
'ThirdPartyIntegrations': vision_response.get('third_party_integrations', []),
'SuggestedFeatures': vision_response.get('suggested_features', []),
'Monetization': vision_response.get('monetization', 0),
'LikedApps': vision_response.get('liked_apps', [])
})
logger.info(f"Extracted vision data: {update_data['$set']}")
except json.JSONDecodeError:
logger.error(f"Failed to parse Users_Vision_Analyst output: {content}")
except Exception as e:
logger.error(f"Error processing Users_Vision_Analyst data: {str(e)}")
elif sender == 'Concept_Refinement_Agent':
try:
# Clean the message content
clean_message = content.replace('```json', '').replace('```', '').strip()
concept_data = json.loads(clean_message)
# Try to parse as ConceptRefinementOutputs
if isinstance(concept_data, dict):
# Handle both direct response and wrapped response
concept_response = (
concept_data.get('output_response', [{}])[0]
if 'output_response' in concept_data
else concept_data
)
# Extract features in the correct format
features = concept_response.get('features', [])
formatted_features = []
for feature in features:
if isinstance(feature, dict):
formatted_features.append({
'feature_title': feature.get('feature_title', ''),
'description': feature.get('description', '')
})
# Map the fields to your database schema
update_data['$set'].update({
'Blueprint': {
'Overview': concept_response.get('narrative', ''),
'Features': formatted_features,
'Tagline': concept_response.get('tagline', ''),
'Note': concept_response.get('important_note', ''),
'status': concept_response.get('status', 0)
}
})
logger.info(f"Extracted concept refinement data: {update_data['$set']}")
except json.JSONDecodeError:
logger.error(f"Failed to parse Concept_Refinement_Agent output: {content}")
except Exception as e:
logger.error(f"Error processing Concept_Refinement_Agent data: {str(e)}")
# Perform the database update
if update_data['$set']:
result = await concepts_collection.update_one(
{'_id': latest_concept['_id']},
update_data
)
logger.info(f"Updated concept document. Modified count: {result.modified_count}")
else:
# Create new concept document if none exists
new_ConceptCode = await self.get_next_ConceptCode()
new_concept = {
"ConceptCode": new_ConceptCode,
"enterprise_id": enterprise_id_str,
"ConceptCreationConvo": [message],
"CoreFocus": "",
"ThirdPartyIntegrations": [],
"SuggestedFeatures": [],
"Monetization": 0,
"LikedApps": [],
"UsedTokens": {
"ConceptCreationAnalysis": {
"PromptTokens": {},
"CompletionTokens": {},
"TotalTokens": {}
}
},
"TotalCost": {
"ConceptCreationAnalysis": {}
},
"Blueprint": {
"Overview": "",
"Features": [],
"Tagline": "",
"Note": "",
"status": 0
}
}
result = await concepts_collection.insert_one(new_concept)
logger.info(f"Created new concept document with id: {result.inserted_id}")
except Exception as e:
logger.error(f"Error updating concept creation convo: {str(e)}")
logger.error(traceback.format_exc())
async def get_next_ConceptCode(self):
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str, "ConceptCode": {"$exists": True}},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
return latest_concept["ConceptCode"] + 1
else:
return 1
async def save_chat_state(self):
try:
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
# Retrieve the existing CreationChatState
existing_chat_state = latest_concept.get('CreationChatState', {})
# Prepare the new state to save
state_to_save = {
'SessionID': self.SessionID,
'AgentHistory': self.AgentHistory,
'IterationCount': self.IterationCount,
'Concept_Refinement_Agent_count': self.Concept_Refinement_Agent_count,
'UserFeedbackCount': self.UserFeedbackCount,
'LastSpeaker': "Feedback_Agent"
}
# Preserve the existing SessionTotals
if 'SessionTotals' in existing_chat_state:
state_to_save['SessionTotals'] = existing_chat_state['SessionTotals']
# Update the document, merging the new state with existing data
result = await concepts_collection.update_one(
{'_id': latest_concept['_id']},
{'$set': {
'CreationChatState': {
**existing_chat_state, # Preserve existing data
**state_to_save # Update with new data
}
}}
)
logger.info(f"Chat state updated in latest concept document. Modified count: {result.modified_count}")
else:
logger.warning("No concept document found to update chat state")
except Exception as e:
logger.error(f"Error saving chat state: {str(e)}")
logger.error(traceback.format_exc())
async def calculate_and_update_usage(self):
try:
# Gather usage summary for all agents
agent_summary = gather_usage_summary(self.groupchat.agents)
# Calculate session usage
session_PromptTokens = 0
session_CompletionTokens = 0
session_TotalTokens = 0
session_TotalCost = 0.0
for Model_name, Model_data in agent_summary["usage_including_cached_inference"].items():
if Model_name != 'total_cost':
session_PromptTokens += Model_data.get('prompt_tokens', 0)
session_CompletionTokens += Model_data.get('completion_tokens', 0)
session_TotalTokens += Model_data.get('total_tokens', 0)
session_TotalCost += Model_data.get('cost', 0.0)
# Log the updates
logger.info(f"Session usage - Prompt: {session_PromptTokens}, Completion: {session_CompletionTokens}, Total: {session_TotalTokens}, Cost: {session_TotalCost}")
# Skip saving if all usage and costs are zero
if session_TotalTokens == 0 and session_TotalCost == 0.0:
logger.info("No usage or cost incurred. Skipping database update.")
return
# Update the database with session data
try:
await self.update_database_usage(
self.SessionID,
session_PromptTokens,
session_CompletionTokens,
session_TotalTokens,
session_TotalCost
)
except Exception as e:
logger.error(f"Error updating database usage: {str(e)}")
logger.error(traceback.format_exc())
# You might want to add more specific error handling here if needed
except Exception as e:
logger.error(f"Error in calculate_and_update_usage: {str(e)}")
logger.error(traceback.format_exc())
async def update_database_usage(self, SessionID, session_PromptTokens, session_CompletionTokens, session_TotalTokens, session_TotalCost):
logger.info(f"Updating database usage for enterprise_id: {self.enterprise_id}")
try:
# Convert self.enterprise_id to string if it's not already
enterprise_id_str = str(self.enterprise_id)
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
# Ensure 'SessionTotals' exists in 'CreationChatState'
if 'CreationChatState' not in latest_concept or 'SessionTotals' not in latest_concept['CreationChatState']:
await concepts_collection.update_one(
{'_id': latest_concept['_id']},
{'$set': {
'CreationChatState.SessionTotals': {}
}}
)
# Refetch the document after update
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
# Calculate new tokens used in this interaction
previous_session_data = latest_concept['CreationChatState']['SessionTotals'].get(f'session_{SessionID}', {})
previous_TotalTokens = previous_session_data.get('TotalTokens', 0)
new_tokens_used = session_TotalTokens - previous_TotalTokens
# Update the current session data in CreationChatState
session_data = {
'PromptTokens': session_PromptTokens,
'CompletionTokens': session_CompletionTokens,
'TotalTokens': session_TotalTokens,
'TotalCost': session_TotalCost
}
# Use $set to update the specific session without overwriting the entire SessionTotals
await concepts_collection.update_one(
{'_id': latest_concept['_id']},
{'$set': {
f'CreationChatState.SessionTotals.session_{SessionID}': session_data,
'last_updated': datetime.utcnow()
}}
)
# Re-fetch the updated document to ensure we have the latest structure
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
# Calculate combined totals based on all session data
combined_PromptTokens = sum(session.get('PromptTokens', 0) for session in latest_concept['CreationChatState']['SessionTotals'].values())
combined_CompletionTokens = sum(session.get('CompletionTokens', 0) for session in latest_concept['CreationChatState']['SessionTotals'].values())
combined_TotalTokens = sum(session.get('TotalTokens', 0) for session in latest_concept['CreationChatState']['SessionTotals'].values())
combined_TotalCost = sum(session.get('TotalCost', 0.0) for session in latest_concept['CreationChatState']['SessionTotals'].values())
# Update combined totals for creation analysis
await concepts_collection.update_one(
{'_id': latest_concept['_id']},
{'$set': {
'UsedTokens.ConceptCreationAnalysis.PromptTokens': combined_PromptTokens,
'UsedTokens.ConceptCreationAnalysis.CompletionTokens': combined_CompletionTokens,
'UsedTokens.ConceptCreationAnalysis.TotalTokens': combined_TotalTokens,
'TotalCost.ConceptCreationAnalysis': combined_TotalCost
}}
)
# Calculate overall combined totals (verification + creation)
verification_totals = latest_concept.get('UsedTokens', {}).get('ConceptVerificationAnalysis', {})
overall_combined_totals = {
'PromptTokens': verification_totals.get('PromptTokens', 0) + combined_PromptTokens,
'CompletionTokens': verification_totals.get('CompletionTokens', 0) + combined_CompletionTokens,
'TotalTokens': verification_totals.get('TotalTokens', 0) + combined_TotalTokens
}
overall_combined_cost = (
latest_concept.get('TotalCost', {}).get('ConceptVerificationAnalysis', 0.0) +
combined_TotalCost
)
# Update overall combined totals
await concepts_collection.update_one(
{'_id': latest_concept['_id']},
{'$set': {
'UsedTokens.CombinedAnalysis': overall_combined_totals,
'TotalCost.CombinedAnalysis': overall_combined_cost
}}
)
# Update the enterprise's token balance
if new_tokens_used > 0:
try:
logger.info(f"Updating token balance for Enterprise_ID {enterprise_id_str}.")
result = await enterprises_collection.update_one(
{"_id": ObjectId(self.enterprise_id)},
{"$inc": {"AvailableTokens": -new_tokens_used}}
)
if result.modified_count == 0:
logger.warning(f"Failed to update token balance for enterprise {enterprise_id_str}")
else:
logger.info(f"Updated token balance for enterprise {enterprise_id_str}. Deducted {new_tokens_used} tokens.")
except Exception as e:
logger.error(f"Error updating enterprise token balance: {str(e)}")
else:
logger.info(f"No new tokens used for enterprise {enterprise_id_str}. Skipping balance update.")
logger.info("Updated combined totals with creation tokens and cost.")
else:
logger.warning(f"No concept document found for enterprise_id: {enterprise_id_str}")
except Exception as e:
logger.error(f"Error in update_database_usage: {str(e)}")
logger.error(traceback.format_exc())
async def display_token_usage(self):
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
CreationChatState = latest_concept.get('CreationChatState', {})
SessionTotals = CreationChatState.get('SessionTotals', {})
if isinstance(SessionTotals, dict):
# Display individual session totals
for SessionID, data in SessionTotals.items():
print(f"Token Usage for Session (ID: {SessionID}):")
print(f" Prompt Tokens: {data.get('PromptTokens', 0)}")
print(f" Completion Tokens: {data.get('CompletionTokens', 0)}")
print(f" Total Tokens: {data.get('TotalTokens', 0)}")
print(f" Total Cost: ${data.get('TotalCost', 0.0):.5f}")
# Display creation totals
creation_totals = latest_concept.get('UsedTokens', {}).get('ConceptCreationAnalysis', {})
if creation_totals:
print("\nCreation Totals Across All Sessions:")
print(f" Creation Prompt Tokens: {creation_totals.get('PromptTokens', 0)}")
print(f" Creation Completion Tokens: {creation_totals.get('CompletionTokens', 0)}")
print(f" Creation Total Tokens: {creation_totals.get('TotalTokens', 0)}")
print(f" Creation Total Cost: ${latest_concept.get('TotalCost', {}).get('ConceptCreationAnalysis', 0.0):.5f}")
# Display overall combined totals
combined_totals = latest_concept.get('UsedTokens', {}).get('CombinedAnalysis', {})
if combined_totals:
print("\nOverall Combined Totals (Verification + Creation):")
print(f" Combined Prompt Tokens: {combined_totals.get('PromptTokens', 0)}")
print(f" Combined Completion Tokens: {combined_totals.get('CompletionTokens', 0)}")
print(f" Combined Total Tokens: {combined_totals.get('TotalTokens', 0)}")
print(f" Combined Total Cost: ${latest_concept.get('TotalCost', {}).get('CombinedAnalysis', 0.0):.5f}")
else:
print("No session totals data available or the structure is not a dictionary.")
else:
print(f"No token usage data available for enterprise_id: {enterprise_id_str}")
async def send_to_websocket(self, sender: str, content: str):
try:
if isinstance(content, dict):
content = content.get('content', '')
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str, "ConceptCode": {"$exists": True}},
sort=[("ConceptCode", DESCENDING)]
)
CreationChatStatus = latest_concept.get('CreationChatStatus', {})
BluePrintReport = latest_concept.get('Blueprint', {}) if latest_concept else {}
await self.websocket.send_text(json.dumps({
"sender": sender,
"content": content,
"CreationChatStatus": CreationChatStatus,
"BluePrintReport": BluePrintReport
}))
except WebSocketDisconnect:
logger.info(f"WebSocket already disconnected for {self.chat_id}. Skipping message send.")
except Exception as e:
logger.error(f"Error sending message to WebSocket: {str(e)}")
async def resume_group_chat(self):
try:
logger.info("Attempting to resume group chat...")
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
# Find the latest concept document for this enterprise
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept and 'CreationChatState' in latest_concept:
saved_state = latest_concept['CreationChatState']
self.AgentHistory = saved_state.get('AgentHistory', [])
self.IterationCount = saved_state.get('IterationCount', 0)
self.Concept_Refinement_Agent_count = saved_state.get('Concept_Refinement_Agent_count', 0)
self.UserFeedbackCount = saved_state.get('UserFeedbackCount', 0)
self.SessionID = saved_state.get('SessionID', str(uuid.uuid4()))
# Set the last speaker based on the saved state
LastSpeaker_name = saved_state.get('LastSpeaker')
self.LastSpeaker = next((agent for agent in self.agents if agent.name == LastSpeaker_name), None)
if not self.LastSpeaker:
logger.warning(f"Last speaker {LastSpeaker_name} not found in agents list. Defaulting to Feedback_Agent.")
self.LastSpeaker = next((agent for agent in self.agents if agent.name == "Feedback_Agent"), None)
logger.info("Group chat resumed successfully")
return True, self.LastSpeaker, self.AgentHistory[-1] if self.AgentHistory else None
else:
logger.info(f"No saved state found for enterprise_id: {enterprise_id_str}. Starting a new chat.")
return False, None, None
except Exception as e:
logger.error(f"Error resuming group chat: {str(e)}")
logger.error(traceback.format_exc())
return False, None, None
async def handle_disconnection(self):
if not hasattr(self, '_disconnection_handled'):
self._disconnection_handled = True
try:
logger.info(f"Handling disconnection for chat_id {self.chat_id}")
# Ensure AgentHistory is accessed properly
last_message = self.AgentHistory[-1] if self.AgentHistory else None
current_agent = self.LastSpeaker
while current_agent and hasattr(current_agent, 'name') and current_agent.name != "user_proxy":
next_agent = await self.state_transition(current_agent, self.groupchat_manager.groupchat, last_message)
if not next_agent:
logger.info("No next agent found. Ending chat flow.")
break
logger.info(f"Continuing chat with agent: {next_agent.name}")
try:
response = await asyncio.wait_for(
next_agent.a_respond(None, self.AgentHistory, current_agent, None),
timeout=30
)
if response and response[1]:
self.AgentHistory.append({
"role": "assistant",
"content": response[1],
"name": next_agent.name
})
logger.info(f"Response from {next_agent.name}: {response[1][:100]}...") # Log first 100 chars
# Try to send to WebSocket; if it fails, skip sending
try:
await self.send_to_websocket(next_agent.name, response[1])
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected for {self.chat_id}. Saving state and halting flow.")
await self.save_chat_state()
return # End the flow here
else:
logger.info(f"No response from {next_agent.name}")
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for response from {next_agent.name}")
except Exception as e:
logger.error(f"Error getting response from {next_agent.name}: {str(e)}")
current_agent = next_agent
self.LastSpeaker = current_agent
if current_agent and current_agent.name == "user_proxy":
logger.info(f"user_proxy reached, saving state and halting the chat flow.")
await self.save_chat_state()
logger.info(f"WebSocket disconnected for {self.chat_id}. Chat state saved.")
except Exception as e:
logger.error(f"Error handling disconnection: {str(e)}")
logger.error(traceback.format_exc())
async def finalize_conversation(self):
"""Finalize the conversation and update the status in the database."""
if not hasattr(self, '_conversation_finalized'):
self._conversation_finalized = True
logger.info("Finalizing the conversation.")
try:
# Ensure enterprise_id is string for concepts_collection
enterprise_id_str = str(self.enterprise_id)
# Find the latest concept document for this enterprise
latest_concept = await concepts_collection.find_one(
{"enterprise_id": enterprise_id_str, "CreationChatStatus": 0},
sort=[("ConceptCode", DESCENDING)]
)
if latest_concept:
# Ensure the conversation status is updated to 1
current_status = latest_concept.get('CreationChatStatus', '')
if current_status == 0:
update_result = await concepts_collection.update_one(
{'_id': latest_concept['_id']},
{'$set': {'CreationChatStatus': 1}}
)
if update_result.modified_count > 0:
logger.info(f"Updated CreationChatStatus to 1 for concept code: {latest_concept['ConceptCode']}.")
else:
logger.warning("No documents were updated. The document might already be in 1 state.")
else:
logger.info(f"Chat already in '{current_status}' state. No update needed.")
else:
logger.warning(f"No in-progress concept found for enterprise_id {enterprise_id_str} to finalize.")
except Exception as e:
logger.error(f"Error finalizing conversation: {str(e)}")
def create_agents(self):
user_proxy = UserProxyWebAgent(
name="user_proxy",
human_input_mode="ALWAYS",
code_execution_config={"use_docker": False},
)
user_proxy.workflow_manager = self
# Agent 1: Users_Vision_Analyst
Users_Vision_Analyst = AsyncConversableWrapper(
name="Users_Vision_Analyst",
llm_config=llm_config_vision,
system_message="""
[ROLE] Vision Analyst
[OBJECTIVE] Analyze the entire conversation with the user regarding their app idea and extract all critical information to form a clear, structured summary.
[CONTEXT]
A full conversation (referred to as the “User's Vision” conversation) where the user has described their app concept. This conversation may include ideas, inspirations, potential features, and references to existing applications or technologies.
[GUIDELINES] You must follow these guidelines strictly for legal reasons. Do not stray from them:
- Output Compliance: You must adhere to the specified "Output Structure" and its instructions. Do not include any additional commentary in your output. Do not forget the 'Important Note' at the end.
- Avoid Using Names: Do not name the concept.
- Read the complete conversation carefully.
- Identify and capture only the essential details.
- If an expected element is missing, use "None" for strings or an empty list for arrays.
- Use clear, direct language without extraneous commentary.
[INSTRUCTIONS]
1. Extract the following elements from the User's Vision conversation:
a. **CoreFocus**: What is the app’s primary purpose or category? (Examples: "budget tracking", "fitness coaching"). Provide a short answer (maximum 5 words).
b. **Monetization**: Indicate whether the app is intended to generate revenue. Respond with "1" for revenue-generating intent, or "0" for personal use. Use integers.
c. **SuggestedFeatures**: Extract all key features mentioned by the user.
- **Each feature must be formatted as an object in an array.**
- **Do not summarize multiple features in one object. Each feature must be its own object.**
- **Follow the exact format:** Each feature must contain a `"FeatureTitle"` and `"Description"`.
d. **ThirdPartyIntegrations**: Identify external services, APIs, or integrations.
- **Each integration must be formatted as an object in an array.**
- **Do not combine multiple technologies in a single object. Each integration must be its own object.**
- **Follow the exact format:** Each integration must contain a `"TechnologyTitle"` and `"Description"`.
e. **LikedApps**: Note any existing applications mentioned as inspiration.