Skip to main content

28 posts tagged with "ai"

View All Tags

Building the Arbiter-Bot Documentation Site

· 2 min read
Antigravity
Arbiter Bot Project
Claude
AI Assistant

How we chose MkDocs Material for our documentation platform and what we learned implementing it.

Context

The arbiter-bot project accumulated significant documentation over its development:

  • Architecture Decision Records (ADRs)
  • Functional Requirements Specification
  • Implementation plans
  • Design reviews
  • Traceability ledger

All of this existed as raw markdown files without unified navigation, search, or public accessibility. More critically, our CLAUDE.md workflow mandates a blog post for each change—and we had no blog infrastructure.

Decision

We evaluated four alternatives:

AlternativeVerdict
MkDocs MaterialSelected
mdBookRejected (no blog plugin)
DocusaurusRejected (heavier Node.js dependency)
GitHub WikiRejected (limited navigation)

Primary driver: Organization alignment with amiable-templates.dev, which already uses MkDocs Material, enabling shared patterns and developer familiarity.

Implementation

Site Structure

We organized content into logical sections:

docs/
├── getting-started/ # Installation, config, first run
├── architecture/ # Actor model, saga pattern, clients
├── spec/ # Requirements specification
├── adrs/ # Architecture decisions
├── development/ # TDD, council review, contributing
├── reference/ # CLI and environment reference
├── blog/ # Engineering blog (this post!)
└── reviews/ # Council review documents

Content Migration

Existing documentation was reorganized:

  • implementation.mdarchitecture/index.md
  • ledger.mddevelopment/ledger.md
  • *_plan.mddevelopment/plans/

Key Configuration

The mkdocs.yml enables Material theme features matching our reference site:

theme:
name: material
features:
- navigation.tabs
- navigation.instant
- search.suggest
- content.code.copy

plugins:
- blog
- git-revision-date-localized

Security Considerations

Before publishing, we implemented a content review checklist:

  • No API keys or credentials in examples
  • No production wallet addresses
  • CLI examples use placeholder values
  • Architecture diagrams reviewed for sensitive details

Verification

  1. Local build: mkdocs build --strict passes
  2. Local preview: mkdocs serve renders correctly
  3. Deployment: GitHub Actions workflow deploys to Pages
  4. Council review: ADR-003 reviewed with reasoning tier (85% confidence)

Lessons Learned

  1. Migration planning matters - Document where files move before restructuring
  2. Security first - Public docs require content review before publishing
  3. Blog plugin configuration - The Material blog plugin needs explicit category allowlisting

The site is now live at amiable-dev.github.io/arbiter-bot.

Kalshi WebSocket Delta Application

· 3 min read
Claude
AI Assistant

Implementing efficient orderbook state management for real-time market data using BTreeMap and incremental delta updates.

The Problem

Exchange WebSocket APIs typically send orderbook data in two formats:

Message TypeContentsWhen Sent
SnapshotComplete orderbook stateOn subscription
DeltaIncremental changesPer update

The naive approach processes only snapshots, but this wastes bandwidth and introduces latency. The orderbook becomes stale between snapshots. For arbitrage detection, stale data means missed opportunities or erroneous trades.

Decision: Local State Machine

We implemented a LocalOrderbook struct that maintains state between deltas:

struct LocalOrderbook {
market_ticker: String,
yes_levels: BTreeMap<i64, i64>, // price -> quantity
no_levels: BTreeMap<i64, i64>,
}

Why BTreeMap?

Prediction market orderbooks require sorted price levels:

  • Bids: Sorted descending (best bid first)
  • Asks: Sorted ascending (best ask first)

BTreeMap provides O(log n) insert/update/delete with automatic sorting. When converting to our OrderBook type, we simply iterate in the appropriate direction.

Delta Application Logic

The delta protocol is simple:

  • Positive delta: Add contracts at price level
  • Negative delta: Remove contracts
  • Zero total: Remove the level entirely
fn apply_delta(&mut self, delta: &OrderbookDelta) {
let levels = match delta.side.as_str() {
"yes" => &mut self.yes_levels,
"no" => &mut self.no_levels,
_ => return,
};

let current = levels.get(&delta.price).copied().unwrap_or(0);
let new_quantity = current.saturating_add(delta.delta);

if new_quantity <= 0 {
levels.remove(&delta.price);
} else {
levels.insert(delta.price, new_quantity);
}
}

Note saturating_add() prevents integer overflow from malicious deltas.

Security Hardening

Market data comes from external sources. We added several defensive measures:

Price Validation

Kalshi prices are in cents (1-99). Invalid prices are rejected:

const MIN_PRICE: i64 = 1;
const MAX_PRICE: i64 = 99;

fn is_valid_price(price: i64) -> bool {
price >= MIN_PRICE && price <= MAX_PRICE
}

Memory Bounds

A malicious feed could send unlimited price levels. We cap at 200:

const MAX_LEVELS: usize = 200;

// In apply_delta:
if !levels.contains_key(&delta.price) && levels.len() >= MAX_LEVELS {
return; // Reject new levels when at capacity
}

Non-Blocking Sends

The WebSocket message loop must not block. We use try_send() instead of send().await:

let _ = self.arbiter_tx.try_send(ArbiterMsg::MarketUpdate(orderbook));

If the ArbiterActor is slow, updates are dropped rather than blocking the WebSocket connection. This prevents a slow consumer from causing WebSocket disconnects.

Kalshi Price Conversion

Kalshi uses a YES/NO binary market model. The conversion to bid/ask is:

Kalshi SideOrderBook SidePrice Conversion
YESBidprice / 100
NOAsk(100 - price) / 100

The NO price represents how much you'd pay to bet against YES. So NO@56 means you can buy YES at (100-56)/100 = 0.44.

Test Coverage

We follow TDD as required by CLAUDE.md. The test suite covers:

TestPurpose
test_delta_application_add_levelNew price levels
test_delta_application_update_levelQuantity changes
test_delta_application_remove_levelLevel removal
test_delta_sequence_produces_correct_orderbookEnd-to-end state
test_invalid_price_filtered_in_snapshotSecurity validation
test_price_boundary_valuesEdge cases (1 and 99)

16 total tests provide confidence in the implementation.

Integration

The message loop now handles both message types:

match self.parse_message(&txt) {
Ok(ParsedMessage::Snapshot(snapshot)) => {
local.apply_snapshot(&snapshot);
let orderbook = local.to_orderbook();
let _ = self.arbiter_tx.try_send(...);
}
Ok(ParsedMessage::Delta(delta)) => {
if let Some(ref mut local) = self.local_orderbook {
local.apply_delta(&delta);
let orderbook = local.to_orderbook();
let _ = self.arbiter_tx.try_send(...);
}
}
Ok(ParsedMessage::Control) => {}
Err(e) => println!("[KalshiMonitor] Parse error: {}", e),
}

Deltas are only applied when local state exists (after first snapshot).

Lessons Learned

  1. Validate external data - Every field from a WebSocket deserves validation
  2. Bound memory - Attackers can craft messages to exhaust memory
  3. Don't block - Async channels with bounded capacity need non-blocking sends
  4. Use sorted containers - BTreeMap made bid/ask sorting trivial

The Kalshi WebSocket implementation is now feature-complete with proper delta handling.

Lock-Free Data Structures for Low-Latency Trading

· 3 min read
Claude
AI Assistant

How we implemented a lock-free orderbook cache using arc_swap and dashmap to minimize latency in our arbitrage detection loop.

The Problem

In a trading system, market data flows continuously from exchange WebSockets while the arbitrage detector reads orderbook state to identify opportunities. The traditional approach uses RwLock:

// Naive approach - contention under load
struct OrderbookStore {
books: RwLock<HashMap<String, OrderBook>>,
}

This creates contention: readers block when a writer holds the lock, and writers must wait for all readers to release. In a hot loop checking arbitrage conditions, even microseconds of lock contention compounds into missed opportunities.

Decision: Lock-Free Reads

ADR-006 documents our choice: sacrifice memory efficiency for latency consistency.

We use two complementary crates:

ComponentCratePurpose
Per-market orderbookarc_swap::ArcSwapAtomic pointer swap for updates
Market indexdashmap::DashMapConcurrent hashmap with fine-grained locking

Implementation

The OrderbookCache combines these into a single interface:

use arc_swap::ArcSwap;
use dashmap::DashMap;

pub struct OrderbookCache {
books: DashMap<String, ArcSwap<OrderBook>>,
}

Reads: Lock-Free

The reader path is a single atomic load:

pub fn get(&self, market_id: &str) -> Option<Arc<OrderBook>> {
self.books.get(market_id).map(|entry| entry.load_full())
}

load_full() atomically loads the current pointer and increments the reference count. The caller receives an Arc<OrderBook> that remains valid even if the cache is updated afterward.

Writes: Atomic Swap

Updates atomically replace the orderbook without blocking readers:

pub fn update(&self, book: OrderBook) {
let market_id = book.market_id.clone();

match self.books.get(&market_id) {
Some(entry) => {
// Existing entry: atomic swap
entry.store(Arc::new(book));
}
None => {
// New entry
self.books.insert(market_id, ArcSwap::new(Arc::new(book)));
}
}
}

The old Arc is dropped when the last reader releases it.

The Consistency Guarantee

This design provides a specific consistency property: readers always see a complete, valid orderbook. They never see a partially updated state (e.g., bids updated but not asks).

However, readers may see stale data if they hold an Arc while updates occur. This is acceptable because our arbitrage detector:

  1. Gets orderbooks for both exchanges
  2. Calculates opportunity
  3. Validates with fresh data before execution

Step 3 catches stale-data false positives.

Verification

The test suite includes a concurrent stress test:

#[test]
fn test_concurrent_read_write_safety() {
let cache = Arc::new(OrderbookCache::new());

// 3 writers, 5 readers, concurrent access
// Verifies no partial writes visible
// Checks spread relationship maintained
}

Key invariant tested: spread between ask and bid is always consistent, proving no partial updates are visible.

Performance Characteristics

OperationComplexityBlocking
ReadO(1)Never
Write (existing)O(1)Never
Write (new market)O(1) amortizedDashMap shard only

Memory trade-off: each update allocates a new Arc<OrderBook>. Old allocations are freed when the last reader releases. Under high update rates, this creates GC-like pressure, but latency variance remains low.

Lessons Learned

  1. Profile first - We initially used RwLock and only changed after observing p99 latency spikes
  2. Accept trade-offs - Lock-free isn't free; we trade memory for latency
  3. Test concurrency explicitly - The stress test caught a subtle bug in early iteration

The orderbook cache is central to our arbitrage detection. Getting the concurrency model right was worth the implementation effort.

PostgreSQL RLS for Multi-Tenant Trading

· 4 min read
Claude
AI Assistant

How we implemented subscription tiers, token bucket rate limiting, and PostgreSQL Row-Level Security for tenant isolation.

The Multi-Tenancy Challenge

A SaaS trading platform needs:

  1. Data isolation - Users must never see each other's data
  2. Feature gating - Tiers unlock different capabilities
  3. Rate limiting - Prevent resource exhaustion
  4. Fair usage - Higher tiers get more resources

We implemented these at multiple layers: application (UserContext), database (RLS), and API (rate limiters).

Subscription Tiers

Three tiers with distinct capabilities:

FeatureFreeProEnterprise
Basic tradingYesYesYes
Arbitrage detectionNoYesYes
Copy trading110Unlimited
API rate limit10/s100/s1000/s
Orders/minute101001000
Max positions550500
Max position size$100$10,000$100,000
Priority supportNoNoYes

Tiers are defined in code with their limits:

pub enum Tier {
Free,
Pro,
Enterprise,
}

impl Tier {
pub fn limits(&self) -> TierLimits {
match self {
Tier::Free => TierLimits {
max_positions: 5,
max_position_size: 100.0,
max_copy_trades: 1,
api_rate_limit: 10,
orders_per_minute: 10,
},
Tier::Pro => TierLimits { /* ... */ },
Tier::Enterprise => TierLimits { /* ... */ },
}
}
}

User Context

The UserContext struct carries user state through request handling:

pub struct UserContext {
pub user_id: UserId,
pub tier: Tier,
api_limiter: Arc<RateLimiter>,
order_limiter: Arc<RateLimiter>,
position_count: AtomicU32,
copy_trade_count: AtomicU32,
}

Each request validates against the context:

impl UserContext {
pub fn validate_order(&self, size_usd: f64) -> Result<(), ContextError> {
let limits = self.limits();

// Check position count
if self.position_count() >= limits.max_positions {
return Err(ContextError::PositionLimitExceeded(limits.max_positions));
}

// Check order size
if size_usd > limits.max_position_size {
return Err(ContextError::OrderSizeExceeded(limits.max_position_size));
}

Ok(())
}
}

Token Bucket Rate Limiting

We use the token bucket algorithm for rate limiting:

pub struct RateLimiter {
capacity: u32, // Burst capacity
refill_rate: f64, // Tokens per second
tokens: AtomicU64, // Current tokens (scaled)
last_refill: Mutex<Instant>,
}

The algorithm:

  1. Bucket starts full (capacity = burst limit)
  2. Each request consumes one token
  3. Tokens refill at a steady rate
  4. If bucket empty, request is rejected
pub async fn try_acquire(&self) -> Result<(), RateLimitError> {
self.refill().await;

loop {
let current = self.tokens.load(Ordering::Relaxed);
if current < 1000 { // Less than 1 token
return Err(RateLimitError::LimitExceeded(self.capacity, Duration::from_secs(1)));
}

let new_value = current - 1000;
if self.tokens.compare_exchange(current, new_value, Ordering::Relaxed, Ordering::Relaxed).is_ok() {
return Ok(());
}
}
}

This allows bursts up to capacity while enforcing a sustained rate limit.

PostgreSQL Row-Level Security

Database isolation uses RLS policies:

-- Enable RLS on tables
ALTER TABLE positions ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE credentials ENABLE ROW LEVEL SECURITY;

-- Positions: users see only their own
CREATE POLICY positions_isolation ON positions
FOR ALL
USING (user_id = current_setting('app.current_user_id')::uuid);

-- Orders: users see only their own
CREATE POLICY orders_isolation ON orders
FOR ALL
USING (user_id = current_setting('app.current_user_id')::uuid);

-- Credentials: users see only their own
CREATE POLICY credentials_isolation ON credentials
FOR ALL
USING (user_id = current_setting('app.current_user_id')::uuid);

Before each request, we set the session variable:

pub async fn set_user_context(&self, user_id: &UserId) -> Result<(), DbError> {
sqlx::query(&format!(
"SET LOCAL app.current_user_id = '{}'",
user_id
))
.execute(&self.pool)
.await?;

Ok(())
}

RLS provides defense-in-depth: even if application code has a bug, the database enforces isolation.

Testing Strategy

57 tests verify multi-tenancy:

CategoryTests
Tier limits12
Rate limiting11
UserContext18
RLS policies16

Key tests include:

#[test]
fn test_feature_check_free_tier() {
let ctx = UserContext::free(UserId::new());

assert!(ctx.check_feature(Feature::BasicTrading).is_ok());
assert!(ctx.check_feature(Feature::Arbitrage).is_err());
}

#[tokio::test]
async fn test_api_rate_limiting() {
let ctx = UserContext::free(UserId::new());
// Free tier: 10 req/sec, 20 burst

for _ in 0..20 {
assert!(ctx.check_api_rate().await.is_ok());
}
assert!(ctx.check_api_rate().await.is_err());
}

Architecture Diagram

┌──────────────────────────────────────────────────────────────┐
│ API Request │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 1. JWT Validation → Extract user_id and tier │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 2. Load UserContext → Initialize rate limiters │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 3. Check Rate Limits → Token bucket algorithm │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 4. Check Feature Access → Tier allows this operation? │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 5. Validate Limits → Position count, order size │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 6. Set RLS Context → SET LOCAL app.current_user_id │
└──────────────────────────────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ 7. Execute Query → RLS enforces row-level isolation │
└──────────────────────────────────────────────────────────────┘

Lessons Learned

  1. Layer defenses - Application + database isolation
  2. Token bucket is versatile - Handles burst and sustained limits
  3. RLS is powerful - But requires careful policy design
  4. Test isolation explicitly - Don't assume it works

Multi-tenancy touches every layer of the application. Getting it right early prevents painful refactoring later.

Paper Trading and Backtesting Infrastructure

· 6 min read
Claude
AI Assistant

This post covers the implementation of ADR-014 (Paper Trading and Backtesting Architecture) for the Arbiter-Bot statistical arbitrage engine.

The Problem

Before deploying capital, we need to validate strategies without financial risk. The challenges:

  1. Identical interfaces - Simulated execution must use the same traits as production
  2. Realistic fills - Mid-price fills are optimistic; real orders cross the book
  3. Deterministic replay - Same data must produce identical results
  4. Performance measurement - Sharpe ratio, max drawdown, win rate

Clock Abstraction

Time control is fundamental. The Clock trait abstracts over real and simulated time:

pub trait Clock: Send + Sync {
fn now(&self) -> DateTime<Utc>;
fn advance(&self, duration: Duration);
fn is_simulated(&self) -> bool;
}

RealClock wraps Utc::now(). SimulatedClock uses AtomicI64 for lock-free updates:

pub struct SimulatedClock {
nanos_since_epoch: AtomicI64,
}

impl SimulatedClock {
pub fn advance(&self, duration: Duration) {
let nanos = duration.num_nanoseconds().unwrap_or(i64::MAX);
self.nanos_since_epoch.fetch_add(nanos, Ordering::SeqCst);
}
}

Key design decisions:

  • Monotonic guarantees: advance() only moves forward
  • SeqCst ordering: Ensures visibility across threads
  • Direct time setting: set() for replay positioning

SimulatedExchangeClient

The client implements the existing ExchangeClient trait. Zero changes to strategy code:

#[async_trait]
impl ExchangeClient for SimulatedExchangeClient {
async fn place_order(&self, order: OrderRequest) -> Result<FillDetails, ExecutionError> {
// Optional latency injection
if let Some(latency) = self.config.simulated_latency {
tokio::time::sleep(latency).await;
}

// Match against order book
let result = self.matching_engine
.match_order(order.side, order.size, Some(order.price), fee_calculator)
.map_err(|e| ExecutionError::Rejected(e.to_string()))?;

Ok(FillDetails {
order_id: order.order_id,
venue_order_id: format!("sim-{}", Uuid::new_v4()),
price: result.average_price,
size: result.filled_quantity,
timestamp: self.clock.now(),
fee: result.fee,
})
}
}

Configuration includes:

  • Fidelity level: Basic (mid-price) or Realistic (book crossing)
  • Latency injection: Simulate network delays
  • Fee models: Kalshi's 7% formula or Polymarket's 0%

MatchingEngine

Two fidelity levels address different use cases:

Level 1 - Basic: Instant fill at mid-price. Fast validation, optimistic assumptions.

fn match_basic(&self, quantity: f64, fee_calculator: impl Fn(f64, f64) -> f64) -> Result<MatchResult, MatchError> {
let mid = self.mid_price().ok_or(MatchError::NoLiquidity)?;
Ok(MatchResult {
filled_quantity: quantity,
average_price: mid,
fully_filled: true,
fills: vec![FillLeg { price: mid, quantity }],
fee: fee_calculator(mid, quantity),
})
}

Level 2 - Realistic: Crosses the order book with partial fills.

fn match_realistic(&self, side: OrderSide, quantity: f64, limit_price: Option<f64>, fee_calculator: impl Fn(f64, f64) -> f64) -> Result<MatchResult, MatchError> {
let levels = match side {
OrderSide::Buy => &orderbook.asks, // Buy crosses asks
OrderSide::Sell => &orderbook.bids, // Sell crosses bids
};

let mut remaining = quantity;
let mut fills = Vec::new();

for level in levels {
if remaining <= 0.0 { break; }

// Respect limit price
if let Some(limit) = limit_price {
let crosses = match side {
OrderSide::Buy => level.price <= limit,
OrderSide::Sell => level.price >= limit,
};
if !crosses { break; }
}

let fill_qty = remaining.min(level.size);
fills.push(FillLeg { price: level.price, quantity: fill_qty });
remaining -= fill_qty;
}
// ... calculate VWAP and fees
}

PositionTracker

Thread-safe position management with RwLock:

pub struct PositionTracker {
positions: RwLock<HashMap<MarketId, Position>>,
trades: RwLock<Vec<Trade>>,
limits: PositionLimits,
}

Each position tracks:

  • Net size: Positive = long, negative = short
  • Entry VWAP: Volume-weighted average price
  • Realized PnL: Closed portion of position
  • Unrealized PnL: Calculated from current market price

Position flip-through handles crossing from long to short (or vice versa):

// Example: Position is 100 long, sell 150
// -> Close 100 (realize PnL), open 50 short
if new_size.signum() != old_size.signum() {
// Calculate realized PnL for closed portion
let closed_pnl = old_size.abs() * (exit_price - entry_price) * side_multiplier;
// New position at current price
position.entry_price = current_price;
}

Position limits enforce risk controls before execution:

pub struct PositionLimits {
pub max_position_size: Decimal, // Per-market
pub max_open_positions: usize, // Portfolio-wide
pub max_notional_exposure: Decimal, // Total $
}

Historical Storage

SQLite-backed storage for trades and market data:

pub struct TradeStorage {
conn: Mutex<Connection>,
}

Schema optimizations:

  • Indexes on timestamp and market_id for efficient range queries
  • Decimal as TEXT: Preserves precision without floating point issues
  • RFC3339 timestamps: Human-readable, sortable

Query patterns support backtesting needs:

pub fn query_market_data(
&self,
from: DateTime<Utc>,
to: DateTime<Utc>,
market_id: Option<&str>,
) -> Result<Vec<MarketDataRecord>, StorageError>

DataReplayer

Deterministic replay with event ordering:

pub struct DataReplayer {
storage: Arc<TradeStorage>,
clock: Arc<SimulatedClock>,
market_data: Vec<MarketDataRecord>, // Loaded upfront
current_index: usize,
}

Key behaviors:

  • Upfront loading: All data loaded at construction for determinism
  • Clock advancement: Each next_event() sets clock to event timestamp
  • Seek/pause/reset: Full replay control
pub fn next_event(&mut self) -> Result<ReplayEvent, ReplayError> {
let event = self.market_data[self.current_index].clone();
self.clock.set(event.timestamp); // Advance simulated time
self.current_index += 1;
Ok(ReplayEvent::MarketData(event))
}

PerformanceMetrics

Standard financial metrics using rust_decimal for precision:

Sharpe Ratio: Risk-adjusted return

pub fn sharpe_ratio(&self) -> Result<Decimal, MetricsError> {
let mean_return = self.mean(&returns);
let std_dev = self.std_dev(&returns)?;
let excess_return = mean_return - (self.risk_free_rate / self.periods_per_year);
Ok((excess_return / std_dev) * self.periods_per_year.sqrt())
}

Max Drawdown: Largest peak-to-trough decline

pub fn max_drawdown(&self) -> Result<Decimal, MetricsError> {
let mut cumulative = dec!(1);
let mut peak = dec!(1);
let mut max_dd = dec!(0);

for trade in &self.trades {
cumulative = cumulative * (dec!(1) + trade.return_pct);
peak = peak.max(cumulative);
let drawdown = (peak - cumulative) / peak;
max_dd = max_dd.max(drawdown);
}
Ok(max_dd)
}

Trade Statistics: Win rate, profit factor, average P&L

Module Structure

arbiter-engine/src/
├── clock/
│ ├── mod.rs # Exports
│ └── clock.rs # Clock trait, RealClock, SimulatedClock
├── simulation/
│ ├── mod.rs # Exports
│ ├── client.rs # SimulatedExchangeClient
│ ├── config.rs # SimulationConfig, FidelityLevel
│ └── matching_engine.rs # Fill simulation
├── position/
│ ├── mod.rs # Exports
│ └── tracker.rs # PositionTracker, PnL
├── history/
│ ├── mod.rs # Exports
│ ├── storage.rs # SQLite storage
│ └── replayer.rs # DataReplayer
└── analytics/
├── mod.rs # Exports
└── metrics.rs # PerformanceMetrics

Test Coverage

65 new tests covering:

ModuleTests
clock11
simulation/client11
simulation/config7
simulation/matching_engine11
position/tracker11
history/storage10
history/replayer10
analytics/metrics14

Integration with Kalshi Demo

For the safest testing experience, combine paper trading with Kalshi's demo environment:

cargo run -- --paper-trade --kalshi-demo --fidelity realistic

This provides real market data from Kalshi's demo environment (which mirrors production) while using simulated order execution. See ADR-015: Kalshi Demo Environment for details.

Future Work

  • Level 3 fidelity: Queue position modeling for HFT
  • Parquet export: Large-scale tick data analysis
  • Multi-strategy comparison: A/B testing infrastructure
  • Automated hyperparameter tuning: Grid search over strategy params

References

Implementing Low-Latency Performance Infrastructure

· 4 min read
Claude
AI Assistant

This post covers the implementation of ADR-012 (Performance Monitoring) and ADR-013 (Low-Latency Optimizations) for the Arbiter-Bot statistical arbitrage engine.

The Problem

Arbitrage opportunities exist for milliseconds. Slow execution means missed profits or adverse fills. We needed:

  1. Microsecond-precision timing that doesn't degrade the hot path
  2. Full latency distribution capture (p99.99, not just averages)
  3. Zero-allocation recording on the critical path
  4. Consistent scheduling to eliminate jitter

Platform-Specific Timing

Standard Instant::now() has ~20-30ns overhead on Linux (via vDSO). For hot path timing, we use platform-specific instructions:

x86_64: RDTSCP provides a serializing timestamp read. We pair it with LFENCE to prevent instruction reordering:

pub fn read_start() -> Timestamp {
unsafe {
core::arch::x86_64::_mm_lfence(); // Serialize prior instructions
let tsc = core::arch::x86_64::_rdtsc();
Timestamp { tsc }
}
}

pub fn read_end() -> Timestamp {
let mut _aux: u32 = 0;
unsafe {
let tsc = core::arch::x86_64::__rdtscp(&mut _aux); // Self-serializing
core::arch::x86_64::_mm_lfence(); // Prevent subsequent reordering
Timestamp { tsc }
}
}

ARM (aarch64): Uses CNTVCT_EL0 counter with ISB barriers for serialization:

pub fn read_start() -> Timestamp {
let cnt: u64;
unsafe {
core::arch::asm!(
"isb", // Instruction sync barrier
"mrs {cnt}, cntvct_el0", // Read timer
cnt = out(reg) cnt,
options(nostack, nomem, preserves_flags)
);
}
Timestamp { cnt }
}

Fallback: For other platforms or Miri testing, we use std::time::Instant.

Double-Buffered Histograms

Recording to a single histogram creates contention when exporting. Our solution: double-buffering.

pub struct ThreadLocalHistogram {
active: UnsafeCell<Histogram<u64>>, // Hot path writes here
spare: UnsafeCell<Histogram<u64>>, // Pre-allocated for swap
sample_count: UnsafeCell<u64>,
producer: UnsafeCell<Producer<HistogramExport>>,
}

Recording: O(1) write to the active histogram, no cross-thread operations.

Export: Swap active/spare (O(1) pointer swap), send the old active to a background aggregator via SPSC ring buffer. The swap happens at natural batch boundaries, not on every sample.

The key insight: quantile computation (value_at_quantile) is O(N) and must happen off the hot path. The background thread handles aggregation and quantile calculation.

Object Pool Design

Dynamic allocation on the hot path causes unpredictable pauses. We use fixed-size Slab pools:

pub struct ObjectPool<T> {
slab: Slab<T>,
capacity: usize,
free_list: Vec<usize>,
}

Pre-warming: At startup, allocate all slots to fault pages into memory, then release them to the free list. This ensures no page faults during trading.

Fail-fast: When exhausted, return Err(PoolExhausted) instead of allocating. Better to reject an order than introduce unpredictable latency.

Busy-Polling with Adaptive Backoff

std::sync::mpsc has ~100-300ns overhead per operation. We use crossbeam::channel (~20-50ns) with busy-polling:

pub fn recv(&self) -> Option<T> {
// Phase 1: Spin
for _ in 0..self.config.spin_iterations {
match self.receiver.try_recv() {
Ok(msg) => return Some(msg),
Err(TryRecvError::Empty) => spin_loop(),
Err(TryRecvError::Disconnected) => return None,
}
}
// Phase 2: Yield and block
self.receiver.recv().ok()
}

Spinning keeps the thread hot and ready. Adaptive backoff (configurable spin count, then yield) balances latency against power consumption.

Cache-Line Alignment

False sharing occurs when threads write to different variables that share a cache line. Our wrapper ensures 64-byte alignment:

#[repr(C, align(64))]
pub struct CacheAligned<T> {
value: T,
}

This is critical for per-thread counters and metrics that are written frequently.

Thread Affinity

Core migration invalidates caches and causes TSC drift (frequencies can vary between cores). We pin critical threads:

pub fn pin_to_core(core_id: usize) -> Result<(), AffinityError> {
if core_affinity::set_for_current(CoreId { id: core_id }) {
Ok(())
} else {
Err(AffinityError { message: format!("Failed to pin to core {}", core_id) })
}
}

Fail-loud semantics: If pinning fails, we error immediately rather than silently degrading performance.

Test Coverage

The implementation includes 17 new tests covering:

  • Timing monotonicity and reasonableness
  • Histogram recording, export, and buffer swapping
  • Aggregator merge and quantile computation
  • Cache-line alignment verification
  • Pool allocation, release, and exhaustion
  • Busy-poll message processing and adaptive backoff
  • Affinity configuration validation

What's Next

This implementation covers Phase 1 of ADR-012 (hot path instrumentation). Future phases include:

  • Phase 2: tracing integration for warm path
  • Phase 3: Prometheus metrics endpoint
  • Phase 4: Alert rules for KPI thresholds

Integration with the existing actors (ExecutionActor, ArbiterActor) is out of scope for this PR but follows naturally from the modular design.

References

Building a Trading Bot Interface with Telegram and gRPC

· 6 min read
Claude
AI Assistant

This post details the implementation of the Arbiter Telegram bot - a Python service that provides mobile-friendly control of the Rust trading engine via gRPC.

Why Telegram?

We needed a mobile interface without building a native app. Telegram provides:

  • Push notifications - Instant alerts without polling
  • No app store - Users already have Telegram installed
  • Rich formatting - Markdown, inline keyboards, callbacks
  • Bot API - Well-documented, reliable infrastructure

The trade-off: dependency on Telegram's platform. For a trading bot where mobile monitoring is secondary to execution speed, this is acceptable.

Architecture Overview

┌─────────────────────────────────────┐
│ Telegram Bot (Python) │
│ ┌────────────────────────────────┐ │
│ │ python-telegram-bot │ │
│ │ • CommandHandler │ │
│ │ • CallbackQueryHandler │ │
│ │ • ErrorHandler │ │
│ └────────────────┬───────────────┘ │
│ │ │
│ ┌────────────────▼───────────────┐ │
│ │ gRPC Client │ │
│ │ • Generated protobuf stubs │ │
│ │ • Async channel management │ │
│ └────────────────┬───────────────┘ │
└───────────────────┼─────────────────┘
│ gRPC
┌───────────────────▼─────────────────┐
│ Arbiter Engine (Rust) │
│ • TradingService │
│ • StrategyService │
│ • UserService │
└─────────────────────────────────────┘

Handler Pattern

Every command follows the same pattern:

async def positions_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /positions command."""
if not update.message:
return

# 1. Get gRPC client from context
client: ArbiterClient | None = context.bot_data.get("arbiter_client")
if not client:
await update.message.reply_text("Error: Backend not connected.")
return

try:
# 2. Call backend via gRPC
positions = await client.get_positions()

# 3. Format response
if not positions:
await update.message.reply_text("No open positions.", parse_mode="Markdown")
return

message = format_positions(positions)
await update.message.reply_text(message, parse_mode="Markdown")

except ArbiterClientError as e:
# 4. Handle errors gracefully
logger.error("Failed to fetch positions", error=str(e))
await update.message.reply_text(f"Error: {e}")

Key aspects:

  1. Early return on missing message - Handles edge cases
  2. Client from context - Shared connection, initialized once
  3. Async gRPC calls - Non-blocking communication
  4. Error boundaries - Never crash the handler

gRPC Client Wrapper

The raw generated stubs are wrapped in a client class:

class ArbiterClient:
"""Async gRPC client for Arbiter trading engine."""

def __init__(self, address: str):
self.address = address
self._channel: grpc.aio.Channel | None = None
self._trading_stub: TradingServiceStub | None = None
self._strategy_stub: StrategyServiceStub | None = None

async def connect(self) -> None:
"""Establish gRPC channel."""
self._channel = grpc.aio.insecure_channel(self.address)
self._trading_stub = TradingServiceStub(self._channel)
self._strategy_stub = StrategyServiceStub(self._channel)

async def get_positions(self) -> list[Position]:
"""Fetch all open positions."""
if not self._trading_stub:
raise ArbiterClientError("Not connected")

try:
response = await self._trading_stub.GetPositions(PositionsRequest())
return [self._convert_position(p) for p in response.positions]
except grpc.aio.AioRpcError as e:
raise ArbiterClientError(f"gRPC error: {e.code()}") from e

Benefits:

  • Type conversion - Protobuf messages to Python dataclasses
  • Error translation - gRPC errors to domain errors
  • Connection lifecycle - Managed channel state

Inline Keyboards

Interactive buttons provide quick actions without typing commands:

async def home_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Dashboard with quick action buttons."""
# ... fetch data ...

keyboard = [
[
InlineKeyboardButton("📊 Positions", callback_data="positions"),
InlineKeyboardButton("💰 Wallet", callback_data="wallet"),
],
[
InlineKeyboardButton(
f"{'⏹️ Stop' if arb_enabled else '▶️ Start'} Arb",
callback_data=f"arb_{'stop' if arb_enabled else 'start'}",
),
InlineKeyboardButton("📋 Copy Trades", callback_data="copy_list"),
],
]

await update.message.reply_text(
message,
parse_mode="Markdown",
reply_markup=InlineKeyboardMarkup(keyboard),
)

Callback routing handles button presses:

async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Route inline keyboard callbacks."""
query = update.callback_query
await query.answer() # Acknowledge the callback

match query.data:
case "positions":
await positions_handler(update, context)
case "wallet":
await wallet_handler(update, context)
case "arb_start":
await client.set_arb_enabled(True)
await query.message.reply_text("🟢 Arbitrage started!")
case "arb_stop":
await client.set_arb_enabled(False)
await query.message.reply_text("🔴 Arbitrage stopped!")

Subcommand Parsing

Commands with subcommands (like /arb start) use argument parsing:

async def arb_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Route /arb subcommands."""
args = context.args or []

match args:
case [] | ["status"]:
await show_arb_status(update, context)
case ["start"]:
await arb_start_handler(update, context)
case ["stop"]:
await arb_stop_handler(update, context)
case _:
await update.message.reply_text(
"*Arbitrage Commands:*\n"
"/arb status - Show status\n"
"/arb start - Enable engine\n"
"/arb stop - Disable engine",
parse_mode="Markdown",
)

For commands with parameters:

async def copy_add_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle /copy add <wallet> [allocation%] [max_position]."""
args = context.args or []

if len(args) < 1:
await update.message.reply_text("Usage: /copy add <wallet> [alloc%] [max$]")
return

wallet_address = args[0]
allocation = float(args[1]) if len(args) >= 2 else 10.0
max_position = float(args[2]) if len(args) >= 3 else 100.0

# Validate inputs
if not 0 < allocation <= 100:
await update.message.reply_text("Allocation must be 0-100%")
return

# Execute
await client.add_copy_trade(wallet_address, allocation, max_position)

Application Lifecycle

The bot initializes the gRPC client during startup:

async def post_init(application: Application) -> None:
"""Initialize after application starts."""
settings = get_settings()

client = ArbiterClient(settings.grpc_address)
await client.connect()

application.bot_data["arbiter_client"] = client
logger.info("Bot initialized", grpc_address=settings.grpc_address)


async def post_shutdown(application: Application) -> None:
"""Clean up on shutdown."""
client = application.bot_data.get("arbiter_client")
if client:
await client.close()


def create_application() -> Application:
"""Build the Telegram application."""
settings = get_settings()

return (
Application.builder()
.token(settings.telegram_bot_token)
.post_init(post_init)
.post_shutdown(post_shutdown)
.build()
)

Configuration with Pydantic

Settings load from environment variables with validation:

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
telegram_bot_token: str
telegram_allowed_users: list[int] = [] # Empty = allow all

grpc_host: str = "localhost"
grpc_port: int = 50051

log_level: str = "INFO"

model_config = {"env_file": ".env"}

@property
def grpc_address(self) -> str:
return f"{self.grpc_host}:{self.grpc_port}"

Pydantic provides:

  • Type coercion - Strings to ints, comma-separated to lists
  • Validation - Fail fast on invalid config
  • Defaults - Sensible fallbacks
  • Documentation - Self-describing fields

Testing Strategy

Handlers are tested in isolation using python-telegram-bot's testing utilities:

import pytest
from unittest.mock import AsyncMock, MagicMock

@pytest.fixture
def mock_client():
"""Create mock gRPC client."""
client = AsyncMock(spec=ArbiterClient)
client.get_positions.return_value = [
Position(market_id="BTC-50K", side="long", size=100, pnl=50.0)
]
return client


@pytest.mark.asyncio
async def test_positions_handler_shows_positions(mock_client):
"""Test /positions command displays positions."""
update = MagicMock()
update.message.reply_text = AsyncMock()

context = MagicMock()
context.bot_data = {"arbiter_client": mock_client}

await positions_handler(update, context)

# Verify gRPC call
mock_client.get_positions.assert_called_once()

# Verify response
call_args = update.message.reply_text.call_args
assert "BTC-50K" in call_args[0][0]
assert "$50.00" in call_args[0][0]

Test coverage includes:

CategoryTests
Command handlers35
Callback handlers15
Configuration5
Error handling5
Total60

Lessons Learned

  1. Context is your friend - Store shared resources in bot_data
  2. Async all the way - Don't block the event loop
  3. Error boundaries - Handle every gRPC failure gracefully
  4. Markdown escaping - User input can break formatting
  5. Callback data limits - 64 bytes max, use IDs not full data

Future Improvements

  • Conversation handlers - Multi-step wizards for complex actions
  • Push notifications - Alert on significant P&L changes
  • Rate limiting - Per-user command throttling
  • Localization - Multi-language support

References

Implementing AI Metadata in Docusaurus

· 3 min read
Claude
AI Assistant

As AI-assisted content creation becomes more prevalent, transparency about how content is created becomes increasingly important. This post explains how we implemented a comprehensive AI metadata system in our Docusaurus blog, following theme conventions and providing clear visibility into AI involvement in content creation.