1
+ import os , time , subprocess
2
+ import datetime
3
+ from os import listdir
4
+ from os .path import isfile , join
5
+
6
+ from .celery import app
7
+ from celery .contrib .abortable import AbortableTask
8
+ from django_celery_results .models import TaskResult
9
+
10
+ from django .contrib .auth .models import User
11
+ from django .conf import settings
12
+ from celery .exceptions import Ignore , TaskError
13
+
14
+
15
+ def get_scripts ():
16
+ """
17
+ Returns all scripts from 'ROOT_DIR/celery_scripts'
18
+ """
19
+ raw_scripts = []
20
+ scripts = []
21
+ ignored_ext = ['db' , 'txt' ]
22
+
23
+ try :
24
+ raw_scripts = [f for f in listdir (settings .CELERY_SCRIPTS_DIR ) if isfile (join (settings .CELERY_SCRIPTS_DIR , f ))]
25
+ except Exception as e :
26
+ return None , 'Error CELERY_SCRIPTS_DIR: ' + str ( e )
27
+
28
+ for filename in raw_scripts :
29
+
30
+ ext = filename .split ("." )[- 1 ]
31
+ if ext not in ignored_ext :
32
+ scripts .append ( filename )
33
+
34
+ return scripts , None
35
+
36
+ def write_to_log_file (logs , script_name ):
37
+ """
38
+ Writes logs to a log file with formatted name in the CELERY_LOGS_DIR directory.
39
+ """
40
+ script_base_name = os .path .splitext (script_name )[0 ] # Remove the .py extension
41
+ current_time = datetime .datetime .now ().strftime ("%y%m%d-%H%M%S" )
42
+ log_file_name = f"{ script_base_name } -{ current_time } .log"
43
+ log_file_path = os .path .join (settings .CELERY_LOGS_DIR , log_file_name )
44
+
45
+ with open (log_file_path , 'w' ) as log_file :
46
+ log_file .write (logs )
47
+
48
+ return log_file_path
49
+
50
+ @app .task (bind = True , base = AbortableTask )
51
+ def execute_script (self , data : dict ):
52
+ """
53
+ This task executes scripts found in settings.CELERY_SCRIPTS_DIR and logs are later generated and stored in settings.CELERY_LOGS_DIR
54
+ :param data dict: contains data needed for task execution. Example `input` which is the script to be executed.
55
+ :rtype: None
56
+ """
57
+ script = data .get ("script" )
58
+ args = data .get ("args" )
59
+
60
+ print ( '> EXEC [' + script + '] -> (' + args + ')' )
61
+
62
+ scripts , ErrInfo = get_scripts ()
63
+
64
+ if script and script in scripts :
65
+ # Executing related script
66
+ script_path = os .path .join (settings .CELERY_SCRIPTS_DIR , script )
67
+ process = subprocess .Popen (
68
+ f"python { script_path } { args } " , shell = True , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
69
+ time .sleep (8 )
70
+
71
+ exit_code = process .wait ()
72
+ error = False
73
+ status = "STARTED"
74
+ if exit_code == 0 : # If script execution successfull
75
+ logs = process .stdout .read ().decode ()
76
+ status = "SUCCESS"
77
+ else :
78
+ logs = process .stderr .read ().decode ()
79
+ error = True
80
+ status = "FAILURE"
81
+
82
+
83
+ log_file = write_to_log_file (logs , script )
84
+
85
+ return {"logs" : logs , "input" : script , "error" : error , "output" : "" , "status" : status , "log_file" : log_file }
0 commit comments