feat: initial skeleton with basically finished mempool module

This commit is contained in:
17ms 2024-06-30 22:04:22 +03:00
parent 52aaf92ad9
commit 6a65f80472
Signed by untrusted user who does not match committer: ae
GPG Key ID: 995EFD5C1B532B3E
7 changed files with 146 additions and 2 deletions

3
.gitignore vendored
View File

@ -160,3 +160,6 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# MacOS shit
.DS_Store

View File

@ -1,2 +1,9 @@
# chainmapper
Dockerized script to monitor the Ethereum network for most active sender addresses
# Ethereum network sender address mapper
Script that, once deployed in a Docker container, monitors a live feed of the Ethereum network via a WebSocket connection, stores the sender addresses, and creates statistics of the most active addresses.
## Usage
```shell
$ echo "TODO"
```

15
main.py Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env python3
from dotenv import dotenv_values
def main(cfg):
pass
def dotconfig(path=".env"):
return dotenv_values(path)
if __name__ == "__main__":
main(dotconfig())

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
python-dotenv==1.0.1
websocket_client==1.8.0

1
src/const.py Normal file
View File

@ -0,0 +1 @@
WS_ADDR = "wss://ws.blockchain.info/coins"

42
src/db.py Normal file
View File

@ -0,0 +1,42 @@
import sqlite3
class Handler:
def __init__(self, database="chainmapper.sql"):
self.database = database
# Notably `connect` automatically creates the database if it doesn't already exist
self.con = sqlite3.connect(self.database)
self.cursor = self.con.cursor()
# Initialize the table if necessary
self.cursor.execute(
"""
CREATE TABLE IF NOT EXISTS AddressMapping (
id INTEGER PRIMARY KEY AUTOINCREMENT,
address TEXT NOT NULL UNIQUE,
total_tx_count INTEGER NOT NULL DEFAULT 1,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
)
self.con.commit()
def store(self, address):
self.cursor.execute(
"""
INSERT INTO AddressTracking (address)
VALUES
(?) ON CONFLICT(address) DO
UPDATE
SET
total_tx_count = total_tx_count + 1,
last_updated = CURRENT_TIMESTAMP;
""",
address,
)
self.con.commit()
def get_ordered(self):
# TODO: return addresses in descending order (based on `total_tx_count`)
pass

74
src/mempool.py Normal file
View File

@ -0,0 +1,74 @@
import asyncio
import json
import threading
import websocket
from const import WS_ADDR
# FIFO queue for cross-thread communications
tx_queue = asyncio.Queue()
tx_count = 0
async def process_queue():
"""Handles emptying the transaction queue and calling the database module with the received data."""
while True:
# TODO: handle graceful shutdown
tx_sender = tx_queue.get()
# TODO: send `tx_sender` to the db module
tx_count += 1
tx_queue.task_done()
def on_message(_, msg, loop):
msg_json = json.loads(msg)
try:
tx_sender = msg_json["transaction"]["from"]
except KeyError as e:
# TODO: log the seen KeyError `e` & handle what happens next (i.e. proper error handling)?
return
future = asyncio.run_coroutine_threadsafe(tx_queue.put(tx_sender), loop)
future.result() # Won't timeout
def on_error(_, err):
# TODO: error handling
exit(1)
def on_close(_, status_code, msg):
# TODO: log `status_code` & `msg`
pass
def on_open(ws):
# TODO: log "Connection opened"
# Subscribed entity could also be `pending_transactions` to receive the transactions directly
# from the mempool.
ws.send(json.dumps({"coin": "eth", "command": "subscribe", "entity": "confirmed_transaction"}))
# TODO: log "Subscription message sent"
async def start_monitor():
"""Connects to the WebSocket feed of mined Ethereum transactions"""
queue_processor = asyncio.create_task(process_queue())
loop = asyncio.get_event_loop()
ws = websocket.WebSocketApp(
WS_ADDR,
on_open=on_open,
on_message=lambda ws, msg: on_message(ws, msg, loop),
on_error=on_error,
on_close=on_close,
)
# Run the WebSocket client in a separate thread
# TODO: replace `run_forever` with something that can be signaled to shutdown gracefully
ws_thread = threading.Thread(target=ws.run_forever)
ws_thread.start()
# Wait for the processor to finish cleaning up the queue before shutting down
await queue_processor()