Skip to content

Commit b3a4562

Browse files
committed
aioservices/aiomqtt: Fix firmware match re &
* Fix pattern matching with '*' wildcards and * Allow asyncmd ota server serve local firmware files (filesystem path as name) as well as remote files (URLs paths as names) e.g.: - /path/to/firmware/build-ESP32_OTA/micropython.bin - http://myserver.local:8000/firmware/build-ESP32_OTA/micropython.bin
1 parent deec5cf commit b3a4562

File tree

3 files changed

+202
-19
lines changed

3 files changed

+202
-19
lines changed

aioservices/services/aiomqtt_service.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io
1313
import aiostats
1414
import machine
15+
import re
1516

1617

1718
class MQTTService(Service):
@@ -107,6 +108,20 @@ def _taskinfo(self):
107108
self._services_total = len(aioctl.tasks_match("*.service"))
108109
self._ctasks_total = len(aioctl.tasks_match("*.service.*"))
109110

111+
def _grep(self, patt, filename):
112+
if isinstance(patt, list):
113+
pass
114+
else:
115+
patt = [patt]
116+
_pattlst = (
117+
re.compile(_patt.replace(".", r"\.").replace("*", ".*") + "$")
118+
for _patt in patt
119+
)
120+
try:
121+
return any(_pattrn.match(filename) for _pattrn in _pattlst)
122+
except Exception:
123+
return None
124+
110125
@aioctl.aiotask
111126
async def do_action(self, action, service):
112127
if action == "status":
@@ -329,7 +344,7 @@ async def do_action(self, action, service):
329344
if self.log:
330345
self.log.info("No new OTA update")
331346
return
332-
elif not _ota_params["fwfile"].endswith(self._fwfile):
347+
elif not self._grep(self._fwfile, _ota_params["fwfile"]):
333348
if self.log:
334349
self.log.info("No new OTA update")
335350
return
@@ -347,7 +362,9 @@ async def do_action(self, action, service):
347362
{
348363
"notify": False,
349364
"sha": _ota_service._comp_sha_ota("", rtn=True),
350-
"fwfile": self._fwfile,
365+
"fwfile": self._fwfile
366+
if not self._fwf_re
367+
else _ota_params["fwfile"],
351368
"ip": (
352369
aioctl.group()
353370
.tasks["network.service"]

cli/asyncmd/async_ota.py

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
import json
1313
import asyncio
1414
import time
15+
from urllib.parse import urlparse
16+
import urllib.request
17+
import aiohttp
18+
import tempfile
1519

1620
BLOCKLEN = 4096
1721

@@ -31,6 +35,14 @@
3135
pb = False
3236

3337

38+
def is_url(url):
39+
try:
40+
result = urlparse(url)
41+
return all([result.scheme, result.netloc])
42+
except ValueError:
43+
return False
44+
45+
3446
def do_pg_bar(
3547
fwrite,
3648
index,
@@ -165,14 +177,56 @@ def __init__(
165177
self.log.info("Serving firmware:")
166178
for fwfile in firmwares:
167179
self.log.info(f"- {fwfile}")
168-
if os.path.exists(fwfile):
169-
self.update_sha(fwfile)
180+
self.update_sha(fwfile)
170181

171182
def update_sha(self, fwfile):
172-
# self._fw_file = firmware
173-
with open(fwfile, "rb") as fwr:
174-
firmware = fwr.read()
183+
if os.path.exists(fwfile):
184+
with open(fwfile, "rb") as fwr:
185+
firmware = fwr.read()
186+
elif is_url(fwfile):
187+
try:
188+
firmware = self.http_get(fwfile)
189+
except Exception:
190+
self.log.error(f"Firmware: {fwfile} not available")
191+
return
192+
else:
193+
self.log.error(f"Firmware: {fwfile} not available")
194+
195+
return
196+
197+
self._fw_files[fwfile]["sz"] = sz = len(firmware)
198+
if is_url(fwfile):
199+
self._fw_files[fwfile]["data"] = firmware
200+
hf = hashlib.sha256(firmware)
201+
if sz % BLOCKLEN != 0:
202+
self._fw_files[fwfile]["n_blocks"] = _n_blocks = (sz // BLOCKLEN) + 1
203+
204+
hf.update(b"\xff" * ((_n_blocks * BLOCKLEN) - sz))
205+
else:
206+
self._fw_files[fwfile]["n_blocks"] = _n_blocks = sz // BLOCKLEN
207+
self.check_sha[fwfile] = self._fw_files[fwfile]["sha"] = hexlify(
208+
hf.digest()
209+
).decode()
210+
211+
async def async_update_sha(self, fwfile):
212+
if os.path.exists(fwfile):
213+
with open(fwfile, "rb") as fwr:
214+
firmware = fwr.read()
215+
elif is_url(fwfile):
216+
try:
217+
firmware = await self.aiohttp_get(fwfile)
218+
except Exception:
219+
self.log.error(f"Firmware: {fwfile} not available")
220+
return
221+
else:
222+
self.log.error(f"Firmware: {fwfile} not available")
223+
224+
return
225+
175226
self._fw_files[fwfile]["sz"] = sz = len(firmware)
227+
228+
if is_url(fwfile):
229+
self._fw_files[fwfile]["data"] = firmware
176230
hf = hashlib.sha256(firmware)
177231
if sz % BLOCKLEN != 0:
178232
self._fw_files[fwfile]["n_blocks"] = _n_blocks = (sz // BLOCKLEN) + 1
@@ -184,6 +238,22 @@ def update_sha(self, fwfile):
184238
hf.digest()
185239
).decode()
186240

241+
def http_get(self, fwfile):
242+
resp = urllib.request.urlopen(fwfile)
243+
if resp.status == 200:
244+
firmware = resp.read()
245+
resp.close()
246+
return firmware
247+
else:
248+
raise ValueError
249+
250+
async def aiohttp_get(self, fwfile):
251+
async with aiohttp.ClientSession() as client:
252+
async with client.get(fwfile) as resp:
253+
assert resp.status == 200
254+
firmware = await resp.read()
255+
return firmware
256+
187257
def find_localip(self):
188258
ip_soc = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
189259
ip_soc.connect(("8.8.8.8", 1))
@@ -363,13 +433,27 @@ async def do_async_ota(self, addr, reader, writer):
363433

364434
sz = self._fw_files[fwfile]["sz"]
365435
if not self._bg:
366-
tqdm.write(f"{os.path.relpath(fwfile)} [{sz / 1000:.2f} kB]")
436+
if not is_url(fwfile):
437+
tqdm.write(f"{os.path.relpath(fwfile)} [{sz / 1000:.2f} kB]")
438+
else:
439+
tqdm.write(f"{fwfile} [{sz / 1000:.2f} kB]")
367440
cnt = 0
368441
data = await reader.read(2)
369442
assert data == b"OK"
370443
_pb_index = self.get_pos_pgb()
371444
buff = b""
372445
t_start = time.time()
446+
tf = None
447+
448+
if is_url(fwfile):
449+
# url_path = urlparse(fwfile).path
450+
url_fwf = fwfile
451+
tf = tempfile.NamedTemporaryFile(delete=False)
452+
tf.write(self._fw_files[fwfile]["data"])
453+
tf.seek(0)
454+
fwfile = tf.name
455+
tf.close()
456+
373457
with open(fwfile, "rb") as fmwf:
374458
with cstqdm(
375459
desc=_dev,
@@ -417,7 +501,10 @@ async def do_async_ota(self, addr, reader, writer):
417501
pass
418502

419503
tqdm.write("")
420-
tqdm.write(f"{os.path.basename(fwfile)} @ {_dev} [ \033[92mOK\x1b[0m ] ")
504+
if tf is None:
505+
tqdm.write(f"{os.path.basename(fwfile)} @ {_dev} [ \033[92mOK\x1b[0m ] ")
506+
else:
507+
tqdm.write(f"{url_fwf} @ {_dev} [ \033[92mOK\x1b[0m ] ")
421508
do_pg_bar(
422509
tqdm.write,
423510
loop_index,
@@ -432,6 +519,9 @@ async def do_async_ota(self, addr, reader, writer):
432519
)
433520
tqdm.write("\n")
434521
self._busy_pos.remove(_pb_index)
522+
# Remove temp file
523+
if tf and os.path.exists(tf.name):
524+
os.remove(tf.name)
435525
if self._async:
436526
while True:
437527
data = await reader.read(2)

cli/bin/asyncmd

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ from asyncmd.status import get_status
2020
import yaml
2121
import requests
2222
import time
23+
from urllib.parse import urlparse
2324

2425
_CONFIG_DIR = os.path.join(os.environ["HOME"], ".asyncmd")
2526
_CONFIG_FILE = os.path.join(_CONFIG_DIR, "asyncmd.config")
@@ -300,6 +301,14 @@ fh_err.setFormatter(fmt_err)
300301
log.addHandler(fh_err)
301302

302303

304+
def is_url(url):
305+
try:
306+
result = urlparse(url)
307+
return all([result.scheme, result.netloc])
308+
except ValueError:
309+
return False
310+
311+
303312
def get_sha256(file):
304313
try:
305314
with open(file, "rb") as fwf:
@@ -342,7 +351,9 @@ async def _ota(args, log, tls_params=None, ota_tls_params={}):
342351
async def async_ota(args, log, tls_params=None, ota_tls_params={}):
343352
tasks = set()
344353
interval = 5
354+
url_files = [fwfile for fwfile in args.ff if is_url(fwfile)]
345355
args.ff = [os.path.abspath(fwfile) for fwfile in args.ff if os.path.exists(fwfile)]
356+
args.ff += url_files
346357

347358
while True:
348359
try:
@@ -365,9 +376,13 @@ async def async_ota(args, log, tls_params=None, ota_tls_params={}):
365376
otaserv_sha_task = asyncio.create_task(
366377
ota_sha_check_listen(args, log, ota_server, tls_params)
367378
)
379+
check_http_sha_task = asyncio.create_task(
380+
check_http_server_fwfiles(args, log, tls_params, ota_server)
381+
)
368382
tasks.add(ota_task)
369383
tasks.add(check_sha_task)
370384
tasks.add(otaserv_sha_task)
385+
tasks.add(check_http_sha_task)
371386
# add ota_check listener
372387
await asyncio.gather(*tasks)
373388
except aiomqtt.MqttError:
@@ -417,20 +432,42 @@ async def ota_sha_check_listen(args, log, server, tls_params=None):
417432
sha_file["fwfile"] = args.ff[0]
418433

419434
if sha_file["fwfile"] not in server.check_sha:
420-
log.error(
421-
f"OTA request from {devname}: FILE "
422-
f": {sha_file['fwfile']} not available"
423-
)
424-
continue
435+
if not any(
436+
[
437+
fwfile.endswith(sha_file["fwfile"].replace("*", ""))
438+
for fwfile in server.check_sha
439+
]
440+
):
441+
log.error(
442+
f"OTA request from {devname}: FILE "
443+
f": {sha_file['fwfile']} not available"
444+
)
445+
continue
425446

426-
log.info(
427-
f"OTA request from {devname}: FILE: "
428-
f"{os.path.relpath(sha_file['fwfile'])}"
429-
)
447+
if not is_url(sha_file["fwfile"]) and not sha_file["fwfile"].startswith(
448+
"*"
449+
):
450+
log.info(
451+
f"OTA request from {devname}: FILE: "
452+
f"{os.path.relpath(sha_file['fwfile'])}"
453+
)
454+
else:
455+
log.info(
456+
f"OTA request from {devname}: FILE: " f"{sha_file['fwfile']}"
457+
)
430458
log.info(f"OTA request from {devname}: SHA: {sha_file['sha']}")
431459

432460
# get device from topic
433461
# publish to device/{name}/ota
462+
# match sha_file
463+
if sha_file["fwfile"].startswith("*"):
464+
_matchs = [
465+
fwfile
466+
for fwfile in server.check_sha
467+
if fwfile.endswith(sha_file["fwfile"].replace("*", ""))
468+
]
469+
if _matchs:
470+
sha_file["fwfile"] = _matchs[0]
434471

435472
server.register_device(devname, sha_file)
436473

@@ -501,7 +538,7 @@ async def check_fw_sha(args, log, client, server):
501538
log.info("New or modified firmware file detected")
502539
log.info(f"{os.path.relpath(fwfile)}: SHA256: {fw_sha[fwfile]}")
503540
log.info("Sending notification...")
504-
server.update_sha(fwfile) # fix this
541+
await server.async_update_sha(fwfile) # fix this
505542
await client.publish(
506543
args.t,
507544
payload=json.dumps(
@@ -523,6 +560,45 @@ async def check_fw_sha(args, log, client, server):
523560
await asyncio.sleep(5)
524561

525562

563+
async def check_http_server_fwfiles(args, log, tls_params, server):
564+
sub_topic = "asyncmd/otacheck"
565+
566+
fw_sha = {fwfile: "" for fwfile in args.ff if is_url(fwfile)}
567+
568+
async with aiomqtt.Client(
569+
hostname=args.ht, port=args.p, logger=log, tls_params=tls_params
570+
) as client:
571+
await client.subscribe(sub_topic)
572+
async with client.messages() as messages:
573+
async for message in messages:
574+
try:
575+
fw_sha_server = json.loads(message.payload.decode())
576+
for fwfile in fw_sha_server:
577+
if fw_sha.get(fwfile) != fw_sha_server.get(fwfile):
578+
fw_sha[fwfile] = fw_sha_server.get(fwfile)
579+
580+
log.info("New or modified firmware file detected")
581+
log.info(f"{fwfile}: SHA256: {fw_sha[fwfile]}")
582+
log.info("Sending notification...")
583+
await server.async_update_sha(fwfile) # fix this
584+
await client.publish(
585+
args.t,
586+
payload=json.dumps(
587+
{
588+
"host": server.host,
589+
"port": server.port,
590+
"sha": server.check_sha[fwfile],
591+
"blocks": server._fw_files[fwfile]["n_blocks"],
592+
"bg": server._bg,
593+
"fwfile": fwfile,
594+
}
595+
),
596+
)
597+
598+
except Exception as e:
599+
log.error(e)
600+
601+
526602
async def check_device_sha(args, log, tls_params):
527603
pub_topic = f"device/{args.d}/ota"
528604
_msg = "check"

0 commit comments

Comments
 (0)