Compare commits
10 Commits
340f199b3a
...
a292a48877
Author | SHA1 | Date | |
---|---|---|---|
|
a292a48877 | ||
|
e96b9739b8 | ||
|
ec38a2c35b | ||
|
6dea9fad14 | ||
|
9475280902 | ||
|
757910642e | ||
|
50efda3324 | ||
|
15eed24abb | ||
|
3c6a54aeee | ||
|
1547cd2c6e |
24
README.md
24
README.md
@ -1,15 +1,23 @@
|
|||||||
# Ethereum network sender address mapper
|
# Ethereum network sender address mapper
|
||||||
|
|
||||||
Script that, once deployed in a Docker container, monitors a live feed of the Ethereum network via a WebSocket connection, stores the sender addresses with transaction counts, and creates statistics of the most active addresses.
|
This script, designed for deployment in a Docker container, monitors the Ethereum network's mempool via a WebSocket connection. It tracks sender addresses and their transaction counts, generating a list of the most active addresses. This data can be used to black-/whitelist addresses in other research projects, ensuring targeting is done based on up-to-date information.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
A list of the possible environment variables and their purpose:
|
||||||
|
|
||||||
|
- `MODE`: Either `development` or `production`, the logging level is set based on this
|
||||||
|
- `EXPORT_INTERVAL`: The interval of how often the SQLite database is exported as a JSON file, does nothing if `IS_EXPORT` is not true
|
||||||
|
- Notably set as a string (similar to all environment variables)
|
||||||
|
- `IS_EXPORT`: Boolean that indicates whether the aforementioned export task is enabled or not
|
||||||
|
- Possible values that are interpreted as `True` (case insensitive): `true`, `1`, and `t`
|
||||||
|
|
||||||
## Development
|
## Development
|
||||||
|
|
||||||
Most critically `MODE=development` should be specified, as it sets the logging level from `INFO` to `DEBUG`. Low `EXPORT_INTERVAL` should be used for testing the export functionality (obviously).
|
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
mkvirtualenv chainmapper # OR 'workon chainmapper'
|
mkvirtualenv chainmapper # OR 'workon chainmapper'
|
||||||
pip3 install -r requirements.txt
|
pip3 install -r requirements.txt
|
||||||
touch .env && echo -e "MODE=\"development\"\nEXPORT_INTERVAL=\"60\"" > .env # 60 seconds export period for testing
|
touch .env # Optional, see the previous section
|
||||||
```
|
```
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
@ -18,5 +26,13 @@ The included `deploy.sh` shellscript should be used for any kind of (development
|
|||||||
|
|
||||||
```shell
|
```shell
|
||||||
chmod +x ./scripts/deploy.sh
|
chmod +x ./scripts/deploy.sh
|
||||||
|
# Add `-y` flag to automatically overwrite existing containers with the same name
|
||||||
./scripts/deploy.sh
|
./scripts/deploy.sh
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Use the following command if you wish to proxy the WebSocket connection:
|
||||||
|
|
||||||
|
```shell
|
||||||
|
# Proxy format: <protocol>://<ip>:<port>
|
||||||
|
./scripts/deploy.sh -p <proxy>
|
||||||
|
```
|
||||||
|
41
main.py
41
main.py
@ -1,17 +1,19 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
import asyncio
|
import asyncio
|
||||||
import threading
|
import threading
|
||||||
import logging
|
import logging
|
||||||
import signal
|
import signal
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
import sqlite3
|
||||||
import aioprocessing
|
import aioprocessing
|
||||||
|
import websockets
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
import requests
|
||||||
|
|
||||||
from src.const import DEFAULT_MODE, DEFAULT_EXPORT_INTERVAL, DEFAULT_IS_EXPORT
|
from src import const, mempool, db
|
||||||
from src.mempool import WebSocketThread, QueueProcessor
|
|
||||||
from src.db import Handler, periodic_export
|
|
||||||
|
|
||||||
Config = namedtuple("Config", ["mode", "export_interval", "is_export"])
|
Config = namedtuple("Config", ["mode", "export_interval", "is_export"])
|
||||||
|
|
||||||
@ -41,15 +43,19 @@ def load_cfg(dotenv_path=".env"):
|
|||||||
load_dotenv(dotenv_path)
|
load_dotenv(dotenv_path)
|
||||||
print(f"[+] Environment variables loaded from '{dotenv_path}'\n---")
|
print(f"[+] Environment variables loaded from '{dotenv_path}'\n---")
|
||||||
|
|
||||||
mode = os.getenv("MODE", DEFAULT_MODE).lower()
|
mode = os.getenv("MODE", const.DEFAULT_MODE).lower()
|
||||||
export_interval = int(os.getenv("EXPORT_INTERVAL", DEFAULT_EXPORT_INTERVAL))
|
export_interval = int(os.getenv("EXPORT_INTERVAL", const.DEFAULT_EXPORT_INTERVAL))
|
||||||
is_export = os.getenv("IS_EXPORT", DEFAULT_IS_EXPORT).lower() in ("true", "1", "t")
|
is_export = os.getenv("IS_EXPORT", const.DEFAULT_IS_EXPORT).lower() in ("true", "1", "t")
|
||||||
|
|
||||||
cfg = Config(mode, export_interval, is_export)
|
cfg = Config(mode, export_interval, is_export)
|
||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
|
|
||||||
|
|
||||||
|
def get_ip(addr=const.IP_TEST_ADDR):
|
||||||
|
return requests.get(addr, timeout=10).text.strip()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
cfg = load_cfg()
|
cfg = load_cfg()
|
||||||
|
|
||||||
@ -60,23 +66,31 @@ def main():
|
|||||||
|
|
||||||
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level)
|
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=log_level)
|
||||||
logging.info("Logger initialized")
|
logging.info("Logger initialized")
|
||||||
|
logging.info("Currently running version %s", const.VERSION)
|
||||||
logging.info("MODE: %s", cfg.mode)
|
logging.info("MODE: %s", cfg.mode)
|
||||||
logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval)
|
logging.info("EXPORT_INTERVAL: %d (seconds)", cfg.export_interval)
|
||||||
logging.info("IS_EXPORT: %r", cfg.is_export)
|
logging.info("IS_EXPORT: %r", cfg.is_export)
|
||||||
|
logging.info("IP: %s", get_ip())
|
||||||
|
|
||||||
|
# Information for debugging issues caused by potential version differences
|
||||||
|
logging.info("Python version: %s", sys.version)
|
||||||
|
logging.info("aioprocessing version: %s", aioprocessing.__version__)
|
||||||
|
logging.info("websockets version: %s", websockets.__version__)
|
||||||
|
logging.info("sqlite3 version: %s", sqlite3.version)
|
||||||
|
|
||||||
# FIFO queue for cross-thread communications
|
# FIFO queue for cross-thread communications
|
||||||
q = aioprocessing.AioQueue()
|
q = aioprocessing.AioQueue()
|
||||||
handler = Handler()
|
handler = db.Handler()
|
||||||
shutdown_event = threading.Event()
|
shutdown_event = threading.Event()
|
||||||
|
|
||||||
shutdown_loop = asyncio.new_event_loop()
|
shutdown_loop = asyncio.new_event_loop()
|
||||||
export_loop = asyncio.new_event_loop()
|
export_loop = asyncio.new_event_loop()
|
||||||
|
|
||||||
ws_thread = WebSocketThread(q, shutdown_event)
|
ws_thread = mempool.WebSocketThread(q, shutdown_event)
|
||||||
qp_thread = QueueProcessor(q, shutdown_event, handler)
|
qp_thread = mempool.QueueProcessor(q, shutdown_event, handler)
|
||||||
|
|
||||||
export_thread = threading.Thread(
|
export_thread = threading.Thread(
|
||||||
target=periodic_export,
|
target=db.periodic_export,
|
||||||
args=(
|
args=(
|
||||||
export_loop,
|
export_loop,
|
||||||
handler,
|
handler,
|
||||||
@ -92,10 +106,13 @@ def main():
|
|||||||
|
|
||||||
def handle_exit():
|
def handle_exit():
|
||||||
logging.info("Shutdown procedure initialized")
|
logging.info("Shutdown procedure initialized")
|
||||||
|
|
||||||
shutdown_event.set()
|
shutdown_event.set()
|
||||||
shutdown_loop.run_until_complete(shutdown(shutdown_loop))
|
shutdown_loop.run_until_complete(shutdown(shutdown_loop))
|
||||||
ws_thread.join()
|
|
||||||
|
# NOTE: It's vital to close the queue processor first so that it doesn't halt the shutdown
|
||||||
qp_thread.join()
|
qp_thread.join()
|
||||||
|
ws_thread.join()
|
||||||
export_thread.join()
|
export_thread.join()
|
||||||
|
|
||||||
def handle_signal(signal, _frame):
|
def handle_signal(signal, _frame):
|
||||||
@ -107,8 +124,8 @@ def main():
|
|||||||
signal.signal(signal.SIGTERM, handle_signal)
|
signal.signal(signal.SIGTERM, handle_signal)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ws_thread.join()
|
|
||||||
qp_thread.join()
|
qp_thread.join()
|
||||||
|
ws_thread.join()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logging.info("Keyboard interrupt received, shutting down threads")
|
logging.info("Keyboard interrupt received, shutting down threads")
|
||||||
handle_exit()
|
handle_exit()
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
aioprocessing==2.0.1
|
aioprocessing==2.0.1
|
||||||
python-dotenv==1.0.1
|
python-dotenv==1.0.1
|
||||||
|
Requests==2.32.3
|
||||||
websockets==12.0
|
websockets==12.0
|
||||||
|
@ -1,23 +1,61 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
echo "[+] Starting the deployment script"
|
AUTOREMOVE=false
|
||||||
|
VOLUME_PATH="./data" # Local path to the volume's mount point
|
||||||
|
IS_PROXIED=false
|
||||||
|
PROXY=""
|
||||||
|
|
||||||
|
while getopts ":hyp:" opt
|
||||||
|
do
|
||||||
|
case "$opt" in
|
||||||
|
h)
|
||||||
|
echo "Usage: $0 [-y]"
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
y)
|
||||||
|
echo -e "[+] Automatically removing all containers with the same tag (if any)\n"
|
||||||
|
AUTOREMOVE=true
|
||||||
|
;;
|
||||||
|
p)
|
||||||
|
IS_PROXIED=true
|
||||||
|
PROXY=${OPTARG}
|
||||||
|
echo -e "[+] Proxying enabled: $PROXY"
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
! command -v docker &> /dev/null && echo "[!] Docker could not be found, exiting..." && exit 1
|
! command -v docker &> /dev/null && echo "[!] Docker could not be found, exiting..." && exit 1
|
||||||
|
|
||||||
# Building with '--no-cache' ensures a fresh build will always be used
|
# Building with '--no-cache' ensures a fresh build will always be used
|
||||||
echo -e "\n[+] Building the Docker image without caching..."
|
echo -e "[+] Building the Docker image without caching...\n"
|
||||||
docker build --no-cache -t chainmapper .
|
docker build --no-cache -t chainmapper .
|
||||||
|
|
||||||
[ ! -d "./data" ] && mkdir data && echo -e "\n[+] Created the default volume directory 'data'"
|
[ ! -d "./data" ] && mkdir data && echo -e "\n[+] Created the default volume directory 'data'"
|
||||||
|
|
||||||
OLD_ID=$(docker ps -a -q -f name="chainmapper-prod")
|
OLD_ID=$(docker ps -a -q -f name="chainmapper")
|
||||||
|
|
||||||
if [ "$OLD_ID" ]
|
if [ "$OLD_ID" ] && [ "$AUTOREMOVE" = true ]
|
||||||
|
then
|
||||||
|
echo -e "\n[+] Removing existing container with the same tag ($OLD_ID)"
|
||||||
|
docker stop "$OLD_ID" &> /dev/null
|
||||||
|
docker rm "$OLD_ID" &> /dev/null
|
||||||
|
elif [ "$OLD_ID" ]
|
||||||
then
|
then
|
||||||
read -p "[?] Existing container found with id '$OLD_ID', do you want to remove it? " -n 1 -r
|
read -p "[?] Existing container found with id '$OLD_ID', do you want to remove it? " -n 1 -r
|
||||||
[[ "$REPLY" =~ ^[Yy]$ ]] || (echo "[!] Exiting..." && exit 0)
|
[[ "$REPLY" =~ ^[Yy]$ ]] || (echo "[!] Exiting..." && exit 0)
|
||||||
|
docker stop "$OLD_ID" &> /dev/null
|
||||||
docker rm "$OLD_ID" &> /dev/null
|
docker rm "$OLD_ID" &> /dev/null
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo -e "\n[+] Deploying the container with 'docker run' ('data' as the volume)..."
|
echo -e "\n[+] Deploying the container with 'docker run' ('data' as the volume)..."
|
||||||
docker run -it --restart unless-stopped -v ./data:/app/data --name chainmapper-prod -d chainmapper
|
|
||||||
|
if [ "$IS_PROXIED" = true ]
|
||||||
|
then
|
||||||
|
# Override the default entrypoint to run the connections through the given proxy
|
||||||
|
docker run -it --restart unless-stopped -v $VOLUME_PATH:/app/data --name chainmapper --entrypoint /bin/bash -d chainmapper -c "HTTPS_PROXY=$PROXY python main.py"
|
||||||
|
else
|
||||||
|
docker run -it --restart unless-stopped -v $VOLUME_PATH:/app/data --name chainmapper -d chainmapper
|
||||||
|
fi
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# Included into the built image via its Dockerfile
|
# Included into the built image via its Dockerfile
|
||||||
|
|
||||||
DB_FILE="/app/chainmapper.sqlite3"
|
DB_FILE="/app/data/chainmapper.sqlite3"
|
||||||
|
|
||||||
if [ -s $DB_FILE ]; then
|
if [ -s $DB_FILE ]; then
|
||||||
exit 0
|
exit 0
|
||||||
|
10
src/const.py
10
src/const.py
@ -1,5 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
|
VERSION = "v1.1.0"
|
||||||
|
|
||||||
# Blockchain.com endpoint and the subscription message which initializes the "transaction stream"
|
# Blockchain.com endpoint and the subscription message which initializes the "transaction stream"
|
||||||
WS_ADDR = "wss://ws.blockchain.info/coins"
|
WS_ADDR = "wss://ws.blockchain.info/coins"
|
||||||
# Optionally `confirmed_transaction` can be used (bursts of data instead of a steady stream, which is worse for the overall performance)
|
# Optionally `confirmed_transaction` can be used (bursts of data instead of a steady stream, which is worse for the overall performance)
|
||||||
@ -8,12 +10,6 @@ SUB_MSG = json.dumps({"coin": "eth", "command": "subscribe", "entity": "pending_
|
|||||||
# Pause before reconnecting after the WebSocket connection is accidentally dropped by either party
|
# Pause before reconnecting after the WebSocket connection is accidentally dropped by either party
|
||||||
WS_RECONNECT_PAUSE = 2
|
WS_RECONNECT_PAUSE = 2
|
||||||
|
|
||||||
# Timeout for asynchronous WebSocket reading (seconds)
|
|
||||||
WS_INTERMSG_TIMEOUT = 1
|
|
||||||
|
|
||||||
# Timeout for asynchronous queue operations (`coro_get` and `coro_put`, seconds)
|
|
||||||
QUEUE_OP_TIMEOUT = 1
|
|
||||||
|
|
||||||
# Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`)
|
# Paths inside the Docker container where data is stored/exported (should match with the mounted volume in `deploy.sh`)
|
||||||
DEFAULT_DB_PATH = "./data/chainmapper.sqlite3"
|
DEFAULT_DB_PATH = "./data/chainmapper.sqlite3"
|
||||||
DEFAULT_EXPORT_PATH = "./data/export.json"
|
DEFAULT_EXPORT_PATH = "./data/export.json"
|
||||||
@ -22,3 +18,5 @@ DEFAULT_EXPORT_PATH = "./data/export.json"
|
|||||||
DEFAULT_MODE = "production"
|
DEFAULT_MODE = "production"
|
||||||
DEFAULT_EXPORT_INTERVAL = "10800"
|
DEFAULT_EXPORT_INTERVAL = "10800"
|
||||||
DEFAULT_IS_EXPORT = "False"
|
DEFAULT_IS_EXPORT = "False"
|
||||||
|
|
||||||
|
IP_TEST_ADDR = "https://ipv4.icanhazip.com"
|
||||||
|
@ -4,13 +4,13 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from src.const import DEFAULT_DB_PATH, DEFAULT_EXPORT_PATH
|
from src import const
|
||||||
|
|
||||||
|
|
||||||
class Handler:
|
class Handler:
|
||||||
"""Handle all SQLite connections required to create, update, and export the stored addresses."""
|
"""Handle all SQLite connections required to create, update, and export the stored addresses."""
|
||||||
|
|
||||||
def __init__(self, database=DEFAULT_DB_PATH):
|
def __init__(self, database=const.DEFAULT_DB_PATH):
|
||||||
self.database = database
|
self.database = database
|
||||||
# Notably `connect` automatically creates the database if it doesn't already exist
|
# Notably `connect` automatically creates the database if it doesn't already exist
|
||||||
self.con = sqlite3.connect(self.database, check_same_thread=False)
|
self.con = sqlite3.connect(self.database, check_same_thread=False)
|
||||||
@ -52,7 +52,7 @@ class Handler:
|
|||||||
)
|
)
|
||||||
self.con.commit()
|
self.con.commit()
|
||||||
|
|
||||||
async def export(self, filepath=DEFAULT_EXPORT_PATH):
|
async def export(self, filepath=const.DEFAULT_EXPORT_PATH):
|
||||||
"""Export the addresses from the SQLite database in descending order based on the transaction counts."""
|
"""Export the addresses from the SQLite database in descending order based on the transaction counts."""
|
||||||
with self.lock:
|
with self.lock:
|
||||||
logging.debug("Reentrant lock acquired")
|
logging.debug("Reentrant lock acquired")
|
||||||
|
@ -4,13 +4,13 @@ import threading
|
|||||||
import logging
|
import logging
|
||||||
import websockets
|
import websockets
|
||||||
|
|
||||||
from src.const import WS_ADDR, SUB_MSG, WS_RECONNECT_PAUSE, WS_INTERMSG_TIMEOUT, QUEUE_OP_TIMEOUT
|
from src import const
|
||||||
|
|
||||||
|
|
||||||
class WebSocketThread(threading.Thread):
|
class WebSocketThread(threading.Thread):
|
||||||
"""Handle connection, subscription, and message parsing for the Blockchain.com WebSocket."""
|
"""Handle connection, subscription, and message parsing for the Blockchain.com WebSocket."""
|
||||||
|
|
||||||
def __init__(self, q, shutdown_event, sub_msg=SUB_MSG):
|
def __init__(self, q, shutdown_event, sub_msg=const.SUB_MSG):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.name = "WebSocketThread"
|
self.name = "WebSocketThread"
|
||||||
self.q = q
|
self.q = q
|
||||||
@ -19,9 +19,7 @@ class WebSocketThread(threading.Thread):
|
|||||||
self.tx_count = 0
|
self.tx_count = 0
|
||||||
|
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
async with websockets.connect(WS_ADDR) as ws:
|
async with websockets.connect(const.WS_ADDR) as ws:
|
||||||
logging.info("Inter message timeout set to %d seconds", WS_INTERMSG_TIMEOUT)
|
|
||||||
|
|
||||||
logging.info("WebSocket connection established successfully")
|
logging.info("WebSocket connection established successfully")
|
||||||
await ws.send(self.sub_msg)
|
await ws.send(self.sub_msg)
|
||||||
logging.info("Subscription message sent")
|
logging.info("Subscription message sent")
|
||||||
@ -32,24 +30,22 @@ class WebSocketThread(threading.Thread):
|
|||||||
|
|
||||||
while not self.shutdown_event.is_set():
|
while not self.shutdown_event.is_set():
|
||||||
try:
|
try:
|
||||||
# Timeout is necessary to make sure the state of the shutdown event is checked often enough
|
msg = await ws.recv()
|
||||||
msg = await asyncio.wait_for(ws.recv(), timeout=WS_INTERMSG_TIMEOUT)
|
|
||||||
data = self.handle_msg(msg)
|
data = self.handle_msg(msg)
|
||||||
|
|
||||||
if data is None:
|
if data is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# This shouldn't really be an issue, but it's safer to set a timeout here too...
|
await self.q.coro_put(data)
|
||||||
await asyncio.wait_for(self.q.coro_put(data), timeout=QUEUE_OP_TIMEOUT)
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logging.debug("WebSocket receiver timed out before fetching a new message, reattempting")
|
logging.debug("WebSocket receiver timed out before fetching a new message, reattempting")
|
||||||
continue
|
continue
|
||||||
except websockets.exceptions.ConnectionClosed:
|
except websockets.exceptions.ConnectionClosed:
|
||||||
logging.info(
|
logging.info(
|
||||||
"WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection",
|
"WebSocket connection closed unexpectedly, sleeping for %d seconds before rebooting the connection",
|
||||||
WS_RECONNECT_PAUSE,
|
const.WS_RECONNECT_PAUSE,
|
||||||
)
|
)
|
||||||
await asyncio.sleep(WS_RECONNECT_PAUSE)
|
await asyncio.sleep(const.WS_RECONNECT_PAUSE)
|
||||||
break
|
break
|
||||||
# pylint: disable=broad-exception-caught
|
# pylint: disable=broad-exception-caught
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -104,12 +100,10 @@ class QueueProcessor(threading.Thread):
|
|||||||
self.handler = handler
|
self.handler = handler
|
||||||
|
|
||||||
async def process_queue(self):
|
async def process_queue(self):
|
||||||
logging.info("Queue operations timeout set to %d seconds", QUEUE_OP_TIMEOUT)
|
|
||||||
|
|
||||||
while not self.shutdown_event.is_set():
|
while not self.shutdown_event.is_set():
|
||||||
try:
|
try:
|
||||||
# Timeout is necessary to make sure the state of the shutdown event is checked often enough
|
# Might prevent a proper shutdown procedure if the queue feeder is closed before the processor
|
||||||
tx_sender = await asyncio.wait_for(self.q.coro_get(), timeout=QUEUE_OP_TIMEOUT)
|
tx_sender = await self.q.coro_get()
|
||||||
await self.handler.store(tx_sender)
|
await self.handler.store(tx_sender)
|
||||||
# pylint: disable=broad-exception-caught
|
# pylint: disable=broad-exception-caught
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
Loading…
Reference in New Issue
Block a user