hanze/muizenval

remote.py in master
Repositories | Summary | Log | Files

remote.py (3977B) download


  1from threading import Thread
  2from time import sleep
  3import tkinter as tk
  4from tkinter import Button, OptionMenu, StringVar, Tk, Label
  5from http.client import HTTPConnection
  6from typing import Optional
  7
  8from remote import Remote
  9
 10import json
 11import sys
 12import asyncio
 13import websockets
 14
 15
 16WEBSOCKET_PORT = 1612
 17host, port = 'localhost', 5000
 18
 19remote = Remote(115200)
 20token: Optional[str] = None
 21
 22
 23@remote.command("set_token")
 24def set_token(req):
 25    global token
 26    token = req['token']
 27
 28
 29@remote.command("send")
 30def send_http(params):
 31    method, endpoint, body = params["method"], params["endpoint"], params["body"]
 32
 33    print(body)
 34
 35    client = HTTPConnection(host, port)
 36    client.request(method, endpoint, json.dumps(body))
 37    res = client.getresponse()
 38    response = json.load(res)
 39
 40    print(response)
 41
 42    return dict(code=res.status, body=response)
 43
 44
 45token = 'abcdefghijklmnoq'
 46
 47
 48async def websocket_handler(ws, _):
 49    if await ws.recv() == 'token':
 50        if token:
 51            await ws.send(token)
 52    await ws.close()
 53
 54
 55class RemoteWindow(Tk):
 56    running = False
 57    closed = False
 58    disconnecting = False
 59
 60    def __init__(self):
 61        super().__init__()
 62
 63        self.title('Team Benni - Remote')
 64        self.geometry('500x100')
 65        self.protocol("WM_DELETE_WINDOW", self.on_close)
 66
 67        self.columnconfigure(0, weight=1)
 68        self.columnconfigure(1, weight=3)
 69#        self.columnconfigure(2, weight=3)
 70
 71        self.devices = Remote.list_ports()
 72        self.device_names = [
 73            f'{p.name} ({p.description})' for p in self.devices]
 74
 75        self.dev_var = StringVar(self, self.device_names[0])
 76
 77        self.label = Label(self, text='Not connected')
 78        self.label['anchor'] = tk.CENTER
 79        self.label.grid(column=0, row=0, sticky=tk.W,
 80                        padx=5, pady=5, columnspan=2)
 81
 82        self.dev_label = Label(self, text='Device:')
 83        self.dev_label.grid(column=0, row=1, sticky=tk.E, padx=5, pady=5)
 84
 85        self.dev_menu = OptionMenu(self, self.dev_var, *self.device_names)
 86        self.dev_menu.grid(column=1, row=1, sticky=tk.E, padx=5, pady=5)
 87
 88        self.connect_button = Button(
 89            self, text="Connect", command=self.on_connect)
 90        self.connect_button.grid(column=1, row=3, sticky=tk.E, padx=5, pady=5)
 91
 92    async def run_websocket(self):
 93        async with websockets.serve(websocket_handler, '0.0.0.0',  # type: ignore
 94                                    WEBSOCKET_PORT):
 95            while self.running:
 96                await asyncio.sleep(1)
 97
 98    def on_connect(self):
 99        if self.disconnecting:
100            return
101        self.running = not self.running
102        if self.running:
103            port = self.devices[self.device_names.index(self.dev_var.get())]
104
105            self.websocket_thread = Thread(
106                target=lambda: asyncio.run(self.run_websocket()))
107            self.remote_thread = Thread(
108                target=lambda: remote.run(port.device))
109
110            self.websocket_thread.start()
111            self.remote_thread.start()
112
113            self.label['text'] = f'Connected to {port.name}'
114            if port.description != 'n/a':
115                self.label['text'] += f' ({port.description})'
116            self.connect_button['text'] = 'Disconnect'
117        else:
118            remote.stop()
119            self.disconnecting = True
120
121            self.connect_button['text'] = 'Disconnecting...'
122
123    def on_close(self):
124        if self.running:
125            self.on_connect()
126
127        self.closed = True
128
129    def run(self):
130        while not self.closed or self.disconnecting:
131            if self.disconnecting and not self.remote_thread.is_alive() and not self.websocket_thread.is_alive():
132                self.label['text'] = 'Not connected'
133                self.connect_button['text'] = 'Connect'
134                self.disconnecting = False
135
136            sleep(0.1)
137            self.update()
138
139
140if __name__ == "__main__":
141    win = RemoteWindow()
142    win.run()