Skip to content

feat: Implement ProcessorChain for composite metric reporting#2569

Open
drewrelmas wants to merge 15 commits intoopen-telemetry:mainfrom
drewrelmas:drewrelmas/processor_chain_2
Open

feat: Implement ProcessorChain for composite metric reporting#2569
drewrelmas wants to merge 15 commits intoopen-telemetry:mainfrom
drewrelmas:drewrelmas/processor_chain_2

Conversation

@drewrelmas
Copy link
Copy Markdown
Contributor

@drewrelmas drewrelmas commented Apr 7, 2026

Change Summary

Implements ProcessorChain — a composite node that runs multiple sub-processors sequentially within a single task, eliminating inter-processor channels. The chain reports a single compute.success.duration (orcompute.failed.duration) metric covering the total compute cost of a record batch passing through all sub-processors, while each sub-processor still reports its own individual duration.

Motivation

When a single logical operation involves multiple internal processors for performance optimization (e.g., attributes -> condense -> recordset KQL), operators need a single duration metric representing the total cost of that logical operation per batch. Without this, the only option is statistical aggregation at the metrics layer, which produces incorrect min/max values (min(A) + min(B) != min(A+B)).

The ProcessorChain approach gives per-batch composite timing with correct Mmsc distribution (min/max/sum/count).

Config syntax

version: otel_dataflow/v1
engine: {}
groups:
  default:
    pipelines:
      main:
        policies:
          telemetry:
            runtime_metrics: normal
          channel_capacity:
            control:
              node: 100
              pipeline: 100
            pdata: 128

        nodes:
          receiver:
            type: receiver:traffic_generator
            config:
              traffic_config:
                max_batch_size: 1
                signals_per_second: 1
                log_weight: 100
              registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]
          insert_A:
            type: processor:attribute
            config:
              actions:
                - action: insert
                  key: pre_chain
                  value: A
          chain:
            type: processor_chain:composite
            config:
              processors:
                - name: insert_B
                  type: processor:attribute
                  config:
                    actions:
                      - action: insert
                        key: chain_step_1
                        value: B
                - name: insert_C
                  type: processor:attribute
                  config:
                    actions:
                      - action: insert
                        key: chain_step_2
                        value: C
          insert_D:
            type: processor:attribute
            config:
              actions:
                - action: insert
                  key: post_chain
                  value: D
          debug:
            type: processor:debug
            config:
              verbosity: detailed
              mode: signal
          noop:
            type: exporter:noop
            config:

        connections:
          - from: receiver
            to: insert_A
          - from: insert_A
            to: chain
          - from: chain
            to: insert_D
          - from: insert_D
            to: debug
          - from: debug
            to: noop

Telemetry output

With runtime_metrics: normal, this pipeline produces compute.success.duration for all 5 of the following:

node.id node.type node.urn
insert_A processor urn:otel:processor:attribute
chain processor_chain urn:otel:processor_chain:composite
chain/insert_B processor urn:otel:processor:attribute
chain/insert_C processor urn:otel:processor:attribute
insert_D processor urn:otel:processor:attribute

The composite duration is always >= the sum of sub-processor durations (it includes inter-stage overhead).

Querying metrics locally shows the following result:

curl -s "http://127.0.0.1:8080/api/v1/telemetry/metrics?format=json" | jq '[.metric_sets[] | select(.name == "processor.compute") | {node_id: .attributes["node.id"].String, node_type: .attributes["node.type"].String, node_urn: .attributes["node.urn"].String, success: (.metrics[] | select(.name == "compute.success.duration") | .value | {avg_ms: (if .count > 0 then .sum / .count / 1e6 else 0 end), count})}]'

[
  {
    "node_id": "insert_A",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 1.2843131327433628,
      "count": 226
    }
  },
  {
    "node_id": "insert_D",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 0.36642488938053097,
      "count": 226
    }
  },
  {
    "node_id": "chain",
    "node_type": "processor_chain",
    "node_urn": "urn:otel:processor_chain:composite",
    "success": {
      "avg_ms": 0.9586159513274336,
      "count": 226
    }
  },
  {
    "node_id": "chain/insert_B",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 0.4922827168141593,
      "count": 226
    }
  },
  {
    "node_id": "chain/insert_C",
    "node_type": "processor",
    "node_urn": "urn:otel:processor:attribute",
    "success": {
      "avg_ms": 0.37540364159292033,
      "count": 226
    }
  }
]

Design decisions

  • Vec-backed buffers: Each intermediate sub-processor (all except the last) gets a buffer EffectHandler wired to a shared Rc<RefCell<Vec<PData>>> via a VecSender. When the sub-processor calls send_message(), the output is pushed directly into the Vec — no channel send/recv, waker management, or async polling overhead. These buffer handlers are created once at construction and reused for every batch.
  • Single-item fast path: For chains of length 1, the chain delegates directly to the sole sub-processor with zero staging overhead. For length ≥ 2, an optimistic fast path threads a single PData value through each intermediate stage without any Vec operations. Only when a stage produces 0 or 2+ outputs does it fall back to staging vecs (stage_a/stage_b) which swap roles via std::mem::swap and retain heap capacity across calls.
  • Last sub-processor uses real EffectHandler: The last sub-processor in the chain doesn't need a buffer — it sends directly through the real downstream channel, like any normal processor would.
  • Sub-processor entity registration: Each sub-processor gets a distinct telemetry entity with its own node.id (e.g., chain/insert_B) and node.urn via with_node_telemetry_handle scoping during factory creation. This ensures sub-processor metrics are clearly separated from the chain's composite metrics in telemetry output.
  • CollectTelemetry forwarding: The real MetricsReporter from the CollectTelemetry control message is forwarded to sub-processors so their metric snapshots reach the telemetry registry (not an orphaned channel).
  • NodeKind::ProcessorChain: Reuses the existing (previously unused) config variant. Maps to NodeType::Processor in the engine so it participates in normal processor wiring.

Performance

~300µs simulated work per processor (1,000 batches, single-threaded LocalSet):

Chain len Chained (ms) Separate (ms) Overhead
1 301.6 301.4 +0.1%
2 603.4 604.0 -0.1%
3 904.9 904.9 0.0%

With realistic per-processor compute (~300µs, approximating processor work), the chain overhead is within noise — effectively zero.

~100ns simulated work per processor (1,000 batches, single-threaded LocalSet):

Chain len Chained (µs) Separate (µs) Overhead
1 305 320 -5%
2 713 661 +8%
3 1,033 990 +4%

At trivially low work (100ns per processor), the chain matches or beats separate tasks for len=1 thanks to a single-processor fast path. For len>=2, the remaining ~5-8% gap is the cost of per-stage Vec/RefCell bookkeeping, which is significant only at these artificially low work levels.

The chain's value is not raw throughput — it's the ability to produce a correct composite duration metric (min/max/sum/count) across all sub-processors, which is impossible with separate processors.

Future work

What issue does this PR close?

How are these changes tested?

Unit tests, benchmarks, and running fake config locally

Are there any user-facing changes?

Yes, users can now configure processor_chain elements in their configuration.

@github-actions github-actions bot added the rust Pull requests that update Rust code label Apr 7, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 92.86872% with 44 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.44%. Comparing base (9b4b8dc) to head (692f119).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2569      +/-   ##
==========================================
+ Coverage   88.42%   88.44%   +0.01%     
==========================================
  Files         631      632       +1     
  Lines      232112   232715     +603     
==========================================
+ Hits       205243   205816     +573     
- Misses      26345    26375      +30     
  Partials      524      524              
Components Coverage Δ
otap-dataflow 90.25% <92.86%> (+0.01%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.74% <ø> (ø)
otel-arrow-go 52.45% <ø> (ø)
quiver 92.27% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@drewrelmas drewrelmas changed the title Implement ProcessorChain for composite metric reporting feat: Implement ProcessorChain for composite metric reporting Apr 7, 2026
Copy link
Copy Markdown
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

@drewrelmas drewrelmas marked this pull request as ready for review April 7, 2026 21:19
@drewrelmas drewrelmas requested a review from a team as a code owner April 7, 2026 21:19
@drewrelmas
Copy link
Copy Markdown
Contributor Author

drewrelmas commented Apr 7, 2026

Marking as draft again, upon further review of the benchmarks I don't think this is a strictly valid comparison at this iteration. Theoretically the chain benchmark should be equivalent or faster than the regular path.

@drewrelmas drewrelmas marked this pull request as draft April 7, 2026 22:08
@drewrelmas drewrelmas marked this pull request as ready for review April 8, 2026 18:48
@gouslu
Copy link
Copy Markdown
Contributor

gouslu commented Apr 8, 2026

Putting here what we discussed offline:

I was wondering if ProcessorChain in this PR can be a ChainProcessor instead :D

1. ChainProcessor (user-facing composition)

This is what the PR implements today, but reframed: it's just another processor that allows you to compose multiple processors inside it. It implements the Processor trait, it's registered via a normal ProcessorFactory, and ideally requires minimal or no engine changes. The engine doesn't need to know it's special — it's a processor of processors and can have more logic in it.

2. ProcessorChain (engine-level optimization)

NOTE: This section is exploratory to share an idea, rather than a specific implementation.

This is a separate concept. A processor chain at the engine level would be about optimization, not about adding functionality. The idea would be to separate the pure data processing from message handling in processors, something like:

// Full processor — owns its message loop
trait Processor {
    async fn process(&mut self, channel, effect_handler);
}

// Pure data transform — no message loop, no channels
trait PDataProcessor {
    fn process_pdata(&mut self, pdata: PData) -> Vec<PData>;
}

A processor that implements PDataProcessor advertises that its data processing is pure and stateless with respect to the message loop. The engine can then automatically fuse linear sequences of PDataProcessors into a single task — no user configuration needed, no new YAML syntax. The engine handles the message loop once at the outer level and just calls a.process_pdata()b.process_pdata()c.process_pdata() internally.

This keeps the two concerns separate:

  • ChainProcessor: user wants to group processors explicitly for composite metrics or organizational reasons — it's just a processor, no engine changes needed.
  • ProcessorChain: engine detects fusable processors and optimizes them transparently — this is an engine concern, not a user concern.

I think there is a place for both, but they shouldn't be conflated. The current PR mixes user-facing composition with engine-level plumbing. Ideally the ChainProcessor should work without NodeKind::ProcessorChain, create_processor_chain(), or into_local_processor() in the engine.

@lquerel
Copy link
Copy Markdown
Contributor

lquerel commented Apr 8, 2026

Two very brief comments on this PR:

  • The benefit of chaining processors cannot be measured only in terms of CPU usage. The main benefit is in memory consumption. By eliminating channels, we reduce the amount of pdata accumulated in transit through those channels, and therefore reduce memory usage across the entire pipeline.
  • Originally, the idea behind the NodeKind::ProcessorChain variant was to allow the engine to automatically infer processor chains in order to transparently eliminate channel overhead, especially memory overhead. What I like about this PR is that, as a first step, we can make this concept explicit and then add support for this optimization in a future PR.
    • The difference I see between the explicit version and the implicit version is the transparency of this optimization at the level of the generated entities and signals. In the current PR, the sub-processors, and therefore the corresponding entities, are scoped by the chain, which seems like the logical behavior a user would expect.
    • For the future implicit version, ideally we should preserve transparency, meaning that the processor entities would remain the original ones, with however one additional entity corresponding to the chain, as in the explicit version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Initial implementation of processor_chain Use NodeUserConfig for ProcessorChain processor configs

4 participants