Welcome to Cosmictron

Cosmictron is a distributed database runtime that executes WebAssembly modules as stored procedures with real-time subscriptions. Build real-time applications without managing separate servers, databases, and pub/sub systems.

Public Alpha

Cosmictron is in public alpha. The core engine is stable and load-tested, but APIs may change before 1.0.

Why Cosmictron?

Traditional backends require you to wire together an application server, a database, an ORM, and a real-time messaging layer. Each boundary introduces latency, complexity, and failure modes.

Cosmictron replaces all of it with a single binary. Your business logic (written in Rust or TypeScript) runs inside the database as WASM modules. When data changes, subscribed clients receive deltas automatically via WebSocket.

Explore the Docs

Quick Start

Get Cosmictron running and deploy your first module in under 5 minutes.

Prerequisites

  • Rust 1.75+ — Install via rustup
  • Bun 1.0+ — For the TypeScript SDK (bun.sh)
  • wasm32 targetrustup target add wasm32-unknown-unknown

1. Clone and Build

Terminal bash
git clone https://github.com/cosmictron/cosmictron.git
cd cosmictron
cargo build --release

2. Start the Server

Terminal bash
cargo run --release -p cosmictron-server

The server starts with:

  • WebSocket Gateway at ws://localhost:8080
  • Metrics Endpoint at http://localhost:9090/metrics

3. Create a Hello World Module

Create a new Rust project for your first Cosmictron module. This simple counter demonstrates the core concepts:

Terminal bash
# Create a new Rust library
mkdir hello-cosmictron
cd hello-cosmictron
cargo init --lib

# Add Cosmictron SDK dependency
cat >> Cargo.toml << 'EOF'
[dependencies]
cosmictron-sdk = { path = "../cosmictron/crates/cosmictron-sdk" }
EOF

Now write your module code with derive macros (no manual serialization needed!):

src/lib.rs rust
use cosmictron_sdk::prelude::*;
use cosmictron_sdk::datastore;
use cosmictron_sdk::query::find_first;

// Derive macros automatically handle BSATN serialization
// Saves ~50 lines of boilerplate code!
#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "counter")]
pub struct Counter {
    #[primary_key]
    pub id: u32,
    pub value: i64,
    pub updated_at: u64,
}

// Initialize the counter when the module loads
#[reducer(init)]
pub fn initialize() {
    // Check if counter exists using Query API
    if find_first("counter", |_c: &Counter| true).is_none() {
        // Create initial counter
        let counter = Counter {
            id: 1,
            value: 0,
            updated_at: now_ms(),
        };
        datastore::insert(&counter).ok();
        info("Counter initialized to 0");
    }
}

// Increment the counter
#[reducer]
pub fn increment() {
    // Find the counter using Query API
    if let Some(mut counter) = find_first("counter", |c: &Counter| c.id == 1) {
        counter.value += 1;
        counter.updated_at = now_ms();
        datastore::update(&counter).ok();
        info(&format!("Counter incremented to {}", counter.value));
    }
}

// Decrement the counter
#[reducer]
pub fn decrement() {
    if let Some(mut counter) = find_first("counter", |c: &Counter| c.id == 1) {
        counter.value -= 1;
        counter.updated_at = now_ms();
        datastore::update(&counter).ok();
        info(&format!("Counter decremented to {}", counter.value));
    }
}

// Export the module
describe_module!(
    tables = [Counter],
    reducers = [initialize, increment, decrement]
);
What's happening here?
  • #[derive(BsatnSerialize, BsatnDeserialize)] — Automatically handles serialization. No boilerplate!
  • #[table(name = "counter")] — Registers this struct as a database table
  • find_first() — Query API for type-safe database lookups
  • datastore::insert() / datastore::update() — Host functions for data persistence
  • now_ms() — Get current timestamp in milliseconds
  • info!() — Log messages visible in server logs

4. Compile to WebAssembly

Terminal bash
cargo build --target wasm32-unknown-unknown --release

5. Deploy

Terminal bash
cosmictron deploy target/wasm32-unknown-unknown/release/counter.wasm

6. Connect from TypeScript

app.ts typescript
import { CosmictronClient } from '@cosmictron/client';

const client = new CosmictronClient('ws://localhost:8080');
await client.connect();

// Call a reducer
await client.callReducer('counter', 'increment', {});

// Subscribe to changes
const sub = await client.subscribe(
  'SELECT * FROM counter',
  (delta) => console.log('Updated:', delta)
);
Tip

Run cargo test --workspace to verify everything is set up correctly. All 132 tests should pass.

Installation

Detailed installation guide for all platforms and package managers.

From Source (Recommended)

Terminal bash
# Clone the repository
git clone https://github.com/cosmictron/cosmictron.git
cd cosmictron

# Build in release mode
cargo build --release

# Install the CLI globally
cargo install --path crates/cosmictron-cli

With Cargo

Terminal bash
cargo install cosmictron

TypeScript SDK

Terminal bash
# Using Bun (recommended)
bun add @cosmictron/client

# Using npm
npm install @cosmictron/client

WASM Target

To compile modules to WebAssembly, you need the wasm32-unknown-unknown target:

Terminal bash
rustup target add wasm32-unknown-unknown

Verify Installation

Terminal bash
# Check server builds
cargo build -p cosmictron-server

# Run all tests
cargo test --workspace

# Check TypeScript SDK
cd clients/typescript && bun test

Tutorial: Build a Chat App

Build a fully-featured real-time chat application with channels, users, and live message streaming. Learn the modern Cosmictron SDK with derive macros and the Query API.

What you'll learn

Derive macros for automatic serialization, the Query API for type-safe database operations, lifecycle hooks for presence tracking, and real-time subscriptions from TypeScript.

Step 1: Define the Schema

Our chat app needs four tables: Users, Channels, ChannelMembers, and Messages. With derive macros, we get automatic BSATN serialization for free.

chat/src/lib.rs rust
use cosmictron_sdk::prelude::*;
use cosmictron_sdk::datastore;
use cosmictron_sdk::query::{find_first, exists};

// Derive macros automatically implement BSATN serialization
// No more manual serialize/deserialize functions!
#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "users")]
pub struct User {
    #[primary_key]
    #[auto_inc]  // ID assigned automatically on insert
    pub id: u64,
    #[unique]
    pub username: String,
    pub display_name: String,
    pub identity: Identity,  // Store caller's identity
    pub created_at: u64,
    pub last_seen: u64,
}

#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "channels")]
pub struct Channel {
    #[primary_key]
    #[auto_inc]
    pub id: u64,
    #[unique]
    pub name: String,
    pub description: String,
    pub created_by: u64,
    pub is_private: bool,
}

#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "channel_members")]
pub struct ChannelMember {
    #[primary_key]
    #[auto_inc]
    pub id: u64,
    #[index]
    pub channel_id: u64,
    #[index]
    pub user_id: u64,
    pub joined_at: u64,
}

#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "messages", event)]
pub struct Message {
    #[primary_key]
    #[auto_inc]
    pub id: u64,
    #[index]
    pub channel_id: u64,
    pub user_id: u64,
    pub content: String,
    pub timestamp: u64,
    pub edited: bool,
}

Step 2: Helper Functions with Query API

The Query API provides type-safe database operations. No more manual table scanning and byte parsing!

chat/src/lib.rs (continued) rust
// Find user by their identity (32-byte public key)
fn find_user_by_identity(identity: &Identity) -> Option<User> {
    find_first("users", |u: &User| u.identity == *identity)
}

// Find channel by name
fn find_channel_by_name(name: &str) -> Option<Channel> {
    find_first("channels", |c: &Channel| c.name == name)
}

// Check if user is a member of a channel
fn is_channel_member(channel_id: u64, user_id: u64) -> bool {
    exists("channel_members", |m: &ChannelMember| {
        m.channel_id == channel_id && m.user_id == user_id
    })
}

Step 3: Write Reducers

Reducers are the functions that modify your data. Each call is an atomic transaction. Use the SDK helper functions and proper error handling.

chat/src/lib.rs (continued) rust
#[reducer]
pub fn register_user(username: String, display_name: String) -> Result<u64, String> {
    // Validate input
    if username.is_empty() || username.len() > 32 {
        return Err("Username must be 1-32 characters".into());
    }
    
    // Check if username already exists using Query API
    if find_channel_by_name(&username).is_some() {
        return Err("Username already taken".into());
    }
    
    // Check if user already registered with this identity
    let identity = caller_identity();
    if let Some(user) = find_user_by_identity(&identity) {
        return Ok(user.id);
    }

    // Create new user - ID will be auto-assigned
    let now = now_ms();
    let user = User {
        id: 0,  // auto_inc will assign this
        username: username.clone(),
        display_name,
        identity,
        created_at: now,
        last_seen: now,
    };
    
    let id = datastore::insert(&user)
        .map_err(|_| "Failed to create user")?;
    
    info(&format!("User '{}' registered with id {}", username, id));
    Ok(id)
}

#[reducer]
pub fn create_channel(name: String, description: String) -> Result<u64, String> {
    // Validate
    if name.is_empty() || name.len() > 48 {
        return Err("Channel name must be 1-48 characters".into());
    }
    
    // Check for duplicate name
    if find_channel_by_name(&name).is_some() {
        return Err("Channel name already exists".into());
    }
    
    // Get current user
    let identity = caller_identity();
    let user = find_user_by_identity(&identity)
        .ok_or("User not registered")?;

    // Create channel
    let channel = Channel {
        id: 0,
        name: name.clone(),
        description,
        created_by: user.id,
        is_private: false,
    };
    
    let channel_id = datastore::insert(&channel)
        .map_err(|_| "Failed to create channel")?;
    
    // Add creator as first member
    let member = ChannelMember {
        id: 0,
        channel_id,
        user_id: user.id,
        joined_at: now_ms(),
    };
    datastore::insert(&member).ok();
    
    info(&format!("Channel '{}' created", name));
    Ok(channel_id)
}

#[reducer]
pub fn send_message(channel_id: u64, content: String) -> Result<u64, String> {
    // Validate
    if content.trim().is_empty() {
        return Err("Message cannot be empty".into());
    }
    
    // Get current user
    let identity = caller_identity();
    let user = find_user_by_identity(&identity)
        .ok_or("User not registered")?;
    
    // Verify channel membership
    if !is_channel_member(channel_id, user.id) {
        return Err("Not a member of this channel".into());
    }

    // Insert message
    let message = Message {
        id: 0,
        channel_id,
        user_id: user.id,
        content: content.clone(),
        timestamp: now_ms(),
        edited: false,
    };
    
    let msg_id = datastore::insert(&message)
        .map_err(|_| "Failed to send message")?;
    
    Ok(msg_id)
}

Step 4: Add Lifecycle Hooks

Track user presence with connect/disconnect hooks. These fire automatically when clients connect or disconnect.

chat/src/lib.rs (continued) rust
// Called when module is first loaded - perfect for seeding data
#[reducer(init)]
pub fn initialize() {
    info("Chat module initialized");
    
    // Create default "general" channel if it doesn't exist
    if find_channel_by_name("general").is_none() {
        let channel = Channel {
            id: 1,  // Fixed ID for default channel
            name: "general".into(),
            description: "General discussion".into(),
            created_by: 0,
            is_private: false,
        };
        datastore::insert(&channel).ok();
        info("Created default 'general' channel");
    }
}

#[reducer(client_connected)]
pub fn on_client_connected() {
    let identity = caller_identity();
    info(&format!("Client {} connected", 
        identity_abbr(&identity)));
    
    // Update last_seen for the connected user
    if let Some(mut user) = find_user_by_identity(&identity) {
        user.last_seen = now_ms();
        datastore::update(&user).ok();
    }
}

#[reducer(client_disconnected)]
pub fn on_client_disconnected() {
    let identity = caller_identity();
    info(&format!("Client {} disconnected",
        identity_abbr(&identity)));
}

Step 4: Subscribe from the Client

chat-client.ts typescript
import { CosmictronClient } from '@cosmictron/client';

const client = new CosmictronClient('ws://localhost:8080');
await client.connect();

// Subscribe to messages in #general
await client.subscribe(
  'SELECT * FROM messages WHERE channel_id = 1',
  (update) => {
    for (const msg of update.inserts) {
      renderMessage(msg);
    }
  }
);

// Send a message
await client.callReducer('chat', 'send_message', {
  channel_id: 1,
  content: 'Hello, Cosmictron!',
});

Step 5: Export the Module

Finally, use the describe_module! macro to export your tables and reducers for the host to discover.

chat/src/lib.rs (end) rust
// Export tables and reducers for the Cosmictron runtime
describe_module!(
    tables = [User, Channel, ChannelMember, Message],
    reducers = [
        initialize,
        register_user,
        create_channel,
        send_message,
        on_client_connected,
        on_client_disconnected
    ]
);

Step 6: Deploy and Test

Terminal bash
# Compile to WASM
cargo build -p chat --target wasm32-unknown-unknown --release

# Deploy
cosmictron deploy target/wasm32-unknown-unknown/release/chat.wasm

# Test with CLI
cosmictron call chat register_user '{"username": "alice", "display_name": "Alice"}'
cosmictron call chat create_channel '{"name": "random", "description": "Random chat"}'
cosmictron call chat send_message '{"channel_id": 1, "content": "Hello, Cosmictron!"}'
cosmictron query "SELECT * FROM messages"
Next Steps

Try adding message editing, channel membership management, and user typing indicators using additional reducers and subscriptions.

Tutorial: Build a Todo App

Learn CRUD operations, priority management, tagging, and real-time sync by building a complete todo application with the modern Cosmictron SDK.

Schema Design

Our todo app uses three tables: Todos, Tags, and TodoTags (for many-to-many relationships). Using derive macros eliminates serialization boilerplate.

todo/src/lib.rs rust
use cosmictron_sdk::prelude::*;
use cosmictron_sdk::datastore;
use cosmictron_sdk::query::{find_first, find_all, exists};

#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "todos")]
pub struct Todo {
    #[primary_key]
    #[auto_inc]
    pub id: u64,
    pub title: String,
    pub description: String,
    pub completed: bool,
    pub priority: u32,      // 1=low, 2=medium, 3=high
    pub created_at: u64,
    pub due_date: Option<u64>,
    #[index]
    pub owner_id: Identity,  // Store identity directly
}

#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "tags")]
pub struct Tag {
    #[primary_key]
    #[auto_inc]
    pub id: u64,
    #[unique]
    pub name: String,
    pub color: String,
}

#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "todo_tags")]
pub struct TodoTag {
    #[primary_key]
    #[auto_inc]
    pub id: u64,
    #[index]
    pub todo_id: u64,
    #[index]
    pub tag_id: u64,
}

Helper Functions

Use the Query API for type-safe database operations:

todo/src/lib.rs (continued) rust
// Find a todo by ID, verifying ownership
fn get_todo(todo_id: u64, owner: &Identity) -> Option<Todo> {
    find_first("todos", |t: &Todo| t.id == todo_id && t.owner_id == *owner)
}

// Get all todos for a user
fn get_user_todos(owner: &Identity) -> Vec<Todo> {
    find_all("todos", |t: &Todo| t.owner_id == *owner)
}

// Check if tag exists by name
fn find_tag_by_name(name: &str) -> Option<Tag> {
    find_first("tags", |t: &Tag| t.name == name)
}

CRUD Reducers

todo/src/lib.rs (continued) rust
#[reducer]
pub fn create_todo(
    title: String, 
    description: String, 
    priority: u32
) -> Result<u64, String> {
    // Validate
    if title.is_empty() {
        return Err("Title is required".into());
    }
    
    let owner = caller_identity();
    
    let todo = Todo {
        id: 0,  // auto_inc assigns this
        title,
        description,
        completed: false,
        priority: priority.clamp(1, 3),
        created_at: now_ms(),
        due_date: None,
        owner_id: owner,
    };
    
    let id = datastore::insert(&todo)
        .map_err(|_| "Failed to create todo")?;
    
    info(&format!("Todo {} created", id));
    Ok(id)
}

#[reducer]
pub fn complete_todo(todo_id: u64) -> Result<<(), String> {
    let owner = caller_identity();
    
    let mut todo = get_todo(todo_id, &owner)
        .ok_or("Todo not found or not owned by you")?;
    
    todo.completed = true;
    datastore::update(&todo)
        .map_err(|_| "Failed to update todo")?;
    
    Ok(())
}

#[reducer]
pub fn delete_todo(todo_id: u64) -> Result<<(), String> {
    let owner = caller_identity();
    
    // Verify ownership before deleting
    let todo = get_todo(todo_id, &owner)
        .ok_or("Todo not found or not owned by you")?;
    
    // Delete the todo
    datastore::delete_by_pk::<Todo>(&todo_id)
        .map_err(|_| "Failed to delete todo")?;
    
    Ok(())
}

#[reducer]
pub fn add_tag_to_todo(
    todo_id: u64, 
    tag_name: String
) -> Result<<(), String> {
    let owner = caller_identity();
    
    // Verify todo ownership
    let _todo = get_todo(todo_id, &owner)
        .ok_or("Todo not found or not owned by you")?;
    
    // Find or create tag
    let tag_id = match find_tag_by_name(&tag_name) {
        Some(tag) => tag.id,
        None => {
            // Create new tag with random color
            let colors = ["red", "blue", "green", "yellow"];
            let tag = Tag {
                id: 0,
                name: tag_name,
                color: colors[(now_ms() % 4) as usize].into(),
            };
            datastore::insert(&tag)
                .map_err(|_| "Failed to create tag")?
        }
    };
    
    // Link tag to todo
    let link = TodoTag {
        id: 0,
        todo_id,
        tag_id,
    };
    datastore::insert(&link)
        .map_err(|_| "Failed to add tag")?;
    
    Ok(())
}

Real-Time Client

todo-client.ts typescript
const client = new CosmictronClient('ws://localhost:8080');
await client.connect();

// Subscribe to incomplete todos (real-time updates)
await client.subscribe(
  'SELECT * FROM todos WHERE completed = false',
  (update) => {
    update.inserts.forEach(todo => addTodoToUI(todo));
    update.deletes.forEach(todo => removeTodoFromUI(todo));
  }
);

// Create a todo
await client.callReducer('todo', 'create_todo', {
  title: 'Learn Cosmictron',
  description: 'Complete the tutorial',
  priority: 3,
});

Your First Module

A complete step-by-step guide to building, testing, and deploying your first Cosmictron module from scratch.

What You'll Build

A simple counter module that tracks per-user counts with increment/decrement operations. This teaches you the essential patterns you'll use in every Cosmictron project.

Step 1: Create the Project

Terminal bash
# Create a new Rust project
cargo new --lib my-counter
cd my-counter

# Add Cosmictron SDK dependency
cargo add cosmictron-sdk

Step 2: Define the Table

Create a table to store per-user counters. Each user gets their own counter tracked by their identity.

src/lib.rs rust
use cosmictron_sdk::prelude::*;

#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "counters")]
pub struct Counter {
    #[primary_key]
    pub owner: Identity,
    pub value: i64,
}
Using Identity as Primary Key

Since each user should only have one counter, we use their Identity as the primary key. This ensures uniqueness and makes lookups efficient.

Step 3: Write the Reducers

Reducers are functions that mutate state. They run inside a transaction, so if they fail, all changes are rolled back.

src/lib.rs rust
/// Initialize a counter for the calling user
#[reducer]
pub fn init() {
    let owner = caller_identity();
    
    // Check if counter already exists
    if find_by_primary_key::(&owner).is_none() {
        let counter = Counter { owner, value: 0 };
        datastore::insert(&counter).expect("insert failed");
        info("Counter initialized");
    }
}

/// Increment the caller's counter
#[reducer]
pub fn increment(amount: i64) {
    let owner = caller_identity();
    
    if let Some(mut counter) = find_by_primary_key::(&owner) {
        counter.value += amount;
        datastore::update(&counter).expect("update failed");
        info(&format!("Counter incremented to {}", counter.value));
    } else {
        panic!("Counter not initialized. Call init() first.");
    }
}

/// Decrement the caller's counter
#[reducer]
pub fn decrement(amount: i64) {
    increment(-amount)  // Reuse increment with negative value
}

Step 4: Add the Export Macro

Every module must end with the describe_module! macro, which exports the schema and reducers.

src/lib.rs rust
describe_module!(
    tables: [Counter],
    reducers: [init, increment, decrement],
);

Step 5: Build and Deploy

Terminal bash
# Build for WASM target
cargo build --target wasm32-unknown-unknown --release

# Deploy to Cosmictron server
cosmictron deploy target/wasm32-unknown-unknown/release/my_counter.wasm --name counter

Step 6: Test with the Client

test.js javascript
import { CosmictronClient } from '@cosmictron/client';

const client = new CosmictronClient('ws://localhost:8080');
await client.connect();

// Subscribe to see your counter
await client.subscribe(
  'SELECT * FROM counters',
  (update) => console.log('Counter update:', update)
);

// Initialize your counter
await client.callReducer('counter', 'init', {});

// Increment it
await client.callReducer('counter', 'increment', { amount: 5 });

// Decrement it
await client.callReducer('counter', 'decrement', { amount: 2 });

What You Learned

  • Tables — Define schema using #[table] with #[derive(BsatnSerialize, BsatnDeserialize)]
  • Reducers — State-changing functions marked with #[reducer]
  • Identity — Each client has a unique Identity accessible via caller_identity()
  • Data Access — Use datastore::insert(), datastore::update(), and find_by_primary_key()
  • Module Export — Use describe_module! to expose tables and reducers
Next Steps

Now try the Chat App tutorial for a more complex example with multiple tables and relationships.

Tables

Tables are defined using the #[table] proc macro on Rust structs. Each struct field becomes a column.

Defining a Table

Example rust
#[table(name = "users", public)]
pub struct User {
    #[primary_key]
    pub id: u64,

    #[unique]
    pub username: String,

    #[index]
    pub created_at: u64,
}

Table Attributes

AttributeRequiredDescription
name = "..."YesSQL table name
publicNoTable is publicly readable via subscriptions
eventNoAppend-only event log (no updates/deletes)

Field Attributes

AttributeDescription
#[primary_key]Primary key (required, one per table)
#[unique]Unique constraint
#[index]Creates a B-tree index for fast lookups
#[auto_inc]Auto-incrementing integer

Supported Types

Fields can use: u8, u16, u32, u64, i8, i16, i32, i64, f32, f64, bool, String, and Option<T> for any of these.

Important

Every table must have exactly one #[primary_key] field. Tables without a primary key will fail to compile.

Reducers

Reducers are functions that modify the database. Every reducer call is wrapped in an atomic transaction.

Defining a Reducer

Example rust
#[reducer]
pub fn create_user(username: String, email: String) {
    // Validate input
    if username.is_empty() {
        panic!("Username is required");
    }

    // Insert into database
    insert(User { id: 0, username, email });
}

Lifecycle Reducers

LifecycleWhen it runs
#[reducer(init)]Once when the module is first deployed
#[reducer(client_connected)]When a client connects via WebSocket
#[reducer(client_disconnected)]When a client disconnects

Transaction Semantics

  • Every reducer call is automatically wrapped in a transaction
  • If the function completes normally, the transaction commits
  • If the function calls panic!(), the transaction rolls back
  • All table inserts, updates, and deletes within a single reducer are atomic

Host Functions

Inside a reducer, you have access to these host functions:

FunctionDescription
identity()Get the 32-byte identity of the calling client
timestamp()Get current Unix timestamp in milliseconds
insert(row)Insert a row into a table
delete(table, pk)Delete a row by primary key
scan(table)Full table scan iterator
index_scan(idx, key)Index point query
log(level, msg)Write a log message

Derive Macros

Use derive macros to automatically implement BSATN serialization for your table structs. This eliminates 100-200 lines of boilerplate per module.

Example rust
#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "products")]
pub struct Product {
    #[primary_key] #[auto_inc]
    pub id: u64,
    pub name: String,
    pub price: u64,
}

// The derive macros automatically implement:
// - BsatnSerialize::serialize() -> Vec
// - BsatnDeserialize::deserialize(bytes) -> Option

Query API

The Query API provides type-safe database operations without manual table scanning and byte parsing.

FunctionDescriptionExample
find_by_pk() Find row by primary key find_by_pk::<Product>("products", 123)
find_first() Find first matching row find_first("users", |u| u.name == "Alice")
find_all() Find all matching rows find_all("products", |p| p.price > 100)
exists() Check if any row matches exists("users", |u| u.email == email)
count() Count matching rows count("orders", |o| o.status == "pending")
scan_all() Get all rows in table scan_all::<Product>("products")

Query API Example

Example rust
use cosmictron_sdk::query::{find_first, exists, find_all};

#[reducer]
pub fn add_to_cart(product_id: u64) {
    // Check if product exists
    let product = find_first("products", |p: &Product| p.id == product_id)
        .expect("Product not found");
    
    // Check if already in cart
    if exists("cart_items", |c: &CartItem| {
        c.product_id == product_id && c.user_id == caller_identity()
    }) {
        panic!("Already in cart");
    }
    
    // Get all items for inventory check
    let items = find_all("cart_items", |c: &CartItem| {
        c.user_id == caller_identity()
    });
}

Lifecycle Reducers

Hook into module and client lifecycle events with special reducer attributes.

AttributeWhen CalledUse Case
#[reducer(init)] When module is first deployed Seed data, create default records
#[reducer(client_connected)] When a client WebSocket connects Update presence, track online users
#[reducer(client_disconnected)] When a client WebSocket disconnects Cleanup, mark offline

Lifecycle Example

Example rust
#[reducer(init)]
pub fn initialize() {
    // Create default channel when module loads
    let channel = Channel {
        id: 1,
        name: "general".to_string(),
        description: "Welcome!".to_string(),
    };
    datastore::insert(&channel);
    info("Created default channel");
}

#[reducer(client_connected)]
pub fn on_connect() {
    let identity = caller_identity();
    info(&format!("User {} connected", 
        identity_abbr(&identity)));
    
    // Update last_seen timestamp
    if let Some(mut user) = find_first("users", |u| u.identity == identity) {
        user.last_seen = now_ms();
        datastore::update(&user);
    }
}

Host Functions SDK

Access runtime capabilities through the host function SDK.

Identity & Time

FunctionReturnsDescription
caller_identity()[u8; 32]32-byte Ed25519 public key of caller
identity_to_hex(&identity)StringConvert identity to hex string
identity_abbr(&identity)StringFirst 8 chars of hex identity
now_ms()u64Current timestamp in milliseconds
get_timestamp()u64Alias for now_ms()

Datastore Operations

FunctionDescription
datastore::insert(&row)Insert a row into its table
datastore::update(&row)Update an existing row
datastore::delete_by_pk::<T>(&pk)Delete row by primary key
datastore::RowIter::new(table_id)Create iterator for table scan

Logging

FunctionLevel
info(msg)Info
warn(msg)Warning
error(msg)Error
debug(msg)Debug
log(level, msg)Any level

Subscriptions

Subscribe to any SQL query and receive incremental deltas in real-time as the underlying data changes.

How Subscriptions Work

  1. Client sends a SQL query via WebSocket
  2. Server evaluates the query and returns the initial result set
  3. Server registers a DBSP circuit for incremental view maintenance
  4. When any transaction modifies matching rows, the server computes the delta
  5. Only the changed rows (inserts + deletes) are sent to the client

Subscribing from TypeScript

Example typescript
// Subscribe to all messages in a channel
await client.subscribe(
  'SELECT * FROM messages WHERE channel_id = 1',
  (update) => {
    // `inserts` = new or updated rows
    update.inserts.forEach(row => {
      console.log('New message:', row.content);
    });

    // `deletes` = removed rows
    update.deletes.forEach(row => {
      console.log('Deleted message:', row.id);
    });
  }
);

// Unsubscribe when done
await sub.unsubscribe();

Best Practices

  • Narrow your queries — Subscribe to WHERE channel_id = 1, not SELECT * FROM messages
  • Use indexed columns in WHERE — Ensures efficient delta computation
  • Unsubscribe when done — Each subscription uses server resources
  • Use LIMIT — Paginate large result sets

Error Handling

Learn how to handle errors gracefully in reducers, validate input, and return meaningful error messages to clients.

Transaction Safety

Reducers run inside ACID transactions. If a reducer panics or returns an error, all changes are rolled back automatically. This ensures data consistency.

Returning Errors from Reducers

The recommended pattern is to return Result<T, String> from reducers. This allows you to return success values or error messages.

Example rust
#[reducer]
pub fn create_user(username: String) -> Result<u64, String> {
    // Validate input
    if username.len() < 3 {
        return Err("Username must be at least 3 characters".to_string());
    }
    
    if username.len() > 20 {
        return Err("Username must be at most 20 characters".to_string());
    }
    
    // Check if username already exists
    if find_user_by_username(&username).is_some() {
        return Err("Username already taken".to_string());
    }
    
    // Create the user
    let user = User {
        id: 0,
        username,
        created_at: now_ms(),
    };
    
    match datastore::insert(&user) {
        Ok(id) => {
            info(&format!("User {} created", id));
            Ok(id)
        }
        Err(e) => Err(format!("Database error: {:?}", e)),
    }
}

Handling Datastore Errors

Datastore operations can fail for various reasons. Always handle these errors appropriately:

Error Handling Patterns rust
// Pattern 1: Propagate errors with ?
let id = datastore::insert(&item)?;

// Pattern 2: Map errors to custom messages
let id = datastore::insert(&item)
    .map_err(|e| format!("Failed to create item: {:?}", e))?;

// Pattern 3: Handle specific cases
match datastore::update(&item) {
    Ok(_) => info("Updated successfully"),
    Err(DatastoreError::NotFound) => return Err("Item not found".into()),
    Err(e) => return Err(format!("Update failed: {:?}", e)),
}

Validation Patterns

Input Sanitization

Validation rust
fn validate_message(text: &str) -> Result<String, String> {
    let trimmed = text.trim();
    
    if trimmed.is_empty() {
        return Err("Message cannot be empty".to_string());
    }
    
    if trimmed.len() > 2000 {
        return Err("Message too long (max 2000 chars)".to_string());
    }
    
    // Check for invalid characters
    if trimmed.chars().any(|c| c.is_control() && c != '\n') {
        return Err("Message contains invalid characters".to_string());
    }
    
    Ok(trimmed.to_string())
}

Authorization Checks

Authorization rust
#[reducer]
pub fn delete_post(post_id: u64) -> Result<(), String> {
    let caller = caller_identity();
    
    // Fetch the post
    let post = find_by_primary_key::<Post>(&post_id)
        .ok_or("Post not found")?;
    
    // Check ownership
    if post.author_id != caller {
        return Err("You can only delete your own posts".to_string());
    }
    
    // Check if post is locked
    if post.locked {
        return Err("Cannot delete a locked post".to_string());
    }
    
    datastore::delete(&post)
        .map_err(|e| format!("Delete failed: {:?}", e))?;
    
    Ok(())
}

Panics vs Results

Prefer returning Result over panicking. Panics abort the transaction but don't give the client useful information.

ScenarioUseExample
Invalid user inputResult::ErrValidation failures
Not foundResult::ErrResource doesn't exist
Permission deniedResult::ErrUnauthorized action
Invariant violationpanic!Programming error (should never happen)
Out of memorypanic!Unrecoverable system error

Error Messages Best Practices

  • Be specific — "User not found" not "Error"
  • Be actionable — "Username must be 3-20 characters" not "Invalid username"
  • Don't leak internals — Don't expose database details to clients
  • Use logging — Log full details with info!, return safe messages

Testing

Write tests for your Cosmictron modules using the built-in test harness. Test reducers, queries, and business logic.

Test Environment

Tests run against an in-memory Cosmictron instance with a fresh database for each test. No external server required.

Writing Unit Tests

Tests use the #[test] attribute and the cosmictron_test helper:

src/lib.rs rust
#[cfg(test)]
mod tests {
    use super::*;
    use cosmictron_sdk::test::*;

    #[test]
    fn test_create_user() {
        // Create a test context with a mock identity
        let ctx = TestContext::new()
            .with_identity(Identity::from_bytes(&[1; 32]));
        
        // Call the reducer
        let result = ctx.call(|| create_user("alice".to_string()));
        
        // Assert success
        assert!(result.is_ok(), "Should create user: {:?}", result);
        let user_id = result.unwrap();
        assert!(user_id > 0, "Should return valid ID");
        
        // Verify user was created
        let user = find_by_primary_key::<User>(&user_id);
        assert!(user.is_some());
        assert_eq!(user.unwrap().username, "alice");
    }

    #[test]
    fn test_duplicate_username_fails() {
        let ctx = TestContext::new()
            .with_identity(Identity::from_bytes(&[1; 32]));
        
        // Create first user
        let _ = ctx.call(|| create_user("bob".to_string())).unwrap();
        
        // Try to create second user with same name (different identity)
        let ctx2 = TestContext::new()
            .with_identity(Identity::from_bytes(&[2; 32]));
        let result = ctx2.call(|| create_user("bob".to_string()));
        
        assert!(result.is_err());
        assert!(result.unwrap_err().contains("already taken"));
    }
}

Testing with Multiple Users

Simulate interactions between multiple users by switching identities:

Multi-User Test rust
#[test]
fn test_chat_between_users() {
    let alice_id = Identity::from_bytes(&[1; 32]);
    let bob_id = Identity::from_bytes(&[2; 32]);
    
    // Alice creates a channel
    let ctx_alice = TestContext::new().with_identity(alice_id);
    let channel_id = ctx_alice
        .call(|| create_channel("general".to_string()))
        .unwrap();
    
    // Bob joins the channel
    let ctx_bob = TestContext::new().with_identity(bob_id);
    ctx_bob.call(|| join_channel(channel_id)).unwrap();
    
    // Bob sends a message
    ctx_bob
        .call(|| send_message(channel_id, "Hello Alice!".to_string()))
        .unwrap();
    
    // Verify message exists
    let messages = find_all::<Message>("messages");
    assert_eq!(messages.len(), 1);
    assert_eq!(messages[0].text, "Hello Alice!");
}

Testing Error Cases

Error Testing rust
#[test]
fn test_unauthorized_action_fails() {
    let ctx = TestContext::new();
    
    // Try to perform action without proper setup
    let result = ctx.call(|| delete_post(999));
    
    assert!(result.is_err());
    assert!(result.unwrap_err().contains("not found"));
}

#[test]
fn test_validation_rejects_invalid_input() {
    let ctx = TestContext::new();
    
    // Test empty username
    let result = ctx.call(|| create_user("".to_string()));
    assert!(result.is_err());
    
    // Test username too long
    let result = ctx.call(|| create_user("a".repeat(100)));
    assert!(result.is_err());
}

Query Testing

Test your Query API functions directly:

Query Tests rust
#[test]
fn test_find_user_by_username() {
    let ctx = TestContext::new();
    
    // Create test users
    ctx.call(|| create_user("alice".to_string())).unwrap();
    ctx.call(|| create_user("bob".to_string())).unwrap();
    
    // Test query
    let user = find_user_by_username("alice");
    assert!(user.is_some());
    assert_eq!(user.unwrap().username, "alice");
    
    // Test non-existent user
    assert!(find_user_by_username("charlie").is_none());
}

Running Tests

Terminal bash
# Run all tests
cargo test

# Run with output
cargo test -- --nocapture

# Run specific test
cargo test test_create_user

# Run tests with logging
RUST_LOG=debug cargo test

Test Best Practices

  • One concept per test — Test one behavior at a time
  • Use descriptive namestest_user_cannot_delete_others_posts
  • Test error cases — Ensure invalid actions fail appropriately
  • Clean state — Each test gets a fresh database automatically
  • Assert on error messages — Verify clients get helpful errors

API Reference

Complete reference for the Rust SDK, host functions, and WebSocket protocol.

WebSocket Protocol

All messages use binary framing:

Frame Format binary
[tag: u8][length: u32 little-endian][payload: bytes]

Client Messages

TagNameDescription
1AuthenticateRequest identity and token
2CallReducerExecute a reducer function
3SubscribeSubscribe to a SQL query
4UnsubscribeStop receiving updates
5QueryOne-time SQL query
6DeployModuleDeploy a WASM module
7ListModulesList deployed modules

Server Messages

TagNameDescription
128AuthSuccessJWT token + identity
129ReducerCallResultcommitted / rolled_back / interrupted
130QueryResultSQL query result rows
131TransactionUpdateSubscription delta (inserts/deletes)
255ErrorError message

Host Functions

Datastore Operations

FunctionSignatureDescription
datastore.insert(table_id, row_ptr, row_len) -> u64Insert a row, returns row ID
datastore.delete_by_pk(table_id, pk_ptr, pk_len) -> i32Delete by primary key
datastore.scan(table_id) -> u64Create full scan iterator
datastore.index_scan_point(index_id, key_ptr, key_len) -> u64Equality lookup on index
datastore.row_count(table_id) -> u64Get table row count

Context Functions

FunctionReturnsDescription
cosm.identity[u8; 32]Caller's 32-byte identity
cosm.timestampu64Unix timestamp (ms)
console.log()Write log message (level 0-4)

CLI Commands

The cosmictron CLI tool provides commands for deploying modules, querying data, and managing your server.

Deploy

Terminal bash
cosmictron deploy path/to/module.wasm

Authentication

Terminal bash
# Generate a new identity and token
cosmictron auth --generate

# Set the token
export COSMICTRON_TOKEN="<your-token>"

Call Reducers

Terminal bash
# Call with arguments
cosmictron call counter increment '{"amount": 5}'

# Call without arguments
cosmictron call my-module init '{}'

Query Data

Terminal bash
# Table format (default)
cosmictron query "SELECT * FROM users"

# JSON format
cosmictron query "SELECT * FROM users" --format json

# CSV format
cosmictron query "SELECT * FROM orders" --format csv > orders.csv

Subscribe

Terminal bash
# Subscribe indefinitely
cosmictron subscribe "SELECT * FROM messages WHERE channel = 'general'"

# Subscribe for 30 seconds
cosmictron subscribe "SELECT * FROM events" --duration 30

List Modules

Terminal bash
# Simple list
cosmictron modules

# Detailed information
cosmictron modules --detailed

TypeScript SDK

The @cosmictron/client package provides a type-safe WebSocket client with auto-reconnection and subscription management.

Installation

Terminal bash
bun add @cosmictron/client
# or: npm install @cosmictron/client

Client API

Usage typescript
import { CosmictronClient } from '@cosmictron/client';

// Create client
const client = new CosmictronClient('ws://localhost:8080', {
  reconnect: true,
  reconnectInterval: 1000,
  timeout: 5000,
});

// Connect
await client.connect();

// Get identity
const identity = client.getIdentity();
const token = client.getToken();

// Call a reducer
const result = await client.callReducer('module', 'reducer', args);

// One-time query
const rows = await client.query('SELECT * FROM users');

// Subscribe to live updates
const sub = await client.subscribe(sql, callback);
await sub.unsubscribe();

// Disconnect
client.disconnect();

Methods

MethodReturnsDescription
connect()Promise<void>Open WebSocket connection
callReducer(module, reducer, args)Promise<ReducerResult>Call a reducer
query(sql)Promise<QueryResult>Execute one-time SQL query
subscribe(sql, callback)Promise<Subscription>Subscribe to live query
getIdentity()IdentityGet client identity
getToken()stringGet auth token
getState()stringConnection state
disconnect()voidClose connection

React Hooks

For React applications, Cosmictron provides hooks that handle connection state, subscriptions, and reducer calls automatically.

Installation

The React hooks are included in @cosmictron/client. Install with: bun add @cosmictron/client

useCosmictronClient

Provides the Cosmictron client instance via React Context. Wrap your app with CosmictronProvider:

App.tsx tsx
import { CosmictronProvider } from '@cosmictron/client/react';

function App() {
  return (
    <CosmictronProvider 
      url="ws://localhost:8080"
      reconnect={true}
      autoConnect={true}
    >
      <ChatApp />
    </CosmictronProvider>
  );
}

useSubscription

Subscribe to live query updates. Automatically manages subscription lifecycle and re-renders when data changes.

MessageList.tsx tsx
import { useSubscription } from '@cosmictron/client/react';

interface Message {
  id: number;
  channel_id: number;
  sender: string;
  text: string;
  sent_at: number;
}

function MessageList({ channelId }: { channelId: number }) {
  const { data: messages, error, isLoading } = useSubscription<Message[]>(
    `SELECT * FROM messages WHERE channel_id = ${channelId} ORDER BY sent_at`
  );

  if (isLoading) return <div>Loading...</div>;
  if (error) return <div>Error: {error.message}</div>;

  return (
    <div className="messages">
      {messages?.map(msg => (
        <div key={msg.id} className="message">
          <span className="sender">{msg.sender}</span>
          <p>{msg.text}</p>
        </div>
      ))}
    </div>
  );
}
Automatic Cleanup

Subscriptions are automatically unsubscribed when the component unmounts. No manual cleanup needed.

useReducer

Call reducers with loading states and error handling built-in.

SendMessage.tsx tsx
import { useReducer } from '@cosmictron/client/react';

function SendMessage({ channelId }: { channelId: number }) {
  const [text, setText] = useState('');
  
  // Create a typed reducer hook
  const { call, isLoading, error } = useReducer<{ text: string }>(
    'chat',
    'send_message'
  );

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    if (!text.trim()) return;

    const result = await call({ channel_id: channelId, text });
    
    if (result.success) {
      setText('');
    } else {
      console.error('Failed to send:', result.error);
    }
  };

  return (
    <form onSubmit={handleSubmit}>
      <input
        value={text}
        onChange={e => setText(e.target.value)}
        placeholder="Type a message..."
        disabled={isLoading}
      />
      <button type="submit" disabled={isLoading}>
        {isLoading ? 'Sending...' : 'Send'}
      </button>
      {error && <span className="error">{error}</span>}
    </form>
  );
}

useQuery

Execute one-time queries with caching and automatic revalidation.

UserProfile.tsx tsx
import { useQuery } from '@cosmictron/client/react';

function UserProfile({ userId }: { userId: number }) {
  // One-time query with 30 second cache
  const { data: user, isLoading, refetch } = useQuery(
    ['user', userId],
    `SELECT * FROM users WHERE id = ${userId}`,
    { staleTime: 30000 }  // 30 second cache
  );

  if (isLoading) return <div>Loading...</div>;

  return (
    <div>
      <h1>{user?.username}</h1>
      <p>Joined: {user?.created_at}</p>
      <button onClick={refetch}>Refresh</button>
    </div>
  );
}

useConnectionState

Monitor WebSocket connection status for UI indicators.

ConnectionStatus.tsx tsx
import { useConnectionState } from '@cosmictron/client/react';

function ConnectionStatus() {
  const { state, isConnected, reconnect } = useConnectionState();

  if (isConnected) {
    return <span className="status connected">🟢 Connected</span>;
  }

  return (
    <div className="status disconnected">
      <span>🔴 {state}</span>
      <button onClick={reconnect}>Reconnect</button>
    </div>
  );
}

Complete Example: Chat App

Putting it all together:

ChatApp.tsx tsx
import { 
  CosmictronProvider, 
  useSubscription, 
  useReducer,
  useConnectionState 
} from '@cosmictron/client/react';

function ChatPage() {
  const [channelId, setChannelId] = useState(1);

  return (
    <div className="chat-app">
      <ConnectionStatus />
      <ChannelList onSelect={setChannelId} />
      <MessageList channelId={channelId} />
      <SendMessage channelId={channelId} />
    </div>
  );
}

// App entry point
export default function App() {
  return (
    <CosmictronProvider url="ws://localhost:8080">
      <ChatPage />
    </CosmictronProvider>
  );
}

Hook API Reference

HookReturnsDescription
useSubscription<T>(sql){ data, error, isLoading }Subscribe to live query updates
useReducer<T>(module, reducer){ call, isLoading, error }Call reducers with loading state
useQuery<T>(key, sql, opts){ data, isLoading, refetch }One-time query with caching
useConnectionState(){ state, isConnected, reconnect }Monitor connection status
useCosmictronClient()CosmictronClientDirect access to client instance

Best Practices

Guidelines for building robust, performant, and secure Cosmictron applications.

Module Design

  • Keep modules focused — One domain per module (auth, chat, orders)
  • Plan your schema first — List entities, relationships, and common queries before writing code
  • Use descriptive namescreate_user_account over do_thing

Schema Design

  • Index strategically — Index fields used in WHERE, JOIN, and ORDER BY clauses
  • Don't over-index — Indexes speed reads but slow writes
  • Use event tables — Mark append-only tables with event for audit logs and message streams
  • Denormalize hot paths — Duplicate data for frequently-queried relationships

Reducer Implementation

  • Validate input early — Check lengths, formats, and ranges before touching the datastore
  • Keep reducers small — Single responsibility per reducer
  • Use panic!() for errors — It rolls back the transaction automatically
  • Log important actions — Use structured log messages for debugging

Security

  • Always verify ownership — Check identity() before mutations
  • Sanitize user input — Enforce length limits and strip dangerous characters
  • Rate limit in reducers — Implement per-user rate limiting for expensive operations

Performance

  • Use indexes — Avoid full table scans on large tables
  • Paginate results — Use LIMIT/OFFSET or cursor-based pagination
  • Batch operations — Use a single reducer for bulk inserts
  • Narrow subscriptions — Subscribe to specific channels, not all messages
  • Monitor WASM fuel — Break large operations into batches if hitting fuel limits

Deployment

  • Build optimized WASM — Use opt-level = "z" and lto = true in release profile
  • Version modules — Deploy counter-v2 alongside counter-v1
  • Test in staging first — Unit tests, integration tests, then staging, then production
  • Monitor after deploy — Watch error rates, latency, and rollback metrics

Schema Migrations

Migrate your module schema between versions without data loss. Cosmictron provides declarative migrations and migration hooks.

Migration Safety

Always backup your data before running migrations in production. Test migrations thoroughly in staging environments first.

How Migrations Work

When you deploy a new module version with a different schema, Cosmictron automatically detects changes and runs migrations:

  1. Compare old and new table schemas
  2. Generate migration steps (add column, rename, etc.)
  3. Execute migrations in a transaction
  4. If migration fails, deployment is aborted

Automatic Migrations

Cosmictron handles these changes automatically:

  • Add column — New columns get default values (null or specified default)
  • Remove column — Data is discarded; irreversible
  • Rename column — Detected by #[migrate] attribute
  • Add index — Built in background, doesn't block
  • Remove index — Instant operation

Custom Migrations with #[migrate]

For complex migrations, use the #[migrate] attribute to specify custom logic:

lib.rs - Migration Example rust
#[derive(BsatnSerialize, BsatnDeserialize)]
#[table(name = "users")]
pub struct User {
    #[primary_key]
    #[auto_inc]
    pub id: u64,
    pub username: String,
    
    // New column with default value
    #[default = "UTC"]
    pub timezone: String,
    
    // Renamed from 'created_at' to 'joined_at'
    #[migrate(from = "created_at")]
    pub joined_at: Timestamp,
}

// Custom migration hook
#[migrate(to = "v2")]
pub fn migrate_v1_to_v2() {
    // Migrate data from old schema to new
    for user in find_all::User("users") {
        // Transform data as needed
        let updated = UserV2 {
            id: user.id,
            username: user.username.to_lowercase(),
            timezone: "UTC".to_string(),
            joined_at: user.created_at,
        };
        datastore::update(&updated).unwrap();
    }
    
    info("Migration v1→v2 complete");
}

Migration Rollback

If a migration fails, Cosmictron automatically rolls back:

  • Schema changes are reverted
  • Data modifications are undone
  • Previous module version remains active

Best Practices

  • Migrate incrementally — One change per deployment
  • Add before remove — Add new columns first, remove old ones later
  • Use default values — New columns should have sensible defaults
  • Test with production data — Copy prod data to staging for testing
  • Keep migrations idempotent — Running twice shouldn't break

Performance

Optimize your Cosmictron modules for throughput, latency, and scalability.

Understanding DBSP Performance

Cosmictron uses DBSP (Database Stream Processing) for incremental subscriptions. This means:

  • Inserts/Updates — O(1) per subscriber for simple queries
  • Joins — Only joined rows are recomputed
  • Aggregations — COUNT/SUM update in O(1), not O(N)
  • Subscriptions — Deltas computed once, broadcast to all subscribers

Indexing Strategies

When to Index

Indexing rust
// Index columns used in WHERE clauses
#[index]
pub user_id: Identity,

// Composite index for multi-column queries
#[index(columns = [user_id, created_at])]
pub workspace_id: u64,

// Unique index
#[index(unique)]
pub email: String,
Query PatternIndex Type
WHERE user_id = ?Single column
WHERE user_id = ? AND created_at > ?Composite (user_id, created_at)
WHERE email = ? (unique)Unique index
ORDER BY created_at DESCIndex on created_at
WHERE content LIKE '%foo%'Full-text (special handling)

Subscription Optimization

Narrow Your Queries

Good vs Bad typescript
// BAD: Subscribes to ALL messages
await client.subscribe('SELECT * FROM messages', callback);

// GOOD: Only messages in this channel
await client.subscribe(
  `SELECT * FROM messages WHERE channel_id = ${channelId}`,
  callback
);

// BEST: Limited + paginated
await client.subscribe(
  `SELECT * FROM messages 
   WHERE channel_id = ${channelId} 
   ORDER BY sent_at DESC 
   LIMIT 100`,
  callback
);

Reducer Performance

Batch Operations

Batching rust
// BAD: N individual inserts
#[reducer]
pub fn import_items_slow(items: Vec<Item>) {
    for item in items {
        datastore::insert(&item).unwrap();  // N operations
    }
}

// GOOD: Single batch operation
#[reducer]
pub fn import_items_fast(items: Vec<Item>) {
    datastore::insert_batch(&items).unwrap();  // 1 operation
}

Memory Management

  • Event tables — Mark append-only tables to enable log compaction
  • TTL — Set time-to-live on ephemeral data
  • Pagination — Don't load entire tables into memory
  • Projection — Select only needed columns

Benchmarking

Benchmark bash
# Run load tests
cargo test --release -p cosmictron-test --test load_test

# Profile your module
wasm-prof my_module.wasm --reducer create_user --count 1000

Production Deployment

Best practices for running Cosmictron in production environments.

Deployment Checklist

Pre-Deployment

  • ☐ Run full test suite (cargo test --workspace)
  • ☐ Test migrations with production-sized data
  • ☐ Benchmark expected load (2x your traffic estimate)
  • ☐ Enable structured logging
  • ☐ Configure monitoring (Prometheus metrics)
  • ☐ Set up alerting (latency, error rate, connections)
  • ☐ Test graceful shutdown
  • ☐ Verify backup strategy

Server Configuration

production.env bash
# Network
COSMICTRON_SERVER=0.0.0.0:8080
COSMICTRON_MAX_CONNECTIONS=10000

# Persistence
COSMICTRON_DATA_DIR=/var/lib/cosmictron
COSMICTRON_SNAPSHOT_INTERVAL=3600

# Observability
COSMICTRON_LOG_LEVEL=info
COSMICTRON_LOG_FORMAT=json
COSMICTRON_METRICS_PORT=9090

# Security
COSMICTRON_RATE_LIMIT_ENABLED=true
COSMICTRON_TOKEN_ROTATION_INTERVAL=3600

# Resources
COSMICTRON_SHUTDOWN_TIMEOUT=60
COSMICTRON_WASM_FUEL_LIMIT=10000000

Monitoring & Alerting

Key Metrics

MetricAlert ThresholdAction
Reducer latency (p99)> 100msOptimize hot reducers
Websocket connections> 80% of maxScale horizontally
Subscription backlog> 1000Check client health
Error rate> 1%Investigate failures
Memory usage> 80%Review memory leaks

Prometheus Rules

alerts.yml yaml
groups:
  - name: cosmictron
    rules:
      - alert: HighReducerLatency
        expr: histogram_quantile(0.99, 
          rate(cosmictron_reducer_duration_us_bucket[5m])) > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High reducer latency detected"
          
      - alert: TooManyConnections
        expr: cosmictron_websocket_connections_active > 8000
        for: 1m
        labels:
          severity: critical

Backup Strategy

Snapshot Backups

Cosmictron creates point-in-time snapshots automatically:

backup.sh bash
#!/bin/bash
BACKUP_DIR=/backup/cosmictron
TIMESTAMP=$(date +%Y%m%d_%H%M%S)

# Trigger snapshot
curl -X POST http://localhost:8080/admin/snapshot

# Copy to backup location
cp -r $COSMICTRON_DATA_DIR/snapshots/latest $BACKUP_DIR/$TIMESTAMP

# Upload to S3
aws s3 sync $BACKUP_DIR/$TIMESTAMP s3://my-backup-bucket/cosmictron/$TIMESTAMP

High Availability

Single Node with Hot Standby

For production, run a primary with a hot standby:

  1. Primary handles all writes and reads
  2. Standby replicates WAL in real-time
  3. Automatic failover on primary failure
  4. Clients reconnect to standby

Disaster Recovery

ScenarioRTORPORecovery
Process crash5s0Auto-restart with replay
Node failure30s< 1sFailover to standby
Data corruption10mHourlyRestore from snapshot
Region outage1hHourlyRestore to new region

Security Hardening

Network Security

  • Run behind a reverse proxy (nginx, Envoy)
  • Use TLS 1.3 for all connections
  • Restrict access to admin endpoints
  • Enable rate limiting

Module Security

  • Sign WASM modules before deployment
  • Review all reducer authorization checks
  • Validate all user input
  • Use RLS policies for defense in depth
Runbook

Create a runbook with common operational procedures: failover, backup restore, module rollback, and emergency shutdown.

Configuration

Configure the Cosmictron server via environment variables.

Environment Variables

VariableDefaultDescription
COSMICTRON_SERVER0.0.0.0:8080Server bind address
COSMICTRON_MAX_CONNECTIONS10000Max concurrent connections
COSMICTRON_METRICS_PORT9090Prometheus metrics port
COSMICTRON_SHUTDOWN_TIMEOUT30Graceful shutdown timeout (seconds)
COSMICTRON_LOG_LEVELinfoLog level (error, warn, info, debug, trace)

Resource Limits

ResourceLimit
Max connections10,000 (configurable)
Per-client rate limit100 msg/sec, burst 200
Connection timeout5 minutes
Reducer timeout60 seconds
WASM fuel limit1,000,000 instructions
Subscription broadcast capacity1,000 updates

Prometheus Metrics

Available at http://localhost:9090/metrics:

MetricTypeDescription
cosmictron_reducer_calls_totalCounterTotal reducer invocations
cosmictron_reducer_duration_usHistogramReducer execution time
cosmictron_websocket_connections_activeGaugeActive connections
cosmictron_subscriptions_activeGaugeActive subscriptions
cosmictron_transactions_committed_totalCounterCommitted transactions
cosmictron_rate_limit_exceeded_totalCounterRate limit hits

Graceful Shutdown

Terminal bash
kill -TERM <pid>

The server will:

  1. Stop accepting new connections
  2. Wait for in-flight requests (30s timeout)
  3. Close active connections
  4. Exit cleanly