Skip to main content
Extend Kona’s derivation pipeline by wrapping the top-level AttributesQueue stage with custom logic for monitoring, validation, or transformation.

Core Concepts

The derivation pipeline uses a stage-based architecture where each stage wraps the previous one:
L1Traversal → L1Retrieval → FrameQueue → ChannelProvider → 
ChannelReader → BatchStream → BatchProvider → AttributesQueue

Key Traits

Custom stages that wrap the AttributesQueue must implement:
  • NextAttributes - Provides payload attributes for block building
  • OriginProvider - Provides current L1 origin
  • SignalReceiver - Handles pipeline resets
  • OriginAdvancer - Advances L1 origin

Example: Monitoring Stage

Wrap the AttributesQueue to add metrics tracking:
use kona_derive::{
    NextAttributes, OriginProvider, SignalReceiver, OriginAdvancer,
    PipelineResult, Signal, OpAttributesWithParent
};
use kona_protocol::{BlockInfo, L2BlockInfo};
use async_trait::async_trait;
use std::time::Instant;

#[derive(Debug)]
pub struct LoggingStage<S> {
    inner: S,
    attributes_count: u64,
    last_origin: Option<BlockInfo>,
}

impl<S> LoggingStage<S> {
    pub fn new(inner: S) -> Self {
        Self {
            inner,
            attributes_count: 0,
            last_origin: None,
        }
    }
}

#[async_trait]
impl<S> NextAttributes for LoggingStage<S>
where
    S: NextAttributes + Send + Sync,
{
    async fn next_attributes(
        &mut self, 
        parent: L2BlockInfo
    ) -> PipelineResult<OpAttributesWithParent> {
        let start = Instant::now();
        
        // Delegate to inner stage
        let attributes = self.inner.next_attributes(parent).await?;
        
        // Track metrics
        self.attributes_count += 1;
        let duration = start.elapsed();
        
        info!(
            target: "pipeline::logging",
            count = self.attributes_count,
            duration_ms = duration.as_millis(),
            parent_hash = ?parent.block_info.hash,
            "Generated attributes"
        );
        
        Ok(attributes)
    }
}

impl<S> OriginProvider for LoggingStage<S>
where
    S: OriginProvider,
{
    fn origin(&self) -> Option<BlockInfo> {
        self.inner.origin()
    }
}

#[async_trait]
impl<S> SignalReceiver for LoggingStage<S>
where
    S: SignalReceiver + Send + Sync,
{
    async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
        info!(target: "pipeline::logging", ?signal, "Received signal");
        
        // Track origin changes on reset
        if let Signal::Reset(reset) = &signal {
            self.last_origin = Some(reset.l1_origin);
            self.attributes_count = 0; // Reset counter
        }
        
        self.inner.signal(signal).await
    }
}

#[async_trait]
impl<S> OriginAdvancer for LoggingStage<S>
where
    S: OriginAdvancer + Send + Sync,
{
    async fn advance_origin(&mut self) -> PipelineResult<()> {
        let prev_origin = self.inner.origin();
        self.inner.advance_origin().await?;
        let new_origin = self.inner.origin();
        
        if prev_origin != new_origin {
            info!(
                target: "pipeline::logging",
                prev = ?prev_origin,
                new = ?new_origin,
                "Advanced origin"
            );
        }
        
        Ok(())
    }
}
Custom stages wrap the AttributesQueue (top-level stage). For deeper pipeline modifications, you’d need to rebuild the entire pipeline.

Integration

use kona_derive::{PipelineBuilder, DerivationPipeline};
use kona_node::{StatefulAttributesBuilder};
use alloc::sync::Arc;

// Build standard pipeline
let pipeline = PipelineBuilder::new()
    .rollup_config(rollup_config.clone())
    .origin(origin)
    .chain_provider(chain_provider)
    .l2_chain_provider(l2_chain_provider.clone())
    .dap_source(dap_source)
    .builder(attributes_builder)
    .build_polled();

// Wrap with monitoring
let monitoring_stage = LoggingStage::new(pipeline.attributes);

// Create new pipeline
let custom_pipeline = DerivationPipeline::new(
    monitoring_stage,
    rollup_config,
    l2_chain_provider,
);

Testing

#[cfg(test)]
mod tests {
    use super::*;
    use kona_derive::test_utils::TestNextAttributes;
    
    #[tokio::test]
    async fn test_logging_stage() {
        let mock_inner = TestNextAttributes::new();
        let mut stage = LoggingStage::new(mock_inner);
        
        // Test attributes generation
        let parent = L2BlockInfo::default();
        let result = stage.next_attributes(parent).await;
        assert!(result.is_ok());
        assert_eq!(stage.attributes_count, 1);
        
        // Test signal handling
        let signal = Signal::Reset(Default::default());
        stage.signal(signal).await.unwrap();
        assert_eq!(stage.attributes_count, 0);
    }
}