forked from nyaoouo/FFxivPythonTrigger2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBundleDecoder.py
More file actions
181 lines (155 loc) · 6.4 KB
/
Copy pathBundleDecoder.py
File metadata and controls
181 lines (155 loc) · 6.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import time
from struct import pack
from ctypes import sizeof
from threading import Lock
from traceback import format_exc
from typing import Optional
from zlib import decompress, compress, MAX_WBITS, error
from socket import ntohl
from datetime import datetime, timedelta, timezone
from queue import Queue
from FFxivPythonTrigger.Logger import Logger
from plugins.XivNetwork.Structs import FFXIVBundleHeader
_logger = Logger("XivNetwork/BundleDecoder")
SAFE_LIMIT = 100
MAGIC_NUMBER = 0x41a05252
MAGIC_NUMBER_Array = pack('I', MAGIC_NUMBER)
_encoding_error = False
magics = {
'magic0': None,
'magic1': None,
'magic2': None,
'magic3': None,
}
header_size = sizeof(FFXIVBundleHeader)
def decompress_message(header: FFXIVBundleHeader, buffer: bytearray) -> Optional[bytearray]:
if header.encoding == 0x0000 or header.encoding == 0x0001:
return buffer[header_size:header.length]
if header.encoding != 0x0101 and header.encoding != 0x0100:
global _encoding_error
if not _encoding_error:
_logger.error("unknown encoding type:", hex(header.encoding))
_encoding_error = True
return None
try:
return bytearray(decompress(buffer[header_size + 2:header.length], wbits=-MAX_WBITS))
except error:
_logger.error("Decompression error:\n", format_exc())
def compress_message(header: FFXIVBundleHeader, message: bytearray):
if header.encoding == 0x0000 or header.encoding == 0x0001:
return message
elif header.encoding == 0x0101 or header.encoding == 0x0100:
return bytearray(compress(message)[2:])
else:
_logger.error("Unknown encoding type:", hex(header.encoding))
return None
def extract_single(raw: bytearray):
if len(raw) < header_size: return
header = FFXIVBundleHeader.from_buffer(raw)
if header.magic0 != 0x41a05252 and header.magic0 and header.magic1 and header.magic2 and header.magic3:
_logger.error("[single] Invalid magic # in header:", raw[:header_size])
return
if header.length > len(raw):
_logger.error(f"[single] Invalid msg length({len(raw)}/{header.length})")
return
message_raw = decompress_message(header, raw)
if message_raw is None or not len(message_raw): return
try:
messages = list()
msg_offset = 0
for i in range(header.msg_count):
msg_len = int.from_bytes(message_raw[msg_offset:msg_offset + 4], byteorder='little')
messages.append(message_raw[msg_offset:msg_offset + msg_len])
msg_offset += msg_len
return header, messages
except Exception:
_logger.error("[single] Split message error:\n", format_exc())
return
def pack_single(header: Optional[FFXIVBundleHeader], messages: list[bytearray]):
x = header is None
if header is None:
header = FFXIVBundleHeader(**magics, epoch=int(time.time() * 1000), )
new_raw_message = bytearray()
for m in messages: new_raw_message += m
new_msg = compress_message(header, new_raw_message)
if new_msg is None: return
header.length = len(new_msg) + header_size
header.msg_count = len(messages)
ans = bytearray(header) + new_msg
# h, m = extract_single(ans)
# _logger(h, m[0].hex())
return ans
def reset_stream(buffer):
try:
idx = buffer.index(MAGIC_NUMBER_Array, 1)
except ValueError:
buffer.clear()
else:
del buffer[:idx]
class BundleDecoder(object):
# messages: list[tuple[int, bytearray]]
def __init__(self, recall: callable):
self.data = Queue()
self.recall = recall
self.messages = Queue()
self.work = False
self.is_processing = False
def store_data(self, data: bytearray):
self.data.put(data)
def stop_process(self):
self.work = False
self.data.put(bytearray())
while self.is_processing:
time.sleep(0.05)
def process(self):
self.work = True
self.is_processing = True
buffer = bytearray()
try:
while self.work:
buffer += self.data.get()
msg_cnt = 0
while len(buffer):
msg_cnt += 1
if msg_cnt>=SAFE_LIMIT:
_logger.error("too many msg in buffer!")
buffer.clear()
break
if len(buffer) < header_size:
break
header = FFXIVBundleHeader.from_buffer(buffer[:header_size])
if header.magic0 != MAGIC_NUMBER and header.magic0 and header.magic1 and header.magic2 and header.magic3:
_logger.error("Invalid magic in header:", header.get_data())
reset_stream(buffer)
continue
if not header.length:
_logger.error("Invalid header length:", header.get_data())
reset_stream(buffer)
continue
if header.length > len(buffer):
break
if header.magic0:
for key in magics.keys():
magics[key] = getattr(header, key)
message = decompress_message(header,buffer)
if message is None:
reset_stream(buffer)
else:
del buffer[:header.length]
if len(message):
try:
msg_offset = 0
msg_time = datetime.fromtimestamp(header.epoch / 1000)
# _logger.debug(f"{header.msg_count} in {len(message)} long [{bytearray(header).hex()}]")
for i in range(header.msg_count):
msg_len = int.from_bytes(message[msg_offset:msg_offset + 4], byteorder='little')
self.recall(msg_time, message[msg_offset:msg_offset + msg_len])
msg_offset += msg_len
except Exception:
_logger.error("Split message error:\n", format_exc())
buffer.clear()
break
except Exception:
self.is_processing = False
raise
self.is_processing = False