diff --git a/main.py b/main.py index 8da80b6..52d1e29 100755 --- a/main.py +++ b/main.py @@ -5,16 +5,19 @@ import asyncio import threading import logging import signal +from collections import namedtuple import aioprocessing from dotenv import load_dotenv -from src.const import DEFAULT_EXPORT_INTERVAL, DEFAULT_MODE +from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT from src.mempool import WebSocketThread, QueueProcessor from src.db import Handler, periodic_export +Config = namedtuple("Config", ["mode", "export_interval", "is_export"]) + async def shutdown(loop, signal=None): - """Cleanup tasks tied to the service's shutdown.""" + """Run cleanup tasks tied to the service's shutdown.""" if signal: logging.info("Received exit signal %s", signal.name) @@ -34,40 +37,34 @@ async def shutdown(loop, signal=None): def load_cfg(dotenv_path=".env"): + """Parse configuration from environment variables located in `dotenv_path` or from defaults.""" load_dotenv(dotenv_path) - cfg = {} - print(f"[+] Environment variables loaded from '{dotenv_path}'\n---") - cfg["MODE"] = os.getenv("MODE") - cfg["EXPORT_INTERVAL"] = os.getenv("EXPORT_INTERVAL") + mode = os.getenv("MODE", DEFAULT_MODE).lower() + export_interval = int(os.getenv("EXPORT_INTERVAL", DEFAULT_EXPORT_INTERVAL)) + is_export = os.getenv("IS_EXPORT", DEFAULT_IS_EXPORT).lower() in ("true", "1", "t") - if cfg["MODE"] is None: - cfg["MODE"] = DEFAULT_MODE - - if cfg["EXPORT_INTERVAL"] is None: - cfg["EXPORT_INTERVAL"] = DEFAULT_EXPORT_INTERVAL - else: - cfg["EXPORT_INTERVAL"] = int(cfg["EXPORT_INTERVAL"]) + cfg = Config(mode, export_interval, is_export) return cfg def main(): cfg = load_cfg() - mode = cfg["MODE"] - if mode.lower() == "production": + if cfg.mode == "production": log_level = logging.INFO else: log_level = logging.DEBUG logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level) logging.info("Logger initialized") - logging.info("MODE: %s", cfg["MODE"]) - logging.info("EXPORT_INTERVAL: %d (seconds)", cfg["EXPORT_INTERVAL"]) + logging.info("MODE: %s", cfg.mode) + logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval) + logging.info("IS_EXPORT: %r", cfg.is_export) - # FIFO queue for crosst-thread communications + # FIFO queue for cross-thread communications q = aioprocessing.AioQueue() handler = Handler() shutdown_event = threading.Event() @@ -77,12 +74,14 @@ def main(): ws_thread = WebSocketThread(q, shutdown_event) qp_thread = QueueProcessor(q, shutdown_event, handler) + export_thread = threading.Thread( target=periodic_export, args=( export_loop, handler, - cfg["EXPORT_INTERVAL"], + cfg.export_interval, + cfg.is_export, shutdown_event, ), ) @@ -116,8 +115,12 @@ def main(): finally: export_loop.stop() export_loop.close() + logging.info("Export loop shut down") + shutdown_loop.stop() shutdown_loop.close() + logging.info("Shutdown loop shut down") + logging.info("Shutdown sequence completed successfully!") diff --git a/src/const.py b/src/const.py index a2434f9..90561f1 100644 --- a/src/const.py +++ b/src/const.py @@ -1,12 +1,24 @@ import json -DEFAULT_MODE = "production" - +# Blockchain.com endpoint and the subscription message which initializes the "transaction stream" WS_ADDR = "wss://ws.blockchain.info/coins" # Optionally `confirmed_transaction` can be used (bursts of data instead of a steady stream, which is worse for the overall performance) SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "pending_transaction"}) -WS_RECONNECT_PAUSE = 2 # Seconds -WS_INTERMSG_TIMEOUT = 1 # Seconds +# Pause before reconnecting after the WebSocket connection is accidentally dropped by either party +WS_RECONNECT_PAUSE = 2 + +# Timeout for asynchronous WebSocket reading (seconds) +WS_INTERMSG_TIMEOUT = 1 + +# Timeout for asynchronous queue operations (`coro_get` and `coro_put`, seconds) +QUEUE_OP_TIMEOUT = 1 + +# Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`) +DEFAULT_DB_PATH = "./data/chainmapper.sqlite3" DEFAULT_EXPORT_PATH = "./data/export.json" -DEFAULT_EXPORT_INTERVAL = 10800 # 3 hours + +# Defaults to environment variables (must be strings for this reason, interval in seconds) +DEFAULT_MODE = "production" +DEFAULT_EXPORT_INTERVAL = "10800" +DEFAULT_IS_EXPORT = "False" diff --git a/src/db.py b/src/db.py index 82156e8..0b36f56 100644 --- a/src/db.py +++ b/src/db.py @@ -4,13 +4,13 @@ import logging import threading import asyncio -from src.const import DEFAULT_EXPORT_PATH +from src.const import DEFAULT_DB_PATH, DEFAULT_EXPORT_PATH class Handler: """Handle all SQLite connections required to create, update, and export the stored addresses.""" - def __init__(self, database="chainmapper.sqlite3"): + def __init__(self, database=DEFAULT_DB_PATH): self.database = database # Notably `connect` automatically creates the database if it doesn't already exist self.con = sqlite3.connect(self.database, check_same_thread=False) @@ -78,7 +78,9 @@ class Handler: logging.info("Data exported to '%s'", filepath) -def periodic_export(loop, handler, interval, shutdown_event): +def periodic_export(loop, handler, interval, is_export, shutdown_event): + """Create a task that exports the internal database based on `interval` (seconds) until `shutdown_event` is set""" + async def task(handler, interval, shutdown_event): logging.info("Scheduled export task initialized") @@ -96,5 +98,6 @@ def periodic_export(loop, handler, interval, shutdown_event): logging.info("Periodic export thread quitting") - asyncio.set_event_loop(loop) - loop.run_until_complete(task(handler, interval, shutdown_event)) + if is_export: + asyncio.set_event_loop(loop) + loop.run_until_complete(task(handler, interval, shutdown_event))