-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathprotobuf_easy_decode.py
More file actions
executable file
·94 lines (83 loc) · 3.03 KB
/
protobuf_easy_decode.py
File metadata and controls
executable file
·94 lines (83 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/python
#This will use a lot of code from the google protobuf project
#to generate decoders for the most general of cases.
#Once I learn a bit more about the proto and the src base
#I should be able to just use the protobuf code base to
#accomplise this task.
import sys
import binascii
class ProtobufEasyDecode:
WIRETYPE_VARINT = 0
WIRETYOE_FIXED_64 = 1
WIRETYPE_LENGTHDELIM = 2
WIRETYPE_STARTGROUP = 3
WIRETYPE_ENDGROUP = 4
WIRETYPE_FIXED_32 = 5
raw_message = ""
decoded_message = {}
def __init__(self,new_message):
self.raw_message = new_message
def decode_varint(self,buf,pos):
#pass in buffer and starting position
#return the int and the ending pos
result = 0
shifts = 0
while True:
current_byte = ord(buf[pos])
result = result | ((current_byte & 0x7f) << shifts)
pos = pos + 1
if not (current_byte & 0x80):
return (result,pos)
shifts = shifts + 7
def decode_tag_header(self, tag_header):
#returns tag_id, and tag_type
return (tag_header >> 3, tag_header & 0x07)
def decode_lengthdelim (self, buf, pos):
#pass in buffer and start pos
#return bytes and ending pos
length,pos = self.decode_varint(buf,pos)
new_pos = pos + length
return buf[pos:new_pos],new_pos
def decode_raw_message(self,message):
alls_good = True
pos = 0
temp_proto = {}
while alls_good:
current_tag_header,pos=self.decode_varint(message,pos)
current_tag_id,current_tag_type = \
self.decode_tag_header(current_tag_header)
if current_tag_type == self.WIRETYPE_LENGTHDELIM:
data,pos = self.decode_lengthdelim(message,pos)
elif current_tag_type == self.WIRETYPE_VARINT:
data,pos = self.decode_varint(message,pos)
else:
data = "ERR"
pos = len(message)
alls_good = False
temp_proto[current_tag_id] = (current_tag_type,data)
if pos == len(message):
alls_good = False
return temp_proto
def get_decoded_raw_message (self):
if self.decoded_message == {}:
self.decoded_message = self.decode_raw_message(self.raw_message)
return self.decoded_message
else:
return self.decoded_message
def decode_tag(self, tag_id):
if self.decoded_message == {}:
return
if not (tag_id in self.decoded_message):
return
if self.decoded_message[tag_id][0] == self.WIRETYPE_LENGTHDELIM:
self.decoded_message[tag_id] = \
(self.WIRETYPE_LENGTHDELIM,
self.decode_raw_message(self.decoded_message[tag_id][1]))
else:
return
x = ProtobufEasyDecode(binascii.unhexlify(sys.argv[1]))
x.get_decoded_raw_message()
x.decode_tag(3)
x.decode_tag(5)
x.decode_tag(10)
print x.get_decoded_raw_message()