fix(closes #1): inter message timeouts and improved debug logging

This commit is contained in:
17ms 2024-07-14 18:58:25 +03:00
parent ba2140b11f
commit 43c38656dd
Signed by untrusted user who does not match committer: ae
GPG Key ID: 995EFD5C1B532B3E
3 changed files with 34 additions and 9 deletions

View File

@ -75,6 +75,8 @@ def main():
shutdown_loop = asyncio.new_event_loop() shutdown_loop = asyncio.new_event_loop()
export_loop = asyncio.new_event_loop() export_loop = asyncio.new_event_loop()
ws_thread = WebSocketThread(q, shutdown_event)
qp_thread = QueueProcessor(q, shutdown_event, handler)
export_thread = threading.Thread( export_thread = threading.Thread(
target=periodic_export, target=periodic_export,
args=( args=(
@ -84,12 +86,10 @@ def main():
shutdown_event, shutdown_event,
), ),
) )
export_thread.start()
ws_thread = WebSocketThread(q, shutdown_event)
qp_thread = QueueProcessor(q, shutdown_event, handler)
ws_thread.start() ws_thread.start()
qp_thread.start() qp_thread.start()
export_thread.start()
def handle_exit(): def handle_exit():
logging.info("Shutdown procedure initialized") logging.info("Shutdown procedure initialized")
@ -111,13 +111,14 @@ def main():
ws_thread.join() ws_thread.join()
qp_thread.join() qp_thread.join()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Keyboard interrupt received, shutting down threads.") logging.info("Keyboard interrupt received, shutting down threads")
handle_exit() handle_exit()
finally: finally:
export_loop.stop() export_loop.stop()
export_loop.close() export_loop.close()
shutdown_loop.stop() shutdown_loop.stop()
shutdown_loop.close() shutdown_loop.close()
logging.info("Shutdown sequence completed successfully!")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,6 +1,7 @@
import sqlite3 import sqlite3
import json import json
import logging import logging
import threading
import asyncio import asyncio
from src.const import DEFAULT_EXPORT_PATH from src.const import DEFAULT_EXPORT_PATH
@ -14,6 +15,7 @@ class Handler:
# Notably `connect` automatically creates the database if it doesn't already exist # Notably `connect` automatically creates the database if it doesn't already exist
self.con = sqlite3.connect(self.database, check_same_thread=False) self.con = sqlite3.connect(self.database, check_same_thread=False)
self.cursor = self.con.cursor() self.cursor = self.con.cursor()
self.lock = threading.RLock()
# Initialize the table if necessary # Initialize the table if necessary
self.cursor.execute( self.cursor.execute(
@ -31,7 +33,9 @@ class Handler:
async def store(self, address): async def store(self, address):
"""Store a new address into the SQLite database, or increments the counter by one if the given address already exists in the database.""" """Store a new address into the SQLite database, or increments the counter by one if the given address already exists in the database."""
await asyncio.to_thread(self._store, address) with self.lock:
logging.debug("Reentrant lock acquired")
await asyncio.to_thread(self._store, address)
def _store(self, address): def _store(self, address):
self.cursor.execute( self.cursor.execute(
@ -50,7 +54,9 @@ class Handler:
async def export(self, filepath=DEFAULT_EXPORT_PATH): async def export(self, filepath=DEFAULT_EXPORT_PATH):
"""Export the addresses from the SQLite database in descending order based on the transaction counts.""" """Export the addresses from the SQLite database in descending order based on the transaction counts."""
await asyncio.to_thread(self._export, filepath) with self.lock:
logging.debug("Reentrant lock acquired")
await asyncio.to_thread(self._export, filepath)
def _export(self, filepath): def _export(self, filepath):
self.cursor.execute( self.cursor.execute(
@ -88,5 +94,7 @@ def periodic_export(loop, handler, interval, shutdown_event):
await handler.export() await handler.export()
elapsed = 0 elapsed = 0
logging.info("Periodic export thread quitting")
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.run_until_complete(task(handler, interval, shutdown_event)) loop.run_until_complete(task(handler, interval, shutdown_event))

View File

@ -4,7 +4,7 @@ import threading
import logging import logging
import websockets import websockets
from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT
class WebSocketThread(threading.Thread): class WebSocketThread(threading.Thread):
@ -20,6 +20,8 @@ class WebSocketThread(threading.Thread):
async def connect(self): async def connect(self):
async with websockets.connect(WS_ADDR) as ws: async with websockets.connect(WS_ADDR) as ws:
logging.info("Inter message timeout set to %d seconds", WS_INTERMSG_TIMEOUT)
logging.info("WebSocket connection established successfully") logging.info("WebSocket connection established successfully")
await ws.send(self.sub_msg) await ws.send(self.sub_msg)
logging.info("Subscription message sent") logging.info("Subscription message sent")
@ -30,13 +32,17 @@ class WebSocketThread(threading.Thread):
while not self.shutdown_event.is_set(): while not self.shutdown_event.is_set():
try: try:
msg = await ws.recv() # Timeout is necessary to make sure the state of the shutdown event is checked often enough
msg = await asyncio.wait_for(ws.recv(), timeout=WS_INTERMSG_TIMEOUT)
data = self.handle_msg(msg) data = self.handle_msg(msg)
if data is None: if data is None:
continue continue
await self.q.coro_put(data) await self.q.coro_put(data)
except asyncio.TimeoutError:
logging.debug("WebSocket receiver timed out before fetching a new message, reattempting")
continue
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
logging.info( logging.info(
"WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection", "WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection",
@ -50,6 +56,8 @@ class WebSocketThread(threading.Thread):
self.shutdown_event.set() self.shutdown_event.set()
break break
logging.info("WebSocket thread quitting")
def handle_msg(self, msg): def handle_msg(self, msg):
msg_json = json.loads(msg) msg_json = json.loads(msg)
@ -83,6 +91,8 @@ class WebSocketThread(threading.Thread):
finally: finally:
loop.close() loop.close()
logging.info("WebSocket thread quitting without attempting to reconnect")
class QueueProcessor(threading.Thread): class QueueProcessor(threading.Thread):
"""Handle processing of items from the cross-thread queue where the WebSocket thread feeds data into.""" """Handle processing of items from the cross-thread queue where the WebSocket thread feeds data into."""
@ -97,14 +107,20 @@ class QueueProcessor(threading.Thread):
async def process_queue(self): async def process_queue(self):
while not self.shutdown_event.is_set(): while not self.shutdown_event.is_set():
try: try:
tx_sender = await self.q.coro_get() # Waits here until new msg is available # Timeout is necessary to make sure the state of the shutdown event is checked often enough
tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=WS_INTERMSG_TIMEOUT)
await self.handler.store(tx_sender) await self.handler.store(tx_sender)
# pylint: disable=broad-exception-caught # pylint: disable=broad-exception-caught
except asyncio.TimeoutError:
logging.debug("Queue processor timed out before fetching a new message, reattempting")
continue
except Exception as e: except Exception as e:
logging.error("QueueProcessor thread crashed: %s", str(e)) logging.error("QueueProcessor thread crashed: %s", str(e))
self.shutdown_event.set() self.shutdown_event.set()
break break
logging.info("Queue processor thread quitting")
def run(self): def run(self):
"""Start the queue processing thread that'll run until it receives a shutdown message or crashes.""" """Start the queue processing thread that'll run until it receives a shutdown message or crashes."""
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()