from os import listdir from argparse import ArgumentParser import json import asyncio import websockets from websockets.exceptions import ConnectionClosedOK from sqlalchemy import Column, String, create_engine from sqlalchemy.orm import sessionmaker, declarative_base """ This is best served by an nginx block that should look a bit like this: location /fullnarp/ { root /home/halfnarp; index index.html index.htm; } location /fullnarp/ws/ { proxy_pass http://127.0.0.1:5009; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection 'upgrade'; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # Set keepalive timeout proxy_read_timeout 60; # Set a read timeout to prevent connection closures proxy_send_timeout 60; # Set a send timeout to prevent connection closures } When importing talks from pretalx with halfnarp2.py -i, it creates a file in var/talks_local_fullnarp which contains non-public lectures and privacy relevant speaker availibilities. It should only be served behind some auth. """ # Global placeholders for engine and session factory engine = None SessionLocal = None Base = declarative_base() class TalkPreference(Base): __tablename__ = "talk_preference" uid = Column(String, primary_key=True) public_uid = Column(String, index=True) talk_ids = Column(String) # Shared state current_version = {} newest_version = 0 current_version_lock = asyncio.Lock() # Lock for managing access to the global state clients = {} # Key: websocket, Value: {'client_id': ..., 'last_version': ...} async def bootstrap_client(websocket): """Provide lecture list and votes count to new client""" async def notify_clients(): """Notify all connected clients of the current state.""" async with current_version_lock: # Prepare a full state update message with the current version message = { "property": "fullnarp", "current_version": newest_version, "data": current_version, } # Notify each client about their relevant updates for client, info in clients.items(): try: # Send the state update await client.send(json.dumps(message)) # Update the client's last known version info["last_version"] = newest_version print("Reply: " + json.dumps(message)) except ConnectionClosedOK: # Handle disconnected clients gracefully pass async def handle_client(websocket): """Handle incoming WebSocket connections.""" # Initialize per-connection state clients[websocket] = {"client_id": id(websocket), "last_version": 0} try: # Listen for updates from the client async for message in websocket: global newest_version try: # Parse incoming message data = json.loads(message) print("Got command " + message) if data.get("action", "") == "bootstrap": print("Got bootstrap command") with open("var/talks_local_fullnarp") as data_file: talks = json.load(data_file) message = {"property": "pretalx", "data": talks} await websocket.send(json.dumps(message)) with SessionLocal() as session: preferences = session.query(TalkPreference).all() m = [] for pref in preferences: m.append(json.loads(pref.talk_ids)) message = {"property": "halfnarp", "data": m} await websocket.send(json.dumps(message)) async with current_version_lock: message = { "property": "fullnarp", "current_version": newest_version, "data": current_version, } await websocket.send(json.dumps(message)) print("Reply: " + json.dumps(message)) elif data.get("action", "") == "reconnect": async with current_version_lock: message = { "property": "fullnarp", "current_version": newest_version, "data": current_version, } await websocket.send(json.dumps(message)) elif data.get("action", "") == "remove_event": pass elif data.get("action", "") == "set_event": eventid = data["event_id"] day = data["day"] room = data["room"] time = data["time"] lastupdate = data["lastupdate"] async with current_version_lock: newest_version += 1 # Increment the version print( "Moving event: " + eventid + " to day " + day + " at " + time + " in room " + room + " newcurrentversion " + str(newest_version) ) if not eventid in current_version or int( current_version[eventid]["lastupdate"] ) <= int(lastupdate): current_version[eventid] = { "day": day, "room": room, "time": time, "lastupdate": int(newest_version), } with open( "versions/fullnarp_" + str(newest_version).zfill(5) + ".json", "w", ) as outfile: json.dump(current_version, outfile) # Notify all clients about the updated global state await notify_clients() except json.JSONDecodeError: await websocket.send(json.dumps({"error": "Invalid JSON"})) except websockets.exceptions.ConnectionClosedError as e: print(f"Client disconnected abruptly: {e}") except ConnectionClosedOK: pass finally: # Cleanup when the client disconnects del clients[websocket] async def main(): parser = ArgumentParser(description="halfnarp2") parser.add_argument( "-c", "--config", help="Config file location", default="./config.json" ) args = parser.parse_args() global engine, SessionLocal with open(args.config, mode="r", encoding="utf-8") as json_file: config = json.load(json_file) DATABASE_URL = config.get("database-uri", "sqlite:///test.db") print("Connecting to " + DATABASE_URL) engine = create_engine(DATABASE_URL, echo=False) SessionLocal = sessionmaker(bind=engine) Base.metadata.create_all(bind=engine) # load state file newest_file = sorted(listdir("versions/"))[-1] global newest_version global current_version if newest_file: newest_version = int(newest_file.replace("fullnarp_", "").replace(".json", "")) print("Resuming from version: " + str(newest_version)) with open("versions/" + str(newest_file)) as data_file: current_version = json.load(data_file) else: current_version = {} newest_version = 0 async with websockets.serve(handle_client, "localhost", 22378): print("WebSocket server started on ws://localhost:22378") await asyncio.Future() # Run forever if __name__ == "__main__": asyncio.run(main())