Compare commits

..

10 Commits

8 changed files with 110 additions and 46 deletions

View File

@ -1,15 +1,23 @@
# Ethereum network sender address mapper # Ethereum network sender address mapper
Script that, once deployed in a Docker container, monitors a live feed of the Ethereum network via a WebSocket connection, stores the sender addresses with transaction counts, and creates statistics of the most active addresses. This script, designed for deployment in a Docker container, monitors the Ethereum network's mempool via a WebSocket connection. It tracks sender addresses and their transaction counts, generating a list of the most active addresses. This data can be used to black-/whitelist addresses in other research projects, ensuring targeting is done based on up-to-date information.
## Configuration
A list of the possible environment variables and their purpose:
- `MODE`: Either `development` or `production`, the logging level is set based on this
- `EXPORT_INTERVAL`: The interval of how often the SQLite database is exported as a JSON file, does nothing if `IS_EXPORT` is not true
- Notably set as a string (similar to all environment variables)
- `IS_EXPORT`: Boolean that indicates whether the aforementioned export task is enabled or not
- Possible values that are interpreted as `True` (case insensitive): `true`, `1`, and `t`
## Development ## Development
Most critically `MODE=development` should be specified, as it sets the logging level from `INFO` to `DEBUG`. Low `EXPORT_INTERVAL` should be used for testing the export functionality (obviously).
```shell ```shell
mkvirtualenv chainmapper # OR 'workon chainmapper' mkvirtualenv chainmapper # OR 'workon chainmapper'
pip3 install -r requirements.txt pip3 install -r requirements.txt
touch .env && echo -e "MODE=\"development\"\nEXPORT_INTERVAL=\"60\"" > .env # 60 seconds export period for testing touch .env # Optional, see the previous section
``` ```
## Usage ## Usage
@ -18,5 +26,13 @@ The included `deploy.sh` shellscript should be used for any kind of (development
```shell ```shell
chmod +x ./scripts/deploy.sh chmod +x ./scripts/deploy.sh
# Add `-y` flag to automatically overwrite existing containers with the same name
./scripts/deploy.sh ./scripts/deploy.sh
``` ```
Use the following command if you wish to proxy the WebSocket connection:
```shell
# Proxy format: <protocol>://<ip>:<port>
./scripts/deploy.sh -p <proxy>
```

41
main.py
View File

@ -1,17 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys
import asyncio import asyncio
import threading import threading
import logging import logging
import signal import signal
from collections import namedtuple from collections import namedtuple
import sqlite3
import aioprocessing import aioprocessing
import websockets
from dotenv import load_dotenv from dotenv import load_dotenv
import requests
from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT from src import const, mempool, db
from src.mempool import WebSocketThread, QueueProcessor
from src.db import Handler, periodic_export
Config = namedtuple("Config", ["mode", "export_interval", "is_export"]) Config = namedtuple("Config", ["mode", "export_interval", "is_export"])
@ -41,15 +43,19 @@ def load_cfg(dotenv_path=".env"):
load_dotenv(dotenv_path) load_dotenv(dotenv_path)
print(f"[+] Environment variables loaded from '{dotenv_path}'\n---") print(f"[+] Environment variables loaded from '{dotenv_path}'\n---")
mode = os.getenv("MODE", DEFAULT_MODE).lower() mode = os.getenv("MODE", const.DEFAULT_MODE).lower()
export_interval = int(os.getenv("EXPORT_INTERVAL", DEFAULT_EXPORT_INTERVAL)) export_interval = int(os.getenv("EXPORT_INTERVAL", const.DEFAULT_EXPORT_INTERVAL))
is_export = os.getenv("IS_EXPORT", DEFAULT_IS_EXPORT).lower() in ("true", "1", "t") is_export = os.getenv("IS_EXPORT", const.DEFAULT_IS_EXPORT).lower() in ("true", "1", "t")
cfg = Config(mode, export_interval, is_export) cfg = Config(mode, export_interval, is_export)
return cfg return cfg
def get_ip(addr=const.IP_TEST_ADDR):
return requests.get(addr, timeout=10).text.strip()
def main(): def main():
cfg = load_cfg() cfg = load_cfg()
@ -60,23 +66,31 @@ def main():
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level) logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level)
logging.info("Logger initialized") logging.info("Logger initialized")
logging.info("Currently running version %s", const.VERSION)
logging.info("MODE: %s", cfg.mode) logging.info("MODE: %s", cfg.mode)
logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval) logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval)
logging.info("IS_EXPORT: %r", cfg.is_export) logging.info("IS_EXPORT: %r", cfg.is_export)
logging.info("IP: %s", get_ip())
# Information for debugging issues caused by potential version differences
logging.info("Python version: %s", sys.version)
logging.info("aioprocessing version: %s", aioprocessing.__version__)
logging.info("websockets version: %s", websockets.__version__)
logging.info("sqlite3 version: %s", sqlite3.version)
# FIFO queue for cross-thread communications # FIFO queue for cross-thread communications
q = aioprocessing.AioQueue() q = aioprocessing.AioQueue()
handler = Handler() handler = db.Handler()
shutdown_event = threading.Event() shutdown_event = threading.Event()
shutdown_loop = asyncio.new_event_loop() shutdown_loop = asyncio.new_event_loop()
export_loop = asyncio.new_event_loop() export_loop = asyncio.new_event_loop()
ws_thread = WebSocketThread(q, shutdown_event) ws_thread = mempool.WebSocketThread(q, shutdown_event)
qp_thread = QueueProcessor(q, shutdown_event, handler) qp_thread = mempool.QueueProcessor(q, shutdown_event, handler)
export_thread = threading.Thread( export_thread = threading.Thread(
target=periodic_export, target=db.periodic_export,
args=( args=(
export_loop, export_loop,
handler, handler,
@ -92,10 +106,13 @@ def main():
def handle_exit(): def handle_exit():
logging.info("Shutdown procedure initialized") logging.info("Shutdown procedure initialized")
shutdown_event.set() shutdown_event.set()
shutdown_loop.run_until_complete(shutdown(shutdown_loop)) shutdown_loop.run_until_complete(shutdown(shutdown_loop))
ws_thread.join()
# NOTE: It's vital to close the queue processor first so that it doesn't halt the shutdown
qp_thread.join() qp_thread.join()
ws_thread.join()
export_thread.join() export_thread.join()
def handle_signal(signal, _frame): def handle_signal(signal, _frame):
@ -107,8 +124,8 @@ def main():
signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGTERM, handle_signal)
try: try:
ws_thread.join()
qp_thread.join() qp_thread.join()
ws_thread.join()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("Keyboard interrupt received, shutting down threads") logging.info("Keyboard interrupt received, shutting down threads")
handle_exit() handle_exit()

View File

@ -1,3 +1,4 @@
aioprocessing==2.0.1 aioprocessing==2.0.1
python-dotenv==1.0.1 python-dotenv==1.0.1
Requests==2.32.3
websockets==12.0 websockets==12.0

View File

@ -1,23 +1,61 @@
#!/usr/bin/env bash #!/usr/bin/env bash
echo "[+] Starting the deployment script" AUTOREMOVE=false
VOLUME_PATH="./data" # Local path to the volume's mount point
IS_PROXIED=false
PROXY=""
while getopts ":hyp:" opt
do
case "$opt" in
h)
echo "Usage: $0 [-y]"
exit 0
;;
y)
echo -e "[+] Automatically removing all containers with the same tag (if any)\n"
AUTOREMOVE=true
;;
p)
IS_PROXIED=true
PROXY=${OPTARG}
echo -e "[+] Proxying enabled: $PROXY"
;;
*)
exit 1
;;
esac
done
! command -v docker &> /dev/null && echo "[!] Docker could not be found, exiting..." && exit 1 ! command -v docker &> /dev/null && echo "[!] Docker could not be found, exiting..." && exit 1
# Building with '--no-cache' ensures a fresh build will always be used # Building with '--no-cache' ensures a fresh build will always be used
echo -e "\n[+] Building the Docker image without caching..." echo -e "[+] Building the Docker image without caching...\n"
docker build --no-cache -t chainmapper . docker build --no-cache -t chainmapper .
[ ! -d "./data" ] && mkdir data && echo -e "\n[+] Created the default volume directory 'data'" [ ! -d "./data" ] && mkdir data && echo -e "\n[+] Created the default volume directory 'data'"
OLD_ID=$(docker ps -a -q -f name="chainmapper-prod") OLD_ID=$(docker ps -a -q -f name="chainmapper")
if [ "$OLD_ID" ] if [ "$OLD_ID" ] && [ "$AUTOREMOVE" = true ]
then
echo -e "\n[+] Removing existing container with the same tag ($OLD_ID)"
docker stop "$OLD_ID" &> /dev/null
docker rm "$OLD_ID" &> /dev/null
elif [ "$OLD_ID" ]
then then
read -p "[?] Existing container found with id '$OLD_ID', do you want to remove it? " -n 1 -r read -p "[?] Existing container found with id '$OLD_ID', do you want to remove it? " -n 1 -r
[[ "$REPLY" =~ ^[Yy]$ ]] || (echo "[!] Exiting..." && exit 0) [[ "$REPLY" =~ ^[Yy]$ ]] || (echo "[!] Exiting..." && exit 0)
docker stop "$OLD_ID" &> /dev/null
docker rm "$OLD_ID" &> /dev/null docker rm "$OLD_ID" &> /dev/null
fi fi
echo -e "\n[+] Deploying the container with 'docker run' ('data' as the volume)..." echo -e "\n[+] Deploying the container with 'docker run' ('data' as the volume)..."
docker run -it --restart unless-stopped -v ./data:/app/data --name chainmapper-prod -d chainmapper
if [ "$IS_PROXIED" = true ]
then
# Override the default entrypoint to run the connections through the given proxy
docker run -it --restart unless-stopped -v $VOLUME_PATH:/app/data --name chainmapper --entrypoint /bin/bash -d chainmapper -c "HTTPS_PROXY=$PROXY python main.py"
else
docker run -it --restart unless-stopped -v $VOLUME_PATH:/app/data --name chainmapper -d chainmapper
fi

View File

@ -2,7 +2,7 @@
# Included into the built image via its Dockerfile # Included into the built image via its Dockerfile
DB_FILE="/app/chainmapper.sqlite3" DB_FILE="/app/data/chainmapper.sqlite3"
if [ -s $DB_FILE ]; then if [ -s $DB_FILE ]; then
exit 0 exit 0

View File

@ -1,5 +1,7 @@
import json import json
VERSION = "v1.1.0"
# Blockchain.com endpoint and the subscription message which initializes the "transaction stream" # Blockchain.com endpoint and the subscription message which initializes the "transaction stream"
WS_ADDR = "wss://ws.blockchain.info/coins" WS_ADDR = "wss://ws.blockchain.info/coins"
# Optionally `confirmed_transaction` can be used (bursts of data instead of a steady stream, which is worse for the overall performance) # Optionally `confirmed_transaction` can be used (bursts of data instead of a steady stream, which is worse for the overall performance)
@ -8,12 +10,6 @@ SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "pending_
# Pause before reconnecting after the WebSocket connection is accidentally dropped by either party # Pause before reconnecting after the WebSocket connection is accidentally dropped by either party
WS_RECONNECT_PAUSE = 2 WS_RECONNECT_PAUSE = 2
# Timeout for asynchronous WebSocket reading (seconds)
WS_INTERMSG_TIMEOUT = 1
# Timeout for asynchronous queue operations (`coro_get` and `coro_put`, seconds)
QUEUE_OP_TIMEOUT = 1
# Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`) # Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`)
DEFAULT_DB_PATH = "./data/chainmapper.sqlite3" DEFAULT_DB_PATH = "./data/chainmapper.sqlite3"
DEFAULT_EXPORT_PATH = "./data/export.json" DEFAULT_EXPORT_PATH = "./data/export.json"
@ -22,3 +18,5 @@ DEFAULT_EXPORT_PATH = "./data/export.json"
DEFAULT_MODE = "production" DEFAULT_MODE = "production"
DEFAULT_EXPORT_INTERVAL = "10800" DEFAULT_EXPORT_INTERVAL = "10800"
DEFAULT_IS_EXPORT = "False" DEFAULT_IS_EXPORT = "False"
IP_TEST_ADDR = "https://ipv4.icanhazip.com"

View File

@ -4,13 +4,13 @@ import logging
import threading import threading
import asyncio import asyncio
from src.const import DEFAULT_DB_PATH, DEFAULT_EXPORT_PATH from src import const
class Handler: class Handler:
"""Handle all SQLite connections required to create, update, and export the stored addresses.""" """Handle all SQLite connections required to create, update, and export the stored addresses."""
def __init__(self, database=DEFAULT_DB_PATH): def __init__(self, database=const.DEFAULT_DB_PATH):
self.database = database self.database = database
# Notably `connect` automatically creates the database if it doesn't already exist # Notably `connect` automatically creates the database if it doesn't already exist
self.con = sqlite3.connect(self.database, check_same_thread=False) self.con = sqlite3.connect(self.database, check_same_thread=False)
@ -52,7 +52,7 @@ class Handler:
) )
self.con.commit() self.con.commit()
async def export(self, filepath=DEFAULT_EXPORT_PATH): async def export(self, filepath=const.DEFAULT_EXPORT_PATH):
"""Export the addresses from the SQLite database in descending order based on the transaction counts.""" """Export the addresses from the SQLite database in descending order based on the transaction counts."""
with self.lock: with self.lock:
logging.debug("Reentrant lock acquired") logging.debug("Reentrant lock acquired")

View File

@ -4,13 +4,13 @@ import threading
import logging import logging
import websockets import websockets
from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT, QUEUE_OP_TIMEOUT from src import const
class WebSocketThread(threading.Thread): class WebSocketThread(threading.Thread):
"""Handle connection, subscription, and message parsing for the Blockchain.com WebSocket.""" """Handle connection, subscription, and message parsing for the Blockchain.com WebSocket."""
def __init__(self, q, shutdown_event, sub_msg=SUB_MSG): def __init__(self, q, shutdown_event, sub_msg=const.SUB_MSG):
super().__init__() super().__init__()
self.name = "WebSocketThread" self.name = "WebSocketThread"
self.q = q self.q = q
@ -19,9 +19,7 @@ class WebSocketThread(threading.Thread):
self.tx_count = 0 self.tx_count = 0
async def connect(self): async def connect(self):
async with websockets.connect(WS_ADDR) as ws: async with websockets.connect(const.WS_ADDR) as ws:
logging.info("Inter message timeout set to %d seconds", WS_INTERMSG_TIMEOUT)
logging.info("WebSocket connection established successfully") logging.info("WebSocket connection established successfully")
await ws.send(self.sub_msg) await ws.send(self.sub_msg)
logging.info("Subscription message sent") logging.info("Subscription message sent")
@ -32,24 +30,22 @@ class WebSocketThread(threading.Thread):
while not self.shutdown_event.is_set(): while not self.shutdown_event.is_set():
try: try:
# Timeout is necessary to make sure the state of the shutdown event is checked often enough msg = await ws.recv()
msg = await asyncio.wait_for(ws.recv(), timeout=WS_INTERMSG_TIMEOUT)
data = self.handle_msg(msg) data = self.handle_msg(msg)
if data is None: if data is None:
continue continue
# This shouldn't really be an issue, but it's safer to set a timeout here too... await self.q.coro_put(data)
await asyncio.wait_for(self.q.coro_put(data), timeout=QUEUE_OP_TIMEOUT)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logging.debug("WebSocket receiver timed out before fetching a new message, reattempting") logging.debug("WebSocket receiver timed out before fetching a new message, reattempting")
continue continue
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
logging.info( logging.info(
"WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection", "WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection",
WS_RECONNECT_PAUSE, const.WS_RECONNECT_PAUSE,
) )
await asyncio.sleep(WS_RECONNECT_PAUSE) await asyncio.sleep(const.WS_RECONNECT_PAUSE)
break break
# pylint: disable=broad-exception-caught # pylint: disable=broad-exception-caught
except Exception as e: except Exception as e:
@ -104,12 +100,10 @@ class QueueProcessor(threading.Thread):
self.handler = handler self.handler = handler
async def process_queue(self): async def process_queue(self):
logging.info("Queue operations timeout set to %d seconds", QUEUE_OP_TIMEOUT)
while not self.shutdown_event.is_set(): while not self.shutdown_event.is_set():
try: try:
# Timeout is necessary to make sure the state of the shutdown event is checked often enough # Might prevent a proper shutdown procedure if the queue feeder is closed before the processor
tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=QUEUE_OP_TIMEOUT) tx_sender = await self.q.coro_get()
await self.handler.store(tx_sender) await self.handler.store(tx_sender)
# pylint: disable=broad-exception-caught # pylint: disable=broad-exception-caught
except asyncio.TimeoutError: except asyncio.TimeoutError: