From 340f199b3acef13a9be06a3b087b320e514b417f Mon Sep 17 00:00:00 2001 From: 17ms Date: Mon, 15 Jul 2024 19:54:31 +0300 Subject: [PATCH] fix: fix async operations' safety timeouts --- src/mempool.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/mempool.py b/src/mempool.py index 5539a05..ff1aca3 100644 --- a/src/mempool.py +++ b/src/mempool.py @@ -4,7 +4,7 @@ import threading import logging import websockets -from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT +from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT, QUEUE_OP_TIMEOUT class WebSocketThread(threading.Thread): @@ -39,7 +39,8 @@ class WebSocketThread(threading.Thread): if data is None: continue - await self.q.coro_put(data) + # This shouldn't really be an issue, but it's safer to set a timeout here too... + await asyncio.wait_for(self.q.coro_put(data), timeout=QUEUE_OP_TIMEOUT) except asyncio.TimeoutError: logging.debug("WebSocket receiver timed out before fetching a new message, reattempting") continue @@ -56,8 +57,6 @@ class WebSocketThread(threading.Thread): self.shutdown_event.set() break - logging.info("WebSocket thread quitting") - def handle_msg(self, msg): msg_json = json.loads(msg) @@ -91,7 +90,7 @@ class WebSocketThread(threading.Thread): finally: loop.close() - logging.info("WebSocket thread quitting without attempting to reconnect") + logging.info("WebSocket thread quitting") class QueueProcessor(threading.Thread): @@ -105,10 +104,12 @@ class QueueProcessor(threading.Thread): self.handler = handler async def process_queue(self): + logging.info("Queue operations timeout set to %d seconds", QUEUE_OP_TIMEOUT) + while not self.shutdown_event.is_set(): try: # Timeout is necessary to make sure the state of the shutdown event is checked often enough - tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=WS_INTERMSG_TIMEOUT) + tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=QUEUE_OP_TIMEOUT) await self.handler.store(tx_sender) # pylint: disable=broad-exception-caught except asyncio.TimeoutError: