diff --git a/.gitignore b/.gitignore index 82f9275..523dc31 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# MacOS shit +.DS_Store diff --git a/README.md b/README.md index f74a1dc..e72082c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,9 @@ -# chainmapper -Dockerized script to monitor the Ethereum network for most active sender addresses +# Ethereum network sender address mapper + +Script that, once deployed in a Docker container, monitors a live feed of the Ethereum network via a WebSocket connection, stores the sender addresses, and creates statistics of the most active addresses. + +## Usage + +```shell +$ echo "TODO" +``` diff --git a/main.py b/main.py new file mode 100644 index 0000000..69e6138 --- /dev/null +++ b/main.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 + +from dotenv import dotenv_values + + +def main(cfg): + pass + + +def dotconfig(path=".env"): + return dotenv_values(path) + + +if __name__ == "__main__": + main(dotconfig()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9f78592 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +python-dotenv==1.0.1 +websocket_client==1.8.0 diff --git a/src/const.py b/src/const.py new file mode 100644 index 0000000..f1b42c7 --- /dev/null +++ b/src/const.py @@ -0,0 +1 @@ +WS_ADDR = "wss://ws.blockchain.info/coins" diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..ba91067 --- /dev/null +++ b/src/db.py @@ -0,0 +1,42 @@ +import sqlite3 + + +class Handler: + def __init__(self, database="chainmapper.sql"): + self.database = database + # Notably `connect` automatically creates the database if it doesn't already exist + self.con = sqlite3.connect(self.database) + self.cursor = self.con.cursor() + + # Initialize the table if necessary + self.cursor.execute( + """ + CREATE TABLE IF NOT EXISTS AddressMapping ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + address TEXT NOT NULL UNIQUE, + total_tx_count INTEGER NOT NULL DEFAULT 1, + last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + """ + ) + + self.con.commit() + + def store(self, address): + self.cursor.execute( + """ + INSERT INTO AddressTracking (address) + VALUES + (?) ON CONFLICT(address) DO + UPDATE + SET + total_tx_count = total_tx_count + 1, + last_updated = CURRENT_TIMESTAMP; + """, + address, + ) + self.con.commit() + + def get_ordered(self): + # TODO: return addresses in descending order (based on `total_tx_count`) + pass diff --git a/src/mempool.py b/src/mempool.py new file mode 100644 index 0000000..cadaeb4 --- /dev/null +++ b/src/mempool.py @@ -0,0 +1,74 @@ +import asyncio +import json +import threading +import websocket + +from const import WS_ADDR + +# FIFO queue for cross-thread communications +tx_queue = asyncio.Queue() +tx_count = 0 + + +async def process_queue(): + """Handles emptying the transaction queue and calling the database module with the received data.""" + while True: + # TODO: handle graceful shutdown + tx_sender = tx_queue.get() + # TODO: send `tx_sender` to the db module + tx_count += 1 + tx_queue.task_done() + + +def on_message(_, msg, loop): + msg_json = json.loads(msg) + + try: + tx_sender = msg_json["transaction"]["from"] + except KeyError as e: + # TODO: log the seen KeyError `e` & handle what happens next (i.e. proper error handling)? + return + + future = asyncio.run_coroutine_threadsafe(tx_queue.put(tx_sender), loop) + future.result() # Won't timeout + + +def on_error(_, err): + # TODO: error handling + exit(1) + + +def on_close(_, status_code, msg): + # TODO: log `status_code` & `msg` + pass + + +def on_open(ws): + # TODO: log "Connection opened" + + # Subscribed entity could also be `pending_transactions` to receive the transactions directly + # from the mempool. + ws.send(json.dumps({"coin": "eth", "command": "subscribe", "entity": "confirmed_transaction"})) + + # TODO: log "Subscription message sent" + + +async def start_monitor(): + """Connects to the WebSocket feed of mined Ethereum transactions""" + queue_processor = asyncio.create_task(process_queue()) + loop = asyncio.get_event_loop() + ws = websocket.WebSocketApp( + WS_ADDR, + on_open=on_open, + on_message=lambda ws, msg: on_message(ws, msg, loop), + on_error=on_error, + on_close=on_close, + ) + + # Run the WebSocket client in a separate thread + # TODO: replace `run_forever` with something that can be signaled to shutdown gracefully + ws_thread = threading.Thread(target=ws.run_forever) + ws_thread.start() + + # Wait for the processor to finish cleaning up the queue before shutting down + await queue_processor()