From e9fd3f181ee496421be6e9d7dc69c366fb7b4cfb Mon Sep 17 00:00:00 2001 From: Stanislav Medvedenko Date: Tue, 5 Dec 2023 19:22:47 +0500 Subject: [PATCH 1/4] Add message queue --- src/feecc_workbench/ipfs.py | 5 +- src/feecc_workbench/rabbit_queue.py | 74 +++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 src/feecc_workbench/rabbit_queue.py diff --git a/src/feecc_workbench/ipfs.py b/src/feecc_workbench/ipfs.py index 8a190c4..a194662 100644 --- a/src/feecc_workbench/ipfs.py +++ b/src/feecc_workbench/ipfs.py @@ -1,11 +1,12 @@ import pathlib - +import asyncio import httpx from loguru import logger from .config import CONFIG from .Messenger import messenger from .translation import translation +from .rabbit_queue import rabbit_queue from .utils import async_time_execution, get_headers, service_is_up IPFS_GATEWAY_ADDRESS: str = CONFIG.ipfs_gateway.ipfs_server_uri @@ -36,6 +37,8 @@ async def publish_file(rfid_card_id: str, file_path: pathlib.Path) -> tuple[str, response = await client.post(url="/by-path", headers=headers, json=json) if response.is_error: + message=[file_path,headers,base_url,files] + asyncio.run(rabbit_queue(message)) messenger.error(translation('ErrorIPFS') +" "+ response.json().get('detail', '')) raise httpx.RequestError(response.json().get("detail", "")) diff --git a/src/feecc_workbench/rabbit_queue.py b/src/feecc_workbench/rabbit_queue.py new file mode 100644 index 0000000..44b13b6 --- /dev/null +++ b/src/feecc_workbench/rabbit_queue.py @@ -0,0 +1,74 @@ +import asyncio +import httpx +from aio_pika import Message, connect +from aio_pika import connect +from aio_pika.abc import AbstractIncomingMessage + +n=0 +file_path = '' +headers = '' +base_url = '' +files = '' + +async def send_message_to_queue(message) -> None: + # Perform connection + connection = await connect("amqp://guest:guest@localhost/") + + async with connection: + # Creating a channel + channel = await connection.channel() + + # Declaring queue + queue = await channel.declare_queue("ipfs_message") + + # Sending the message + await channel.default_exchange.publish( + Message(message.encode()), + routing_key=queue.name, + ) + + #print(f" [x] Sent {message}") + +async def send_message_to_client(file_path,headers,base_url,files): + async with httpx.AsyncClient(base_url=base_url, timeout=None) as client: + if file_path.exists(): + httpx.Response = await client.post(url="/upload-file", headers=headers, files=files) + else: + json = {"absolute_path": str(file_path)} + await client.post(url="/by-path", headers=headers, json=json) + +async def callback(message: AbstractIncomingMessage): + global n, headers,files,base_url,file_path + + message_from_queue = message.body.decode("utf-8") + match n: + case 0: file_path = message_from_queue + case 1: headers = message_from_queue + case 2: base_url = message_from_queue + case 3: files = message_from_queue + n += 1 + if n > 3: + n = 0 + send_message_to_client(file_path,headers,base_url,files) + +async def message_receiv_from_queue() -> None: + # Perform connection + connection = await connect("amqp://guest:guest@localhost/") + async with connection: + # Creating a channel + channel = await connection.channel() + + # Declaring queue + queue = await channel.declare_queue("ipfs_message") + + # Start listening the queue + await queue.consume(callback, no_ack=True) + + #print(" [*] Waiting for messages. To exit press CTRL+C") + await asyncio.Future() + +async def rabbit_queue(_file_path,_headers,_base_url,_files): + message = [_file_path,_headers,_base_url,_files] + for mess in message: + await asyncio.create_task(send_message_to_queue(mess)) + asyncio.create_task(message_receiv_from_queue()) From 5a2d429017676a0f21b1d011329e9ecefaf4ceef Mon Sep 17 00:00:00 2001 From: Stanislav Medvedenko Date: Thu, 7 Dec 2023 19:20:41 +0500 Subject: [PATCH 2/4] Change call function --- src/feecc_workbench/rabbit_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/feecc_workbench/rabbit_queue.py b/src/feecc_workbench/rabbit_queue.py index 44b13b6..998093a 100644 --- a/src/feecc_workbench/rabbit_queue.py +++ b/src/feecc_workbench/rabbit_queue.py @@ -67,8 +67,8 @@ async def message_receiv_from_queue() -> None: #print(" [*] Waiting for messages. To exit press CTRL+C") await asyncio.Future() -async def rabbit_queue(_file_path,_headers,_base_url,_files): - message = [_file_path,_headers,_base_url,_files] +async def rabbit_queue(message): + #message = [_file_path,_headers,_base_url,_files] for mess in message: await asyncio.create_task(send_message_to_queue(mess)) asyncio.create_task(message_receiv_from_queue()) From 0a94e242543f6ea1fcea148951bc18944d7fe359 Mon Sep 17 00:00:00 2001 From: Stanislav Medvedenko Date: Sun, 10 Dec 2023 13:27:13 +0500 Subject: [PATCH 3/4] Change ipfs and rabbit --- src/feecc_workbench/ipfs.py | 2 +- src/feecc_workbench/rabbit_queue.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/feecc_workbench/ipfs.py b/src/feecc_workbench/ipfs.py index a194662..ec1f6fc 100644 --- a/src/feecc_workbench/ipfs.py +++ b/src/feecc_workbench/ipfs.py @@ -38,7 +38,7 @@ async def publish_file(rfid_card_id: str, file_path: pathlib.Path) -> tuple[str, if response.is_error: message=[file_path,headers,base_url,files] - asyncio.run(rabbit_queue(message)) + await rabbit_queue(message) messenger.error(translation('ErrorIPFS') +" "+ response.json().get('detail', '')) raise httpx.RequestError(response.json().get("detail", "")) diff --git a/src/feecc_workbench/rabbit_queue.py b/src/feecc_workbench/rabbit_queue.py index 998093a..cd07524 100644 --- a/src/feecc_workbench/rabbit_queue.py +++ b/src/feecc_workbench/rabbit_queue.py @@ -65,10 +65,12 @@ async def message_receiv_from_queue() -> None: await queue.consume(callback, no_ack=True) #print(" [*] Waiting for messages. To exit press CTRL+C") - await asyncio.Future() + asyncio.Future() + + await channel.close() async def rabbit_queue(message): #message = [_file_path,_headers,_base_url,_files] for mess in message: await asyncio.create_task(send_message_to_queue(mess)) - asyncio.create_task(message_receiv_from_queue()) + await asyncio.create_task(message_receiv_from_queue()) From f054b872658501a0386e820911cf1854c7d7b134 Mon Sep 17 00:00:00 2001 From: Stanislav Medvedenko Date: Sun, 10 Dec 2023 13:29:24 +0500 Subject: [PATCH 4/4] Add rabbit module and aio_pika --- docker-compose.yml | 39 ++++++++++++++++++++++++++++++++++----- pyproject.toml | 1 + 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 267f071..968554d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,11 @@ -version: "3" +version: "3.9" services: feecc_workbench_daemon: environment: # Use these environment variables to configure your deployment - LANGUAGE_MESSAGE: "ru" - MONGODB_URI: "" # Your MongoDB connection URI - MONGODB_DB_NAME: "" # Your MongoDB DB name + LANGUAGE_MESSAGE: "ru" # Choose message language + MONGODB_URI: "mongodb://root:pass@localhost:27017/?authMechanism=DEFAULT" # Your MongoDB connection URI + MONGODB_DB_NAME: "FEECC-Academy" # Your MongoDB DB name ROBONOMICS_ENABLE_DATALOG: false # Whether to enable datalog posting or not ROBONOMICS_ACCOUNT_SEED: "" # Your Robonomics network account seed phrase ROBONOMICS_SUBSTRATE_NODE_URI: "" # Robonomics network node URI @@ -21,7 +21,7 @@ services: CAMERA_ENABLE: false # Whether to enable Cameraman or not CAMERA_FFMPEG_COMMAND: "" WORKBENCH_NUMBER: 1 # Workbench number - HID_DEVICES_RFID_READER: "" # RFID reader device name + HID_DEVICES_RFID_READER: "Sample RFID Scanner" # RFID reader device name HID_DEVICES_BARCODE_READER: "" # Barcode reader device name build: context: ./ @@ -35,3 +35,32 @@ services: - "./workbench.pem:/src/workbench.pem:ro" network_mode: host restart: always + + hid-emulator: + image: nyurik/alpine-python3-requests + container_name: feecc_academy_hid_emulator + network_mode: host + build: + context: hid-emulator + dockerfile: Dockerfile + + mongodb: + image: mongo:jammy + container_name: feecc_academy_mongoDB + network_mode: host + environment: + MONGO_INITDB_DATABASE: FEECC-Academy + MONGO_INITDB_ROOT_USERNAME: root + MONGO_INITDB_ROOT_PASSWORD: pass + volumes: + - ./mongodb-local:/docker-entrypoint-initdb.d:ro + + rabbitmq: + image: rabbitmq:3.12-management + netwok_mode: host + ports: + - 5672:5672 + - 15672:15672 + restart: always + + diff --git a/pyproject.toml b/pyproject.toml index 41988dd..b3fd220 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ robonomics-interface = "^1.2.2" ecs-logging = "^2.0.0" aioprometheus = "^22.5.0" pycups = "^2.0.1" +aio-pika = "^9.3.1" [tool.poetry.dev-dependencies] mypy = "^0.971"