-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexternal_wrapper.py
More file actions
131 lines (113 loc) · 5.45 KB
/
external_wrapper.py
File metadata and controls
131 lines (113 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from os import path, mkdir
from subprocess import run, PIPE
from virustotal_python import Virustotal
from pprint import pprint
from sys import stdout, getfilesystemencoding
class ExternalTesting:
# This requires a virustotal API key
VIRUSTOTAL = 4
# This requires the installation of the Malice multiscanner along with installed scanner plugins
MALICE = 2
NONE = 0
MALICE_SCANNERS = ['malice/avast', 'malice/avg', 'malice/clamav', 'malice/comodo', 'malice/escan', 'malice/fprot',
'malice/fsecure', 'malice/mcafee', 'malice/sophos', 'malice/zoner']
# Initialize module logfile and tool output file to STDOUT, initialize output headers
def __init__(self):
self.logfile = stdout
self.output = False
self.virus_total = None
self.virus_total_output_path = ''
self.malice_output_path = ''
self.scanners = ExternalTesting.NONE
@staticmethod
def check_or_make_path(target_path):
if not path.isdir(target_path):
mkdir(target_path)
def get_scanners(self):
return self.MALICE_SCANNERS
# Specify module logfile manually
def set_logfile(self, logfile_path):
self.logfile = open(logfile_path, 'w')
# Specify module logfile directory, but use default name for logfile
def set_logfile_by_directory(self, logfile_directory):
self.logfile = open(path.join(logfile_directory, 'external_scanner.log'), 'w')
# Flush module logfile
def flush_logfile(self):
self.logfile.flush()
# Close module logfile
def close_logfile(self):
if self.logfile != stdout:
self.logfile.close()
# Specify tool output file
def set_output_directory(self, output_path):
self.check_or_make_path(output_path)
if self.scanners and self.VIRUSTOTAL:
self.virus_total_output_path = path.join(output_path, 'VirusTotal')
ExternalTesting.check_or_make_path(self.virus_total_output_path)
if self.scanners and self.MALICE:
self.malice_output_path = path.join(output_path, 'Malice')
ExternalTesting.check_or_make_path(self.malice_output_path)
self.output = True
# Flush tool output file
def flush_output(self):
self.output.flush()
# Close tool output file
def close_output(self):
if self.output != stdout:
self.output.close()
# Close all logfiles (tool output, module logs)
def close(self):
self.close_logfile()
self.close_output()
def set_virus_total_key(self, key):
self.virus_total = Virustotal(key)
def set_external_scanners(self, setting):
if setting in [ExternalTesting.NONE, ExternalTesting.MALICE, ExternalTesting.VIRUSTOTAL,
ExternalTesting.MALICE | ExternalTesting.VIRUSTOTAL]:
self.scanners = setting
return True
else:
return False
def check_virus_total(self):
if self.virus_total:
return True
else:
return False
# This function expects input in the form of destination filename, md5 sum, and path to file to scan. The output is
# a .json file (if the VirusTotal api is enabled) and an .md file (if the Malice command is enabled).
def process_file_list_multiscanner(self, md5_list):
if (self.scanners == self.NONE):
return False
if (self.scanners and self.VIRUSTOTAL) and not (self.check_virus_total()):
raise AttributeError('VirusTotal API invoked without providing a key using the virus_total_key(key) method')
for entry in md5_list:
if self.scanners and self.VIRUSTOTAL:
self.logfile.write('Invoking VirusTotal on {}.'.format(entry[2]))
destination_file_path = path.join(self.virus_total_output_path, '{}.json'.format(entry[0]))
print(destination_file_path)
virus_total_report = self.virus_total.file_report([entry[1]])
if self.output:
with open(destination_file_path, 'w') as write_file:
pprint(virus_total_report, write_file)
self.logfile.write("Wrote VirusTotal report for {} to {}".format(entry[2],
destination_file_path))
else:
pprint(virus_total_report)
if self.scanners and self.MALICE:
self.logfile.write('Invoking Malice Scanner on {}.'.format(entry[2]))
destination_file_path = path.join(self.malice_output_path, '{}.md'.format(entry[0]))
malice_output = ""
for scanner in self.MALICE_SCANNERS:
print(entry[2], scanner)
command = "docker run --rm -v {}:/malware:ro {} -t " \
"/malware/{}".format(path.dirname(entry[2]), scanner, path.basename(entry[2]))
malice = run(command, shell=True, encoding=getfilesystemencoding(), stdout=PIPE)
malice_output += malice.stdout
if self.output:
with open(destination_file_path, 'w') as write_file:
write_file.write(malice_output)
self.logfile.write(
"Wrote Malice output {} to {}".format(entry[2], destination_file_path))
else:
print(malice_output)
return True