diff --git a/main.py b/main.py index ecf6133..82ec6da 100755 --- a/main.py +++ b/main.py @@ -11,10 +11,9 @@ import sqlite3 import aioprocessing import websockets from dotenv import load_dotenv +import requests -from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT, VERSION -from src.mempool import WebSocketThread, QueueProcessor -from src.db import Handler, periodic_export +from src import const, mempool, db Config = namedtuple("Config", ["mode", "export_interval", "is_export"]) @@ -44,9 +43,9 @@ def load_cfg(dotenv_path=".env"): load_dotenv(dotenv_path) print(f"[+] Environment variables loaded from '{dotenv_path}'\n---") - mode = os.getenv("MODE", DEFAULT_MODE).lower() - export_interval = int(os.getenv("EXPORT_INTERVAL", DEFAULT_EXPORT_INTERVAL)) - is_export = os.getenv("IS_EXPORT", DEFAULT_IS_EXPORT).lower() in ("true", "1", "t") + mode = os.getenv("MODE", const.DEFAULT_MODE).lower() + export_interval = int(os.getenv("EXPORT_INTERVAL", const.DEFAULT_EXPORT_INTERVAL)) + is_export = os.getenv("IS_EXPORT", const.DEFAULT_IS_EXPORT).lower() in ("true", "1", "t") cfg = Config(mode, export_interval, is_export) @@ -63,7 +62,7 @@ def main(): logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level) logging.info("Logger initialized") - logging.info("Currently running version %s", VERSION) + logging.info("Currently running version %s", const.VERSION) logging.info("MODE: %s", cfg.mode) logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval) logging.info("IS_EXPORT: %r", cfg.is_export) @@ -76,17 +75,17 @@ def main(): # FIFO queue for cross-thread communications q = aioprocessing.AioQueue() - handler = Handler() + handler = db.Handler() shutdown_event = threading.Event() shutdown_loop = asyncio.new_event_loop() export_loop = asyncio.new_event_loop() - ws_thread = WebSocketThread(q, shutdown_event) - qp_thread = QueueProcessor(q, shutdown_event, handler) + ws_thread = mempool.WebSocketThread(q, shutdown_event) + qp_thread = mempool.QueueProcessor(q, shutdown_event, handler) export_thread = threading.Thread( - target=periodic_export, + target=db.periodic_export, args=( export_loop, handler, diff --git a/src/const.py b/src/const.py index 7d06148..8b854bc 100644 --- a/src/const.py +++ b/src/const.py @@ -18,3 +18,5 @@ DEFAULT_EXPORT_PATH = "./data/export.json" DEFAULT_MODE = "production" DEFAULT_EXPORT_INTERVAL = "10800" DEFAULT_IS_EXPORT = "False" + +IP_TEST_ADDR = "https://ipv4.icanhazip.com" diff --git a/src/db.py b/src/db.py index 0b36f56..b21969b 100644 --- a/src/db.py +++ b/src/db.py @@ -4,13 +4,13 @@ import logging import threading import asyncio -from src.const import DEFAULT_DB_PATH, DEFAULT_EXPORT_PATH +from src import const class Handler: """Handle all SQLite connections required to create, update, and export the stored addresses.""" - def __init__(self, database=DEFAULT_DB_PATH): + def __init__(self, database=const.DEFAULT_DB_PATH): self.database = database # Notably `connect` automatically creates the database if it doesn't already exist self.con = sqlite3.connect(self.database, check_same_thread=False) @@ -52,7 +52,7 @@ class Handler: ) self.con.commit() - async def export(self, filepath=DEFAULT_EXPORT_PATH): + async def export(self, filepath=const.DEFAULT_EXPORT_PATH): """Export the addresses from the SQLite database in descending order based on the transaction counts.""" with self.lock: logging.debug("Reentrant lock acquired") diff --git a/src/mempool.py b/src/mempool.py index 2ea2b3a..e1258a1 100644 --- a/src/mempool.py +++ b/src/mempool.py @@ -4,13 +4,13 @@ import threading import logging import websockets -from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE +from src import const class WebSocketThread(threading.Thread): """Handle connection, subscription, and message parsing for the Blockchain.com WebSocket.""" - def __init__(self, q, shutdown_event, sub_msg=SUB_MSG): + def __init__(self, q, shutdown_event, sub_msg=const.SUB_MSG): super().__init__() self.name = "WebSocketThread" self.q = q @@ -19,7 +19,7 @@ class WebSocketThread(threading.Thread): self.tx_count = 0 async def connect(self): - async with websockets.connect(WS_ADDR) as ws: + async with websockets.connect(const.WS_ADDR) as ws: logging.info("WebSocket connection established successfully") await ws.send(self.sub_msg) logging.info("Subscription message sent") @@ -43,9 +43,9 @@ class WebSocketThread(threading.Thread): except websockets.exceptions.ConnectionClosed: logging.info( "WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection", - WS_RECONNECT_PAUSE, + const.WS_RECONNECT_PAUSE, ) - await asyncio.sleep(WS_RECONNECT_PAUSE) + await asyncio.sleep(const.WS_RECONNECT_PAUSE) break # pylint: disable=broad-exception-caught except Exception as e: