diff --git a/.gitignore b/.gitignore index 18c6659..78ba40e 100644 --- a/.gitignore +++ b/.gitignore @@ -166,6 +166,7 @@ cython_debug/ # Development database chainmapper.sqlite3 +export.json # TODOs/tasks TASKS.md diff --git a/main.py b/main.py index 65bafae..064aa51 100755 --- a/main.py +++ b/main.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 +import os import asyncio import threading import logging import aioprocessing -from dotenv import dotenv_values +from dotenv import load_dotenv from src.mempool import WebSocketThread, QueueProcessor from src.db import Handler, periodic_export -from src.const import EXPORT_INTERVAL async def shutdown(loop, signal=None): @@ -31,30 +31,61 @@ async def shutdown(loop, signal=None): loop.stop() -def main(env_path=".env"): - cfg = dotenv_values(env_path) +def load_cfg(dotenv_path=".env"): + load_dotenv(dotenv_path) + cfg = {} + + print(f"[+] Environment variables loaded from '{dotenv_path}'\n---") + + cfg["MODE"] = os.getenv("MODE") + cfg["EXPORT_INTERVAL"] = os.getenv("EXPORT_INTERVAL") + + if cfg["MODE"] is None: + cfg["MODE"] = "production" + + if cfg["EXPORT_INTERVAL"] is None: + cfg["EXPORT_INTERVAL"] = 24 * 60 * 60 # 24 hours in seconds + else: + cfg["EXPORT_INTERVAL"] = int(cfg["EXPORT_INTERVAL"]) + + return cfg + + +def main(): + cfg = load_cfg() mode = cfg["MODE"] - if mode is None or mode.lower() == "production": + if mode.lower() == "production": log_level = logging.INFO else: log_level = logging.DEBUG logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level) + logging.info("Logger initialized") + logging.info("MODE: %s", cfg["MODE"]) + logging.info("EXPORT_INTERVAL: %d (seconds)", cfg["EXPORT_INTERVAL"]) # FIFO queue for crosst-thread communications q = aioprocessing.AioQueue() handler = Handler() - - loop = asyncio.new_event_loop() - # TODO: handle scheduling of the export task - # loop.create_task(periodic_export(handler, EXPORT_INTERVAL)) - # export_task_fut = asyncio.run_coroutine_threadsafe(periodic_export, loop) shutdown_event = threading.Event() + shutdown_loop = asyncio.new_event_loop() + export_loop = asyncio.new_event_loop() + + export_thread = threading.Thread( + target=periodic_export, + args=( + export_loop, + handler, + cfg["EXPORT_INTERVAL"], + shutdown_event, + ), + ) + export_thread.start() + ws_thread = WebSocketThread(q, shutdown_event) qp_thread = QueueProcessor(q, shutdown_event, handler) - ws_thread.start() qp_thread.start() @@ -64,12 +95,15 @@ def main(env_path=".env"): except KeyboardInterrupt: logging.info("Keyboard interrupt received, shutting down threads.") shutdown_event.set() - loop.run_until_complete(shutdown(loop)) + shutdown_loop.run_until_complete(shutdown(shutdown_loop)) ws_thread.join() qp_thread.join() + export_thread.join() finally: - loop.stop() - loop.close() + export_loop.stop() + export_loop.close() + shutdown_loop.stop() + shutdown_loop.close() if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index cc2010a..64689b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ aioprocessing==2.0.1 +python-dotenv==1.0.1 websockets==12.0 diff --git a/src/const.py b/src/const.py index 2115fc7..f02e346 100644 --- a/src/const.py +++ b/src/const.py @@ -2,6 +2,3 @@ import json WS_ADDR = "wss://ws.blockchain.info/coins" SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "confirmed_transaction"}) - -# EXPORT_INTERVAL = 24 * 60 * 60 # 24 hours in seconds -EXPORT_INTERVAL = 30 diff --git a/src/db.py b/src/db.py index b784023..a1c19cb 100644 --- a/src/db.py +++ b/src/db.py @@ -70,9 +70,21 @@ class Handler: logging.info("Data exported to '%s'", filename) -async def periodic_export(handler, interval): - logging.info("Scheduled export task created") +def periodic_export(loop, handler, interval, shutdown_event): + async def task(handler, interval, shutdown_event): + logging.info("Scheduled export task initialized") - while True: - await asyncio.sleep(interval) - await handler.export() + # Checks the shutdown_event every 5 seconds + check_interval = 5 + elapsed = 0 + + while not shutdown_event.is_set(): + await asyncio.sleep(check_interval) + elapsed += check_interval + + if elapsed >= interval: + await handler.export() + elapsed = 0 + + asyncio.set_event_loop(loop) + loop.run_until_complete(task(handler, interval, shutdown_event))