-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdynamic.py
More file actions
64 lines (57 loc) · 3.14 KB
/
dynamic.py
File metadata and controls
64 lines (57 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import sys
import joblib
import json
import pandas as pd
def inference_test(model_name,json_name,malware_label):
feature_labels = [
"files_opened", "files_read","file_deleted","file_recreated","regkey_opened", "regkey_read", "dll_loaded",
"severity_sum", "checks_debugger", "udp_connections", "allocates_rwx",
"nolookup_communication", "file_size",
"file_type", "execution_duration", "malware"
]
clf_features = [
"files_opened", "files_read","file_deleted","file_recreated","regkey_opened", "regkey_read", "dll_loaded",
"severity_sum", "checks_debugger", "udp_connections", "allocates_rwx",
"nolookup_communication", "file_size",
"file_type", "execution_duration"
]
feature_values = []
with open(json_name, "r") as json_file:
json_data = json.load(json_file)
summary_json = json_data.get("behavior", {}).get("generic", [{}])[0].get("summary", {})
signatures_json = json_data.get("signatures", [])
network_json = json_data.get("network", {})
file_info_json = json_data.get("target", {}).get("file", {})
info_json = json_data.get("info", {})
feature_values = [
len(summary_json.get("file_opened", [])), # Number of files opened
len(summary_json.get("file_read", [])), # Number of files read
len(summary_json.get("file_deleted", [])), # Number of files deleted
len(summary_json.get("file_recreated", [])), # Number of files recreated
len(summary_json.get("regkey_opened", [])),# Number of registry keys opened
len(summary_json.get("regkey_read", [])), # Number of registry keys read
len(summary_json.get("dll_loaded", [])), # Number of DLLs loaded
sum(signature.get("severity", 0) for signature in signatures_json), # Total severity of signatures
int(any(sig.get("name") == "checks_debugger" for sig in signatures_json)), # Checks for debugger (0 or 1)
len(network_json.get("udp", [])), # Number of UDP connections
int(any(sig.get("name") == "allocates_rwx" for sig in signatures_json)), # Memory RWX allocation (0 or 1)
int(any(sig.get("name") == "nolookup_communication" for sig in signatures_json)), # Communication without DNS lookup (0 or 1)
file_info_json.get("size", 0), # File size
file_info_json.get("type", ""), # Executable type
info_json.get("duration", 0), # Duration
malware_label
]
df = pd.DataFrame([feature_values], columns=feature_labels)
label_encoder = joblib.load("label_encoder.joblib")
df['file_type'] = label_encoder.fit_transform(df['file_type'])
scaler = joblib.load("knn_cb1_scaler.joblib")
df[clf_features] = scaler.transform(df[clf_features])
clf = joblib.load(model_name)
pred = clf.predict(df[clf_features].values)
print(f"is Malware? Predicted: {pred[0]} actual: {malware_label}")
return pred
if __name__ == "__main__":
if len(sys.argv) == 4:
print(inference_test(sys.argv[1], sys.argv[2], sys.argv[3]))
else:
print('Invalid arguments')