Storage Patterns and Conventions¶
This document describes how agentd services persist data to SQLite using SeaORM. Read this before writing any storage code.
Overview¶
agentd uses SeaORM 1.1 with the sqlx-sqlite + runtime-tokio-rustls
feature set. Every service that persists state follows the same pattern:
- Entity - a
DeriveEntityModelstruct that maps directly to a table row. - Migration - a
MigrationTraitimplementation that creates / alters tables. - Storage struct - a
Clone-able wrapper aroundDatabaseConnectionthat exposes typed CRUD methods to the rest of the crate.
The workspace dependencies are declared once in the root Cargo.toml:
sea-orm = { version = "1.1", features = ["sqlx-sqlite", "runtime-tokio-rustls", "macros"] }
sea-orm-migration = { version = "1.1" }
Module Layout¶
crates/<service>/src/
entity/
mod.rs # re-exports one sub-module per table
<table_name>.rs # one file per table
migration/
mod.rs # MigratorTrait impl - registers migrations in order
m<YYYYMMDD>_<seq>_<name>.rs # one file per migration
storage.rs # public Storage struct + CRUD methods
lib.rs # declares `pub mod entity; pub(crate) mod migration;`
# also exposes apply_migrations_for_path() / migration_status_for_path()
Naming conventions¶
| Thing | Convention | Example |
|---|---|---|
| Entity file | <table_name>.rs |
notification.rs |
| Migration file | m<YYYYMMDD>_<seq>_<description>.rs |
m20250305_000001_create_notifications_table.rs |
| Iden enum | Table name in PascalCase | Notifications, Agents |
| Index name | idx_<table>_<column> |
idx_agents_status |
| Unique index | uq_<table>_<columns> |
uq_dispatch_workflow_source |
Entity Files¶
An entity file contains exactly three items:
Model- the column definitions (DeriveEntityModel).Relation- foreign-key relations (DeriveRelation).impl ActiveModelBehavior for ActiveModel {}- required boilerplate.
// crates/notify/src/entity/notification.rs
// See docs/storage.md for patterns.
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "notifications")]
pub struct Model {
/// UUID stored as TEXT - primary key.
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
pub source_type: String,
pub source_data: String, // JSON: serialized NotificationSource
pub lifetime_type: String, // "Ephemeral" | "Persistent"
pub lifetime_expires_at: Option<String>, // RFC3339, None for Persistent
pub priority: String, // "Low" | "Normal" | "High" | "Urgent"
pub status: String, // "Pending" | "Viewed" | …
pub title: String,
pub message: String,
pub requires_response: i32, // boolean: 0 or 1
pub response: Option<String>,
pub created_at: String, // RFC3339
pub updated_at: String, // RFC3339
}
// No foreign-key relations for this table.
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
When a table has relations (e.g., workflows → dispatch_log):
// crates/orchestrator/src/entity/workflow.rs
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::dispatch::Entity")]
Dispatch,
}
impl Related<super::dispatch::Entity> for Entity {
fn to() -> RelationDef {
Relation::Dispatch.def()
}
}
Migration Files¶
Each migration file implements MigrationTrait with an up() and down().
Every create_table call uses .if_not_exists() for idempotency.
// crates/notify/src/migration/m20250305_000001_create_notifications_table.rs
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Notifications::Table)
.if_not_exists()
.col(ColumnDef::new(Notifications::Id).string().not_null().primary_key())
.col(ColumnDef::new(Notifications::Status).string().not_null())
.col(ColumnDef::new(Notifications::CreatedAt).string().not_null())
// … more columns …
.to_owned(),
)
.await?;
// Add an index for common filter queries
manager
.create_index(
Index::create()
.name("idx_status")
.table(Notifications::Table)
.col(Notifications::Status)
.if_not_exists()
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_table(Table::drop().table(Notifications::Table).to_owned()).await
}
}
/// Column Iden enum - must list every column used in the migration.
#[derive(DeriveIden)]
enum Notifications {
Table,
Id,
Status,
CreatedAt,
// …
}
The migration is registered in migration/mod.rs:
// crates/notify/src/migration/mod.rs
pub use sea_orm_migration::prelude::*;
mod m20250305_000001_create_notifications_table;
pub struct Migrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20250305_000001_create_notifications_table::Migration),
// New migrations appended here, in chronological order
]
}
}
Storage Structs¶
Every service exposes a FooStorage struct that:
- Holds a
DatabaseConnection(which isClone + Send + Sync). - Runs migrations in the constructor (
with_path). - Provides typed CRUD methods that convert between entity
Modeland domain types.
#[derive(Clone)]
pub struct NotificationStorage {
db: DatabaseConnection,
}
impl NotificationStorage {
/// Platform-specific path: ~/.local/share/agentd-notify/notify.db (Linux)
/// ~/Library/Application Support/agentd-notify/notify.db (macOS)
pub fn get_db_path() -> Result<PathBuf> {
agentd_common::storage::get_db_path("agentd-notify", "notify.db")
}
/// Creates storage at the default path, running migrations.
pub async fn new() -> Result<Self> {
let db_path = Self::get_db_path()?;
Self::with_path(&db_path).await
}
/// Creates storage at an explicit path (used in tests).
pub async fn with_path(db_path: &Path) -> Result<Self> {
let db = agentd_common::storage::create_connection(db_path).await?;
Migrator::up(&db, None).await?;
Ok(Self { db })
}
}
When two storage structs share the same database (e.g., AgentStorage and
SchedulerStorage in the orchestrator), pass the DatabaseConnection rather
than opening a second file:
// In orchestrator main.rs / service setup:
let agent_storage = AgentStorage::new().await?;
let scheduler_storage = SchedulerStorage::new(agent_storage.db().clone());
The db() accessor is:
Common Operations¶
All examples use the notify crate's notifications table.
Insert with ActiveModel¶
use sea_orm::{EntityTrait, Set};
use crate::entity::notification as notif_entity;
pub async fn add(&self, notification: &Notification) -> Result<Uuid> {
let model = notif_entity::ActiveModel {
id: Set(notification.id.to_string()),
status: Set(format!("{:?}", notification.status)), // enum → String
requires_response: Set(if notification.requires_response { 1 } else { 0 }),
source_data: Set(serde_json::to_string(¬ification.source)?), // JSON
created_at: Set(notification.created_at.to_rfc3339()),
updated_at: Set(notification.updated_at.to_rfc3339()),
// … all other columns …
};
notif_entity::Entity::insert(model).exec(&self.db).await?;
Ok(notification.id)
}
Every field must be wrapped in Set(…). Use NotSet only for columns that
have database-level defaults and you want to omit entirely.
Find by ID¶
use sea_orm::EntityTrait;
pub async fn get(&self, id: &Uuid) -> Result<Option<Notification>> {
let model = notif_entity::Entity::find_by_id(id.to_string())
.one(&self.db)
.await?;
match model {
Some(m) => Ok(Some(model_to_notification(m)?)),
None => Ok(None),
}
}
Update Specific Fields¶
Use update_many + col_expr to update only the columns that changed.
Always check rows_affected to detect missing rows.
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use sea_orm::sea_query::Expr;
pub async fn update(&self, notification: &Notification) -> Result<()> {
let result = notif_entity::Entity::update_many()
.col_expr(
notif_entity::Column::Status,
Expr::value(format!("{:?}", notification.status)),
)
.col_expr(
notif_entity::Column::Response,
Expr::value(notification.response.clone()),
)
.col_expr(
notif_entity::Column::UpdatedAt,
Expr::value(notification.updated_at.to_rfc3339()),
)
.filter(notif_entity::Column::Id.eq(notification.id.to_string()))
.exec(&self.db)
.await?;
if result.rows_affected == 0 {
anyhow::bail!("Notification not found");
}
Ok(())
}
Delete¶
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
pub async fn delete(&self, id: &Uuid) -> Result<()> {
let result = notif_entity::Entity::delete_many()
.filter(notif_entity::Column::Id.eq(id.to_string()))
.exec(&self.db)
.await?;
if result.rows_affected == 0 {
anyhow::bail!("Notification not found");
}
Ok(())
}
Filtered List¶
use sea_orm::{ColumnTrait, EntityTrait, Order, QueryFilter, QueryOrder};
pub async fn list(
&self,
status_filter: Option<NotificationStatus>,
) -> Result<Vec<Notification>> {
let mut query = notif_entity::Entity::find()
.order_by(notif_entity::Column::CreatedAt, Order::Desc);
if let Some(status) = status_filter {
query = query.filter(
notif_entity::Column::Status.eq(format!("{:?}", status))
);
}
let models = query.all(&self.db).await?;
models.into_iter().map(model_to_notification).collect()
}
For OR conditions across multiple values, use Condition::any():
use sea_orm::Condition;
let models = notif_entity::Entity::find()
.filter(
Condition::any()
.add(notif_entity::Column::Status.eq("Pending"))
.add(notif_entity::Column::Status.eq("Viewed")),
)
.all(&self.db)
.await?;
Paginated List with Filters¶
use sea_orm::{Condition, ColumnTrait, EntityTrait, Order, PaginatorTrait,
QueryFilter, QueryOrder, QuerySelect};
pub async fn list_paginated(
&self,
status_filter: Option<NotificationStatus>,
limit: usize,
offset: usize,
) -> Result<(Vec<Notification>, usize)> {
let condition = match &status_filter {
Some(s) => Condition::all()
.add(notif_entity::Column::Status.eq(format!("{:?}", s))),
None => Condition::all(),
};
// Count total matching rows first
let total = notif_entity::Entity::find()
.filter(condition.clone())
.count(&self.db)
.await? as usize;
// Fetch the page
let models = notif_entity::Entity::find()
.filter(condition)
.order_by(notif_entity::Column::CreatedAt, Order::Desc)
.limit(limit as u64)
.offset(offset as u64)
.all(&self.db)
.await?;
let items = models.into_iter().map(model_to_notification).collect::<Result<Vec<_>>>()?;
Ok((items, total))
}
agentd-common Integration¶
The agentd-common crate (crates/common) exposes shared storage utilities
in agentd_common::storage:
| Function | Purpose |
|---|---|
get_db_path(project, filename) |
Returns the XDG/platform-specific path for the database file, creating parent directories. |
create_connection(path) |
Opens a sqlite://…?mode=rwc SeaORM DatabaseConnection. |
create_test_connection() |
Returns (DatabaseConnection, TempDir) for use in unit tests. |
apply_migrations::<M>(path) |
Opens a connection and runs M::up(&db, None). |
migration_status::<M>(path) |
Returns Vec<(migration_name, is_applied)> for all registered migrations. |
Each service library also exposes two public wrappers in its lib.rs that
cargo xtask migrate and cargo xtask migrate-status call:
// crates/notify/src/lib.rs
pub async fn apply_migrations_for_path(db_path: &std::path::Path) -> anyhow::Result<()> {
agentd_common::storage::apply_migrations::<migration::Migrator>(db_path).await
}
pub async fn migration_status_for_path(
db_path: &std::path::Path,
) -> anyhow::Result<Vec<(String, bool)>> {
agentd_common::storage::migration_status::<migration::Migrator>(db_path).await
}
Adding a New Table¶
Follow these steps to add a tags table to the notify crate as an example.
1. Write the entity¶
// crates/notify/src/entity/tag.rs
// See docs/storage.md for patterns.
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "tags")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
pub notification_id: String,
pub label: String,
pub created_at: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
2. Re-export from entity/mod.rs¶
3. Write the migration¶
// crates/notify/src/migration/m20250401_000001_create_tags_table.rs
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Tags::Table)
.if_not_exists()
.col(ColumnDef::new(Tags::Id).string().not_null().primary_key())
.col(ColumnDef::new(Tags::NotificationId).string().not_null())
.col(ColumnDef::new(Tags::Label).string().not_null())
.col(ColumnDef::new(Tags::CreatedAt).string().not_null())
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx_tags_notification_id")
.table(Tags::Table)
.col(Tags::NotificationId)
.if_not_exists()
.to_owned(),
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_table(Table::drop().table(Tags::Table).to_owned()).await
}
}
#[derive(DeriveIden)]
enum Tags {
Table,
Id,
NotificationId,
Label,
CreatedAt,
}
4. Register the migration¶
// crates/notify/src/migration/mod.rs
mod m20250305_000001_create_notifications_table;
mod m20250401_000001_create_tags_table; // ← add this
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20250305_000001_create_notifications_table::Migration),
Box::new(m20250401_000001_create_tags_table::Migration), // ← append
]
}
}
Important: migrations are applied in the order returned by migrations().
Always append new migrations at the end - never reorder existing entries.
5. Add storage methods and wire up in service code¶
Add methods to NotificationStorage (or a new TagStorage if the table
warrants its own struct) and update your API handlers.
Modifying an Existing Table¶
Schema changes require a new migration file. Never edit an existing migration that has already been applied to any installation.
Example: add a tags column to notifications¶
// crates/notify/src/migration/m20250501_000001_add_tags_to_notifications.rs
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Notifications::Table)
.add_column(
ColumnDef::new(Notifications::Tags)
.string()
.not_null()
.default("[]"), // default for existing rows
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Notifications::Table)
.drop_column(Notifications::Tags)
.to_owned(),
)
.await
}
}
#[derive(DeriveIden)]
enum Notifications {
Table,
Tags,
}
Then:
1. Add pub tags: String to entity/notification.rs.
2. Register the migration in migration/mod.rs.
3. Update model_to_notification and add / update in storage.rs.
Type Mapping Conventions¶
SQLite has a flexible type system. agentd uses the following conventions to keep the mapping layer consistent.
UUIDs¶
Store as TEXT (not BLOB). The uuid crate's to_string() / parse_str()
methods handle conversion:
// Entity field
pub id: String,
// Writing
id: Set(my_uuid.to_string()),
// Reading
let id = Uuid::parse_str(&model.id)?;
Booleans¶
SQLite has no native boolean type. Store as INTEGER (0 / 1), map in
the domain conversion layer:
// Entity field
pub requires_response: i32,
// Writing
requires_response: Set(if value { 1 } else { 0 }),
// Reading
let requires_response = model.requires_response != 0;
In migrations, use .integer().not_null().default(0) for false-defaulting
booleans, or .default(1) for true-defaulting ones.
Timestamps¶
Store as TEXT in RFC3339 format. Use chrono:
// Entity field
pub created_at: String,
// Writing
created_at: Set(Utc::now().to_rfc3339()),
// Reading
let created_at = DateTime::parse_from_rfc3339(&model.created_at)?.with_timezone(&Utc);
Enums¶
Map Rust enums to their Debug representation string. The parse() method
on the domain type handles reading them back (implement FromStr):
// Writing
status: Set(format!("{:?}", agent.status)), // "Pending", "Running", …
// Reading
let status: AgentStatus = model.status.parse()?;
For simple string-like enums without extra data, {:?} and FromStr keep the
mapping readable. For enums with associated data (e.g., NotificationSource),
use JSON serialization (see below).
JSON Columns¶
Complex types (nested structs, maps, enums with payloads) are serialized to
JSON and stored as TEXT. Use serde_json:
// Entity field
pub source_data: String, // JSON: serialized NotificationSource
pub env: String, // JSON: HashMap<String, String>
// Writing
source_data: Set(serde_json::to_string(¬ification.source)?),
env: Set(serde_json::to_string(&agent.config.env).unwrap_or_else(|_| "{}".to_string())),
// Reading
let source: NotificationSource = serde_json::from_str(&model.source_data)?;
let env: HashMap<String, String> = serde_json::from_str(&model.env).unwrap_or_default();
In migrations, add a sensible default for JSON columns:
.col(ColumnDef::new(Agents::Env).string().not_null().default("{}"))
.col(ColumnDef::new(Agents::ToolPolicy).string().not_null().default("{\"mode\":\"allow_all\"}"))
Testing Storage Code¶
Use agentd_common::storage::create_test_connection() to get a temporary
in-memory-ish database for each test. Keep the TempDir alive for the
duration of the test.
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
// Helper: create isolated storage for each test
async fn create_test_storage() -> (NotificationStorage, TempDir) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test.db");
let storage = NotificationStorage::with_path(&db_path).await.unwrap();
(storage, temp_dir)
}
#[tokio::test]
async fn test_add_and_get() {
let (storage, _tmp) = create_test_storage().await;
let notification = Notification::new(/* … */);
let id = notification.id;
storage.add(¬ification).await.unwrap();
let retrieved = storage.get(&id).await.unwrap().unwrap();
assert_eq!(retrieved.id, id);
}
#[tokio::test]
async fn test_update() {
let (storage, _tmp) = create_test_storage().await;
let mut n = make_test_notification();
storage.add(&n).await.unwrap();
n.status = NotificationStatus::Viewed;
n.updated_at = Utc::now();
storage.update(&n).await.unwrap();
let retrieved = storage.get(&n.id).await.unwrap().unwrap();
assert_eq!(retrieved.status, NotificationStatus::Viewed);
}
#[tokio::test]
async fn test_delete_missing_returns_error() {
let (storage, _tmp) = create_test_storage().await;
let missing_id = Uuid::new_v4();
assert!(storage.delete(&missing_id).await.is_err());
}
}
Key points:
- Each test gets its own
TempDir+ database - tests never interfere with each other. _tmpkeeps theTempDiralive; the database file is deleted when it is dropped.with_path()runs migrations automatically, so tests always see the current schema.- Test helper functions (
create_test_storage,make_test_notification) reduce boilerplate and make intent clear.
Common Pitfalls¶
Forgot if_not_exists() on create_table¶
Without .if_not_exists(), running migrations twice (e.g., in tests that share
a database) will error with "table already exists". Always include it.
Modified an existing migration¶
Once a migration has been applied, its seaql_migrations row records its
checksum. Editing the file changes the checksum and causes SeaORM to refuse
to run or may leave the database in an inconsistent state. Create a new
migration file instead.
rows_affected == 0 not checked¶
update_many and delete_many succeed even when no rows match the filter.
Check result.rows_affected to detect "not found" conditions and return an
appropriate error.
Large result sets without pagination¶
Fetching all rows with .all(&self.db) on a table that may grow unboundedly
is a memory hazard. Prefer list_paginated variants for API endpoints.
UUID round-trip through parse_str¶
Uuid::parse_str is infallible only when the string is a valid UUID. If a
non-UUID string ends up in the id column (e.g., from a bug or manual
database edit), it will return an error. Handle this with ? so the error
propagates rather than causing a panic.
JSON deserialization failures¶
serde_json::from_str on a column that holds malformed JSON will fail. Use
.unwrap_or_default() only when an empty/default value is safe (e.g.,
HashMap<String, String> for environment variables). For required fields,
propagate the error with ?.
Sharing a connection vs. opening a second file¶
Opening two DatabaseConnection instances to the same SQLite file can cause
write conflicts. When two storage structs belong to the same service, share
the connection:
let agent_storage = AgentStorage::new().await?;
let scheduler_storage = SchedulerStorage::new(agent_storage.db().clone());
xtask Commands¶
Three cargo xtask sub-commands help manage databases during development:
# Apply all pending migrations for every service
cargo xtask migrate
# Apply migrations for a single service
cargo xtask migrate --service notify
cargo xtask migrate --service orchestrator
# Show migration status (applied / pending) for every service
cargo xtask migrate-status
# Regenerate entity files from the live database schema
# (requires sea-orm-cli: cargo install sea-orm-cli)
cargo xtask generate-entities
cargo xtask generate-entities --service notify
migrate and migrate-status work even when the database file does not yet
exist - they create it on the fly. generate-entities requires the database
to exist (start the service once first, or run migrate).