-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmulticast_observer.py
More file actions
112 lines (91 loc) · 3.9 KB
/
multicast_observer.py
File metadata and controls
112 lines (91 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
import sys
import time
import json
import requests
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from dotenv import load_dotenv
load_dotenv()
infrastructure_ip = os.getenv('INFRASTRUCTURE_IP')
print(infrastructure_ip)
base_url = "http://" + infrastructure_ip + ":8054/"
scripts_dir = os.path.dirname(os.path.abspath(__file__))
csv_module_path = os.path.join(scripts_dir, 'csv')
if csv_module_path not in sys.path:
sys.path.append(csv_module_path)
import send_rows_from_csv
class CSVChangeHandler(FileSystemEventHandler):
def __init__(self, csv_file_path):
self.csv_file_path = csv_file_path
def on_modified(self, event):
if event.src_path == self.csv_file_path:
print(f"Detected changes in {self.csv_file_path}")
self.process_csv()
def process_csv(self):
try:
url = base_url + "connections"
method = 'GET'
response = make_http_request(url, method)
if response and response.status_code == 200:
try:
response_json = response.json()
completed_connections = [entry["connection_id"] for entry in response_json.get("results", []) if entry.get("rfc23_state") == "completed"]
# Get all rows in the CSV file
total_rows = send_rows_from_csv.get_number_of_rows(self.csv_file_path)
rows_to_send = [send_rows_from_csv.get_row(self.csv_file_path, row) for row in range(1, total_rows + 1)]
if rows_to_send:
send_rows(base_url, completed_connections, rows_to_send)
else:
print("No rows to send.")
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
except Exception as e:
print(f"Error processing CSV: {e}")
def make_http_request(url, method='GET', headers=None, body=None, verify=False):
try:
if method.upper() == 'GET':
response = requests.get(url, headers=headers)
elif method.upper() == 'POST':
response = requests.post(url, headers=headers, data=body, verify=verify)
else:
print(f"Unsupported HTTP method: {method}")
return None
print(f"Response Code: {response.status_code}")
return response
except Exception as e:
print(f"Error making HTTP request: {e}")
return None
def send_rows(base_url, completed_connections, rows_to_send):
try:
for row_data in rows_to_send:
content = {k: v for k, v in row_data.items()}
print("Sending message:", content)
body_dict = {"content": content}
body = json.dumps(body_dict)
method = 'POST'
for connection in completed_connections:
url = base_url + "connections/" + connection + "/send-message"
response_post = make_http_request(url, method, body=body)
print(response_post.status_code)
print(response_post.text)
if response_post.status_code != 200:
raise Exception("Unable to send message to connection " + connection)
except Exception as e:
print(f"Error sending rows: {e}")
if __name__ == "__main__":
csv_directory = os.getenv('CSV_PATH')
if not csv_directory:
csv_directory = input("Enter the directory containing the CSV file: ")
csv_file_path = os.path.join(os.path.abspath(csv_directory), 'eventlog.csv')
print(f"Using CSV file path: {csv_file_path}")
event_handler = CSVChangeHandler(csv_file_path)
observer = Observer()
observer.schedule(event_handler, path=os.path.dirname(csv_file_path), recursive=False)
observer.start()
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
observer.stop()
observer.join()