feat: implement controller interface

This commit is contained in:
2025-10-22 23:57:23 +02:00
parent bf1935fd7e
commit f91a4e8d61
2 changed files with 204 additions and 0 deletions

23
scripts/recorder.py Normal file
View File

@@ -0,0 +1,23 @@
from PyQt6.QtWidgets import QApplication
from src.recorder import RecorderWindow
def main():
import sys
def except_hook(cls, exception, traceback):
sys.__excepthook__(cls, exception, traceback)
sys.excepthook = except_hook
app = QApplication(sys.argv)
window = RecorderWindow("localhost", 5000)
app.aboutToQuit.connect(window.shutdown)
window.show()
app.exec()
if __name__ == "__main__":
main()

181
src/recorder.py Normal file
View File

@@ -0,0 +1,181 @@
import socket
import struct
from PyQt6 import uic
from PyQt6.QtCore import QObject, Qt, QThread, QTimer, pyqtSignal, pyqtSlot
from PyQt6.QtWidgets import QMainWindow
from src.command import CarControl, Command, ControlCommand
from src.recorder_ui import Ui_Recorder
from src.snapshot import Snapshot
class RecorderClient(QObject):
DATA_CHUNK_SIZE = 4096
data_received: pyqtSignal = pyqtSignal(Snapshot)
def __init__(self, host: str, port: int) -> None:
super().__init__()
self.host: str = host
self.port: int = port
self.socket: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.timer: QTimer = QTimer(self)
self.timer.timeout.connect(self.poll_socket)
self.connected: bool = False
@pyqtSlot()
def start(self):
self.socket.connect((self.host, self.port))
self.socket.setblocking(False)
self.connected = True
self.timer.start(50)
print(f"Connected to server")
def poll_socket(self):
buffer: bytes = b""
if not self.connected:
return
try:
chunk: bytes = self.socket.recv(self.DATA_CHUNK_SIZE)
if not chunk:
return
buffer += chunk
while True:
if len(buffer) < 4:
break
msg_len: int = struct.unpack(">I", buffer[:4])[0]
msg_end: int = 4 + msg_len
if len(buffer) < msg_end:
break
message: bytes = buffer[4:msg_end]
buffer = buffer[msg_end:]
self.on_message(message)
except BlockingIOError:
pass
except Exception as e:
print(f"Socket error: {e}")
self.shutdown()
def on_message(self, message: bytes):
snapshot: Snapshot = Snapshot.unpack(message)
self.data_received.emit(snapshot)
@pyqtSlot(object)
def send_command(self, command):
if self.connected:
try:
payload: bytes = command.pack()
self.socket.sendall(struct.pack(">I", len(payload)) + payload)
except Exception as e:
print(f"An exception occured: {e}")
self.shutdown()
else:
print("Not connected")
@pyqtSlot()
def shutdown(self):
print("Shutting down client")
self.timer.stop()
self.connected = False
self.socket.close()
class RecorderWindow(Ui_Recorder, QMainWindow):
close_signal: pyqtSignal = pyqtSignal()
send_signal: pyqtSignal = pyqtSignal(object)
def __init__(self, host: str, port: int) -> None:
super().__init__()
self.host: str = host
self.port: int = port
self.client_thread: QThread = QThread()
self.client: RecorderClient = RecorderClient(self.host, self.port)
self.client.data_received.connect(self.on_snapshot_received)
self.client.moveToThread(self.client_thread)
self.client_thread.started.connect(self.client.start)
self.close_signal.connect(self.client.shutdown)
self.send_signal.connect(self.client.send_command)
uic.load_ui.loadUi("src/recorder.ui", self)
self.command_directions = {
"w": CarControl.FORWARD,
"s": CarControl.BACKWARD,
"d": CarControl.RIGHT,
"a": CarControl.LEFT,
}
self.forwardButton.pressed.connect(
lambda: self.on_car_controlled(CarControl.FORWARD, True)
)
self.forwardButton.released.connect(
lambda: self.on_car_controlled(CarControl.FORWARD, False)
)
self.backwardButton.pressed.connect(
lambda: self.on_car_controlled(CarControl.BACKWARD, True)
)
self.backwardButton.released.connect(
lambda: self.on_car_controlled(CarControl.BACKWARD, False)
)
self.rightButton.pressed.connect(
lambda: self.on_car_controlled(CarControl.RIGHT, True)
)
self.rightButton.released.connect(
lambda: self.on_car_controlled(CarControl.RIGHT, False)
)
self.leftButton.pressed.connect(
lambda: self.on_car_controlled(CarControl.LEFT, True)
)
self.leftButton.released.connect(
lambda: self.on_car_controlled(CarControl.LEFT, False)
)
self.recordDataButton.clicked.connect(self.toggle_record)
self.resetButton.clicked.connect(self.rollback)
self.autopiloting = False
self.autopilotButton.clicked.connect(self.toggle_autopilot)
self.saveRecordButton.clicked.connect(self.save_record)
self.recording = False
self.recorded_data = []
self.client_thread.start()
def on_car_controlled(self, control: CarControl, active: bool):
self.send_command(ControlCommand(control, active))
def toggle_record(self):
pass
def rollback(self):
pass
def toggle_autopilot(self):
self.autopiloting = not self.autopiloting
self.autopilotButton.setText(
"AutoPilot:\n" + ("ON" if self.autopiloting else "OFF")
)
def save_record(self):
pass
@pyqtSlot(Snapshot)
def on_snapshot_received(self, snapshot: Snapshot):
self.recorded_data.append(snapshot)
self.nbrSnapshotSaved.setText(str(len(self.recorded_data)))
def shutdown(self):
self.close_signal.emit()
def send_command(self, command: Command):
self.send_signal.emit(command)