Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions adxexporter/summaryrule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

adxmonv1 "github.com/Azure/adx-mon/api/v1"
crdownership "github.com/Azure/adx-mon/pkg/crd/summaryrule"
"github.com/Azure/adx-mon/pkg/crd/summaryrule/backfill"
"github.com/Azure/adx-mon/pkg/kustoutil"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/azure-kusto-go/kusto"
Expand Down Expand Up @@ -114,6 +115,9 @@ func (r *SummaryRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Track/advance outstanding async operations (including backlog windows)
r.trackAsyncOperations(ctx, &rule)

// Process user-requested historical backfill (low priority, bounded concurrency).
backfill.Process(ctx, &rule, r.Clock, r.backfillSubmitFunc(), r.backfillGetStatusFunc())

// Persist any changes made to async operation conditions or timestamps
if err := r.Status().Update(ctx, &rule); err != nil {
// Not fatal; next reconcile will retry
Expand Down Expand Up @@ -516,6 +520,37 @@ func (r *SummaryRuleReconciler) updateLastExecutionTimeIfForward(rule *adxmonv1.
}
}

// backfillSubmitFunc adapts the exporter's submitRule (which takes time.Time) into
// the backfill.SubmitFunc signature (which takes RFC3339Nano strings).
func (r *SummaryRuleReconciler) backfillSubmitFunc() backfill.SubmitFunc {
return func(ctx context.Context, rule adxmonv1.SummaryRule, startTime, endTime string) (string, error) {
start, err := time.Parse(time.RFC3339Nano, startTime)
if err != nil {
return "", fmt.Errorf("backfill: invalid start time %q: %w", startTime, err)
}
end, err := time.Parse(time.RFC3339Nano, endTime)
if err != nil {
return "", fmt.Errorf("backfill: invalid end time %q: %w", endTime, err)
}
return r.submitRule(ctx, rule, start, end)
}
}

// backfillGetStatusFunc adapts the exporter's getOperation into the
// backfill.GetOperationStatusFunc signature.
func (r *SummaryRuleReconciler) backfillGetStatusFunc() backfill.GetOperationStatusFunc {
return func(ctx context.Context, database, operationID string) (string, bool, error) {
status, err := r.getOperation(ctx, database, operationID)
if err != nil {
return "", false, err
}
if status == nil {
return "InProgress", false, nil
}
return status.State, status.ShouldRetry != 0, nil
}
}

// emitHealthLog produces a single structured log line summarizing whether the rule is considered
// "working". A rule is working when: owned (or already adopted) by exporter, database managed,
// criteria matched, no stale async operations, no detected interval gap beyond one interval, and
Expand Down
3 changes: 3 additions & 0 deletions api/v1/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
ConditionCompleted = "Completed"
// ConditionFailed indicates terminal failure (will not retry automatically without spec change or manual intervention).
ConditionFailed = "Failed"
// ConditionBackfill reflects the current phase of a user-requested historical backfill.
// Status=True when completed, Unknown when running/pending, False when failed.
ConditionBackfill = "Backfill"
)

// Standardized reasons for shared conditions (subset reused from verifier conditions file to avoid duplication).
Expand Down
71 changes: 71 additions & 0 deletions api/v1/summaryrule_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,31 @@ type SummaryRuleSpec struct {
//
// An invalid expression (parse or type check error) will be treated as an error and prevent the rule from executing.
CriteriaExpression string `json:"criteriaExpression,omitempty"`

// Backfill optionally specifies a historical time range to process. The controller will
// generate interval-sized windows from StartTime to EndTime and submit them as low-priority
// async operations, independent of the normal scheduling cursor.
// +optional
Backfill *BackfillSpec `json:"backfill,omitempty"`
}

// BackfillSpec defines a user-requested historical backfill for a SummaryRule.
type BackfillSpec struct {
// RequestID is a user-chosen identifier for this backfill request. The same RequestID
// resumes an in-progress backfill; a new RequestID starts a fresh backfill.
// +kubebuilder:validation:MinLength=1
RequestID string `json:"requestId"`
// StartTime is the inclusive start of the historical range to backfill.
StartTime metav1.Time `json:"startTime"`
// EndTime is the exclusive end of the historical range to backfill.
EndTime metav1.Time `json:"endTime"`
// MaxInFlight limits the number of concurrent async operations for backfill windows.
// Defaults to 1 if unset or zero, keeping backfill as a low-priority background task.
// Values greater than 20 are rejected to keep historical processing throttled.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=20
// +optional
MaxInFlight int `json:"maxInFlight,omitempty"`
}

// +kubebuilder:object:root=true
Expand All @@ -123,6 +148,52 @@ type SummaryRuleStatus struct {
// use shared conditions defined in conditions.go: ConditionCriteria, ConditionCompleted, ConditionFailed.
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty"`

// Backfill tracks the progress of a user-requested historical backfill.
// +optional
Backfill *BackfillStatus `json:"backfill,omitempty"`
}

// BackfillPhase describes the current lifecycle phase of a backfill request.
type BackfillPhase string

const (
BackfillPhasePending BackfillPhase = "Pending"
BackfillPhaseRunning BackfillPhase = "Running"
BackfillPhaseCompleted BackfillPhase = "Completed"
BackfillPhaseFailed BackfillPhase = "Failed"
)

// BackfillStatus tracks the observed progress of a backfill request.
type BackfillStatus struct {
// RequestID mirrors the spec.backfill.requestId that this status corresponds to.
RequestID string `json:"requestId"`
// Phase is the current lifecycle phase of the backfill.
Phase BackfillPhase `json:"phase"`
// ObservedGeneration is the spec generation when this backfill was started.
// If the spec generation changes, the backfill is failed.
ObservedGeneration int64 `json:"observedGeneration"`
// NextWindowStart is the start time of the next window to be submitted.
NextWindowStart metav1.Time `json:"nextWindowStart"`
// SubmittedWindows is the total number of successful submissions (including retries).
SubmittedWindows int `json:"submittedWindows"`
// CompletedWindows is the number of windows that completed successfully.
CompletedWindows int `json:"completedWindows"`
// RetriedWindows counts how many times a window operation failed and was
// re-queued for retry. This is an observability counter; failed windows are
// never permanently skipped.
RetriedWindows int `json:"retriedWindows"`
// ActiveOperations are the currently in-flight async operations for this backfill.
ActiveOperations []BackfillOperation `json:"activeOperations,omitempty"`
// Message provides human-readable detail about the current phase.
Message string `json:"message,omitempty"`
}

// BackfillOperation tracks a single in-flight backfill async operation.
type BackfillOperation struct {
OperationID string `json:"operationId,omitempty"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
}

func (s *SummaryRule) GetCondition() *metav1.Condition {
Expand Down
63 changes: 63 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/crds.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This page summarizes all Custom Resource Definitions (CRDs) managed by adx-mon,
| Ingestor | Ingests telemetry from collectors, manages WAL, uploads to ADX | image, replicas, endpoint, exposeExternally, adxClusterSelector | [Operator Design](designs/operator.md#ingestor-crd) |
| Collector | Collects metrics/logs/traces, forwards to Ingestor | image, ingestorEndpoint | [Operator Design](designs/operator.md#collector-crd) |
| Alerter | Runs alert rules, sends notifications | image, notificationEndpoint, adxClusterSelector | [Operator Design](designs/operator.md#alerter-crd) |
| SummaryRule | Automates periodic KQL aggregations with async operation tracking, time window management, cluster label substitutions, and criteria-based execution | database, name, body, table, interval, criteria | [Summary Rules](designs/summary-rules.md#crd) |
| SummaryRule | Automates periodic KQL aggregations with async operation tracking, time window management, cluster label substitutions, criteria-based execution, and historical backfill | database, name, body, table, interval, criteria, backfill | [Summary Rules](designs/summary-rules.md#crd) |
| MetricsExporter | Executes KQL queries and exports results as metrics to OTLP endpoints | database, body, interval, transform, criteria | [Kusto-to-Metrics](designs/kusto-to-metrics.md) |
| Function | Defines KQL functions/views for ADX | name, body, database, table, isView, parameters | [Schema ETL](designs/schema-etl.md#crd) |
| ManagementCommand | Declarative cluster management commands | command, args, target, schedule | [Management Commands](designs/management-commands.md#crd) |
Expand Down
81 changes: 81 additions & 0 deletions docs/designs/summary-rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,85 @@ spec:
This is useful when the remote cluster has different retention policies, data is queried frequently or there is data collected in other system that is useful for reference. For example, it might
be useful to import data from a remote cluster that has a global view of all telemetry data but you only need a subset of that data.

## Historical Backfill

SummaryRules support explicit historical backfill via an optional `backfill` field in the spec. This lets you process
a specific time range that was either never summarized or needs to be re-computed.

### Usage

Add a `backfill` block to an existing SummaryRule:

```yaml
apiVersion: adx-mon.azure.com/v1
kind: SummaryRule
metadata:
name: hourly-avg
spec:
database: Metrics
table: MetricHourlyAvg
interval: 1h
body: |
RawMetric
| where Timestamp between (_startTime .. _endTime)
| summarize avg(Value) by bin(Timestamp, 1h)
backfill:
requestId: jan-2026 # User-chosen identifier; same ID = resume
startTime: "2026-01-01T00:00:00Z" # Inclusive start
endTime: "2026-02-01T00:00:00Z" # Exclusive end
maxInFlight: 1 # Max concurrent async ops (default: 1, max: 20)
```

### Key Semantics

- **`requestId`**: Required. The same `requestId` resumes an in-progress backfill. A new `requestId` starts fresh.
- **Separate cursor**: Backfill uses its own progress cursor (`status.backfill.nextWindowStart`) and does not
modify `LastSuccessfulExecution` used by normal scheduling.
- **No skipped intervals**: Retryable async failures are automatically re-queued for retry. Non-retryable
failures stop the backfill and mark it `Failed` rather than silently skipping a window.
- **No overlapping intervals**: Windows are generated sequentially from `startTime`, advancing by exactly one
`interval` each time. A deduplication guard prevents double-submission.
- **Whole-interval ranges only**: `endTime - startTime` must cover one or more whole `interval` windows.
Partial trailing windows are rejected up front instead of being silently dropped.
- **Generation pinning**: If the SummaryRule spec is edited mid-backfill (changing body, interval, etc.), the
backfill is failed and a new `requestId` must be submitted.
- **Low priority**: Backfill is designed as a background task. With `maxInFlight: 1` (default), only one
window is in-flight at a time. `maxInFlight` is capped at 20 to keep historical processing throttled.
A complete backfill may take days for large ranges — this is by design.
- **Append-only**: Backfill uses the same `.set-or-append async` as normal execution. Re-running the same
time range appends duplicate rows; use ADX extent management to deduplicate if needed.

### Status

Progress is tracked in `status.backfill`:

```yaml
status:
backfill:
requestId: jan-2026
phase: Running # Pending | Running | Completed | Failed
observedGeneration: 3
nextWindowStart: "2026-01-15T00:00:00Z"
submittedWindows: 336
completedWindows: 335
retriedWindows: 2
activeOperations:
- operationId: "abc-123"
startTime: "2026-01-15T00:00:00Z"
endTime: "2026-01-15T00:59:59.9999999Z"
```

The current phase is also mirrored into `status.conditions` as `type: Backfill`:

- `True` when the backfill is complete
- `False` when the backfill fails
- `Unknown` while pending or running

### Completing or Cancelling

- **Completion**: The backfill transitions to `Completed` automatically when all windows are processed.
- **Cancel**: Remove the `backfill` block from the spec. The status will be cleared on the next reconcile
(unless it was already in a terminal state, which is preserved for observability).
- **Restart**: Change the `requestId` to a new value with the desired time range.

> **See also:** [CRD Reference](../crds.md) for a summary of all CRDs and links to advanced usage.
27 changes: 27 additions & 0 deletions ingestor/adx/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/Azure/adx-mon/ingestor/storage"
"github.com/Azure/adx-mon/pkg/celutil"
crdownership "github.com/Azure/adx-mon/pkg/crd/summaryrule"
"github.com/Azure/adx-mon/pkg/crd/summaryrule/backfill"
"github.com/Azure/adx-mon/pkg/kustoutil"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/azure-kusto-go/kusto"
Expand Down Expand Up @@ -437,6 +438,9 @@ func (t *SummaryRuleTask) Run(ctx context.Context) error {
// Process any outstanding async operations for this rule
t.trackAsyncOperations(timeoutCtx, &rule)

// Process user-requested historical backfill (low priority, bounded concurrency).
backfill.Process(timeoutCtx, &rule, t.Clock, t.backfillSubmitFunc(), t.backfillGetStatusFunc())

t.logSummaryRule(&rule, false, err)
// Update the rule's primary status condition
if err := t.updateSummaryRuleStatus(timeoutCtx, &rule, err); err != nil {
Expand Down Expand Up @@ -821,3 +825,26 @@ func (t *AuditDiskSpaceTask) Run(ctx context.Context) error {
}
return nil
}

// backfillSubmitFunc adapts the ingestor's SubmitRule (which already takes string
// parameters) into the backfill.SubmitFunc signature.
func (t *SummaryRuleTask) backfillSubmitFunc() backfill.SubmitFunc {
return func(ctx context.Context, rule v1.SummaryRule, startTime, endTime string) (string, error) {
return t.SubmitRule(ctx, rule, startTime, endTime)
}
}

// backfillGetStatusFunc adapts the ingestor's getOperation into the
// backfill.GetOperationStatusFunc signature.
func (t *SummaryRuleTask) backfillGetStatusFunc() backfill.GetOperationStatusFunc {
return func(ctx context.Context, database, operationID string) (string, bool, error) {
status, err := t.getOperation(ctx, operationID)
if err != nil {
return "", false, err
}
if status == nil {
return "InProgress", false, nil
}
return status.State, status.ShouldRetry != 0, nil
}
}
Loading
Loading