Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
*.py
.idea/
.vscode/
__pycache__
dist/
script/*.pcap
Expand Down
219 changes: 153 additions & 66 deletions aiocomfoconnect/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from __future__ import annotations

import asyncio
import itertools
import logging
import struct
from asyncio import StreamReader, StreamWriter
from typing import Awaitable
from typing import Awaitable, Callable, Dict, Iterator, Optional, Set

from google.protobuf.message import DecodeError
from google.protobuf.message import Message as ProtobufMessage
Expand Down Expand Up @@ -39,141 +40,219 @@ class EventBus:
"""An event bus for async replies."""

def __init__(self):
self.listeners = {}
self._listeners: Dict[int, Set[asyncio.Future]] = {}

def add_listener(self, event_name, future):
@property
def listeners(self) -> Dict[int, Set[asyncio.Future]]:
"""Expose listeners for diagnostic purposes (primarily tests)."""
return self._listeners

def add_listener(self, event_name: int, future: asyncio.Future):
"""Add a listener to the event bus."""
_LOGGER.debug("Adding listener for event %s", event_name)
if not self.listeners.get(event_name, None):
self.listeners[event_name] = {future}
else:
self.listeners[event_name].add(future)
self._listeners.setdefault(event_name, set()).add(future)

def emit(self, event_name, event):
def emit(self, event_name: int, event):
"""Emit an event to the event bus."""
_LOGGER.debug("Emitting for event %s", event_name)
futures = self.listeners.get(event_name, [])
futures = self._listeners.pop(event_name, set())
for future in futures:
if future.done():
continue
if isinstance(event, Exception):
future.set_exception(event)
else:
future.set_result(event)
del self.listeners[event_name]

def fail_all(self, exc: Exception):
"""Fail all pending listeners with the provided exception."""
pending = list(self._listeners.values())
self._listeners.clear()
for futures in pending:
for future in futures:
if future.done():
continue
future.set_exception(exc)


class Bridge:
"""ComfoConnect LAN C API."""

PORT = 56747

def __init__(self, host: str, uuid: str, loop=None):
def __init__(self, host: str, uuid: str, loop: Optional[asyncio.AbstractEventLoop] = None):
self.host: str = host
self.uuid: str = uuid
self._local_uuid: str = None
self._local_uuid: Optional[str] = None

self._reader: StreamReader = None
self._writer: StreamWriter = None
self._reference = None
self._reader: Optional[StreamReader] = None
self._writer: Optional[StreamWriter] = None
self._reference: Optional[Iterator[int]] = None

self._event_bus: EventBus = None
self._event_bus: Optional[EventBus] = None

self.__sensor_callback_fn: callable = None
self.__alarm_callback_fn: callable = None
self.__sensor_callback_fn: Optional[Callable[[int, int], None]] = None
self.__alarm_callback_fn: Optional[Callable[[int, ProtobufMessage], None]] = None

self._loop = loop or asyncio.get_running_loop()
self._loop: Optional[asyncio.AbstractEventLoop] = loop
self._read_task = None

def __repr__(self):
return f"<Bridge {self.host}, UID={self.uuid}>"

def set_sensor_callback(self, callback: callable):
def set_sensor_callback(self, callback: Optional[Callable[[int, int], None]]):
"""Set a callback to be called when a message is received."""
self.__sensor_callback_fn = callback

def set_alarm_callback(self, callback: callable):
def set_alarm_callback(self, callback: Optional[Callable[[int, ProtobufMessage], None]]):
"""Set a callback to be called when an alarm is received."""
self.__alarm_callback_fn = callback

async def _connect(self, uuid: str):
"""Connect to the bridge."""
async def connect(self, uuid: str):
"""Connect to the bridge and start reading messages."""
if self.is_connected():
_LOGGER.warning("Already connected to bridge %s", self.host)
return

# Get the running loop if not provided
if self._loop is None:
self._loop = asyncio.get_running_loop()

_LOGGER.debug("Connecting to bridge %s", self.host)
try:
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.PORT), TIMEOUT)
except asyncio.TimeoutError as exc:
_LOGGER.warning("Timeout while connecting to bridge %s", self.host)
raise AioComfoConnectTimeout("Timeout while connecting to bridge") from exc

self._reference = 1
self._reference = itertools.count(1)
self._local_uuid = uuid
self._event_bus = EventBus()

async def _read_messages():
while True:
try:
# Keep processing messages until we are disconnected or shutting down
await self._process_message()
# Start background task to read messages
self._read_task = self._loop.create_task(self._read_messages())
_LOGGER.debug("Connected to bridge %s", self.host)

except asyncio.exceptions.CancelledError:
# We are shutting down. Return to stop the background task
return False
async def _read_messages(self):
"""Read messages from the bridge until disconnected or cancelled."""
try:
while True:
await self._process_message()
except asyncio.CancelledError:
_LOGGER.debug("Message reading cancelled")
raise
except AioComfoConnectNotConnected as exc:
_LOGGER.info("Disconnected from bridge")
self._notify_pending_futures(exc)
raise
except Exception as exc:
_LOGGER.error("Unexpected error reading messages: %s", exc, exc_info=True)
self._notify_pending_futures(AioComfoConnectNotConnected("Unexpected error during read"))
raise

def _notify_pending_futures(self, exc: Exception):
"""Fail all pending listeners so callers do not hang."""
if self._event_bus is None:
return
self._event_bus.fail_all(exc)

async def disconnect(self):
"""Disconnect from the bridge."""
if not self.is_connected():
return

except AioComfoConnectNotConnected as exc:
# We have been disconnected
raise AioComfoConnectNotConnected("We have been disconnected") from exc
_LOGGER.debug("Disconnecting from bridge %s", self.host)

read_task = self._loop.create_task(_read_messages())
_LOGGER.debug("Connected to bridge %s", self.host)
# Cancel the read task
if self._read_task and not self._read_task.done():
self._read_task.cancel()
try:
await self._read_task
except asyncio.CancelledError:
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except asyncio.CancelledError:
except asyncio.CancelledError:
# Task cancellation is expected during disconnect; ignore.

Copilot uses AI. Check for mistakes.
pass

return read_task
self._notify_pending_futures(AioComfoConnectNotConnected("Disconnected"))

async def _disconnect(self):
"""Disconnect from the bridge."""
# Close the connection
if self._writer:
self._writer.close()
await self._writer.wait_closed()

# Clear state
self._reader = None
self._writer = None
self._read_task = None
self._event_bus = None
self._reference = None

def is_connected(self) -> bool:
"""Returns True if the bridge is connected."""
return self._writer is not None and not self._writer.is_closing()

async def _send(self, request, request_type, params: dict = None, reply: bool = True) -> Message:
"""Sends a command and wait for a response if the request is known to return a result."""
# Check if we are actually connected
async def _send(self, request, request_type, params: dict = None, reply: bool = True, timeout: float = None) -> Message:
"""Sends a command and wait for a response if the request is known to return a result.

Supports concurrent requests through atomic reference allocation and lock-free sending.
Multiple requests can be in-flight simultaneously, improving throughput.
"""
if not self.is_connected():
raise AioComfoConnectNotConnected
raise AioComfoConnectNotConnected("Not connected to bridge")

if timeout is None:
timeout = TIMEOUT

if self._loop is None:
self._loop = asyncio.get_running_loop()

# Construct the message
if not self.is_connected() or self._writer is None or self._reference is None:
raise AioComfoConnectNotConnected("Not connected to bridge")

# Allocate reference atomically (thread-safe)
reference = next(self._reference)

# Build command and message
cmd = zehnder_pb2.GatewayOperation() # pylint: disable=no-member
cmd.type = request_type
cmd.reference = self._reference
cmd.reference = reference

msg = request()
if params is not None:
for param in params:
if params[param] is not None:
setattr(msg, param, params[param])
for param, value in params.items():
if value is not None:
setattr(msg, param, value)

message = Message(cmd, msg, self._local_uuid, self.uuid)

# Create the future that will contain the response
fut = asyncio.Future()
# Create and register future BEFORE sending to avoid race condition
# where response arrives before listener is registered
fut = self._loop.create_future()
if reply:
self._event_bus.add_listener(self._reference, fut)
if self._event_bus is None:
raise RuntimeError("Event bus is not initialized")
self._event_bus.add_listener(reference, fut)
else:
fut.set_result(None)

# Send the message
# Send message (no lock needed - TCP writes are serialized by the OS)
_LOGGER.debug("TX %s", message)
self._writer.write(message.encode())
await self._writer.drain()

# Increase message reference for next message
self._reference += 1

try:
return await asyncio.wait_for(fut, TIMEOUT)
self._writer.write(message.encode())
await self._writer.drain()
except (ConnectionError, OSError) as exc:
send_exc = AioComfoConnectNotConnected("Connection lost while sending")
_LOGGER.warning("Failed to send message: %s", exc)
# Clean up the registered listener on send failure
if reply and self._event_bus is not None:
self._event_bus.emit(reference, send_exc)
elif not fut.done():
fut.set_exception(send_exc)
raise send_exc from exc

# Wait for response
try:
return await asyncio.wait_for(fut, timeout)
except asyncio.TimeoutError as exc:
_LOGGER.warning("Timeout while waiting for response from bridge")
await self._disconnect()
raise AioComfoConnectTimeout("Timeout while waiting for response from bridge") from exc

async def _read(self) -> Message:
Expand Down Expand Up @@ -238,21 +317,29 @@ async def _process_message(self):

elif message.cmd.type == zehnder_pb2.GatewayOperation.CloseSessionRequestType:
_LOGGER.info("The Bridge has asked us to close the connection.")
raise AioComfoConnectNotConnected("Bridge requested connection close")

elif message.cmd.reference:
elif message.cmd.reference and self._event_bus:
# Emit to the event bus
self._event_bus.emit(message.cmd.reference, message.msg)

else:
_LOGGER.warning("Unhandled message type %s: %s", message.cmd.type, message)

except asyncio.exceptions.IncompleteReadError as exc:
except asyncio.IncompleteReadError as exc:
_LOGGER.info("The connection was closed.")
await self._disconnect()
raise AioComfoConnectNotConnected("The connection was closed.") from exc
disconnect_exc = AioComfoConnectNotConnected("The connection was closed.")
self._notify_pending_futures(disconnect_exc)
raise disconnect_exc from exc

except (ConnectionError, OSError) as exc:
_LOGGER.info("Connection error: %s", exc)
disconnect_exc = AioComfoConnectNotConnected("Connection error")
self._notify_pending_futures(disconnect_exc)
raise disconnect_exc from exc

except ComfoConnectError as exc:
if exc.message.cmd.reference:
if exc.message.cmd.reference and self._event_bus:
self._event_bus.emit(exc.message.cmd.reference, exc)

except DecodeError as exc:
Expand Down
Loading