fix: fix async operations' safety timeouts

This commit is contained in:
17ms 2024-07-15 19:54:31 +03:00
parent ccee3ecd01
commit 340f199b3a
Signed by untrusted user who does not match committer: ae
GPG Key ID: 995EFD5C1B532B3E

View File

@ -4,7 +4,7 @@ import threading
import logging import logging
import websockets import websockets
from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT, QUEUE_OP_TIMEOUT
class WebSocketThread(threading.Thread): class WebSocketThread(threading.Thread):
@ -39,7 +39,8 @@ class WebSocketThread(threading.Thread):
if data is None: if data is None:
continue continue
await self.q.coro_put(data) # This shouldn't really be an issue, but it's safer to set a timeout here too...
await asyncio.wait_for(self.q.coro_put(data), timeout=QUEUE_OP_TIMEOUT)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logging.debug("WebSocket receiver timed out before fetching a new message, reattempting") logging.debug("WebSocket receiver timed out before fetching a new message, reattempting")
continue continue
@ -56,8 +57,6 @@ class WebSocketThread(threading.Thread):
self.shutdown_event.set() self.shutdown_event.set()
break break
logging.info("WebSocket thread quitting")
def handle_msg(self, msg): def handle_msg(self, msg):
msg_json = json.loads(msg) msg_json = json.loads(msg)
@ -91,7 +90,7 @@ class WebSocketThread(threading.Thread):
finally: finally:
loop.close() loop.close()
logging.info("WebSocket thread quitting without attempting to reconnect") logging.info("WebSocket thread quitting")
class QueueProcessor(threading.Thread): class QueueProcessor(threading.Thread):
@ -105,10 +104,12 @@ class QueueProcessor(threading.Thread):
self.handler = handler self.handler = handler
async def process_queue(self): async def process_queue(self):
logging.info("Queue operations timeout set to %d seconds", QUEUE_OP_TIMEOUT)
while not self.shutdown_event.is_set(): while not self.shutdown_event.is_set():
try: try:
# Timeout is necessary to make sure the state of the shutdown event is checked often enough # Timeout is necessary to make sure the state of the shutdown event is checked often enough
tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=WS_INTERMSG_TIMEOUT) tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=QUEUE_OP_TIMEOUT)
await self.handler.store(tx_sender) await self.handler.store(tx_sender)
# pylint: disable=broad-exception-caught # pylint: disable=broad-exception-caught
except asyncio.TimeoutError: except asyncio.TimeoutError: