diff --git a/main.py b/main.py index 9a5ec69..ecf6133 100755 --- a/main.py +++ b/main.py @@ -1,12 +1,15 @@ #!/usr/bin/env python3 import os +import sys import asyncio import threading import logging import signal from collections import namedtuple +import sqlite3 import aioprocessing +import websockets from dotenv import load_dotenv from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT, VERSION @@ -65,6 +68,12 @@ def main(): logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval) logging.info("IS_EXPORT: %r", cfg.is_export) + # Information for debugging issues caused by potential version differences + logging.info("Python version: %s", sys.version) + logging.info("aioprocessing version: %s", aioprocessing.__version__) + logging.info("websockets version: %s", websockets.__version__) + logging.info("sqlite3 version: %s", sqlite3.version) + # FIFO queue for cross-thread communications q = aioprocessing.AioQueue() handler = Handler() @@ -93,10 +102,13 @@ def main(): def handle_exit(): logging.info("Shutdown procedure initialized") + shutdown_event.set() shutdown_loop.run_until_complete(shutdown(shutdown_loop)) - ws_thread.join() + + # NOTE: It's vital to close the queue processor first so that it doesn't halt the shutdown qp_thread.join() + ws_thread.join() export_thread.join() def handle_signal(signal, _frame): @@ -108,8 +120,8 @@ def main(): signal.signal(signal.SIGTERM, handle_signal) try: - ws_thread.join() qp_thread.join() + ws_thread.join() except KeyboardInterrupt: logging.info("Keyboard interrupt received, shutting down threads") handle_exit() diff --git a/src/const.py b/src/const.py index 6a9cd65..7d06148 100644 --- a/src/const.py +++ b/src/const.py @@ -10,12 +10,6 @@ SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "pending_ # Pause before reconnecting after the WebSocket connection is accidentally dropped by either party WS_RECONNECT_PAUSE = 2 -# Timeout for asynchronous WebSocket reading (seconds) -WS_INTERMSG_TIMEOUT = 1 - -# Timeout for asynchronous queue operations (`coro_get` and `coro_put`, seconds) -QUEUE_OP_TIMEOUT = 1 - # Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`) DEFAULT_DB_PATH = "./data/chainmapper.sqlite3" DEFAULT_EXPORT_PATH = "./data/export.json" diff --git a/src/mempool.py b/src/mempool.py index ff1aca3..2ea2b3a 100644 --- a/src/mempool.py +++ b/src/mempool.py @@ -4,7 +4,7 @@ import threading import logging import websockets -from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT, QUEUE_OP_TIMEOUT +from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE class WebSocketThread(threading.Thread): @@ -20,8 +20,6 @@ class WebSocketThread(threading.Thread): async def connect(self): async with websockets.connect(WS_ADDR) as ws: - logging.info("Inter message timeout set to %d seconds", WS_INTERMSG_TIMEOUT) - logging.info("WebSocket connection established successfully") await ws.send(self.sub_msg) logging.info("Subscription message sent") @@ -32,15 +30,13 @@ class WebSocketThread(threading.Thread): while not self.shutdown_event.is_set(): try: - # Timeout is necessary to make sure the state of the shutdown event is checked often enough - msg = await asyncio.wait_for(ws.recv(), timeout=WS_INTERMSG_TIMEOUT) + msg = await ws.recv() data = self.handle_msg(msg) if data is None: continue - # This shouldn't really be an issue, but it's safer to set a timeout here too... - await asyncio.wait_for(self.q.coro_put(data), timeout=QUEUE_OP_TIMEOUT) + await self.q.coro_put(data) except asyncio.TimeoutError: logging.debug("WebSocket receiver timed out before fetching a new message, reattempting") continue @@ -104,12 +100,10 @@ class QueueProcessor(threading.Thread): self.handler = handler async def process_queue(self): - logging.info("Queue operations timeout set to %d seconds", QUEUE_OP_TIMEOUT) - while not self.shutdown_event.is_set(): try: - # Timeout is necessary to make sure the state of the shutdown event is checked often enough - tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=QUEUE_OP_TIMEOUT) + # Might prevent a proper shutdown procedure if the queue feeder is closed before the processor + tx_sender = await self.q.coro_get() await self.handler.store(tx_sender) # pylint: disable=broad-exception-caught except asyncio.TimeoutError: