From 43c38656dde7b32e44f2c790a5ba33b2b0a7b671 Mon Sep 17 00:00:00 2001 From: 17ms Date: Sun, 14 Jul 2024 18:58:25 +0300 Subject: [PATCH] fix(closes #1): inter message timeouts and improved debug logging --- main.py | 9 +++++---- src/db.py | 12 ++++++++++-- src/mempool.py | 22 +++++++++++++++++++--- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/main.py b/main.py index 383a76e..8da80b6 100755 --- a/main.py +++ b/main.py @@ -75,6 +75,8 @@ def main(): shutdown_loop = asyncio.new_event_loop() export_loop = asyncio.new_event_loop() + ws_thread = WebSocketThread(q, shutdown_event) + qp_thread = QueueProcessor(q, shutdown_event, handler) export_thread = threading.Thread( target=periodic_export, args=( @@ -84,12 +86,10 @@ def main(): shutdown_event, ), ) - export_thread.start() - ws_thread = WebSocketThread(q, shutdown_event) - qp_thread = QueueProcessor(q, shutdown_event, handler) ws_thread.start() qp_thread.start() + export_thread.start() def handle_exit(): logging.info("Shutdown procedure initialized") @@ -111,13 +111,14 @@ def main(): ws_thread.join() qp_thread.join() except KeyboardInterrupt: - logging.info("Keyboard interrupt received, shutting down threads.") + logging.info("Keyboard interrupt received, shutting down threads") handle_exit() finally: export_loop.stop() export_loop.close() shutdown_loop.stop() shutdown_loop.close() + logging.info("Shutdown sequence completed successfully!") if __name__ == "__main__": diff --git a/src/db.py b/src/db.py index f2cf64a..82156e8 100644 --- a/src/db.py +++ b/src/db.py @@ -1,6 +1,7 @@ import sqlite3 import json import logging +import threading import asyncio from src.const import DEFAULT_EXPORT_PATH @@ -14,6 +15,7 @@ class Handler: # Notably `connect` automatically creates the database if it doesn't already exist self.con = sqlite3.connect(self.database, check_same_thread=False) self.cursor = self.con.cursor() + self.lock = threading.RLock() # Initialize the table if necessary self.cursor.execute( @@ -31,7 +33,9 @@ class Handler: async def store(self, address): """Store a new address into the SQLite database, or increments the counter by one if the given address already exists in the database.""" - await asyncio.to_thread(self._store, address) + with self.lock: + logging.debug("Reentrant lock acquired") + await asyncio.to_thread(self._store, address) def _store(self, address): self.cursor.execute( @@ -50,7 +54,9 @@ class Handler: async def export(self, filepath=DEFAULT_EXPORT_PATH): """Export the addresses from the SQLite database in descending order based on the transaction counts.""" - await asyncio.to_thread(self._export, filepath) + with self.lock: + logging.debug("Reentrant lock acquired") + await asyncio.to_thread(self._export, filepath) def _export(self, filepath): self.cursor.execute( @@ -88,5 +94,7 @@ def periodic_export(loop, handler, interval, shutdown_event): await handler.export() elapsed = 0 + logging.info("Periodic export thread quitting") + asyncio.set_event_loop(loop) loop.run_until_complete(task(handler, interval, shutdown_event)) diff --git a/src/mempool.py b/src/mempool.py index 864c9d4..5539a05 100644 --- a/src/mempool.py +++ b/src/mempool.py @@ -4,7 +4,7 @@ import threading import logging import websockets -from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE +from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT class WebSocketThread(threading.Thread): @@ -20,6 +20,8 @@ class WebSocketThread(threading.Thread): async def connect(self): async with websockets.connect(WS_ADDR) as ws: + logging.info("Inter message timeout set to %d seconds", WS_INTERMSG_TIMEOUT) + logging.info("WebSocket connection established successfully") await ws.send(self.sub_msg) logging.info("Subscription message sent") @@ -30,13 +32,17 @@ class WebSocketThread(threading.Thread): while not self.shutdown_event.is_set(): try: - msg = await ws.recv() + # Timeout is necessary to make sure the state of the shutdown event is checked often enough + msg = await asyncio.wait_for(ws.recv(), timeout=WS_INTERMSG_TIMEOUT) data = self.handle_msg(msg) if data is None: continue await self.q.coro_put(data) + except asyncio.TimeoutError: + logging.debug("WebSocket receiver timed out before fetching a new message, reattempting") + continue except websockets.exceptions.ConnectionClosed: logging.info( "WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection", @@ -50,6 +56,8 @@ class WebSocketThread(threading.Thread): self.shutdown_event.set() break + logging.info("WebSocket thread quitting") + def handle_msg(self, msg): msg_json = json.loads(msg) @@ -83,6 +91,8 @@ class WebSocketThread(threading.Thread): finally: loop.close() + logging.info("WebSocket thread quitting without attempting to reconnect") + class QueueProcessor(threading.Thread): """Handle processing of items from the cross-thread queue where the WebSocket thread feeds data into.""" @@ -97,14 +107,20 @@ class QueueProcessor(threading.Thread): async def process_queue(self): while not self.shutdown_event.is_set(): try: - tx_sender = await self.q.coro_get() # Waits here until new msg is available + # Timeout is necessary to make sure the state of the shutdown event is checked often enough + tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=WS_INTERMSG_TIMEOUT) await self.handler.store(tx_sender) # pylint: disable=broad-exception-caught + except asyncio.TimeoutError: + logging.debug("Queue processor timed out before fetching a new message, reattempting") + continue except Exception as e: logging.error("QueueProcessor thread crashed: %s", str(e)) self.shutdown_event.set() break + logging.info("Queue processor thread quitting") + def run(self): """Start the queue processing thread that'll run until it receives a shutdown message or crashes.""" loop = asyncio.new_event_loop()