hanze/muizenval

remote/__init__.py in master
Repositories | Summary | Log | Files

__init__.py (2388B) download


 1from datetime import datetime
 2from typing import Any, Callable, Dict, Optional
 3from serial import Serial
 4from serial.tools.list_ports import comports
 5from .exception import RemoteException
 6
 7import json
 8import traceback
 9
10
11CommandHandler = Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]
12
13
14class Remote:
15    commands: Dict[str, CommandHandler] = dict()
16    running = True
17
18    @staticmethod
19    def list_ports():
20        return comports()
21
22    def __init__(self, baud: int):
23        self.baud = baud
24
25    def command(self, name: str):
26        def inner(func: CommandHandler):
27            self.commands[name] = func
28            return func
29
30        return inner
31
32    def run(self, serial_path, timeout=1):
33        serial = Serial(port=serial_path, baudrate=self.baud, timeout=timeout)
34        self.running = True
35        while self.running:
36            command = ''
37            status = ''
38            params = None
39            response = None
40            try:
41                line = serial.readline().decode(errors='ignore')
42                try:
43                    if line == '':
44                        continue
45                    if ' ' in line:
46                        command, params_raw = line.split(' ', 1)
47                        params = json.loads(params_raw)
48                    else:
49                        command = line
50                        params = dict()
51                except json.JSONDecodeError:
52                    raise RemoteException('bad-request')
53
54                if command not in self.commands:
55                    raise RemoteException('bad-command')
56
57                res = self.commands[command](params) or dict()
58
59                status = 'ok'
60                response = json.dumps(res)
61            except RemoteException as err:
62                status = err.name
63            except KeyboardInterrupt:
64                break
65            except Exception as err:
66                print(f'Error handling {command} ({params}):')
67                traceback.print_exc()
68                status = 'unknown'
69
70            print(
71                f'{datetime.now().strftime("%d/%m/%y %H:%M:%S")} | {command} -> { status }')
72            serial.write(status.encode())
73            if response is not None:
74                serial.write(b' ' + response.encode())
75            serial.write(b'\n')
76        serial.close()
77
78    def stop(self):
79        self.running = False