Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/24686_aggregate_interval_ms.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `aggregate` transform now correctly sets `interval_ms` on incremental counter metrics, allowing the Datadog metrics sink to encode them as rate metrics instead of count metrics.

authors: thomasqueirozb
104 changes: 94 additions & 10 deletions src/transforms/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{HashMap, hash_map::Entry},
num::NonZeroU32,
pin::Pin,
time::Duration,
};
Expand Down Expand Up @@ -109,6 +110,7 @@ type MetricEntry = (MetricData, EventMetadata);
#[derive(Debug)]
pub struct Aggregate {
interval: Duration,
interval_ms: Option<NonZeroU32>,
map: HashMap<MetricSeries, MetricEntry>,
prev_map: HashMap<MetricSeries, MetricEntry>,
multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
Expand All @@ -117,8 +119,10 @@ pub struct Aggregate {

impl Aggregate {
pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
let interval_ms = NonZeroU32::new(config.interval_ms as u32);
Ok(Self {
interval: Duration::from_millis(config.interval_ms),
interval_ms,
map: Default::default(),
prev_map: Default::default(),
multi_map: Default::default(),
Expand Down Expand Up @@ -251,6 +255,14 @@ impl Aggregate {
let map = std::mem::take(&mut self.map);
for (series, entry) in map.clone().into_iter() {
let mut metric = Metric::from_parts(series, entry.0, entry.1);

// Set interval_ms for incremental counters
if matches!(metric.data().kind, MetricKind::Incremental)
&& matches!(metric.data().value(), MetricValue::Counter { .. })
{
metric.data_mut().time.interval_ms = self.interval_ms;
}

if matches!(self.mode, AggregationMode::Diff)
&& let Some(prev_entry) = self.prev_map.get(metric.series())
&& metric.data().kind == prev_entry.0.kind
Expand Down Expand Up @@ -288,7 +300,13 @@ impl Aggregate {
let final_mean = final_sum.clone();
match self.mode {
AggregationMode::Mean => {
let metric = Metric::from_parts(series, final_mean, final_metadata);
let mut metric = Metric::from_parts(series, final_mean, final_metadata);
// Set interval_ms for incremental counters
if matches!(metric.data().kind, MetricKind::Incremental)
&& matches!(metric.data().value(), MetricValue::Counter { .. })
{
metric.data_mut().time.interval_ms = self.interval_ms;
}
output.push(Event::Metric(metric));
}
AggregationMode::Stdev => {
Expand All @@ -308,7 +326,13 @@ impl Aggregate {
if let MetricValue::Gauge { value } = final_stdev.value_mut() {
*value = variance.sqrt()
}
let metric = Metric::from_parts(series, final_stdev, final_metadata);
let mut metric = Metric::from_parts(series, final_stdev, final_metadata);
// Set interval_ms for incremental counters
if matches!(metric.data().kind, MetricKind::Incremental)
&& matches!(metric.data().value(), MetricValue::Counter { .. })
{
metric.data_mut().time.interval_ms = self.interval_ms;
}
output.push(Event::Metric(metric));
}
_ => (),
Expand Down Expand Up @@ -395,6 +419,17 @@ mod tests {
event
}

// Helper function to compare metrics ignoring the interval_ms field
fn assert_metric_eq_ignore_interval(expected: &Event, actual: &Event) {
let expected_metric = expected.as_metric();
let actual_metric = actual.as_metric();

assert_eq!(expected_metric.series(), actual_metric.series());
assert_eq!(expected_metric.data().kind, actual_metric.data().kind);
assert_eq!(expected_metric.data().value(), actual_metric.data().value());
// Note: We don't check interval_ms here since the aggregate transform sets it
}

#[test]
fn incremental_auto() {
let mut agg = Aggregate::new(&AggregateConfig {
Expand Down Expand Up @@ -425,7 +460,7 @@ mod tests {
// We should flush 1 item counter_a_1
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&counter_a_1, &out[0]);
assert_metric_eq_ignore_interval(&counter_a_1, &out[0]);

// A subsequent flush doesn't send out anything
out.clear();
Expand All @@ -443,7 +478,7 @@ mod tests {
out.clear();
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&counter_a_summed, &out[0]);
assert_metric_eq_ignore_interval(&counter_a_summed, &out[0]);

let counter_b_1 = make_metric(
"counter_b",
Expand All @@ -459,8 +494,8 @@ mod tests {
// B/c we don't know the order they'll come back
for event in out {
match event.as_metric().series().name.name.as_str() {
"counter_a" => assert_eq!(counter_a_1, event),
"counter_b" => assert_eq!(counter_b_1, event),
"counter_a" => assert_metric_eq_ignore_interval(&counter_a_1, &event),
"counter_b" => assert_metric_eq_ignore_interval(&counter_b_1, &event),
_ => panic!("Unexpected metric name in aggregate output"),
}
}
Expand Down Expand Up @@ -943,7 +978,7 @@ mod tests {
// We should flush 1 item counter
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&summed, &out[0]);
assert_metric_eq_ignore_interval(&summed, &out[0]);
}

#[test]
Expand Down Expand Up @@ -998,7 +1033,56 @@ mod tests {
// We should flush 1 item incremental
agg.flush_into(&mut out);
assert_eq!(1, out.len());
assert_eq!(&summed, &out[0]);
assert_metric_eq_ignore_interval(&summed, &out[0]);
}

#[test]
fn aggregate_sets_interval_ms_on_counters() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 30_000,
mode: AggregationMode::Auto,
})
.unwrap();

let counter = make_metric(
"requests",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
);

agg.record(counter);
let mut out = vec![];
agg.flush_into(&mut out);

assert_eq!(1, out.len());
let output_metric = out[0].as_metric();
assert_eq!(
output_metric.data().time.interval_ms,
NonZeroU32::new(30_000)
);
}

#[test]
fn aggregate_does_not_set_interval_ms_on_gauges() {
let mut agg = Aggregate::new(&AggregateConfig {
interval_ms: 30_000,
mode: AggregationMode::Auto,
})
.unwrap();

let gauge = make_metric(
"cpu",
MetricKind::Absolute,
MetricValue::Gauge { value: 50.0 },
);

agg.record(gauge);
let mut out = vec![];
agg.flush_into(&mut out);

assert_eq!(1, out.len());
let output_metric = out[0].as_metric();
assert_eq!(output_metric.data().time.interval_ms, None);
}

#[tokio::test]
Expand Down Expand Up @@ -1054,7 +1138,7 @@ interval_ms = 999999
while let Some(event) = out_stream.next().await {
count += 1;
match event.as_metric().series().name.name.as_str() {
"counter_a" => assert_eq!(counter_a_summed, event),
"counter_a" => assert_metric_eq_ignore_interval(&counter_a_summed, &event),
"gauge_a" => assert_eq!(gauge_a_2, event),
_ => panic!("Unexpected metric name in aggregate output"),
};
Expand Down Expand Up @@ -1120,7 +1204,7 @@ interval_ms = 999999
match out.next().await {
Some(event) => {
match event.as_metric().series().name.name.as_str() {
"counter_a" => assert_eq!(counter_a_summed, event),
"counter_a" => assert_metric_eq_ignore_interval(&counter_a_summed, &event),
"gauge_a" => assert_eq!(gauge_a_2, event),
_ => panic!("Unexpected metric name in aggregate output"),
};
Expand Down
Loading