fix(closes #1): inter message timeouts and improved debug logging
This commit is contained in:
parent
ba2140b11f
commit
43c38656dd
9
main.py
9
main.py
@ -75,6 +75,8 @@ def main():
|
|||||||
shutdown_loop = asyncio.new_event_loop()
|
shutdown_loop = asyncio.new_event_loop()
|
||||||
export_loop = asyncio.new_event_loop()
|
export_loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
|
ws_thread = WebSocketThread(q, shutdown_event)
|
||||||
|
qp_thread = QueueProcessor(q, shutdown_event, handler)
|
||||||
export_thread = threading.Thread(
|
export_thread = threading.Thread(
|
||||||
target=periodic_export,
|
target=periodic_export,
|
||||||
args=(
|
args=(
|
||||||
@ -84,12 +86,10 @@ def main():
|
|||||||
shutdown_event,
|
shutdown_event,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
export_thread.start()
|
|
||||||
|
|
||||||
ws_thread = WebSocketThread(q, shutdown_event)
|
|
||||||
qp_thread = QueueProcessor(q, shutdown_event, handler)
|
|
||||||
ws_thread.start()
|
ws_thread.start()
|
||||||
qp_thread.start()
|
qp_thread.start()
|
||||||
|
export_thread.start()
|
||||||
|
|
||||||
def handle_exit():
|
def handle_exit():
|
||||||
logging.info("Shutdown procedure initialized")
|
logging.info("Shutdown procedure initialized")
|
||||||
@ -111,13 +111,14 @@ def main():
|
|||||||
ws_thread.join()
|
ws_thread.join()
|
||||||
qp_thread.join()
|
qp_thread.join()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.info("Keyboard interrupt received, shutting down threads.")
|
logging.info("Keyboard interrupt received, shutting down threads")
|
||||||
handle_exit()
|
handle_exit()
|
||||||
finally:
|
finally:
|
||||||
export_loop.stop()
|
export_loop.stop()
|
||||||
export_loop.close()
|
export_loop.close()
|
||||||
shutdown_loop.stop()
|
shutdown_loop.stop()
|
||||||
shutdown_loop.close()
|
shutdown_loop.close()
|
||||||
|
logging.info("Shutdown sequence completed successfully!")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
12
src/db.py
12
src/db.py
@ -1,6 +1,7 @@
|
|||||||
import sqlite3
|
import sqlite3
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from src.const import DEFAULT_EXPORT_PATH
|
from src.const import DEFAULT_EXPORT_PATH
|
||||||
@ -14,6 +15,7 @@ class Handler:
|
|||||||
# Notably `connect` automatically creates the database if it doesn't already exist
|
# Notably `connect` automatically creates the database if it doesn't already exist
|
||||||
self.con = sqlite3.connect(self.database, check_same_thread=False)
|
self.con = sqlite3.connect(self.database, check_same_thread=False)
|
||||||
self.cursor = self.con.cursor()
|
self.cursor = self.con.cursor()
|
||||||
|
self.lock = threading.RLock()
|
||||||
|
|
||||||
# Initialize the table if necessary
|
# Initialize the table if necessary
|
||||||
self.cursor.execute(
|
self.cursor.execute(
|
||||||
@ -31,7 +33,9 @@ class Handler:
|
|||||||
|
|
||||||
async def store(self, address):
|
async def store(self, address):
|
||||||
"""Store a new address into the SQLite database, or increments the counter by one if the given address already exists in the database."""
|
"""Store a new address into the SQLite database, or increments the counter by one if the given address already exists in the database."""
|
||||||
await asyncio.to_thread(self._store, address)
|
with self.lock:
|
||||||
|
logging.debug("Reentrant lock acquired")
|
||||||
|
await asyncio.to_thread(self._store, address)
|
||||||
|
|
||||||
def _store(self, address):
|
def _store(self, address):
|
||||||
self.cursor.execute(
|
self.cursor.execute(
|
||||||
@ -50,7 +54,9 @@ class Handler:
|
|||||||
|
|
||||||
async def export(self, filepath=DEFAULT_EXPORT_PATH):
|
async def export(self, filepath=DEFAULT_EXPORT_PATH):
|
||||||
"""Export the addresses from the SQLite database in descending order based on the transaction counts."""
|
"""Export the addresses from the SQLite database in descending order based on the transaction counts."""
|
||||||
await asyncio.to_thread(self._export, filepath)
|
with self.lock:
|
||||||
|
logging.debug("Reentrant lock acquired")
|
||||||
|
await asyncio.to_thread(self._export, filepath)
|
||||||
|
|
||||||
def _export(self, filepath):
|
def _export(self, filepath):
|
||||||
self.cursor.execute(
|
self.cursor.execute(
|
||||||
@ -88,5 +94,7 @@ def periodic_export(loop, handler, interval, shutdown_event):
|
|||||||
await handler.export()
|
await handler.export()
|
||||||
elapsed = 0
|
elapsed = 0
|
||||||
|
|
||||||
|
logging.info("Periodic export thread quitting")
|
||||||
|
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
loop.run_until_complete(task(handler, interval, shutdown_event))
|
loop.run_until_complete(task(handler, interval, shutdown_event))
|
||||||
|
@ -4,7 +4,7 @@ import threading
|
|||||||
import logging
|
import logging
|
||||||
import websockets
|
import websockets
|
||||||
|
|
||||||
from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE
|
from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT
|
||||||
|
|
||||||
|
|
||||||
class WebSocketThread(threading.Thread):
|
class WebSocketThread(threading.Thread):
|
||||||
@ -20,6 +20,8 @@ class WebSocketThread(threading.Thread):
|
|||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
async with websockets.connect(WS_ADDR) as ws:
|
async with websockets.connect(WS_ADDR) as ws:
|
||||||
|
logging.info("Inter message timeout set to %d seconds", WS_INTERMSG_TIMEOUT)
|
||||||
|
|
||||||
logging.info("WebSocket connection established successfully")
|
logging.info("WebSocket connection established successfully")
|
||||||
await ws.send(self.sub_msg)
|
await ws.send(self.sub_msg)
|
||||||
logging.info("Subscription message sent")
|
logging.info("Subscription message sent")
|
||||||
@ -30,13 +32,17 @@ class WebSocketThread(threading.Thread):
|
|||||||
|
|
||||||
while not self.shutdown_event.is_set():
|
while not self.shutdown_event.is_set():
|
||||||
try:
|
try:
|
||||||
msg = await ws.recv()
|
# Timeout is necessary to make sure the state of the shutdown event is checked often enough
|
||||||
|
msg = await asyncio.wait_for(ws.recv(), timeout=WS_INTERMSG_TIMEOUT)
|
||||||
data = self.handle_msg(msg)
|
data = self.handle_msg(msg)
|
||||||
|
|
||||||
if data is None:
|
if data is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
await self.q.coro_put(data)
|
await self.q.coro_put(data)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logging.debug("WebSocket receiver timed out before fetching a new message, reattempting")
|
||||||
|
continue
|
||||||
except websockets.exceptions.ConnectionClosed:
|
except websockets.exceptions.ConnectionClosed:
|
||||||
logging.info(
|
logging.info(
|
||||||
"WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection",
|
"WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection",
|
||||||
@ -50,6 +56,8 @@ class WebSocketThread(threading.Thread):
|
|||||||
self.shutdown_event.set()
|
self.shutdown_event.set()
|
||||||
break
|
break
|
||||||
|
|
||||||
|
logging.info("WebSocket thread quitting")
|
||||||
|
|
||||||
def handle_msg(self, msg):
|
def handle_msg(self, msg):
|
||||||
msg_json = json.loads(msg)
|
msg_json = json.loads(msg)
|
||||||
|
|
||||||
@ -83,6 +91,8 @@ class WebSocketThread(threading.Thread):
|
|||||||
finally:
|
finally:
|
||||||
loop.close()
|
loop.close()
|
||||||
|
|
||||||
|
logging.info("WebSocket thread quitting without attempting to reconnect")
|
||||||
|
|
||||||
|
|
||||||
class QueueProcessor(threading.Thread):
|
class QueueProcessor(threading.Thread):
|
||||||
"""Handle processing of items from the cross-thread queue where the WebSocket thread feeds data into."""
|
"""Handle processing of items from the cross-thread queue where the WebSocket thread feeds data into."""
|
||||||
@ -97,14 +107,20 @@ class QueueProcessor(threading.Thread):
|
|||||||
async def process_queue(self):
|
async def process_queue(self):
|
||||||
while not self.shutdown_event.is_set():
|
while not self.shutdown_event.is_set():
|
||||||
try:
|
try:
|
||||||
tx_sender = await self.q.coro_get() # Waits here until new msg is available
|
# Timeout is necessary to make sure the state of the shutdown event is checked often enough
|
||||||
|
tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=WS_INTERMSG_TIMEOUT)
|
||||||
await self.handler.store(tx_sender)
|
await self.handler.store(tx_sender)
|
||||||
# pylint: disable=broad-exception-caught
|
# pylint: disable=broad-exception-caught
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logging.debug("Queue processor timed out before fetching a new message, reattempting")
|
||||||
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error("QueueProcessor thread crashed: %s", str(e))
|
logging.error("QueueProcessor thread crashed: %s", str(e))
|
||||||
self.shutdown_event.set()
|
self.shutdown_event.set()
|
||||||
break
|
break
|
||||||
|
|
||||||
|
logging.info("Queue processor thread quitting")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Start the queue processing thread that'll run until it receives a shutdown message or crashes."""
|
"""Start the queue processing thread that'll run until it receives a shutdown message or crashes."""
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
|
Loading…
Reference in New Issue
Block a user