fix(43c3865): remove asyncio timeouts to prevent queue processor from halting

This commit is contained in:
17ms 2024-07-17 20:30:24 +03:00
parent 50efda3324
commit 757910642e
Signed by untrusted user who does not match committer: ae
GPG Key ID: 995EFD5C1B532B3E
3 changed files with 19 additions and 19 deletions

16
main.py
View File

@ -1,12 +1,15 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys
import asyncio import asyncio
import threading import threading
import logging import logging
import signal import signal
from collections import namedtuple from collections import namedtuple
import sqlite3
import aioprocessing import aioprocessing
import websockets
from dotenv import load_dotenv from dotenv import load_dotenv
from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT, VERSION from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT, VERSION
@ -65,6 +68,12 @@ def main():
logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval) logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval)
logging.info("IS_EXPORT: %r", cfg.is_export) logging.info("IS_EXPORT: %r", cfg.is_export)
# Information for debugging issues caused by potential version differences
logging.info("Python version: %s", sys.version)
logging.info("aioprocessing version: %s", aioprocessing.__version__)
logging.info("websockets version: %s", websockets.__version__)
logging.info("sqlite3 version: %s", sqlite3.version)
# FIFO queue for cross-thread communications # FIFO queue for cross-thread communications
q = aioprocessing.AioQueue() q = aioprocessing.AioQueue()
handler = Handler() handler = Handler()
@ -93,10 +102,13 @@ def main():
def handle_exit(): def handle_exit():
logging.info("Shutdown procedure initialized") logging.info("Shutdown procedure initialized")
shutdown_event.set() shutdown_event.set()
shutdown_loop.run_until_complete(shutdown(shutdown_loop)) shutdown_loop.run_until_complete(shutdown(shutdown_loop))
ws_thread.join()
# NOTE: It's vital to close the queue processor first so that it doesn't halt the shutdown
qp_thread.join() qp_thread.join()
ws_thread.join()
export_thread.join() export_thread.join()
def handle_signal(signal, _frame): def handle_signal(signal, _frame):
@ -108,8 +120,8 @@ def main():
signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGTERM, handle_signal)
try: try:
ws_thread.join()
qp_thread.join() qp_thread.join()
ws_thread.join()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Keyboard interrupt received, shutting down threads") logging.info("Keyboard interrupt received, shutting down threads")
handle_exit() handle_exit()

View File

@ -10,12 +10,6 @@ SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "pending_
# Pause before reconnecting after the WebSocket connection is accidentally dropped by either party # Pause before reconnecting after the WebSocket connection is accidentally dropped by either party
WS_RECONNECT_PAUSE = 2 WS_RECONNECT_PAUSE = 2
# Timeout for asynchronous WebSocket reading (seconds)
WS_INTERMSG_TIMEOUT = 1
# Timeout for asynchronous queue operations (`coro_get` and `coro_put`, seconds)
QUEUE_OP_TIMEOUT = 1
# Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`) # Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`)
DEFAULT_DB_PATH = "./data/chainmapper.sqlite3" DEFAULT_DB_PATH = "./data/chainmapper.sqlite3"
DEFAULT_EXPORT_PATH = "./data/export.json" DEFAULT_EXPORT_PATH = "./data/export.json"

View File

@ -4,7 +4,7 @@ import threading
import logging import logging
import websockets import websockets
from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT, QUEUE_OP_TIMEOUT from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE
class WebSocketThread(threading.Thread): class WebSocketThread(threading.Thread):
@ -20,8 +20,6 @@ class WebSocketThread(threading.Thread):
async def connect(self): async def connect(self):
async with websockets.connect(WS_ADDR) as ws: async with websockets.connect(WS_ADDR) as ws:
logging.info("Inter message timeout set to %d seconds", WS_INTERMSG_TIMEOUT)
logging.info("WebSocket connection established successfully") logging.info("WebSocket connection established successfully")
await ws.send(self.sub_msg) await ws.send(self.sub_msg)
logging.info("Subscription message sent") logging.info("Subscription message sent")
@ -32,15 +30,13 @@ class WebSocketThread(threading.Thread):
while not self.shutdown_event.is_set(): while not self.shutdown_event.is_set():
try: try:
# Timeout is necessary to make sure the state of the shutdown event is checked often enough msg = await ws.recv()
msg = await asyncio.wait_for(ws.recv(), timeout=WS_INTERMSG_TIMEOUT)
data = self.handle_msg(msg) data = self.handle_msg(msg)
if data is None: if data is None:
continue continue
# This shouldn't really be an issue, but it's safer to set a timeout here too... await self.q.coro_put(data)
await asyncio.wait_for(self.q.coro_put(data), timeout=QUEUE_OP_TIMEOUT)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logging.debug("WebSocket receiver timed out before fetching a new message, reattempting") logging.debug("WebSocket receiver timed out before fetching a new message, reattempting")
continue continue
@ -104,12 +100,10 @@ class QueueProcessor(threading.Thread):
self.handler = handler self.handler = handler
async def process_queue(self): async def process_queue(self):
logging.info("Queue operations timeout set to %d seconds", QUEUE_OP_TIMEOUT)
while not self.shutdown_event.is_set(): while not self.shutdown_event.is_set():
try: try:
# Timeout is necessary to make sure the state of the shutdown event is checked often enough # Might prevent a proper shutdown procedure if the queue feeder is closed before the processor
tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=QUEUE_OP_TIMEOUT) tx_sender = await self.q.coro_get()
await self.handler.store(tx_sender) await self.handler.store(tx_sender)
# pylint: disable=broad-exception-caught # pylint: disable=broad-exception-caught
except asyncio.TimeoutError: except asyncio.TimeoutError: