feat: optional export task and expandable config parsing with named tuples

This commit is contained in:
17ms 2024-07-15 19:53:35 +03:00
parent 43c38656dd
commit ccee3ecd01
Signed by untrusted user who does not match committer: ae
GPG Key ID: 995EFD5C1B532B3E
3 changed files with 47 additions and 29 deletions

41
main.py
View File

@ -5,16 +5,19 @@ import asyncio
import threading import threading
import logging import logging
import signal import signal
from collections import namedtuple
import aioprocessing import aioprocessing
from dotenv import load_dotenv from dotenv import load_dotenv
from src.const import DEFAULT_EXPORT_INTERVAL, DEFAULT_MODE from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT
from src.mempool import WebSocketThread, QueueProcessor from src.mempool import WebSocketThread, QueueProcessor
from src.db import Handler, periodic_export from src.db import Handler, periodic_export
Config = namedtuple("Config", ["mode", "export_interval", "is_export"])
async def shutdown(loop, signal=None): async def shutdown(loop, signal=None):
"""Cleanup tasks tied to the service's shutdown.""" """Run cleanup tasks tied to the service's shutdown."""
if signal: if signal:
logging.info("Received exit signal %s", signal.name) logging.info("Received exit signal %s", signal.name)
@ -34,40 +37,34 @@ async def shutdown(loop, signal=None):
def load_cfg(dotenv_path=".env"): def load_cfg(dotenv_path=".env"):
"""Parse configuration from environment variables located in `dotenv_path` or from defaults."""
load_dotenv(dotenv_path) load_dotenv(dotenv_path)
cfg = {}
print(f"[+] Environment variables loaded from '{dotenv_path}'\n---") print(f"[+] Environment variables loaded from '{dotenv_path}'\n---")
cfg["MODE"] = os.getenv("MODE") mode = os.getenv("MODE", DEFAULT_MODE).lower()
cfg["EXPORT_INTERVAL"] = os.getenv("EXPORT_INTERVAL") export_interval = int(os.getenv("EXPORT_INTERVAL", DEFAULT_EXPORT_INTERVAL))
is_export = os.getenv("IS_EXPORT", DEFAULT_IS_EXPORT).lower() in ("true", "1", "t")
if cfg["MODE"] is None: cfg = Config(mode, export_interval, is_export)
cfg["MODE"] = DEFAULT_MODE
if cfg["EXPORT_INTERVAL"] is None:
cfg["EXPORT_INTERVAL"] = DEFAULT_EXPORT_INTERVAL
else:
cfg["EXPORT_INTERVAL"] = int(cfg["EXPORT_INTERVAL"])
return cfg return cfg
def main(): def main():
cfg = load_cfg() cfg = load_cfg()
mode = cfg["MODE"]
if mode.lower() == "production": if cfg.mode == "production":
log_level = logging.INFO log_level = logging.INFO
else: else:
log_level = logging.DEBUG log_level = logging.DEBUG
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level) logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level)
logging.info("Logger initialized") logging.info("Logger initialized")
logging.info("MODE: %s", cfg["MODE"]) logging.info("MODE: %s", cfg.mode)
logging.info("EXPORT_INTERVAL: %d (seconds)", cfg["EXPORT_INTERVAL"]) logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval)
logging.info("IS_EXPORT: %r", cfg.is_export)
# FIFO queue for crosst-thread communications # FIFO queue for cross-thread communications
q = aioprocessing.AioQueue() q = aioprocessing.AioQueue()
handler = Handler() handler = Handler()
shutdown_event = threading.Event() shutdown_event = threading.Event()
@ -77,12 +74,14 @@ def main():
ws_thread = WebSocketThread(q, shutdown_event) ws_thread = WebSocketThread(q, shutdown_event)
qp_thread = QueueProcessor(q, shutdown_event, handler) qp_thread = QueueProcessor(q, shutdown_event, handler)
export_thread = threading.Thread( export_thread = threading.Thread(
target=periodic_export, target=periodic_export,
args=( args=(
export_loop, export_loop,
handler, handler,
cfg["EXPORT_INTERVAL"], cfg.export_interval,
cfg.is_export,
shutdown_event, shutdown_event,
), ),
) )
@ -116,8 +115,12 @@ def main():
finally: finally:
export_loop.stop() export_loop.stop()
export_loop.close() export_loop.close()
logging.info("Export loop shut down")
shutdown_loop.stop() shutdown_loop.stop()
shutdown_loop.close() shutdown_loop.close()
logging.info("Shutdown loop shut down")
logging.info("Shutdown sequence completed successfully!") logging.info("Shutdown sequence completed successfully!")

View File

@ -1,12 +1,24 @@
import json import json
DEFAULT_MODE = "production" # Blockchain.com endpoint and the subscription message which initializes the "transaction stream"
WS_ADDR = "wss://ws.blockchain.info/coins" WS_ADDR = "wss://ws.blockchain.info/coins"
# Optionally `confirmed_transaction` can be used (bursts of data instead of a steady stream, which is worse for the overall performance) # Optionally `confirmed_transaction` can be used (bursts of data instead of a steady stream, which is worse for the overall performance)
SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "pending_transaction"}) SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "pending_transaction"})
WS_RECONNECT_PAUSE = 2 # Seconds
WS_INTERMSG_TIMEOUT = 1 # Seconds
# Pause before reconnecting after the WebSocket connection is accidentally dropped by either party
WS_RECONNECT_PAUSE = 2
# Timeout for asynchronous WebSocket reading (seconds)
WS_INTERMSG_TIMEOUT = 1
# Timeout for asynchronous queue operations (`coro_get` and `coro_put`, seconds)
QUEUE_OP_TIMEOUT = 1
# Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`)
DEFAULT_DB_PATH = "./data/chainmapper.sqlite3"
DEFAULT_EXPORT_PATH = "./data/export.json" DEFAULT_EXPORT_PATH = "./data/export.json"
DEFAULT_EXPORT_INTERVAL = 10800 # 3 hours
# Defaults to environment variables (must be strings for this reason, interval in seconds)
DEFAULT_MODE = "production"
DEFAULT_EXPORT_INTERVAL = "10800"
DEFAULT_IS_EXPORT = "False"

View File

@ -4,13 +4,13 @@ import logging
import threading import threading
import asyncio import asyncio
from src.const import DEFAULT_EXPORT_PATH from src.const import DEFAULT_DB_PATH, DEFAULT_EXPORT_PATH
class Handler: class Handler:
"""Handle all SQLite connections required to create, update, and export the stored addresses.""" """Handle all SQLite connections required to create, update, and export the stored addresses."""
def __init__(self, database="chainmapper.sqlite3"): def __init__(self, database=DEFAULT_DB_PATH):
self.database = database self.database = database
# Notably `connect` automatically creates the database if it doesn't already exist # Notably `connect` automatically creates the database if it doesn't already exist
self.con = sqlite3.connect(self.database, check_same_thread=False) self.con = sqlite3.connect(self.database, check_same_thread=False)
@ -78,7 +78,9 @@ class Handler:
logging.info("Data exported to '%s'", filepath) logging.info("Data exported to '%s'", filepath)
def periodic_export(loop, handler, interval, shutdown_event): def periodic_export(loop, handler, interval, is_export, shutdown_event):
"""Create a task that exports the internal database based on `interval` (seconds) until `shutdown_event` is set"""
async def task(handler, interval, shutdown_event): async def task(handler, interval, shutdown_event):
logging.info("Scheduled export task initialized") logging.info("Scheduled export task initialized")
@ -96,5 +98,6 @@ def periodic_export(loop, handler, interval, shutdown_event):
logging.info("Periodic export thread quitting") logging.info("Periodic export thread quitting")
asyncio.set_event_loop(loop) if is_export:
loop.run_until_complete(task(handler, interval, shutdown_event)) asyncio.set_event_loop(loop)
loop.run_until_complete(task(handler, interval, shutdown_event))