Skip to content

Commit 43b120e

Browse files
authored
Fix Problems when frontend and database are in a different timezone (#34)
The problem was that a) the frontend generated timestamps without timezones which postgresql then interpreted as the UTC but which were actually in the timezone which you ran the python code and b) the closing of open runs after an exception which kills the ETL runner itself (not individual tasks) after an error happened to insert a timestamp with timezone of the time on the postgresql server. -> the error only happens when an exception happens which triggers this code path, i.e. because something shuts down the the runner (e.g. docker instance is shut down or ctrl+c on the command line). Should fix mara/mara-example-project-2#12
1 parent 5ab58be commit 43b120e

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

CHANGELOG.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
# Changelog
22

3+
## 2.8.1 (2020-04-27)
4+
5+
- Fix Problems when frontend and database are in a different timezone (#34)
6+
37
## 2.8.0 (2020-03-25)
48

59
- Implement pipeline notifications via Microsoft Teams #28
610
- Make it possible to disable output coloring in command line etl runs (#31)
711

8-
912
## 2.7.0 (2020-03-05)
1013

1114
- Make event handlers configurable: this allows for e.g. adding your own notifier for specific events

data_integration/execution.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55

66
import datetime
7+
from datetime import timezone as tz
78
import functools
89
import multiprocessing
910
import os
@@ -99,7 +100,7 @@ def with_all_upstreams(nodes: {pipelines.Node}):
99100
queue([pipeline])
100101

101102
# book keeping
102-
run_start_time = datetime.datetime.now()
103+
run_start_time = datetime.datetime.now(tz.utc)
103104
# all nodes that already ran or that won't be run anymore
104105
processed_nodes: {pipelines.Node} = set()
105106
# running pipelines with start times and number of running children
@@ -134,10 +135,10 @@ def track_finished_pipelines():
134135
succeeded = running_pipeline not in failed_pipelines
135136
event_queue.put(pipeline_events.Output(
136137
node_path=running_pipeline.path(), format=logger.Format.ITALICS, is_error=not succeeded,
137-
message=f'{"succeeded" if succeeded else "failed"}, {logger.format_time_difference(run_start_time, datetime.datetime.now())}'))
138+
message=f'{"succeeded" if succeeded else "failed"}, {logger.format_time_difference(run_start_time, datetime.datetime.now(tz.utc))}'))
138139
event_queue.put(pipeline_events.NodeFinished(
139140
node_path=running_pipeline.path(), start_time=start_time,
140-
end_time=datetime.datetime.now(), is_pipeline=True, succeeded=succeeded))
141+
end_time=datetime.datetime.now(tz.utc), is_pipeline=True, succeeded=succeeded))
141142
del running_pipelines[running_pipeline]
142143
processed_nodes.add(running_pipeline)
143144

@@ -180,7 +181,7 @@ def track_finished_pipelines():
180181
queue(list(next_node.nodes.values()))
181182

182183
# book keeping and event emission
183-
pipeline_start_time = datetime.datetime.now()
184+
pipeline_start_time = datetime.datetime.now(tz.utc)
184185
running_pipelines[next_node] = [pipeline_start_time, 0]
185186
event_queue.put(pipeline_events.NodeStarted(next_node.path(), pipeline_start_time, True))
186187
event_queue.put(pipeline_events.Output(
@@ -190,7 +191,7 @@ def track_finished_pipelines():
190191

191192
elif isinstance(next_node, pipelines.ParallelTask):
192193
# create sub tasks and queue them
193-
task_start_time = datetime.datetime.now()
194+
task_start_time = datetime.datetime.now(tz.utc)
194195
try:
195196
logger.redirect_output(event_queue, next_node.path())
196197
logger.log('☆ Launching tasks', format=logger.Format.ITALICS)
@@ -207,7 +208,7 @@ def track_finished_pipelines():
207208
format=pipeline_events.Output.Format.VERBATIM, is_error=True)
208209
event_queue.put(pipeline_events.NodeFinished(
209210
node_path=next_node.path(), start_time=task_start_time,
210-
end_time=datetime.datetime.now(), is_pipeline=True, succeeded=False))
211+
end_time=datetime.datetime.now(tz.utc), is_pipeline=True, succeeded=False))
211212

212213
failed_pipelines.add(next_node.parent)
213214
processed_nodes.add(next_node)
@@ -219,7 +220,7 @@ def track_finished_pipelines():
219220
if next_node.parent in running_pipelines:
220221
running_pipelines[next_node.parent][1] += 1
221222
event_queue.put(
222-
pipeline_events.NodeStarted(next_node.path(), datetime.datetime.now(), False))
223+
pipeline_events.NodeStarted(next_node.path(), datetime.datetime.now(tz.utc), False))
223224
event_queue.put(pipeline_events.Output(
224225
node_path=next_node.path(), format=logger.Format.ITALICS,
225226
message='★ ' + node_cost.format_duration(
@@ -246,7 +247,7 @@ def track_finished_pipelines():
246247
for parent in task_process.task.parents()[:-1]:
247248
failed_pipelines.add(parent)
248249

249-
end_time = datetime.datetime.now()
250+
end_time = datetime.datetime.now(tz.utc)
250251
event_queue.put(
251252
pipeline_events.Output(task_process.task.path(),
252253
('succeeded' if succeeded else 'failed') + ', '
@@ -273,7 +274,7 @@ def track_finished_pipelines():
273274
statistics_process.join()
274275

275276
# run finished
276-
event_queue.put(pipeline_events.RunFinished(node_path=pipeline.path(), end_time=datetime.datetime.now(),
277+
event_queue.put(pipeline_events.RunFinished(node_path=pipeline.path(), end_time=datetime.datetime.now(tz.utc),
277278
succeeded=not failed_pipelines,
278279
interactively_started=interactively_started))
279280

@@ -326,7 +327,7 @@ def __init__(self, task: pipelines.Task, event_queue: multiprocessing.Queue, sta
326327
self.task = task
327328
self.event_queue = event_queue
328329
self.status_queue = status_queue
329-
self.start_time = datetime.datetime.now()
330+
self.start_time = datetime.datetime.now(tz.utc)
330331

331332
def run(self):
332333
# redirect stdout and stderr to queue

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ def get_long_description():
77

88
setup(
99
name='data-integration',
10-
version='2.8.0',
10+
version='2.8.1',
1111

1212
description='Opinionated lightweight ETL pipeline framework',
1313

0 commit comments

Comments
 (0)