-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUmrIcal.py
executable file
·255 lines (225 loc) · 9.07 KB
/
UmrIcal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#imports UMR
from UmrConf import gconfig
import UmrConf
import UmrMail
#system & debug
import os.path
import logging
#date manipulation
from datetime import datetime, timezone, timedelta
from dateutil import rrule
from icalendar import Calendar, parser_tools, Event, vDatetime
#misc
import hashlib, json
def get_events(pathname, maxfuture = 60, limit = 200):
"""Read ics file and get <limit> events for the <maxfuture> days"""
assert type(pathname)==str
assert type(maxfuture)==int
assert type(limit)==int
logging.info("Getting max %d future events from %s in less than %d", limit, pathname, maxfuture)
#time limit
futuredate = datetime.now(timezone.utc)+timedelta(days=maxfuture)
now = datetime.now(timezone.utc)+timedelta(days=0)
oldestdate = datetime.now(timezone.utc)+timedelta(days=-maxfuture)
#open calendar
g = open(pathname,'rb')
logging.debug("Import %s events", pathname)
gcal = Calendar.from_ical(g.read())
listevents = [] #for events to return
for component in gcal.walk(): #for each event
if limit == 0 :
break #enough !
if component.name == "VEVENT": #OK, this is a meeting
dtstart = component.get('dtstart') #start of event
dt = dtstart.dt.replace(tzinfo=timezone.utc)
if component.has_key('rrule'): #event with repeat
rule = component.get('rrule')
try :
until = rule['UNTIL'][0]
except :
logging.info("rrule without UNTIL in %s %s", pathname, dtstart)
continue
ruleset = rrule.rruleset()
ruleset.rrule(rrule.rrulestr(rule.to_ical().decode('UTF-8'), dtstart=dtstart.dt))
exdate = component.get('EXDATE')
if exdate is not None : #exclude dates
for edate in exdate :
ruleset.exdate(edate.dts[0].dt)
target = None
for devent in ruleset: #for each date
if devent > now and (target is None or devent < target):
target = devent #this one is the nearest in the future
#FIXME : what if we want to keep recurreing event ?
if target is not None :
del(component['dtstart'])
component.add('dtstart', target)#] = vDatetime(target).to_ical()
listevents.append(component) #old, but repeat recently
logging.debug("Keeping %s, %s", until, component.get('summary'))
limit -= 1 #one more
elif dt > oldestdate and dt < futuredate:
listevents.append(component)
logging.debug("Keeping %s, %s", dt, component.get('summary'))
limit -= 1
g.close()
return(listevents)
def body_event_from_data(data):
"""Make event body from dict"""
#FIXME : eventually, do it in html
assert 'dtstart' in data
assert 'summary' in data
bodylist = ["%s" % datetime.utcfromtimestamp(int(data['dtstart'])).strftime('%d-%m-%Y à %H:%M'), data['summary']]
if 'location' in data and data['location'] != "" :
bodylist.append(data['location'])
bodylist.append(body_plan_from_data(data))
body = "\n".join(bodylist)
return(body)
def body_plan_from_data(data) :
"""Make plan text from data"""
if "meetingplan" in data and data['meetingplan'] != "" :
plan = data['meetingplan']
elif 'description' in data and data['description'] != "" :
plan = data['description']
else :
plan = "GOAL:\nTODO:\n" #FIXME : Default plan in config ?
return("===\n%s\n===\n" % plan)
def subject_from_data(data):
"""Make Title from dict"""
time_to_meeting = int(data['dtstart']) - ts_from_datetime()
if time_to_meeting < 3600*24*7 and time_to_meeting > 0 :
date_format = '%A %d/%m à %H:%M'
else :
date_format = '%d/%m à %H:%M'
dtiso = datetime.utcfromtimestamp(int(data['dtstart'])).strftime(date_format)
#FIXME : optionnaly, take it from config file
return("%s - %s - UMR/%s" % (data['summary'], dtiso, data['uid']) )
def send_event_from_uid(uid, prefix = None):
"""Send mail for uid with data from store"""
data = get_data_from_store(uid) #get data
logging.debug("Send event for %s" % uid)
subject = subject_from_data(data) #subject
text = body_event_from_data(data)
if prefix is not None :
text = "%s\n%s" % (prefix, text)
#Send
UmrMail.sendOneMail([gconfig.get('Mail', 'reminded')], subject, text)
def ts_from_datetime(dt = None) :
if dt == None :
dt = datetime.now()
return(int(dt.replace(tzinfo=timezone.utc).timestamp()))
def make_data_from_event(event):
"""Make data dict from ics event"""
ret = {}
ret['summary'] = event.get('summary').strip()
ret['location'] = event.get('location').strip()
ret['description'] = event.get('description').strip()
ret['uid'] = event.get('uid').strip()
ret['dtstart'] = ts_from_datetime(event.get('dtstart').dt)
#FIXME : Duration ?
#FIXME : RRULE RECUR ?
#FIXME : DT END
return(ret)
def get_data_from_store(uid):
"""Read data from store with uid"""
fpath = get_store_pathname(uid)
if os.path.exists(fpath) : #already known
return(get_data_from_file(fpath))
else :
#FIXME : should be NONE ?
return({})
def get_data_from_file(fpath):
"""Read json file"""
with open(fpath, 'r') as h :
data = json.load(h) #load existing datas
data['read'] = ts_from_datetime()
logging.debug("Existing datas for uid %s loaded from %s", data['uid'], fpath)
return(data)
def get_store_pathname(uid):
"""Build meeting store pathname (uid md5 hash)"""
return(os.path.join(UmrConf.get_path('store'), hashlib.md5(uid.encode()).hexdigest()+".json"))
def write_store_with_data(data):
"""Write meeting store entry from data dict"""
assert 'dtstart' in data #mandatory
assert 'uid' in data #mandatory
fpath = get_store_pathname(data['uid'])
data['written'] = ts_from_datetime()
with open(fpath, 'w') as h:
json.dump(data, h, indent=2)
logging.debug("Writing %s for uid %s", fpath, data['uid'])
return(data)
def update_store_with_data(data):
"""Update meeting store entry from data dict"""
logging.info("Updating store for uid %s", data['uid'])
ret = get_data_from_store(data['uid'])
ret.update(data) #replace existing data
ret = write_store_with_data(ret)
return(ret)
def update_store_from_ical():
"""Update meeting store from ical source"""
icspath = gconfig.get('Ical', 'path')
logging.debug("Update meeting store from %s", icspath)
levents = get_events(icspath, 90) #get events #FIXME : Conf ?
counter = 0
uidlist = []
for e in levents : #for each event
data = make_data_from_event(e)
update_store_with_data(data) #update
uidlist.append(data['uid']) #prepare removal
counter += 1
for uid in get_events_from_store():
if uid not in uidlist : #not in ics
remove_event_from_store(uid) #remove
logging.info("%d events read and created/updated", counter)
return(counter)
def get_events_from_store():
"""Read events from store"""
storepath = UmrConf.get_path('store')
events = {}
for f in os.listdir(storepath): #list files in store
fpath = os.path.join(storepath, f)
if os.path.isfile(fpath): #this is a file
data = get_data_from_file(fpath)
events[data['uid']] = data
return(events)
def remove_event_from_store(uid):
"""Delete event from store"""
fpath = get_store_pathname(uid)
try:
send_event_from_uid(uid, "REMOVED") #send removal
os.remove(fpath) #FIXME : Shall copy in backup dir ?
logging.debug("%s removed", uid)
except :
logging.debug("Could not remove %s", uid)
def remind_event_from_store(uid):
"""Remind event - and update data store"""
send_event_from_uid(uid, 'REMINDER')
data = {'reminded' : ts_from_datetime(), 'uid' : uid}
return(update_store_with_data(data))
def to_be_reminded(data):
"""Check if event has to be reminded"""
assert 'dtstart' in data
updated = data.get('updated', 0)
reminded = data.get('reminded', 0)
#rule of thumb : remind if half way between last update and the meeting
halfway = float(max(updated, reminded) + data['dtstart'])/2.0
now = ts_from_datetime()
return(now > halfway and now < data['dtstart'])
def remind_events(events):
"""Remind events if necessary"""
logging.info('Remind events')
reminded = []
for uid in events : #for each event
data = events[uid]
if to_be_reminded(data): #something to do
remind_event_from_store(uid)
reminded.append(data)
return(reminded)
def send_reminders():
events = get_events_from_store()
remind_events(events)
def print_events(events):
for e in events :
print(json.dumps(e, indent=2))
if __name__ == '__main__':
print("Test UmrIcal")