Custom Indexer and Walrus
This topic examines how to use a custom indexer with Walrus. For a more in-depth look at creating a custom indexer, see Build Your First Custom Indexer.
Walrus is a decentralized storage and data availability protocol designed specifically for large binary files, or blobs. It is a content-addressable storage protocol, meaning data is identified and retrieved using a unique identifier called a blob. Blobs are derived from the content itself rather than from a file path or location. Consequently, if different users upload the same content, Walrus reuses the existing blob rather than creating a new one.
For uniqueness, each blob uploaded to Walrus also creates a corresponding Blob NFT object on Sui with a unique ID. Furthermore, the associated Blob object can optionally have a Metadata dynamic field.
Metadata dynamic fields are a key-value extension that allows an on-chain objectβs data to be augmented at runtime. If set, this dynamic field acts as a mapping of key-value attribute pairs.
You can use the custom indexer framework to extend the existing functionality of Walrus.
The Walrus Foundation operates and controls Walrus. For the most accurate and up-to-date information on the Walrus protocol, consult the official Walrus Docs.
Blog platform using Walrusβ
The system derives the ID of a dynamic field from its type and parent object's ID. Each Metadata dynamic field ID is also unique. You can leverage these unique characteristics and the Metadata attribute pairs to build a blog platform that enables users to:
- Upload blog posts with titles.
- View their own posts and metrics.
- Delete posts they created.
- Edit post titles.
- Browse posts by other publishers.
Assume a blog platform service already exists to handle uploads to Walrus. When the service creates a blob and its associated NFT object on Walrus, it also attaches a Metadata dynamic field containing key-value pairs for publisher (the Sui Address that uploaded the blob), view_count, and title. The service prevents users from modifying the publisher and view_count pairs, but allows the publisher to update the title value.
When a user views a post, the service retrieves the relevant blog post Metadata from the indexed database. It then uses the Owner field to fetch the blob from the full node. The liveness of the Blob object on Sui is used to represent whether a blog post is available. If the Blob object is wrapped or deleted, the blog post is not accessible through the service, even if the underlying content on Walrus still exists.
Data modelingβ
One option for data modeling is to use a single table that maps publisher addresses to Metadata dynamic fields. With this approach, the table is keyed on dynamic_field_id because it both identifies your dApp data and uniquely represents the content of each uploaded blob.
For example, the up.sql file to create this table might looks like the following:
-- This table maps a blob to its associated Sui Blob object and the latest dynamic field metadata
-- for traceability. The `view_count` is indexed to serve reads on the app.
CREATE TABLE IF NOT EXISTS blog_post (
    -- ID of the Metadata dynamic field.
    dynamic_field_id            BYTEA         NOT NULL,
    -- Current version of the Metadata dynamic field.
    df_version                  BIGINT        NOT NULL,
    -- Address that published the Walrus Blob.
    publisher                   BYTEA         NOT NULL,
    -- ID of the Blob object on Sui, used during reads to fetch the actual blob content. If this
    -- object has been wrapped or deleted, it will not be present on the live object set, which
    -- means the corresponding content on Walrus is also not accessible.
    blob_obj_id                 BYTEA         NOT NULL,
    view_count                  BIGINT        NOT NULL,
    title                       TEXT          NOT NULL,
    PRIMARY KEY (dynamic_field_id)
);
-- Index to support ordering and filtering by title
CREATE INDEX IF NOT EXISTS blog_post_by_title ON blog_post
(publisher, title);
Readsβ
To load blog posts from a particular publisher, pass the publisher and LIMIT values to the following query pattern:
SELECT *
FROM blog_post
WHERE publisher = $1
ORDER BY title
LIMIT $2;
Custom indexer implementationβ
This example uses a sequential pipeline, ensuring each checkpoint is committed once in strict order and as a single atomic operation. The sequential pipeline architecture is not required for this project, but it is a more straightforward option than implementing the concurrent architecture. You can always scale up to the concurrent pipeline if and when your project requires it.
This implementation tracks the latest object state at checkpoint boundary. When the Metadata dynamic field is created, mutated, wrapped or deleted, or unwrapped, it appears among the transaction output under the object changes. You can see an example transaction on Testnet that creates the field. These dynamic fields have type 0x2::dynamic_field::Field<vector<u8>, 0xabc...123::metadata::Metadata>.
| Object change to Metadatadynamic field | Included in input objects | Included in live output objects | How to index | 
|---|---|---|---|
| Creation (or unwrap) | β | β | Insert row | 
| Mutation | β | β | Update row | 
| Deletion (or wrap) | β | β | Delete row | 
Processorβ
All pipelines implement the same Processor trait, which defines the logic to transform a checkpoint from the ingestion task into an intermediate or final form to commit to the store. Data flows into and out of the processor, potentially out of order.
process functionβ
The process function computes the checkpoint_input_objects and latest_live_output_objects sets to capture the state of objects entering and exiting a checkpoint. A Metadata dynamic field that appears in checkpoint_input_objects but not in latest_live_output_objects means it has been either wrapped or deleted. In those cases, you need to record only the dynamic field ID for the commit function to handle later deletion. For creation, mutation, and unwrap operations, the objects always appear in at least the latest_live_output_objects set.
/// This pipeline operates on a checkpoint granularity to produce a set of values reflecting the
/// state of relevant Metadata dynamic fields at checkpoint boundary.
fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
    let checkpoint_input_objects = checkpoint_input_objects(checkpoint)?;
    let latest_live_output_objects = checkpoint_output_objects(checkpoint)?;
    // Collect values to be passed to committer. This map is keyed on the dynamic field id.
    let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
    // Process relevant Metadata dynamic fields that were wrapped or deleted in this checkpoint.
    for (object_id, object) in &checkpoint_input_objects {
        // If an object appears in both maps, it is still alive at the end of the checkpoint.
        if latest_live_output_objects.contains_key(object_id) {
            continue;
        }
        // Check the checkpoint input state of the Metadata dynamic field to see if it's
        // relevant to our indexing.
        let Some((_, _)) = extract_content_from_metadata(&self.metadata_type, object)? else {
            continue;
        };
        // Since the table is keyed on the dynamic field id, this is all the information we need
        // to delete the correct entry in the commit fn.
        values.insert(*object_id, ProcessedWalrusMetadata::Delete(*object_id));
    }
    for (object_id, object) in &latest_live_output_objects {
        let Some((blog_post_metadata, blob_obj_id)) =
            extract_content_from_metadata(&self.metadata_type, object)?
        else {
            continue;
        };
        values.insert(
            *object_id,
            ProcessedWalrusMetadata::Upsert {
                df_version: object.version().into(),
                dynamic_field_id: *object_id,
                blog_post_metadata,
                blob_obj_id,
            },
        );
    }
    Ok(values.into_values().collect())
}
Committerβ
The second and final part of the sequential pipeline is the Committer. Because data flows from the processor into the committer out of order, it is the committer's responsibility to batch and write the transformed data to the store in order on checkpoint boundaries.
batchβ
The batch function defines how to batch transformed data from other processed checkpoints. This function maintains a mapping of dynamic_field_id to the processed Walrus Metadata. The batch function guarantees that the next checkpoint to batch is the next contiguous checkpoint, which means it's safe for you to overwrite the existing entry.
fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
    for value in values {
        match value {
            ProcessedWalrusMetadata::Upsert {
                dynamic_field_id, ..
            } => {
                batch.insert(dynamic_field_id, value);
            }
            ProcessedWalrusMetadata::Delete(dynamic_field_id) => {
                batch.insert(dynamic_field_id, value);
            }
        }
    }
}
commitβ
The commit function conducts final transformations to the processed data before writing to the store. In this case, the logic partitions the processed data into to_delete and to_upsert.
async fn commit<'a>(batch: &Self::Batch, conn: &mut postgres::Connection<'a>) -> Result<usize> {
    // Partition the batch into items to delete and items to upsert
    let (to_delete, to_upsert): (Vec<_>, Vec<_>) = batch
        .values()
        .partition(|item| matches!(item, ProcessedWalrusMetadata::Delete(_)));
    let to_upsert: Vec<StoredBlogPost> = to_upsert
        .into_iter()
        .map(|item| item.to_stored())
        .collect::<Result<Vec<_>>>()?;
    let to_delete: Vec<ObjectID> = to_delete
        .into_iter()
        .map(|item| Ok(item.dynamic_field_id()))
        .collect::<Result<Vec<_>>>()?;
    let mut total_affected = 0;
    if !to_delete.is_empty() {
        let deleted_count = diesel::delete(blog_post::table)
            .filter(blog_post::dynamic_field_id.eq_any(to_delete.iter().map(|id| id.to_vec())))
            .execute(conn)
            .await?;
        total_affected += deleted_count;
    }
    if !to_upsert.is_empty() {
        let upserted_count = diesel::insert_into(blog_post::table)
            .values(&to_upsert)
            .on_conflict(blog_post::dynamic_field_id)
            .do_update()
            .set((
                blog_post::df_version.eq(excluded(blog_post::df_version)),
                blog_post::title.eq(excluded(blog_post::title)),
                blog_post::view_count.eq(excluded(blog_post::view_count)),
                blog_post::blob_obj_id.eq(excluded(blog_post::blob_obj_id)),
            ))
            .filter(blog_post::df_version.lt(excluded(blog_post::df_version)))
            .execute(conn)
            .await?;
        total_affected += upserted_count;
    }
    Ok(total_affected)
}
Putting it all togetherβ
The main function for the service
#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();
    // The `IndexerClusterBuilder` offers a convenient way to quickly set up an `IndexerCluster`,
    // which consists of the base indexer, metrics service, and a cancellation token.
    let mut indexer = IndexerClusterBuilder::new()
        .with_database_url(args.database_url)
        .with_args(args.cluster_args)
        .with_migrations(&MIGRATIONS)
        .build()
        .await?;
    let blog_post_pipeline = BlogPostPipeline::new(METADATA_DYNAMIC_FIELD_TYPE).unwrap();
    // Other pipelines can be easily added with `.sequential_pipeline()` or
    // `.concurrent_pipeline()`.
    indexer
        .sequential_pipeline(blog_post_pipeline, SequentialConfig::default())
        .await?;
    let _ = indexer.run().await?.await;
    Ok(())
}
To provide users with a list of posts written by a publisher, your service first queries the
database on publisher, yielding a result like the following. The service then uses the blob_obj_id to fetch the
Blob NFT contents. From there, you can retrieve the actual Walrus content.
                          dynamic_field_id                          | df_version |                             publisher                              |                            blob_obj_id                             | view_count |      title
--------------------------------------------------------------------+------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+------------------
 \x40b5ae12e780ae815d7b0956281291253c02f227657fe2b7a8ccf003a5f597f7 |  608253371 | \xfe9c7a465f63388e5b95c8fd2db857fad4356fc873f96900f4d8b6e7fc1e760e | \xcfb3d474c9a510fde93262d4b7de66cad62a2005a54f31a63e96f3033f465ed3 |         10 | Blog Post Module
Additional considerationsβ
All Walrus blobs carry an associated lifetime, so you must track expiration changes. Whenever the Metadata dynamic field changes, the parent Sui Blob object should also appear in the output changes. You can read the blobβs lifetime directly from the Blob object contents. However, lifetime changes usually occur on the Blob object itself. Because updates to the parent object donβt affect the child dynamic field, unless you directly modify the child, these lifetime changes remain hidden in the current indexing setup. You can address this in several ways:
- Watch all Blobobject changes.
- Watch all BlobCertifiedevents.
- Construct PTBs that make calls to manage blob lifetime and ping the Metadatadynamic field in the same transaction.
If you don't want to perform additional work on the write side, then you are limited to the first two options. This requires two pipelines, one to do the work in the previous section of indexing metadata, and another to index BlobCertified events (or Blob object changes.)
Related linksβ
Walrus is a decentralized storage and data availability protocol designed specifically for large binary files, or blobs.
Establishing a custom indexer helps improve latency, allows pruning the data of your Sui full node, and provides efficient assemblage of checkpoint data.
Dynamic fields and dynamic object fields on Sui are added and removed dynamically, affect gas only when accessed, and store heterogeneous values.
Directory in the Sui repo containing the files for this example.