This repository has been archived by the owner on Jul 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataIterator.py
103 lines (91 loc) · 3.5 KB
/
DataIterator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#
# Copyright (c) 2018, Edgewise Networks Inc. All rights reserved.
#
import os, sys
import gzip
from itertools import repeat
from Netflows import Netflow
from random import random, randint, sample
import multiprocessing
from datetime import datetime
def iterdir( dirname ):
try:
for fname in sorted(map(os.path.join, repeat(dirname), os.listdir(dirname))):
if os.path.isfile(fname):
yield fname
elif os.path.isdir(fname):
for p in iterdir(fname):
yield p
except:
pass
def iterNetflows(fname):
with gzip.open(fname, 'r') as f:
for x in f:
yield Netflow( *x.decode('utf8').strip().split('\t') )
def iterateData(pathname, siteId):
dirname = pathname % siteId
for fname in iterdir(dirname):
if fname.endswith(".txt.gz"):
for nf in iterNetflows(fname):
yield nf
def iterateNetworkDataImpl(pathname, siteId, maxCount=None):
cnt = 0
dirname = pathname % siteId
for fname in iterdir(dirname):
if fname.endswith(".txt.gz"):
for nf in iterNetflows(fname):
if maxCount is not None and cnt >= maxCount:
print("iterated %i netflows - terminated" % cnt)
raise StopIteration
if nf.inNetwork():
cnt += 1
yield nf
print("iterated %i netflows - completed" % cnt)
def iterateNetworkData(pathname, siteId, maxCount=None):
startTime = datetime.now().timestamp()
it = iterateNetworkDataImpl(pathname, siteId, maxCount)
nf = next(it)
offset = startTime - nf.timestamp
nf.timestamp = startTime
yield nf
for nf in it:
nf.timestamp += offset
yield nf
def processFile(fname):
print("start processFile:", fname)
return [nf for nf in iterNetflows(fname) if nf.inNetwork()]
def iterateNetworkDataParallel(pathname, siteId, maxCount=None):
cnt = 0
dirname = pathname % siteId
it = [fname for fname in iterdir(dirname) if fname.endswith(".txt.gz")]
numthreads = 8
# create the process pool
pool = multiprocessing.Pool(processes=numthreads)
for netflowList in pool.imap(processFile, it, 1):
if maxCount is not None and cnt >= maxCount:
pool.terminate()
break
cnt += len(netflowList)
for nf in netflowList:
yield nf
print("iterated %i netflows - completed" % cnt)
def iterateNetworkDataWithPortScanning(pathname, siteId, insertionFreq=0.01, maxCount=None):
portScanner = "10.10.10.10"
it = iterateNetworkData(pathname, siteId, maxCount)
ipPortSet = set()
dirname = pathname % siteId
for nf in it:
srcip = nf.srcip
dstip = nf.dstip
srcport = nf.srcport
dstport = nf.dstport
ipPortSet.add( (srcip, srcport) )
ipPortSet.add( (dstip, dstport) )
yield nf
if len(ipPortSet) > 100 and random() < insertionFreq:
ts = nf.timestamp
srcport = randint(0, 65536)
dstip, dstport = sample(ipPortSet, 1)[0]
flowCount = randint(1, 100)
scanFlow = Netflow(ts, portScanner, srcport, dstip, dstport, flowCount)
yield scanFlow