Architecture Overview¶
ArBot is designed with a modular, asynchronous architecture that ensures scalability, maintainability, and high performance for real-time arbitrage trading.
System Architecture¶
graph TB
subgraph "User Interface Layer"
GUI[GUI Interface]
CLI[CLI Interface]
API[REST API]
end
subgraph "Application Layer"
MAIN[Main Application]
STRAT[Strategy Engine]
TRADER[Trading Engine]
SIM[Simulator]
BT[Backtester]
end
subgraph "Exchange Layer"
BINA[Binance Adapter]
BYBIT[Bybit Adapter]
OKX[OKX Adapter]
BITGET[Bitget Adapter]
end
subgraph "Data Layer"
DB[(SQLite Database)]
CACHE[In-Memory Cache]
LOG[Logging System]
end
subgraph "External Services"
BINANCE[Binance API]
BYBITAPI[Bybit API]
OKXAPI[OKX API]
BITGETAPI[Bitget API]
end
GUI --> MAIN
CLI --> MAIN
API --> MAIN
MAIN --> STRAT
MAIN --> TRADER
MAIN --> SIM
MAIN --> BT
STRAT --> BINA
STRAT --> BYBIT
STRAT --> OKX
STRAT --> BITGET
TRADER --> BINA
TRADER --> BYBIT
TRADER --> OKX
TRADER --> BITGET
BINA --> BINANCE
BYBIT --> BYBITAPI
OKX --> OKXAPI
BITGET --> BITGETAPI
STRAT --> DB
TRADER --> DB
SIM --> DB
BT --> DB
STRAT --> CACHE
MAIN --> LOG
Core Components¶
1. Main Application (main.py
)¶
Purpose: Orchestrates all system components and manages application lifecycle.
Responsibilities: - Configuration loading and validation - Component initialization and dependency injection - Application state management - Graceful shutdown handling
Key Classes:
class ArBot:
def __init__(self, config: Config)
async def initialize(self)
async def start(self)
async def stop(self)
def _validate_config(self)
2. Strategy Engine (strategy.py
)¶
Purpose: Core arbitrage detection and signal generation.
Architecture:
class ArbitrageStrategy:
# Data management
exchange_data: Dict[str, Dict[str, ExchangeData]]
recent_signals: deque
# Signal processing
async def _on_ticker_update(self, ticker: Ticker)
async def _check_arbitrage_opportunities(self, symbol: str)
def _calculate_arbitrage(self, buy_exchange, sell_exchange)
# Lifecycle management
async def initialize(self, exchanges)
async def start(self)
async def stop(self)
Data Flow: 1. Receive ticker updates from exchanges 2. Store in structured data format 3. Calculate arbitrage opportunities 4. Apply filters (trend, risk, etc.) 5. Generate trading signals 6. Notify subscribers
3. Exchange Adapters (exchanges/
)¶
Purpose: Standardized interface to different cryptocurrency exchanges.
Base Interface:
class BaseExchange(ABC):
@abstractmethod
async def connect_ws(self)
@abstractmethod
async def get_orderbook(self, symbol: str)
@abstractmethod
async def place_order(self, symbol: str, side: OrderSide,
amount: float, price: float)
@abstractmethod
async def cancel_order(self, order_id: str)
@abstractmethod
async def get_balance(self)
@abstractmethod
async def get_trading_fees(self, symbol: str)
Implementation Pattern:
class BinanceExchange(BaseExchange):
def __init__(self, config: ExchangeConfig):
self.config = config
self.ws_client = None
self.rest_client = None
self.callbacks = []
async def connect_ws(self):
# WebSocket connection logic
pass
def on_ticker(self, callback):
self.callbacks.append(callback)
4. GUI Interface (gui.py
)¶
Purpose: User interface for monitoring and controlling the trading bot.
Architecture Components: - Main Dashboard: Real-time price monitoring - Settings Panels: Configuration management - Data Export: Historical data extraction - Control Interface: Start/stop trading operations
Key Features:
class TradingBotGUI:
# UI Components
def create_widgets(self)
def create_price_monitoring_table(self)
def create_settings_panel(self)
# Data Management
def update_price_display(self)
def handle_arbitrage_signal(self, signal)
# Event Handling
def start_trading(self)
def stop_trading(self)
def export_data(self)
5. Database Layer (database.py
)¶
Purpose: Persistent storage for all trading data and analytics.
Schema Design:
-- Core tables
CREATE TABLE tickers (
id INTEGER PRIMARY KEY,
exchange TEXT,
symbol TEXT,
bid REAL,
ask REAL,
timestamp REAL
);
CREATE TABLE arbitrage_opportunities (
id INTEGER PRIMARY KEY,
symbol TEXT,
buy_exchange TEXT,
sell_exchange TEXT,
profit_percent REAL,
timestamp REAL
);
CREATE TABLE trades (
id INTEGER PRIMARY KEY,
symbol TEXT,
side TEXT,
amount REAL,
price REAL,
exchange TEXT,
timestamp REAL,
status TEXT
);
Data Access Layer:
class Database:
async def insert_ticker(self, ticker: TickerRecord)
async def insert_arbitrage_opportunity(self, opportunity)
async def insert_trade(self, trade: TradeRecord)
async def get_historical_data(self, symbol, start_date, end_date)
async def cleanup_old_data(self, max_age_days)
Design Patterns¶
1. Observer Pattern¶
Implementation: Signal/callback system for real-time data distribution.
# Strategy notifies multiple subscribers
class ArbitrageStrategy:
def add_signal_callback(self, callback):
self.signal_callbacks.append(callback)
async def _handle_arbitrage_opportunity(self, signal):
for callback in self.signal_callbacks:
await callback(signal)
# GUI subscribes to strategy signals
gui.strategy.add_signal_callback(gui.handle_arbitrage_signal)
2. Adapter Pattern¶
Implementation: Standardized exchange interfaces.
# Each exchange adapter implements the same interface
exchanges = {
'binance': BinanceExchange(config.exchanges['binance']),
'bybit': BybitExchange(config.exchanges['bybit']),
'okx': OKXExchange(config.exchanges['okx'])
}
# Strategy works with any exchange through common interface
for exchange_name, exchange in exchanges.items():
await exchange.connect_ws()
exchange.on_ticker(self._on_ticker_update)
3. Strategy Pattern¶
Implementation: Configurable trading strategies and risk management.
class TradingStrategy(ABC):
@abstractmethod
def should_execute_trade(self, signal: ArbitrageSignal) -> bool:
pass
class TrendFilterStrategy(TradingStrategy):
def should_execute_trade(self, signal: ArbitrageSignal) -> bool:
return self.check_trend_conditions(signal)
class VolumeFilterStrategy(TradingStrategy):
def should_execute_trade(self, signal: ArbitrageSignal) -> bool:
return self.check_volume_conditions(signal)
4. Factory Pattern¶
Implementation: Dynamic exchange creation based on configuration.
class ExchangeFactory:
@staticmethod
def create_exchange(name: str, config: ExchangeConfig) -> BaseExchange:
exchanges = {
'binance': BinanceExchange,
'bybit': BybitExchange,
'okx': OKXExchange,
'bitget': BitgetExchange
}
if name not in exchanges:
raise ValueError(f"Unsupported exchange: {name}")
return exchanges[name](config)
Asynchronous Architecture¶
Event Loop Management¶
Single Event Loop: All asynchronous operations run on the main event loop.
async def main():
# Initialize components
config = Config()
database = Database(config.database.db_path)
strategy = ArbitrageStrategy(config, database)
# Start all async components
await asyncio.gather(
strategy.start(),
exchange_manager.start(),
gui.start_async_updates()
)
Concurrent Data Processing¶
WebSocket Handling: Multiple exchange connections managed concurrently.
async def connect_all_exchanges(self):
tasks = []
for exchange_name, exchange in self.exchanges.items():
if exchange.config.enabled:
tasks.append(exchange.connect_ws())
await asyncio.gather(*tasks, return_exceptions=True)
Non-Blocking Operations¶
Database Operations: Asynchronous database access prevents blocking.
# Non-blocking database operations
async def store_ticker_data(self, tickers: List[Ticker]):
async with self.db_pool.acquire() as connection:
await connection.executemany(
"INSERT INTO tickers VALUES (?, ?, ?, ?, ?)",
[(t.exchange, t.symbol, t.bid, t.ask, t.timestamp)
for t in tickers]
)
Error Handling and Resilience¶
Graceful Degradation¶
Exchange Failures: System continues operating with available exchanges.
async def handle_exchange_error(self, exchange_name: str, error: Exception):
logger.error(f"Exchange {exchange_name} error: {error}")
# Disable problematic exchange temporarily
self.exchanges[exchange_name].enabled = False
# Continue with remaining exchanges
if len(self.get_active_exchanges()) > 1:
logger.info("Continuing with remaining exchanges")
else:
logger.error("Insufficient exchanges, stopping trading")
await self.stop()
Automatic Recovery¶
Reconnection Logic: Automatic reconnection with exponential backoff.
async def reconnect_with_backoff(self, exchange: BaseExchange):
max_retries = 5
base_delay = 1.0
for attempt in range(max_retries):
try:
await exchange.connect_ws()
logger.info(f"Reconnected to {exchange.name}")
break
except Exception as e:
delay = base_delay * (2 ** attempt)
logger.warning(f"Reconnection attempt {attempt + 1} failed, "
f"retrying in {delay}s")
await asyncio.sleep(delay)
Data Integrity¶
Transaction Management: Ensure data consistency during failures.
async def execute_arbitrage_trade(self, signal: ArbitrageSignal):
async with self.database.transaction():
try:
# Record trade attempt
trade_id = await self.database.insert_trade_attempt(signal)
# Execute orders
buy_order = await self.place_buy_order(signal)
sell_order = await self.place_sell_order(signal)
# Update trade record
await self.database.update_trade_success(trade_id,
buy_order, sell_order)
except Exception as e:
# Rollback on any failure
await self.database.rollback()
logger.error(f"Trade execution failed: {e}")
raise
Performance Optimizations¶
Memory Management¶
Efficient Data Structures: Use appropriate collections for performance.
# Use deque for sliding windows
self.recent_prices = deque(maxlen=100)
# Use dict for fast lookups
self.symbol_cache = {}
# Use sets for membership testing
self.active_symbols = set()
CPU Optimization¶
Vectorized Operations: Use numpy for numerical computations.
import numpy as np
# Efficient moving average calculation
def calculate_moving_average(self, prices: deque) -> float:
if len(prices) < self.window_size:
return None
# Convert to numpy array for efficient computation
price_array = np.array(prices)
return np.mean(price_array)
Network Optimization¶
Connection Pooling: Reuse HTTP connections for REST API calls.
class ExchangeClient:
def __init__(self):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100),
timeout=aiohttp.ClientTimeout(total=30)
)
Security Considerations¶
API Key Management¶
Environment Variables: Store sensitive data outside code.
class ExchangeConfig:
def __init__(self):
self.api_key = os.getenv(f"{self.name.upper()}_API_KEY")
self.api_secret = os.getenv(f"{self.name.upper()}_API_SECRET")
if not self.api_key or not self.api_secret:
raise ValueError(f"Missing API credentials for {self.name}")
Input Validation¶
Data Sanitization: Validate all external inputs.
def validate_trading_signal(self, signal: ArbitrageSignal) -> bool:
checks = [
signal.profit_percent > 0,
signal.profit_percent < self.config.max_profit_threshold,
signal.buy_price > 0,
signal.sell_price > signal.buy_price,
signal.timestamp > time.time() - self.config.max_signal_age
]
return all(checks)
Rate Limiting¶
API Rate Limiting: Respect exchange rate limits.
from asyncio import Semaphore
class RateLimiter:
def __init__(self, max_requests: int, time_window: float):
self.semaphore = Semaphore(max_requests)
self.time_window = time_window
async def acquire(self):
await self.semaphore.acquire()
# Release after time window
asyncio.create_task(self._release_after_delay())
async def _release_after_delay(self):
await asyncio.sleep(self.time_window)
self.semaphore.release()
Architecture Benefits
- Modularity: Easy to add new exchanges or strategies
- Scalability: Asynchronous design handles multiple data streams
- Maintainability: Clear separation of concerns
- Testability: Interface-based design enables comprehensive testing
- Resilience: Multiple layers of error handling and recovery
Created: July 12, 2025