forked from mzanders/vscp4hass
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
107 lines (88 loc) · 3.46 KB
/
__init__.py
File metadata and controls
107 lines (88 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from homeassistant.const import (EVENT_HOMEASSISTANT_STOP,
CONF_HOST,
CONF_PORT,
CONF_USERNAME,
CONF_PASSWORD,
CONF_DISCOVERY)
from .gateway import Gateway
from .light import vscpLight
from .binary_sensor import vscpBinarySensor
import homeassistant.helpers.config_validation as cv
from .const import (DOMAIN, DEFAULT_HOST, DEFAULT_PORT, GATEWAY, SCANNER, SCANNER_TASK,
SVC_PRIORITY, SVC_TYPE, SVC_CLASS, SVC_DATA)
import voluptuous as vol
from .channel import channel_reg
import asyncio
from .vscp.event import Event
import logging
logger = logging.getLogger(__name__)
"""Support for VSCP in HASS."""
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
vol.Optional(CONF_DISCOVERY, default=False): cv.boolean
}
)
},
extra=vol.ALLOW_EXTRA
)
SERVICE_SCHEMA = vol.Schema(
{
vol.Required(SVC_PRIORITY): int,
vol.Required(SVC_CLASS): int,
vol.Required(SVC_TYPE): int,
vol.Required(SVC_DATA): cv.string
}
)
async def async_do_discovery(hass, config, updater):
logger.info('Starting VSCP discovery for HASS nodes.')
conf = config.get(DOMAIN)
host = conf.get(CONF_HOST)
port = conf.get(CONF_PORT)
user = conf.get(CONF_USERNAME)
password = conf.get(CONF_PASSWORD)
scanner = Gateway(host=host, port=port, user=user, password=password)
hass.data[DOMAIN][SCANNER] = scanner
await scanner.connect()
await scanner.scan(updater)
hass.helpers.discovery.load_platform('light', DOMAIN, {}, config)
hass.helpers.discovery.load_platform('binary_sensor', DOMAIN, {}, config)
await scanner.close()
async def async_setup(hass, config):
"""controller setup code"""
conf = config.get(DOMAIN)
host = conf.get(CONF_HOST)
port = conf.get(CONF_PORT)
user = conf.get(CONF_USERNAME)
password = conf.get(CONF_PASSWORD)
hass.data[DOMAIN] = dict()
gw = Gateway(host=host, port=port, user=user, password=password)
await gw.connect()
await gw.start_update()
hass.data[DOMAIN][GATEWAY] = gw
if conf.get(CONF_DISCOVERY):
hass.data[DOMAIN][SCANNER_TASK] = asyncio.create_task(async_do_discovery(hass, config, gw))
async def on_hass_stop(event):
"""Close connection when hass stops."""
await gw.close()
if SCANNER_TASK in hass.data[DOMAIN]:
task = hass.data[DOMAIN][SCANNER_TASK]
if task is not None:
try:
task.cancel()
except asyncio.CancelledError:
pass
await task
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, on_hass_stop)
async def handle_send_event(call):
await gw.send(Event(vscp_class = call.data.get(SVC_CLASS),
vscp_type = call.data.get(SVC_TYPE),
head = call.data.get(SVC_PRIORITY)*32,
data = bytearray([int(x,0) for x in call.data.get(SVC_DATA).split(',')])))
hass.services.async_register(DOMAIN, 'send_event', handle_send_event, SERVICE_SCHEMA)
return True