feat: proper periodic json export & cfg parsing

This commit is contained in:
17ms 2024-07-07 15:17:15 +03:00
parent e262f40290
commit b0f5ab1b83
Signed by untrusted user who does not match committer: ae
GPG Key ID: 995EFD5C1B532B3E
5 changed files with 67 additions and 22 deletions

1
.gitignore vendored
View File

@ -166,6 +166,7 @@ cython_debug/
# Development database # Development database
chainmapper.sqlite3 chainmapper.sqlite3
export.json
# TODOs/tasks # TODOs/tasks
TASKS.md TASKS.md

62
main.py
View File

@ -1,14 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import asyncio import asyncio
import threading import threading
import logging import logging
import aioprocessing import aioprocessing
from dotenv import dotenv_values from dotenv import load_dotenv
from src.mempool import WebSocketThread, QueueProcessor from src.mempool import WebSocketThread, QueueProcessor
from src.db import Handler, periodic_export from src.db import Handler, periodic_export
from src.const import EXPORT_INTERVAL
async def shutdown(loop, signal=None): async def shutdown(loop, signal=None):
@ -31,30 +31,61 @@ async def shutdown(loop, signal=None):
loop.stop() loop.stop()
def main(env_path=".env"): def load_cfg(dotenv_path=".env"):
cfg = dotenv_values(env_path) load_dotenv(dotenv_path)
cfg = {}
print(f"[+] Environment variables loaded from '{dotenv_path}'\n---")
cfg["MODE"] = os.getenv("MODE")
cfg["EXPORT_INTERVAL"] = os.getenv("EXPORT_INTERVAL")
if cfg["MODE"] is None:
cfg["MODE"] = "production"
if cfg["EXPORT_INTERVAL"] is None:
cfg["EXPORT_INTERVAL"] = 24 * 60 * 60 # 24 hours in seconds
else:
cfg["EXPORT_INTERVAL"] = int(cfg["EXPORT_INTERVAL"])
return cfg
def main():
cfg = load_cfg()
mode = cfg["MODE"] mode = cfg["MODE"]
if mode is None or mode.lower() == "production": if mode.lower() == "production":
log_level = logging.INFO log_level = logging.INFO
else: else:
log_level = logging.DEBUG log_level = logging.DEBUG
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level) logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level)
logging.info("Logger initialized")
logging.info("MODE: %s", cfg["MODE"])
logging.info("EXPORT_INTERVAL: %d (seconds)", cfg["EXPORT_INTERVAL"])
# FIFO queue for crosst-thread communications # FIFO queue for crosst-thread communications
q = aioprocessing.AioQueue() q = aioprocessing.AioQueue()
handler = Handler() handler = Handler()
loop = asyncio.new_event_loop()
# TODO: handle scheduling of the export task
# loop.create_task(periodic_export(handler, EXPORT_INTERVAL))
# export_task_fut = asyncio.run_coroutine_threadsafe(periodic_export, loop)
shutdown_event = threading.Event() shutdown_event = threading.Event()
shutdown_loop = asyncio.new_event_loop()
export_loop = asyncio.new_event_loop()
export_thread = threading.Thread(
target=periodic_export,
args=(
export_loop,
handler,
cfg["EXPORT_INTERVAL"],
shutdown_event,
),
)
export_thread.start()
ws_thread = WebSocketThread(q, shutdown_event) ws_thread = WebSocketThread(q, shutdown_event)
qp_thread = QueueProcessor(q, shutdown_event, handler) qp_thread = QueueProcessor(q, shutdown_event, handler)
ws_thread.start() ws_thread.start()
qp_thread.start() qp_thread.start()
@ -64,12 +95,15 @@ def main(env_path=".env"):
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Keyboard interrupt received, shutting down threads.") logging.info("Keyboard interrupt received, shutting down threads.")
shutdown_event.set() shutdown_event.set()
loop.run_until_complete(shutdown(loop)) shutdown_loop.run_until_complete(shutdown(shutdown_loop))
ws_thread.join() ws_thread.join()
qp_thread.join() qp_thread.join()
export_thread.join()
finally: finally:
loop.stop() export_loop.stop()
loop.close() export_loop.close()
shutdown_loop.stop()
shutdown_loop.close()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,2 +1,3 @@
aioprocessing==2.0.1 aioprocessing==2.0.1
python-dotenv==1.0.1
websockets==12.0 websockets==12.0

View File

@ -2,6 +2,3 @@ import json
WS_ADDR = "wss://ws.blockchain.info/coins" WS_ADDR = "wss://ws.blockchain.info/coins"
SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "confirmed_transaction"}) SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "confirmed_transaction"})
# EXPORT_INTERVAL = 24 * 60 * 60 # 24 hours in seconds
EXPORT_INTERVAL = 30

View File

@ -70,9 +70,21 @@ class Handler:
logging.info("Data exported to '%s'", filename) logging.info("Data exported to '%s'", filename)
async def periodic_export(handler, interval): def periodic_export(loop, handler, interval, shutdown_event):
logging.info("Scheduled export task created") async def task(handler, interval, shutdown_event):
logging.info("Scheduled export task initialized")
while True: # Checks the shutdown_event every 5 seconds
await asyncio.sleep(interval) check_interval = 5
await handler.export() elapsed = 0
while not shutdown_event.is_set():
await asyncio.sleep(check_interval)
elapsed += check_interval
if elapsed >= interval:
await handler.export()
elapsed = 0
asyncio.set_event_loop(loop)
loop.run_until_complete(task(handler, interval, shutdown_event))