Database Architecture¶
ArBot uses SQLite as its primary database solution, providing a lightweight, embedded database that requires no separate server setup while offering excellent performance for arbitrage trading applications. This document covers the complete database architecture, schema design, and optimization strategies.
Database Overview¶
SQLite Benefits for ArBot¶
Embedded Database Advantages: - Zero configuration required - No separate database server - Single file storage - ACID compliance - Cross-platform compatibility - High performance for read-heavy workloads
Perfect for Arbitrage Trading: - Fast price data ingestion - Efficient historical analysis - Real-time query performance - Minimal maintenance overhead - Backup simplicity
Database Files Structure¶
arbot/
├── data/
│ ├── arbot.db # Main trading database
│ ├── historical.db # Historical price data
│ ├── backtest.db # Backtesting results
│ └── config.db # Configuration storage
└── backups/
├── arbot_YYYYMMDD.db # Daily backups
└── weekly/ # Weekly backup archives
Core Schema Design¶
Price Data Tables¶
Tickers Table (Real-time Price Data):
CREATE TABLE tickers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
exchange TEXT NOT NULL,
bid REAL NOT NULL,
ask REAL NOT NULL,
bid_size REAL,
ask_size REAL,
timestamp REAL NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, exchange, timestamp)
);
-- Optimized indexes for fast lookups
CREATE INDEX idx_tickers_symbol_exchange ON tickers(symbol, exchange);
CREATE INDEX idx_tickers_timestamp ON tickers(timestamp);
CREATE INDEX idx_tickers_symbol_timestamp ON tickers(symbol, timestamp);
OHLCV Data (Candlestick Data):
CREATE TABLE ohlcv (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
exchange TEXT NOT NULL,
timeframe TEXT NOT NULL,
open_time INTEGER NOT NULL,
close_time INTEGER NOT NULL,
open_price REAL NOT NULL,
high_price REAL NOT NULL,
low_price REAL NOT NULL,
close_price REAL NOT NULL,
volume REAL NOT NULL,
quote_volume REAL,
trades_count INTEGER,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, exchange, timeframe, open_time)
);
CREATE INDEX idx_ohlcv_symbol_timeframe ON ohlcv(symbol, timeframe);
CREATE INDEX idx_ohlcv_open_time ON ohlcv(open_time);
Trading Tables¶
Arbitrage Signals:
CREATE TABLE arbitrage_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
buy_exchange TEXT NOT NULL,
sell_exchange TEXT NOT NULL,
buy_price REAL NOT NULL,
sell_price REAL NOT NULL,
profit REAL NOT NULL,
profit_percent REAL NOT NULL,
buy_size REAL NOT NULL,
sell_size REAL NOT NULL,
confidence REAL NOT NULL,
trend_direction TEXT,
executed BOOLEAN DEFAULT FALSE,
timestamp REAL NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_signals_symbol ON arbitrage_signals(symbol);
CREATE INDEX idx_signals_timestamp ON arbitrage_signals(timestamp);
CREATE INDEX idx_signals_profit ON arbitrage_signals(profit_percent);
Trades Table:
CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
signal_id INTEGER,
symbol TEXT NOT NULL,
buy_exchange TEXT NOT NULL,
sell_exchange TEXT NOT NULL,
buy_order_id TEXT,
sell_order_id TEXT,
planned_buy_price REAL NOT NULL,
planned_sell_price REAL NOT NULL,
actual_buy_price REAL,
actual_sell_price REAL,
quantity REAL NOT NULL,
planned_profit REAL NOT NULL,
actual_profit REAL,
fees_paid REAL DEFAULT 0,
slippage REAL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending',
error_message TEXT,
started_at DATETIME DEFAULT CURRENT_TIMESTAMP,
completed_at DATETIME,
FOREIGN KEY (signal_id) REFERENCES arbitrage_signals(id)
);
CREATE INDEX idx_trades_symbol ON trades(symbol);
CREATE INDEX idx_trades_status ON trades(status);
CREATE INDEX idx_trades_started_at ON trades(started_at);
Balances Table:
CREATE TABLE balances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
exchange TEXT NOT NULL,
asset TEXT NOT NULL,
free_balance REAL NOT NULL DEFAULT 0,
locked_balance REAL NOT NULL DEFAULT 0,
total_balance REAL GENERATED ALWAYS AS (free_balance + locked_balance),
timestamp REAL NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(exchange, asset, timestamp)
);
CREATE INDEX idx_balances_exchange_asset ON balances(exchange, asset);
CREATE INDEX idx_balances_timestamp ON balances(timestamp);
Configuration Tables¶
Settings Table:
CREATE TABLE settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
data_type TEXT NOT NULL DEFAULT 'string',
description TEXT,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(category, key)
);
CREATE INDEX idx_settings_category ON settings(category);
Exchange Configuration:
CREATE TABLE exchange_config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
exchange TEXT NOT NULL UNIQUE,
enabled BOOLEAN DEFAULT TRUE,
arbitrage_enabled BOOLEAN DEFAULT TRUE,
region TEXT,
premium_baseline REAL DEFAULT 0,
maker_fee REAL DEFAULT 0.001,
taker_fee REAL DEFAULT 0.001,
min_trade_amount REAL DEFAULT 10,
max_trade_amount REAL DEFAULT 10000,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
Analytics Tables¶
Performance Metrics:
CREATE TABLE performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date DATE NOT NULL,
total_trades INTEGER DEFAULT 0,
successful_trades INTEGER DEFAULT 0,
total_profit REAL DEFAULT 0,
total_fees REAL DEFAULT 0,
avg_profit_percent REAL DEFAULT 0,
max_drawdown REAL DEFAULT 0,
sharpe_ratio REAL,
win_rate REAL DEFAULT 0,
avg_trade_duration REAL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date)
);
CREATE INDEX idx_metrics_date ON performance_metrics(date);
Symbol Statistics:
CREATE TABLE symbol_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
date DATE NOT NULL,
signals_generated INTEGER DEFAULT 0,
signals_executed INTEGER DEFAULT 0,
avg_spread REAL DEFAULT 0,
max_spread REAL DEFAULT 0,
total_volume REAL DEFAULT 0,
profitability_score REAL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, date)
);
CREATE INDEX idx_symbol_stats_symbol ON symbol_stats(symbol);
CREATE INDEX idx_symbol_stats_date ON symbol_stats(date);
Database Operations¶
Price Data Management¶
Real-time Data Insertion:
class PriceDataManager:
def __init__(self, db_path):
self.db_path = db_path
self.connection_pool = self._create_connection_pool()
async def insert_ticker(self, ticker_data):
query = """
INSERT OR REPLACE INTO tickers
(symbol, exchange, bid, ask, bid_size, ask_size, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
async with self.connection_pool.acquire() as conn:
await conn.execute(query, (
ticker_data.symbol,
ticker_data.exchange,
ticker_data.bid,
ticker_data.ask,
ticker_data.bid_size,
ticker_data.ask_size,
ticker_data.timestamp
))
await conn.commit()
async def bulk_insert_tickers(self, ticker_list):
query = """
INSERT OR REPLACE INTO tickers
(symbol, exchange, bid, ask, bid_size, ask_size, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
data = [
(t.symbol, t.exchange, t.bid, t.ask, t.bid_size, t.ask_size, t.timestamp)
for t in ticker_list
]
async with self.connection_pool.acquire() as conn:
await conn.executemany(query, data)
await conn.commit()
Historical Data Queries:
class HistoricalDataQuery:
def __init__(self, db_manager):
self.db = db_manager
async def get_price_history(self, symbol, exchange, start_time, end_time):
query = """
SELECT timestamp, bid, ask, bid_size, ask_size
FROM tickers
WHERE symbol = ? AND exchange = ?
AND timestamp BETWEEN ? AND ?
ORDER BY timestamp ASC
"""
async with self.db.connection_pool.acquire() as conn:
cursor = await conn.execute(query, (symbol, exchange, start_time, end_time))
rows = await cursor.fetchall()
return [
{
'timestamp': row[0],
'bid': row[1],
'ask': row[2],
'bid_size': row[3],
'ask_size': row[4]
}
for row in rows
]
async def get_spread_analysis(self, symbol, timeframe='1h', limit=100):
query = """
WITH spread_data AS (
SELECT
symbol,
timestamp,
ask - bid as spread,
(ask - bid) / ((ask + bid) / 2) * 100 as spread_percent
FROM tickers
WHERE symbol = ?
ORDER BY timestamp DESC
LIMIT ?
)
SELECT
AVG(spread) as avg_spread,
MIN(spread) as min_spread,
MAX(spread) as max_spread,
AVG(spread_percent) as avg_spread_percent,
STDEV(spread_percent) as spread_volatility
FROM spread_data
"""
async with self.db.connection_pool.acquire() as conn:
cursor = await conn.execute(query, (symbol, limit))
result = await cursor.fetchone()
return {
'avg_spread': result[0],
'min_spread': result[1],
'max_spread': result[2],
'avg_spread_percent': result[3],
'spread_volatility': result[4]
}
Trading Data Management¶
Signal Tracking:
class SignalManager:
def __init__(self, db_manager):
self.db = db_manager
async def save_signal(self, signal):
query = """
INSERT INTO arbitrage_signals
(symbol, buy_exchange, sell_exchange, buy_price, sell_price,
profit, profit_percent, buy_size, sell_size, confidence,
trend_direction, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
async with self.db.connection_pool.acquire() as conn:
cursor = await conn.execute(query, (
signal.symbol, signal.buy_exchange, signal.sell_exchange,
signal.buy_price, signal.sell_price, signal.profit,
signal.profit_percent, signal.buy_size, signal.sell_size,
signal.confidence, signal.trend_direction, signal.timestamp
))
await conn.commit()
return cursor.lastrowid
async def mark_signal_executed(self, signal_id):
query = "UPDATE arbitrage_signals SET executed = TRUE WHERE id = ?"
async with self.db.connection_pool.acquire() as conn:
await conn.execute(query, (signal_id,))
await conn.commit()
async def get_signal_statistics(self, period_hours=24):
cutoff_time = time.time() - (period_hours * 3600)
query = """
SELECT
COUNT(*) as total_signals,
COUNT(CASE WHEN executed = TRUE THEN 1 END) as executed_signals,
AVG(profit_percent) as avg_profit_percent,
MAX(profit_percent) as max_profit_percent,
AVG(confidence) as avg_confidence
FROM arbitrage_signals
WHERE timestamp > ?
"""
async with self.db.connection_pool.acquire() as conn:
cursor = await conn.execute(query, (cutoff_time,))
result = await cursor.fetchone()
return {
'total_signals': result[0],
'executed_signals': result[1],
'execution_rate': result[1] / result[0] if result[0] > 0 else 0,
'avg_profit_percent': result[2],
'max_profit_percent': result[3],
'avg_confidence': result[4]
}
Trade Execution Tracking:
class TradeManager:
def __init__(self, db_manager):
self.db = db_manager
async def create_trade(self, trade_params):
query = """
INSERT INTO trades
(signal_id, symbol, buy_exchange, sell_exchange,
planned_buy_price, planned_sell_price, quantity, planned_profit, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending')
"""
async with self.db.connection_pool.acquire() as conn:
cursor = await conn.execute(query, (
trade_params.signal_id, trade_params.symbol,
trade_params.buy_exchange, trade_params.sell_exchange,
trade_params.planned_buy_price, trade_params.planned_sell_price,
trade_params.quantity, trade_params.planned_profit
))
await conn.commit()
return cursor.lastrowid
async def update_trade_execution(self, trade_id, execution_data):
query = """
UPDATE trades SET
buy_order_id = ?, sell_order_id = ?,
actual_buy_price = ?, actual_sell_price = ?,
actual_profit = ?, fees_paid = ?, slippage = ?,
status = ?, completed_at = CURRENT_TIMESTAMP
WHERE id = ?
"""
async with self.db.connection_pool.acquire() as conn:
await conn.execute(query, (
execution_data.buy_order_id, execution_data.sell_order_id,
execution_data.actual_buy_price, execution_data.actual_sell_price,
execution_data.actual_profit, execution_data.fees_paid,
execution_data.slippage, execution_data.status, trade_id
))
await conn.commit()
Performance Optimization¶
Indexing Strategy¶
Composite Indexes for Common Queries:
-- Fast symbol + exchange + time range queries
CREATE INDEX idx_tickers_composite ON tickers(symbol, exchange, timestamp);
-- Arbitrage signal analysis
CREATE INDEX idx_signals_composite ON arbitrage_signals(symbol, profit_percent, timestamp);
-- Trade performance analysis
CREATE INDEX idx_trades_composite ON trades(symbol, status, started_at);
-- Balance queries
CREATE INDEX idx_balances_composite ON balances(exchange, asset, timestamp);
Partial Indexes for Filtered Queries:
-- Only index successful trades
CREATE INDEX idx_trades_successful ON trades(symbol, actual_profit, completed_at)
WHERE status = 'completed' AND actual_profit > 0;
-- Only index executed signals
CREATE INDEX idx_signals_executed ON arbitrage_signals(symbol, profit_percent, timestamp)
WHERE executed = TRUE;
Query Optimization¶
Efficient Aggregation Queries:
-- Daily performance summary with window functions
WITH daily_trades AS (
SELECT
DATE(started_at) as trade_date,
COUNT(*) as trade_count,
SUM(actual_profit) as daily_profit,
AVG(actual_profit) as avg_profit,
SUM(fees_paid) as total_fees
FROM trades
WHERE status = 'completed'
AND started_at >= date('now', '-30 days')
GROUP BY DATE(started_at)
)
SELECT
trade_date,
trade_count,
daily_profit,
avg_profit,
total_fees,
SUM(daily_profit) OVER (ORDER BY trade_date) as cumulative_profit
FROM daily_trades
ORDER BY trade_date;
Optimized Spread Analysis:
-- Recent spread statistics with subquery optimization
SELECT
symbol,
exchange,
AVG(spread) as avg_spread,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY spread) as median_spread,
STDEV(spread) as spread_volatility
FROM (
SELECT
symbol,
exchange,
ask - bid as spread
FROM tickers
WHERE timestamp > ? -- Last 24 hours
AND bid > 0 AND ask > bid -- Valid prices only
) spread_data
GROUP BY symbol, exchange
HAVING COUNT(*) >= 100 -- Minimum data points
ORDER BY avg_spread DESC;
Connection Management¶
Connection Pooling:
import aiosqlite
import asyncio
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, db_path, max_connections=10):
self.db_path = db_path
self.max_connections = max_connections
self.pool = asyncio.Queue(maxsize=max_connections)
self.created_connections = 0
async def initialize(self):
# Pre-create connections
for _ in range(min(5, self.max_connections)):
conn = await aiosqlite.connect(self.db_path)
await conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging
await conn.execute("PRAGMA synchronous=NORMAL")
await conn.execute("PRAGMA cache_size=10000")
await conn.execute("PRAGMA temp_store=memory")
await self.pool.put(conn)
self.created_connections += 1
@asynccontextmanager
async def acquire(self):
try:
# Try to get existing connection
conn = await asyncio.wait_for(self.pool.get(), timeout=1.0)
except asyncio.TimeoutError:
# Create new connection if under limit
if self.created_connections < self.max_connections:
conn = await aiosqlite.connect(self.db_path)
await conn.execute("PRAGMA journal_mode=WAL")
await conn.execute("PRAGMA synchronous=NORMAL")
self.created_connections += 1
else:
# Wait for available connection
conn = await self.pool.get()
try:
yield conn
finally:
await self.pool.put(conn)
async def close_all(self):
while not self.pool.empty():
conn = await self.pool.get()
await conn.close()
Data Management¶
Data Retention Policies¶
Automated Cleanup:
class DataRetentionManager:
def __init__(self, db_manager):
self.db = db_manager
async def cleanup_old_data(self):
retention_policies = {
'tickers': 7, # Keep 7 days of raw ticker data
'arbitrage_signals': 30, # Keep 30 days of signals
'trades': 365, # Keep 1 year of trade history
'performance_metrics': 1095 # Keep 3 years of metrics
}
for table, days in retention_policies.items():
cutoff_date = datetime.now() - timedelta(days=days)
if table == 'tickers':
await self._cleanup_tickers(cutoff_date)
elif table == 'arbitrage_signals':
await self._cleanup_signals(cutoff_date)
# Add other cleanup methods...
async def _cleanup_tickers(self, cutoff_date):
cutoff_timestamp = cutoff_date.timestamp()
# Archive to historical database before deletion
await self._archive_ticker_data(cutoff_timestamp)
# Delete old data
query = "DELETE FROM tickers WHERE timestamp < ?"
async with self.db.connection_pool.acquire() as conn:
await conn.execute(query, (cutoff_timestamp,))
await conn.commit()
async def _archive_ticker_data(self, cutoff_timestamp):
# Create OHLCV aggregates before deletion
query = """
INSERT OR REPLACE INTO ohlcv
(symbol, exchange, timeframe, open_time, close_time,
open_price, high_price, low_price, close_price, volume)
SELECT
symbol,
exchange,
'1h' as timeframe,
CAST(timestamp / 3600) * 3600 as open_time,
CAST(timestamp / 3600) * 3600 + 3599 as close_time,
FIRST_VALUE(bid) OVER (PARTITION BY symbol, exchange, CAST(timestamp / 3600)
ORDER BY timestamp) as open_price,
MAX((bid + ask) / 2) as high_price,
MIN((bid + ask) / 2) as low_price,
LAST_VALUE(ask) OVER (PARTITION BY symbol, exchange, CAST(timestamp / 3600)
ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING) as close_price,
COUNT(*) as volume
FROM tickers
WHERE timestamp < ?
GROUP BY symbol, exchange, CAST(timestamp / 3600)
"""
async with self.db.connection_pool.acquire() as conn:
await conn.execute(query, (cutoff_timestamp,))
await conn.commit()
Backup and Recovery¶
Automated Backup System:
import shutil
import gzip
from pathlib import Path
class BackupManager:
def __init__(self, db_path, backup_dir):
self.db_path = Path(db_path)
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
async def create_backup(self, compress=True):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"arbot_{timestamp}.db"
backup_path = self.backup_dir / backup_name
# Use SQLite backup API for consistent backup
async with aiosqlite.connect(self.db_path) as source:
async with aiosqlite.connect(backup_path) as backup:
await source.backup(backup)
if compress:
compressed_path = backup_path.with_suffix('.db.gz')
with open(backup_path, 'rb') as f_in:
with gzip.open(compressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
backup_path.unlink() # Remove uncompressed file
return compressed_path
return backup_path
async def restore_backup(self, backup_path):
backup_path = Path(backup_path)
if backup_path.suffix == '.gz':
# Decompress first
temp_path = backup_path.with_suffix('')
with gzip.open(backup_path, 'rb') as f_in:
with open(temp_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
backup_path = temp_path
# Backup current database
current_backup = await self.create_backup()
try:
# Replace current database
shutil.copy2(backup_path, self.db_path)
print(f"Database restored from {backup_path}")
print(f"Previous database backed up to {current_backup}")
except Exception as e:
print(f"Restore failed: {e}")
# Restore could fail, current_backup still exists
finally:
if backup_path.suffix == '':
backup_path.unlink() # Clean up temp file
Monitoring and Maintenance¶
Database Health Monitoring¶
Performance Metrics:
class DatabaseMonitor:
def __init__(self, db_manager):
self.db = db_manager
async def get_database_stats(self):
stats = {}
async with self.db.connection_pool.acquire() as conn:
# Database file size
cursor = await conn.execute("PRAGMA page_count")
page_count = (await cursor.fetchone())[0]
cursor = await conn.execute("PRAGMA page_size")
page_size = (await cursor.fetchone())[0]
stats['file_size_mb'] = (page_count * page_size) / (1024 * 1024)
# Table statistics
tables = ['tickers', 'arbitrage_signals', 'trades', 'balances']
for table in tables:
cursor = await conn.execute(f"SELECT COUNT(*) FROM {table}")
stats[f'{table}_count'] = (await cursor.fetchone())[0]
# Index usage
cursor = await conn.execute("""
SELECT name, sql FROM sqlite_master
WHERE type = 'index' AND sql IS NOT NULL
""")
stats['indexes'] = await cursor.fetchall()
# Cache hit ratio
cursor = await conn.execute("PRAGMA cache_size")
cache_size = (await cursor.fetchone())[0]
stats['cache_size'] = cache_size
return stats
async def analyze_query_performance(self, query, params=None):
async with self.db.connection_pool.acquire() as conn:
# Enable query analysis
await conn.execute("PRAGMA query_only = ON")
# Explain query plan
explain_query = f"EXPLAIN QUERY PLAN {query}"
cursor = await conn.execute(explain_query, params or [])
plan = await cursor.fetchall()
await conn.execute("PRAGMA query_only = OFF")
return plan
Maintenance Tasks¶
Regular Maintenance:
class MaintenanceManager:
def __init__(self, db_manager):
self.db = db_manager
async def run_maintenance(self):
async with self.db.connection_pool.acquire() as conn:
# Analyze tables for query optimization
await conn.execute("ANALYZE")
# Update statistics
tables = ['tickers', 'arbitrage_signals', 'trades']
for table in tables:
await conn.execute(f"ANALYZE {table}")
# Vacuum database (reclaim space)
await conn.execute("VACUUM")
# Reindex if needed
await conn.execute("REINDEX")
await conn.commit()
async def check_integrity(self):
async with self.db.connection_pool.acquire() as conn:
cursor = await conn.execute("PRAGMA integrity_check")
result = await cursor.fetchone()
if result[0] == 'ok':
return True, "Database integrity check passed"
else:
return False, f"Database integrity issues: {result[0]}"
Database Optimization
Enable WAL mode (Write-Ahead Logging) for better concurrency and set appropriate cache sizes based on your available memory for optimal performance.
Backup Important
Always backup your database before performing maintenance operations like VACUUM or major schema changes. Use the automated backup system for regular protection.
Performance Monitoring
Monitor database file size growth and query performance regularly. Set up alerts for unusual patterns that might indicate issues with data retention or query efficiency.
Created: July 12, 2025