Extend Kona’s derivation pipeline by wrapping the top-level AttributesQueue stage with custom logic for monitoring, validation, or transformation.
Core Concepts
The derivation pipeline uses a stage-based architecture where each stage wraps the previous one:
L1Traversal → L1Retrieval → FrameQueue → ChannelProvider →
ChannelReader → BatchStream → BatchProvider → AttributesQueue
Key Traits
Custom stages that wrap the AttributesQueue must implement:
NextAttributes - Provides payload attributes for block building
OriginProvider - Provides current L1 origin
SignalReceiver - Handles pipeline resets
OriginAdvancer - Advances L1 origin
Example: Monitoring Stage
Wrap the AttributesQueue to add metrics tracking:
use kona_derive::{
NextAttributes, OriginProvider, SignalReceiver, OriginAdvancer,
PipelineResult, Signal, OpAttributesWithParent
};
use kona_protocol::{BlockInfo, L2BlockInfo};
use async_trait::async_trait;
use std::time::Instant;
#[derive(Debug)]
pub struct LoggingStage<S> {
inner: S,
attributes_count: u64,
last_origin: Option<BlockInfo>,
}
impl<S> LoggingStage<S> {
pub fn new(inner: S) -> Self {
Self {
inner,
attributes_count: 0,
last_origin: None,
}
}
}
#[async_trait]
impl<S> NextAttributes for LoggingStage<S>
where
S: NextAttributes + Send + Sync,
{
async fn next_attributes(
&mut self,
parent: L2BlockInfo
) -> PipelineResult<OpAttributesWithParent> {
let start = Instant::now();
// Delegate to inner stage
let attributes = self.inner.next_attributes(parent).await?;
// Track metrics
self.attributes_count += 1;
let duration = start.elapsed();
info!(
target: "pipeline::logging",
count = self.attributes_count,
duration_ms = duration.as_millis(),
parent_hash = ?parent.block_info.hash,
"Generated attributes"
);
Ok(attributes)
}
}
impl<S> OriginProvider for LoggingStage<S>
where
S: OriginProvider,
{
fn origin(&self) -> Option<BlockInfo> {
self.inner.origin()
}
}
#[async_trait]
impl<S> SignalReceiver for LoggingStage<S>
where
S: SignalReceiver + Send + Sync,
{
async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
info!(target: "pipeline::logging", ?signal, "Received signal");
// Track origin changes on reset
if let Signal::Reset(reset) = &signal {
self.last_origin = Some(reset.l1_origin);
self.attributes_count = 0; // Reset counter
}
self.inner.signal(signal).await
}
}
#[async_trait]
impl<S> OriginAdvancer for LoggingStage<S>
where
S: OriginAdvancer + Send + Sync,
{
async fn advance_origin(&mut self) -> PipelineResult<()> {
let prev_origin = self.inner.origin();
self.inner.advance_origin().await?;
let new_origin = self.inner.origin();
if prev_origin != new_origin {
info!(
target: "pipeline::logging",
prev = ?prev_origin,
new = ?new_origin,
"Advanced origin"
);
}
Ok(())
}
}
Custom stages wrap the AttributesQueue (top-level stage). For deeper pipeline modifications, you’d need to rebuild the entire pipeline.
Integration
use kona_derive::{PipelineBuilder, DerivationPipeline};
use kona_node::{StatefulAttributesBuilder};
use alloc::sync::Arc;
// Build standard pipeline
let pipeline = PipelineBuilder::new()
.rollup_config(rollup_config.clone())
.origin(origin)
.chain_provider(chain_provider)
.l2_chain_provider(l2_chain_provider.clone())
.dap_source(dap_source)
.builder(attributes_builder)
.build_polled();
// Wrap with monitoring
let monitoring_stage = LoggingStage::new(pipeline.attributes);
// Create new pipeline
let custom_pipeline = DerivationPipeline::new(
monitoring_stage,
rollup_config,
l2_chain_provider,
);
Testing
#[cfg(test)]
mod tests {
use super::*;
use kona_derive::test_utils::TestNextAttributes;
#[tokio::test]
async fn test_logging_stage() {
let mock_inner = TestNextAttributes::new();
let mut stage = LoggingStage::new(mock_inner);
// Test attributes generation
let parent = L2BlockInfo::default();
let result = stage.next_attributes(parent).await;
assert!(result.is_ok());
assert_eq!(stage.attributes_count, 1);
// Test signal handling
let signal = Signal::Reset(Default::default());
stage.signal(signal).await.unwrap();
assert_eq!(stage.attributes_count, 0);
}
}