|
| 1 | +from __future__ import annotations |
| 2 | +from typing import Any, Dict, Tuple |
| 3 | +from gevent import pywsgi # type: ignore |
| 4 | +from arrowhead_client.consumer import Consumer |
| 5 | +from arrowhead_client.provider import Provider |
| 6 | +from arrowhead_client.service import Service |
| 7 | +from arrowhead_client.core_services import core_service |
| 8 | +from arrowhead_client.system import ArrowheadSystem |
| 9 | +from arrowhead_client import core_service_forms as forms |
| 10 | +from arrowhead_client import core_service_responses as responses |
| 11 | + |
| 12 | + |
| 13 | +class ArrowheadApplication(): |
| 14 | + def __init__(self, |
| 15 | + system: ArrowheadSystem, |
| 16 | + consumer: Consumer, |
| 17 | + provider: Provider, |
| 18 | + logger: Any, |
| 19 | + config: Dict, |
| 20 | + server: Any = None, # type: ignore |
| 21 | + keyfile: str = '', |
| 22 | + certfile: str = '', ): |
| 23 | + self.system = system |
| 24 | + self.consumer = consumer |
| 25 | + self.provider = provider |
| 26 | + self._logger = logger |
| 27 | + self.keyfile = keyfile |
| 28 | + self.certfile = certfile |
| 29 | + self.config = config |
| 30 | + #TODO: Remove this hardcodedness |
| 31 | + self.server = pywsgi.WSGIServer((self.system.address, self.system.port), self.provider.app, |
| 32 | + keyfile=self.keyfile, certfile=self.certfile, |
| 33 | + log=self._logger) |
| 34 | + self._core_system_setup() |
| 35 | + self.add_provided_service = self.provider.add_provided_service |
| 36 | + |
| 37 | + @property |
| 38 | + def cert(self) -> Tuple[str, str]: |
| 39 | + return self.certfile, self.keyfile |
| 40 | + |
| 41 | + ''' |
| 42 | + @classmethod |
| 43 | + def from_cfg(cls, properties_file: str) -> ArrowheadHttpApplication: |
| 44 | + """ Creates a BaseArrowheadSystem from a descriptor file """ |
| 45 | +
|
| 46 | + # Parse configuration file |
| 47 | + config = configparser.ConfigParser() |
| 48 | + with open(properties_file, 'r') as properties: |
| 49 | + config.read_file(properties) |
| 50 | + config_dict = {k: v for k, v in config.items('SYSTEM')} |
| 51 | +
|
| 52 | + # Create class instance |
| 53 | + system = cls(**config_dict) |
| 54 | +
|
| 55 | + return system |
| 56 | + ''' |
| 57 | + |
| 58 | + def _core_system_setup(self) -> None: |
| 59 | + service_registry = ArrowheadSystem( |
| 60 | + 'service_registry', |
| 61 | + str(self.config['service_registry']['address']), |
| 62 | + int(self.config['service_registry']['port']), |
| 63 | + '' |
| 64 | + ) |
| 65 | + orchestrator = ArrowheadSystem( |
| 66 | + 'orchestrator', |
| 67 | + str(self.config['orchestrator']['address']), |
| 68 | + int(self.config['orchestrator']['port']), |
| 69 | + '') |
| 70 | + |
| 71 | + self._store_consumed_service(core_service('register'), service_registry, 'POST') |
| 72 | + self._store_consumed_service(core_service('unregister'), service_registry, 'DELETE') |
| 73 | + self._store_consumed_service(core_service('orchestration-service'), orchestrator, 'POST') |
| 74 | + |
| 75 | + def consume_service(self, service_definition: str, **kwargs): |
| 76 | + return self.consumer.consume_service(service_definition, **kwargs) |
| 77 | + |
| 78 | + def add_consumed_service(self, |
| 79 | + service_definition: str, |
| 80 | + http_method: str) -> None: |
| 81 | + """ Add orchestration rule for service definition """ |
| 82 | + |
| 83 | + orchestration_form = forms.OrchestrationForm(self.system.dto, service_definition) |
| 84 | + |
| 85 | + orchestration_response = self.consume_service('orchestration-service', |
| 86 | + json=orchestration_form.dto, |
| 87 | + cert=self.cert, |
| 88 | + ) |
| 89 | + #TODO: Handle orchestrator error codes |
| 90 | + |
| 91 | + orchestration_payload = orchestration_response.json() # This might change with backend |
| 92 | + |
| 93 | + (orchestrated_service, system), *_ = responses.handle_orchestration_response(orchestration_payload) |
| 94 | + |
| 95 | + #TODO: Handle response with more than 1 service |
| 96 | + # Perhaps a list of consumed services for each service definition should be stored |
| 97 | + self._store_consumed_service(orchestrated_service, system, http_method) |
| 98 | + |
| 99 | + def _store_consumed_service(self, |
| 100 | + service: Service, |
| 101 | + system: ArrowheadSystem, |
| 102 | + http_method: str) -> None: |
| 103 | + """ Register consumed services with the consumer """ |
| 104 | + |
| 105 | + self.consumer._consumed_services[service.service_definition] = (service, system, http_method) |
| 106 | + |
| 107 | + def provided_service(self, |
| 108 | + service_definition: str, |
| 109 | + service_uri: str, |
| 110 | + interface: str, |
| 111 | + method: str): |
| 112 | + def wrapped_func(func): |
| 113 | + self.provider.add_provided_service( |
| 114 | + service_definition, |
| 115 | + service_uri, |
| 116 | + interface, |
| 117 | + http_method=method, |
| 118 | + view_func=func) |
| 119 | + return func |
| 120 | + return wrapped_func |
| 121 | + |
| 122 | + def _register_service(self, service: Service): |
| 123 | + """ Registers the given service with service registry """ |
| 124 | + |
| 125 | + service_registration_form = forms.ServiceRegistrationForm( |
| 126 | + service_definition=service.service_definition, |
| 127 | + service_uri=service.service_uri, |
| 128 | + secure='CERTIFICATE', |
| 129 | + # TODO: secure should _NOT_ be hardcoded |
| 130 | + interfaces=service.interface.dto, |
| 131 | + provider_system=self.system.dto |
| 132 | + ) |
| 133 | + |
| 134 | + service_registration_response = self.consume_service( |
| 135 | + 'register', |
| 136 | + json=service_registration_form.dto, |
| 137 | + ) |
| 138 | + |
| 139 | + print(service_registration_response.status_code) |
| 140 | + # TODO: Error handling |
| 141 | + |
| 142 | + # TODO: Do logging |
| 143 | + |
| 144 | + |
| 145 | + def _register_all_services(self) -> None: |
| 146 | + """ Registers all services of the system. """ |
| 147 | + for service, _ in self.provider.provided_services.values(): |
| 148 | + self._register_service(service) |
| 149 | + |
| 150 | + |
| 151 | + def _unregister_service(self, service_definition: str) -> None: |
| 152 | + """ Unregisters the given service with the service registry. """ |
| 153 | + |
| 154 | + if service_definition not in self.provider.provided_services.keys(): |
| 155 | + raise ValueError(f'{service_definition} not provided by {self}') |
| 156 | + |
| 157 | + unregistration_payload = { |
| 158 | + 'service_definition': service_definition, |
| 159 | + 'system_name': self.system.system_name, |
| 160 | + 'address': self.system.address, |
| 161 | + 'port': self.system.port |
| 162 | + } |
| 163 | + |
| 164 | + service_unregistration_response = self.consume_service( |
| 165 | + 'unregister', |
| 166 | + params=unregistration_payload |
| 167 | + ) |
| 168 | + |
| 169 | + print(service_unregistration_response.status_code) |
| 170 | + |
| 171 | + |
| 172 | + def _unregister_all_services(self) -> None: |
| 173 | + """ Unregisters all services of the system """ |
| 174 | + |
| 175 | + for service_definition in self.provider.provided_services: |
| 176 | + self._unregister_service(service_definition) |
| 177 | + |
| 178 | + |
| 179 | + def run_forever(self) -> None: |
| 180 | + """ Start the server, publish all service, and run until interrupted. Then, unregister all services""" |
| 181 | + |
| 182 | + import warnings |
| 183 | + warnings.simplefilter('ignore') |
| 184 | + |
| 185 | + self._register_all_services() |
| 186 | + try: |
| 187 | + self._logger.info(f'Starting server') |
| 188 | + print('Started Arrowhead System') |
| 189 | + self.server.serve_forever() |
| 190 | + except KeyboardInterrupt: |
| 191 | + self._logger.info(f'Shutting down server') |
| 192 | + print('Shutting down Arrowhead system') |
| 193 | + self._unregister_all_services() |
| 194 | + finally: |
| 195 | + self._logger.info(f'Server shut down') |
| 196 | + |
| 197 | + |
| 198 | + """ |
| 199 | + def __enter__(self): |
| 200 | + '''Start server and register all services''' |
| 201 | + import warnings |
| 202 | + warnings.simplefilter('ignore') |
| 203 | +
|
| 204 | + print('Starting server') |
| 205 | + self.server.start() |
| 206 | + print('Registering services') |
| 207 | + self.register_all_services() |
| 208 | +
|
| 209 | + def __exit__(self, exc_type, exc_value, tb): |
| 210 | + '''Unregister all services and stop the server''' |
| 211 | + if exc_type != KeyboardInterrupt: |
| 212 | + print(f'Exception was raised:') |
| 213 | + print(exc_value) |
| 214 | +
|
| 215 | + print('\nSystem was stopped, unregistering services') |
| 216 | + self.unregister_all_services() |
| 217 | + print('Stopping server') |
| 218 | + self.server.stop() |
| 219 | + print('Shutdown completed') |
| 220 | +
|
| 221 | + return True |
| 222 | + """ |
| 223 | +if __name__ == '__main__': |
| 224 | + pass |
0 commit comments