Skip to content

Commit

Permalink
Merge pull request #234 from eaudeweb/232_implement_ftp_retry
Browse files Browse the repository at this point in the history
Implement ftp retry for update_ted management command #232
  • Loading branch information
dianaboiangiu authored Oct 17, 2019
2 parents 3e28fd0 + 62a4361 commit b6780d8
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 36 deletions.
2 changes: 1 addition & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def content(self):
parsed = parser.from_file(self.document.path)
return parsed["content"]
except (ValueError, FileNotFoundError) as e:
logging.debug(e)
logging.warning(e)
pass

class Meta:
Expand Down
59 changes: 30 additions & 29 deletions app/parsers/ted.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def ftp_download_latest_archives(self):
while last_date <= today:
self.download_archive(ftp, last_date, archives)


last_date += timedelta(1)

if last_year != last_date.strftime('%Y'):
Expand Down Expand Up @@ -95,11 +94,17 @@ def download_archive(self, ftp, archive_date, archives):
if not os.path.exists(self.path):
os.makedirs(self.path)
file_path = os.path.join(self.path, archive_name)
with open(file_path, 'wb') as f:
ftp.retrbinary(
'RETR %s' % archive_name, lambda data: f.write(data)
)
self.archives.append(file_path)
while True:
if not os.path.exists(file_path):
with open(file_path, "wb") as f:
ftp.retrbinary(
"RETR %s" % archive_name, lambda data: f.write(data)
)
self.archives.append(file_path)
break
logging.warning(f"File already downloaded, waiting 30 seconds: {file_path}")
time.sleep(30)
logging.warning("Retrying")

def parse_notices(self, tenders=[], set_notified=False):
changed_tenders = []
Expand All @@ -120,14 +125,14 @@ def parse_notices(self, tenders=[], set_notified=False):
try:
os.remove(archive_path)
except OSError as e:
logging.debug(e)
logging.warning(e)
pass

for folder in folders:
try:
os.rmdir(os.path.join(self.path, folder))
except OSError as e:
logging.debug(e)
logging.warning(e)
pass

return changed_tenders, tenders_count
Expand All @@ -138,16 +143,24 @@ def extract_data(archive_path, extract_path):
tf = tarfile.open(archive_path, 'r:gz')
tf.extractall(extract_path)
return tf.getnames()[0]
except FileNotFoundError as e:
logging.debug(e)
except (EOFError, FileNotFoundError) as e:
logging.warning(e)

return

@staticmethod
def ftp_login():
ftp = FTP(settings.TED_FTP_URL)
ftp.login(user=settings.TED_FTP_USER, passwd=settings.TED_FTP_PASSWORD)
return ftp
while True:
try:
ftp = FTP(settings.TED_FTP_URL)
ftp.login(user=settings.TED_FTP_USER, passwd=settings.TED_FTP_PASSWORD)
logging.warning("Logged into FTP.")
return ftp
except error_perm as e:
logging.warning(f"Cannot login to FTP, waiting 30 seconds: {e}")
time.sleep(30)
logging.warning("Retrying")
continue

@staticmethod
def get_archive_name(last_date, archives):
Expand All @@ -165,10 +178,7 @@ def add_worker_log(source, tenders_count):
@staticmethod
def last_update(source):
worker_log = (
WorkerLog.objects
.filter(source=source)
.order_by('-update')
.first()
WorkerLog.objects.filter(source=source).order_by("-update").first()
)
return worker_log.update if worker_log else None

Expand Down Expand Up @@ -486,15 +496,6 @@ def get_archives_path():

def process_daily_archive(given_date):
w = TEDWorker(given_date)
while True:
try:
w.ftp_download_daily_archives()
w.parse_notices([], True)
break
except error_perm as e:
logging.warning(f'Waiting 30 seconds: {e}')
time.sleep(30)
logging.warning('Retrying')
continue

return f'Updated {given_date} TED tenders'
w.ftp_download_daily_archives()
w.parse_notices([], True)
return f"Updated {given_date} TED tenders"
16 changes: 15 additions & 1 deletion app/tests/test_ted_parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from datetime import date, datetime, timedelta
from multiprocessing import Process

from django.test import override_settings
from django.utils.timezone import make_aware

from app.factories import CPVCodeFactory, TedCountryFactory
from app.models import Winner, Tender
from app.parsers.ted import TEDParser
from app.parsers.ted import TEDParser, process_daily_archive
from app.tests.base import BaseTestCase


Expand Down Expand Up @@ -149,3 +150,16 @@ def test_ted_winner_date_format(self):
_, winners = self.parser._parse_notice(f.read(), [], 'test', {}, False)

self.assertEqual(winners[0]['award_date'], date.today())

def test_multiple_update_ted_ftp_retry(self):
def run_process_daily_archive():
process_daily_archive(date(day=13, month=10, year=2019))

processes = []
for _ in range(4):
p = Process(target=run_process_daily_archive)
p.start()
processes.append(p)

join = [p.join() for p in processes]
assert len([p for p in join if p]) == 0
11 changes: 6 additions & 5 deletions crontab
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
0 8 * * * python /var/local/scratch/manage.py update_ungm
0 8 * * * python /var/local/scratch/manage.py update_ted
0 8 * * * * python /var/local/scratch/manage.py update_ungm; python /var/local/scratch/manage.py update_ted; python /var/local/scratch/manage.py notify --digest; python /var/local/scratch/manage.py notify_favorites --digest; python /var/local/scratch/manage.py notify_keywords --digest
# 0 8 * * * python /var/local/scratch/manage.py update_ungm
# 0 8 * * * python /var/local/scratch/manage.py update_ted
# 0 8 * * * python /var/local/scratch/manage.py update_winners
0 8 * * * python /var/local/scratch/manage.py notify --digest
0 8 * * * python /var/local/scratch/manage.py notify_favorites --digest
0 8 * * * python /var/local/scratch/manage.py notify_keywords --digest
# 0 8 * * * python /var/local/scratch/manage.py notify --digest
# 0 8 * * * python /var/local/scratch/manage.py notify_favorites --digest
# 0 8 * * * python /var/local/scratch/manage.py notify_keywords --digest

0 comments on commit b6780d8

Please sign in to comment.