-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexplore_device_data.py
More file actions
271 lines (223 loc) · 8.56 KB
/
explore_device_data.py
File metadata and controls
271 lines (223 loc) · 8.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env python3
"""
Explore INKBIRD device data points and capabilities
Step 1: Understanding what the device can do
"""
import tinytuya
import json
import time
from datetime import datetime
# Working configuration from our tests
INKBIRD_IP = "192.168.1.170"
DEVICE_ID = "ebe68bf20a45e83945l5q6"
LOCAL_KEY = "MiPEx}<M:Zj@8<X!"
PROTOCOL_VERSION = 3.4
def create_device():
"""Create and return configured device instance"""
device = tinytuya.Device(
dev_id=DEVICE_ID,
address=INKBIRD_IP,
local_key=LOCAL_KEY,
version=PROTOCOL_VERSION
)
device.set_socketTimeout(10)
return device
def explore_basic_status():
"""Get basic device status - what we already know works"""
print("🔍 Getting Basic Device Status")
print("-" * 40)
device = create_device()
try:
status = device.status()
print(f"✅ Device Status: {json.dumps(status, indent=2)}")
if 'dps' in status:
print(f"\n📊 Available Data Points:")
for dps_id, value in status['dps'].items():
print(f" DPS {dps_id}: {value} (type: {type(value).__name__})")
return status
except Exception as e:
print(f"❌ Error getting status: {e}")
return None
def discover_all_dps():
"""Try to discover all available data points"""
print("\n🔍 Discovering All Available Data Points")
print("-" * 40)
device = create_device()
try:
# Use tinytuya's built-in DPS discovery
dps_data = device.detect_available_dps()
print(f"✅ DPS Discovery Result: {json.dumps(dps_data, indent=2)}")
return dps_data
except Exception as e:
print(f"❌ Error discovering DPS: {e}")
return None
def analyze_dps_meanings():
"""Analyze what each DPS might mean based on the schema from logs"""
print("\n🧠 Analyzing DPS Meanings (from APK analysis)")
print("-" * 40)
# From the logs, we found schema information
known_schema = {
"101": {
"name": "real_time_data",
"description": "当前探头温度和风扇输出 (Current probe temperature and fan output)",
"mode": "ro", # read-only
"type": "raw",
"maxlen": 128
},
"107": {
"name": "setting_para",
"description": "设置参数 (Setting parameters)",
"mode": "rw", # read-write
"type": "raw",
"maxlen": 128
},
"108": {
"name": "alarm",
"description": "报警 (Alarm status)",
"mode": "ro", # read-only
"type": "bitmap",
"alarms": [
"p1_temp_ah_alm", "p2_temp_ah_alm", "p3_temp_ah_alm", "p4_temp_ah_alm",
"p1_temp_al_alm", "p2_temp_al_alm", "p3_temp_al_alm", "p4_temp_al_alm",
"p1_lost_alm", "p2_lost_alm", "p3_lost_alm", "p4_lost_alm",
"in_temp_ah_alm", "grill_open_alm"
]
},
"116": {
"name": "Probe_Preset_Extra_1",
"description": "预设值附加数据1 (Preset additional data 1)",
"mode": "rw",
"type": "raw",
"maxlen": 128
},
"117": {
"name": "Probe_Preset_Extra_2",
"description": "预设值附加数据2 (Preset additional data 2)",
"mode": "rw",
"type": "raw",
"maxlen": 128
},
"120": {
"name": "unknown_boolean",
"description": "Unknown boolean value - possibly power/enable state",
"mode": "rw",
"type": "boolean"
}
}
for dps_id, info in known_schema.items():
print(f"📍 DPS {dps_id}: {info['name']}")
print(f" Description: {info['description']}")
print(f" Mode: {'Read/Write' if info['mode'] == 'rw' else 'Read-Only'}")
print(f" Type: {info['type']}")
print()
def test_multiple_reads():
"""Take several readings to see if values change"""
print("\n📈 Taking Multiple Readings (watching for changes)")
print("-" * 40)
device = create_device()
readings = []
for i in range(5):
try:
status = device.status()
timestamp = datetime.now().isoformat()
if status and 'dps' in status:
reading = {
'timestamp': timestamp,
'reading': i + 1,
'dps': status['dps']
}
readings.append(reading)
print(f"📊 Reading {i+1}: {status['dps']}")
# Wait between readings
if i < 4:
time.sleep(10)
else:
print(f"❌ Reading {i+1} failed: {status}")
except Exception as e:
print(f"❌ Reading {i+1} error: {e}")
# Analyze changes
print(f"\n📊 Analysis of {len(readings)} readings:")
if len(readings) > 1:
for dps_id in readings[0]['dps'].keys():
values = [r['dps'].get(dps_id) for r in readings]
unique_values = list(set(str(v) for v in values))
if len(unique_values) > 1:
print(f" DPS {dps_id}: CHANGING values: {unique_values}")
else:
print(f" DPS {dps_id}: STABLE value: {values[0]}")
return readings
def test_simple_commands():
"""Try some simple commands to see what the device accepts"""
print("\n🎛️ Testing Simple Commands")
print("-" * 40)
device = create_device()
# Test commands based on what we know
test_commands = [
{"dps": 120, "value": True, "description": "Set DPS 120 to True"},
{"dps": 120, "value": False, "description": "Set DPS 120 to False"},
# Don't test read-only DPS like 108 (alarms) or 101 (real-time data)
]
for cmd in test_commands:
try:
print(f"🔧 Testing: {cmd['description']}")
# Get current value first
current_status = device.status()
if current_status and 'dps' in current_status:
current_value = current_status['dps'].get(str(cmd['dps']))
print(f" Current DPS {cmd['dps']}: {current_value}")
# Send command
result = device.set_value(cmd['dps'], cmd['value'])
print(f" Command result: {result}")
# Check new value
time.sleep(2)
new_status = device.status()
if new_status and 'dps' in new_status:
new_value = new_status['dps'].get(str(cmd['dps']))
print(f" New DPS {cmd['dps']}: {new_value}")
if str(new_value) != str(current_value):
print(f" ✅ Command worked! Changed from {current_value} to {new_value}")
else:
print(f" ❓ Value unchanged - command may not have worked")
print()
except Exception as e:
print(f" ❌ Command error: {e}")
print()
def main():
print("🚀 INKBIRD Device Data Point Exploration")
print("=" * 60)
print(f"Device ID: {DEVICE_ID}")
print(f"Device IP: {INKBIRD_IP}")
print(f"Protocol: {PROTOCOL_VERSION}")
print()
# Step 1: Basic status
basic_status = explore_basic_status()
# Step 2: DPS discovery
dps_discovery = discover_all_dps()
# Step 3: Analyze meanings
analyze_dps_meanings()
# Step 4: Multiple readings
readings = test_multiple_reads()
# Step 5: Test commands
test_simple_commands()
# Save results
results = {
'timestamp': datetime.now().isoformat(),
'basic_status': basic_status,
'dps_discovery': dps_discovery,
'multiple_readings': readings,
'device_config': {
'device_id': DEVICE_ID,
'ip': INKBIRD_IP,
'protocol': PROTOCOL_VERSION
}
}
with open('device_exploration_results.json', 'w') as f:
json.dump(results, f, indent=2)
print("💾 Results saved to device_exploration_results.json")
print("\n🎯 Next Steps:")
print("1. Analyze the results to understand temperature data format")
print("2. Test commands to control fan speed")
print("3. Build Python controller class")
print("4. Create Flask web interface")
if __name__ == "__main__":
main()