2024-06-30 21:04:22 +02:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
2024-07-07 14:17:15 +02:00
|
|
|
import os
|
2024-07-01 19:36:50 +02:00
|
|
|
import asyncio
|
|
|
|
import threading
|
|
|
|
import logging
|
2024-07-07 16:16:14 +02:00
|
|
|
import signal
|
2024-07-04 20:12:43 +02:00
|
|
|
import aioprocessing
|
2024-07-07 14:17:15 +02:00
|
|
|
from dotenv import load_dotenv
|
2024-06-30 21:04:22 +02:00
|
|
|
|
2024-07-12 22:16:07 +02:00
|
|
|
from src.const import DEFAULT_EXPORT_INTERVAL, DEFAULT_MODE
|
2024-07-01 19:36:50 +02:00
|
|
|
from src.mempool import WebSocketThread, QueueProcessor
|
2024-07-04 20:12:43 +02:00
|
|
|
from src.db import Handler, periodic_export
|
2024-06-30 21:04:22 +02:00
|
|
|
|
|
|
|
|
2024-07-04 20:12:43 +02:00
|
|
|
async def shutdown(loop, signal=None):
|
|
|
|
"""Cleanup tasks tied to the service's shutdown."""
|
|
|
|
if signal:
|
|
|
|
logging.info("Received exit signal %s", signal.name)
|
|
|
|
|
|
|
|
logging.info("Napping for a bit before shutting down...")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
|
|
|
|
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
|
|
|
|
|
|
|
for t in tasks:
|
|
|
|
t.cancel()
|
|
|
|
|
|
|
|
logging.info("Cancelling %d outstanding tasks", len(tasks))
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
|
|
logging.info("Flushing metrics")
|
|
|
|
loop.stop()
|
|
|
|
|
|
|
|
|
2024-07-07 14:17:15 +02:00
|
|
|
def load_cfg(dotenv_path=".env"):
|
|
|
|
load_dotenv(dotenv_path)
|
|
|
|
cfg = {}
|
|
|
|
|
|
|
|
print(f"[+] Environment variables loaded from '{dotenv_path}'\n---")
|
|
|
|
|
|
|
|
cfg["MODE"] = os.getenv("MODE")
|
|
|
|
cfg["EXPORT_INTERVAL"] = os.getenv("EXPORT_INTERVAL")
|
|
|
|
|
|
|
|
if cfg["MODE"] is None:
|
2024-07-12 22:16:07 +02:00
|
|
|
cfg["MODE"] = DEFAULT_MODE
|
2024-07-07 14:17:15 +02:00
|
|
|
|
|
|
|
if cfg["EXPORT_INTERVAL"] is None:
|
2024-07-12 22:16:07 +02:00
|
|
|
cfg["EXPORT_INTERVAL"] = DEFAULT_EXPORT_INTERVAL
|
2024-07-07 14:17:15 +02:00
|
|
|
else:
|
|
|
|
cfg["EXPORT_INTERVAL"] = int(cfg["EXPORT_INTERVAL"])
|
|
|
|
|
|
|
|
return cfg
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
cfg = load_cfg()
|
2024-07-04 20:12:43 +02:00
|
|
|
mode = cfg["MODE"]
|
|
|
|
|
2024-07-07 14:17:15 +02:00
|
|
|
if mode.lower() == "production":
|
2024-07-04 20:12:43 +02:00
|
|
|
log_level = logging.INFO
|
|
|
|
else:
|
|
|
|
log_level = logging.DEBUG
|
|
|
|
|
|
|
|
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level)
|
2024-07-07 14:17:15 +02:00
|
|
|
logging.info("Logger initialized")
|
|
|
|
logging.info("MODE: %s", cfg["MODE"])
|
|
|
|
logging.info("EXPORT_INTERVAL: %d (seconds)", cfg["EXPORT_INTERVAL"])
|
2024-07-04 20:12:43 +02:00
|
|
|
|
|
|
|
# FIFO queue for crosst-thread communications
|
|
|
|
q = aioprocessing.AioQueue()
|
2024-07-01 19:36:50 +02:00
|
|
|
handler = Handler()
|
2024-07-04 20:12:43 +02:00
|
|
|
shutdown_event = threading.Event()
|
|
|
|
|
2024-07-07 14:17:15 +02:00
|
|
|
shutdown_loop = asyncio.new_event_loop()
|
|
|
|
export_loop = asyncio.new_event_loop()
|
|
|
|
|
|
|
|
export_thread = threading.Thread(
|
|
|
|
target=periodic_export,
|
|
|
|
args=(
|
|
|
|
export_loop,
|
|
|
|
handler,
|
|
|
|
cfg["EXPORT_INTERVAL"],
|
|
|
|
shutdown_event,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
export_thread.start()
|
|
|
|
|
2024-07-01 19:36:50 +02:00
|
|
|
ws_thread = WebSocketThread(q, shutdown_event)
|
|
|
|
qp_thread = QueueProcessor(q, shutdown_event, handler)
|
|
|
|
ws_thread.start()
|
|
|
|
qp_thread.start()
|
|
|
|
|
2024-07-07 16:16:14 +02:00
|
|
|
def handle_exit():
|
|
|
|
logging.info("Shutdown procedure initialized")
|
2024-07-01 19:36:50 +02:00
|
|
|
shutdown_event.set()
|
2024-07-07 14:17:15 +02:00
|
|
|
shutdown_loop.run_until_complete(shutdown(shutdown_loop))
|
2024-07-01 19:36:50 +02:00
|
|
|
ws_thread.join()
|
|
|
|
qp_thread.join()
|
2024-07-07 14:17:15 +02:00
|
|
|
export_thread.join()
|
2024-07-07 16:16:14 +02:00
|
|
|
|
|
|
|
def handle_signal(signal, _frame):
|
|
|
|
logging.info("Received signal '%s', shutting down...", signal)
|
|
|
|
handle_exit()
|
|
|
|
|
|
|
|
# SIGINT and SIGTERM signal handler (mainly for Docker)
|
|
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
|
|
|
|
|
|
try:
|
|
|
|
ws_thread.join()
|
|
|
|
qp_thread.join()
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
logging.info("Keyboard interrupt received, shutting down threads.")
|
|
|
|
handle_exit()
|
2024-07-04 20:12:43 +02:00
|
|
|
finally:
|
2024-07-07 14:17:15 +02:00
|
|
|
export_loop.stop()
|
|
|
|
export_loop.close()
|
|
|
|
shutdown_loop.stop()
|
|
|
|
shutdown_loop.close()
|
2024-06-30 21:04:22 +02:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2024-07-01 19:36:50 +02:00
|
|
|
main()
|