Skip to content

Commit 9cbb97b

Browse files
author
Don Johnson
committed
Initial commit
create whoop
1 parent 4432902 commit 9cbb97b

File tree

1 file changed

+254
-0
lines changed

1 file changed

+254
-0
lines changed

email_parser.py

+254
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
#!/usr/bin/env python3
2+
3+
import argparse
4+
import concurrent.futures
5+
import dataclasses
6+
import datetime
7+
import email.utils
8+
import ipaddress
9+
import json
10+
import logging
11+
import pathlib
12+
import re
13+
from collections import defaultdict
14+
from dataclasses import dataclass
15+
from typing import Dict, List, Optional, Pattern, Set, Tuple
16+
17+
@dataclass
18+
class EmailLogEntry:
19+
timestamp: datetime.datetime
20+
message_id: str
21+
sender: str
22+
recipients: List[str]
23+
subject: Optional[str]
24+
status: str
25+
server_ip: Optional[ipaddress.IPv4Address] = None
26+
smtp_code: Optional[int] = None
27+
size: Optional[int] = None
28+
29+
def to_dict(self) -> dict:
30+
return {
31+
'timestamp': self.timestamp.isoformat(),
32+
'message_id': self.message_id,
33+
'sender': self.sender,
34+
'recipients': self.recipients,
35+
'subject': self.subject,
36+
'status': self.status,
37+
'server_ip': str(self.server_ip) if self.server_ip else None,
38+
'smtp_code': self.smtp_code,
39+
'size': self.size
40+
}
41+
42+
class LogParser:
43+
"""Advanced log parser with support for multiple log formats"""
44+
45+
def __init__(self):
46+
self.patterns: Dict[str, Pattern] = {
47+
'postfix': re.compile(
48+
r'(?P<timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})'
49+
r'.*?'
50+
r'(?P<message_id>[A-F0-9]{10,})'
51+
r'.*?'
52+
r'from=<(?P<sender>[^>]+)>'
53+
r'.*?'
54+
r'to=<(?P<recipients>[^>]+)>'
55+
r'.*?'
56+
r'status=(?P<status>\w+)'
57+
),
58+
'exchange': re.compile(
59+
r'(?P<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})'
60+
r'.*?'
61+
r'MessageId:\s*(?P<message_id>[^\s,]+)'
62+
r'.*?'
63+
r'Sender:\s*(?P<sender>[^\s,]+)'
64+
r'.*?'
65+
r'Recipients:\s*(?P<recipients>[^\s,]+)'
66+
r'.*?'
67+
r'Status:\s*(?P<status>[^\s,]+)'
68+
),
69+
'sendmail': re.compile(
70+
r'(?P<timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})'
71+
r'.*?'
72+
r't=(?P<message_id>[^,\s]+)'
73+
r'.*?'
74+
r'f=(?P<sender>[^,\s]+)'
75+
r'.*?'
76+
r'r=(?P<recipients>[^,\s]+)'
77+
r'.*?'
78+
r's=(?P<status>[^,\s]+)'
79+
)
80+
}
81+
self.timestamp_formats = {
82+
'postfix': '%b %d %H:%M:%S',
83+
'exchange': '%Y-%m-%d %H:%M:%S',
84+
'sendmail': '%b %d %H:%M:%S'
85+
}
86+
87+
def parse_line(self, line: str, year: int = None) -> Optional[EmailLogEntry]:
88+
"""Parse a single log line and return an EmailLogEntry if matched"""
89+
for format_name, pattern in self.patterns.items():
90+
match = pattern.search(line)
91+
if match:
92+
data = match.groupdict()
93+
94+
# Parse timestamp
95+
ts_str = data['timestamp']
96+
ts_format = self.timestamp_formats[format_name]
97+
try:
98+
if year:
99+
ts_str = f"{ts_str} {year}"
100+
ts_format = f"{ts_format} %Y"
101+
timestamp = datetime.datetime.strptime(ts_str, ts_format)
102+
except ValueError:
103+
logging.warning(f"Failed to parse timestamp: {ts_str}")
104+
continue
105+
106+
# Parse recipients
107+
recipients = [r.strip() for r in data['recipients'].split(',')]
108+
109+
# Extract optional subject if present
110+
subject_match = re.search(r'subject=(?P<subject>[^,]+)', line)
111+
subject = subject_match.group('subject') if subject_match else None
112+
113+
# Extract IP if present
114+
ip_match = re.search(r'ip=\[?(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]?', line)
115+
server_ip = ipaddress.ip_address(ip_match.group('ip')) if ip_match else None
116+
117+
# Extract SMTP code if present
118+
smtp_match = re.search(r'smtp=(?P<smtp>\d{3})', line)
119+
smtp_code = int(smtp_match.group('smtp')) if smtp_match else None
120+
121+
# Extract size if present
122+
size_match = re.search(r'size=(?P<size>\d+)', line)
123+
size = int(size_match.group('size')) if size_match else None
124+
125+
return EmailLogEntry(
126+
timestamp=timestamp,
127+
message_id=data['message_id'],
128+
sender=data['sender'],
129+
recipients=recipients,
130+
subject=subject,
131+
status=data['status'],
132+
server_ip=server_ip,
133+
smtp_code=smtp_code,
134+
size=size
135+
)
136+
return None
137+
138+
class EmailLogAnalyzer:
139+
"""Advanced email log analyzer with statistics and filtering capabilities"""
140+
141+
def __init__(self):
142+
self.parser = LogParser()
143+
self.entries: List[EmailLogEntry] = []
144+
self.sender_stats: Dict[str, int] = defaultdict(int)
145+
self.recipient_stats: Dict[str, int] = defaultdict(int)
146+
self.status_stats: Dict[str, int] = defaultdict(int)
147+
self.hourly_stats: Dict[int, int] = defaultdict(int)
148+
149+
def process_file(self, file_path: pathlib.Path, year: int = None) -> None:
150+
"""Process a log file and collect statistics"""
151+
with file_path.open('r') as f:
152+
for line in f:
153+
entry = self.parser.parse_line(line.strip(), year)
154+
if entry:
155+
self.entries.append(entry)
156+
self.update_stats(entry)
157+
158+
def update_stats(self, entry: EmailLogEntry) -> None:
159+
"""Update statistics for a single entry"""
160+
self.sender_stats[entry.sender] += 1
161+
for recipient in entry.recipients:
162+
self.recipient_stats[recipient] += 1
163+
self.status_stats[entry.status] += 1
164+
self.hourly_stats[entry.timestamp.hour] += 1
165+
166+
def get_failed_deliveries(self) -> List[EmailLogEntry]:
167+
"""Get all failed delivery attempts"""
168+
return [e for e in self.entries if e.status.lower() in {'bounced', 'failed', 'deferred'}]
169+
170+
def get_top_senders(self, limit: int = 10) -> List[Tuple[str, int]]:
171+
"""Get top email senders"""
172+
return sorted(self.sender_stats.items(), key=lambda x: x[1], reverse=True)[:limit]
173+
174+
def get_top_recipients(self, limit: int = 10) -> List[Tuple[str, int]]:
175+
"""Get top email recipients"""
176+
return sorted(self.recipient_stats.items(), key=lambda x: x[1], reverse=True)[:limit]
177+
178+
def get_status_summary(self) -> Dict[str, int]:
179+
"""Get summary of email delivery statuses"""
180+
return dict(self.status_stats)
181+
182+
def get_hourly_distribution(self) -> Dict[int, int]:
183+
"""Get hourly distribution of email traffic"""
184+
return dict(self.hourly_stats)
185+
186+
def export_json(self, output_file: pathlib.Path) -> None:
187+
"""Export analysis results to JSON"""
188+
data = {
189+
'entries': [e.to_dict() for e in self.entries],
190+
'stats': {
191+
'senders': dict(self.sender_stats),
192+
'recipients': dict(self.recipient_stats),
193+
'statuses': dict(self.status_stats),
194+
'hourly': dict(self.hourly_stats)
195+
}
196+
}
197+
with output_file.open('w') as f:
198+
json.dump(data, f, indent=2)
199+
200+
def setup_logging(verbose: bool) -> None:
201+
"""Configure logging based on verbosity"""
202+
level = logging.DEBUG if verbose else logging.INFO
203+
logging.basicConfig(
204+
level=level,
205+
format='%(asctime)s - %(levelname)s - %(message)s'
206+
)
207+
208+
def main():
209+
parser = argparse.ArgumentParser(description='Advanced Email Log Analyzer')
210+
parser.add_argument('log_files', nargs='+', type=pathlib.Path,
211+
help='Log files to analyze')
212+
parser.add_argument('--year', type=int,
213+
help='Year for log entries (if not in timestamp)')
214+
parser.add_argument('--output', type=pathlib.Path,
215+
help='Output JSON file for results')
216+
parser.add_argument('--verbose', action='store_true',
217+
help='Enable verbose output')
218+
args = parser.parse_args()
219+
220+
setup_logging(args.verbose)
221+
analyzer = EmailLogAnalyzer()
222+
223+
# Process log files
224+
for log_file in args.log_files:
225+
if not log_file.exists():
226+
logging.error(f"File not found: {log_file}")
227+
continue
228+
logging.info(f"Processing {log_file}")
229+
analyzer.process_file(log_file, args.year)
230+
231+
# Print summary
232+
print("\nAnalysis Summary:")
233+
print("=" * 50)
234+
print(f"Total entries processed: {len(analyzer.entries)}")
235+
236+
print("\nTop 5 Senders:")
237+
for sender, count in analyzer.get_top_senders(5):
238+
print(f" {sender}: {count}")
239+
240+
print("\nDelivery Status Summary:")
241+
for status, count in analyzer.get_status_summary().items():
242+
print(f" {status}: {count}")
243+
244+
print("\nHourly Distribution:")
245+
for hour, count in sorted(analyzer.get_hourly_distribution().items()):
246+
print(f" {hour:02d}:00 - {count}")
247+
248+
# Export results if requested
249+
if args.output:
250+
analyzer.export_json(args.output)
251+
print(f"\nResults exported to {args.output}")
252+
253+
if __name__ == '__main__':
254+
main()

0 commit comments

Comments
 (0)