diff --git a/src/collector.rs b/src/collector.rs index dc5d179..e2ec44f 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -7,7 +7,7 @@ use metrics_util::{ }; use quanta::Clock; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{atomic::Ordering, Arc}, time::Duration, }; @@ -31,22 +31,74 @@ struct Inner { #[derive(Debug, serde::Deserialize, serde::Serialize)] struct Counter { - labels: Vec<(String, String)>, + labels: BTreeMap, value: u64, } +impl std::fmt::Display for Counter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, self.value) + } +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] struct Gauge { - labels: Vec<(String, String)>, + labels: BTreeMap, value: f64, } +impl std::fmt::Display for Gauge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, self.value) + } +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] struct Histogram { - labels: Vec<(String, String)>, + labels: BTreeMap, value: Vec<(f64, Option)>, } +impl std::fmt::Display for Histogram { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + let value = self + .value + .iter() + .map(|(k, v)| { + if let Some(v) = v { + format!("{}: {:.6}", k, v) + } else { + format!("{}: None,", k) + } + }) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, value) + } +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub(crate) struct Snapshot { counters: HashMap>, @@ -54,6 +106,112 @@ pub(crate) struct Snapshot { histograms: HashMap>, } +const PAIRS: [((&str, &str), &str); 2] = [ + ( + ( + "background-jobs.worker.started", + "background-jobs.worker.finished", + ), + "background-jobs.worker.running", + ), + ( + ( + "background-jobs.job.started", + "background-jobs.job.finished", + ), + "background-jobs.job.running", + ), +]; + +#[derive(Default)] +struct MergeCounter { + start: Option, + finish: Option, +} + +impl MergeCounter { + fn merge(self) -> Option { + match (self.start, self.finish) { + (Some(start), Some(end)) => Some(Counter { + labels: start.labels, + value: start.value.saturating_sub(end.value), + }), + (Some(only), None) | (None, Some(only)) => Some(Counter { + labels: only.labels, + value: 0, + }), + (None, None) => None, + } + } +} + +impl Snapshot { + pub(crate) fn present(self) { + if !self.counters.is_empty() { + println!("Counters"); + let mut merging = HashMap::new(); + for (key, counters) in self.counters { + if let Some(((start, _), name)) = PAIRS + .iter() + .find(|((start, finish), _)| *start == key || *finish == key) + { + let entry = merging.entry(name).or_insert_with(HashMap::new); + + for counter in counters { + let mut merge_counter = entry + .entry(counter.labels.clone()) + .or_insert_with(MergeCounter::default); + if key == *start { + merge_counter.start = Some(counter); + } else { + merge_counter.finish = Some(counter); + } + } + + continue; + } + + println!("\t{}", key); + for counter in counters { + println!("\t\t{}", counter); + } + } + + for (key, counters) in merging { + println!("\t{}", key); + + for (_, counter) in counters { + if let Some(counter) = counter.merge() { + println!("\t\t{}", counter); + } + } + } + } + + if !self.gauges.is_empty() { + println!("Gauges"); + for (key, gauges) in self.gauges { + println!("\t{}", key); + + for gauge in gauges { + println!("\t\t{}", gauge); + } + } + } + + if !self.histograms.is_empty() { + println!("Histograms"); + for (key, histograms) in self.histograms { + println!("\t{}", key); + + for histogram in histograms { + println!("\t\t{}", histogram); + } + } + } + } +} + fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { let labels = key .labels() @@ -76,10 +234,10 @@ impl Inner { let (name, labels) = key_to_parts(&key); let value = counter.get_inner().load(Ordering::Acquire); - counters - .entry(name) - .or_insert_with(Vec::new) - .push(Counter { labels, value }); + counters.entry(name).or_insert_with(Vec::new).push(Counter { + labels: labels.into_iter().collect(), + value, + }); } counters @@ -96,10 +254,10 @@ impl Inner { let (name, labels) = key_to_parts(&key); let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); - gauges - .entry(name) - .or_insert_with(Vec::new) - .push(Gauge { labels, value }) + gauges.entry(name).or_insert_with(Vec::new).push(Gauge { + labels: labels.into_iter().collect(), + value, + }) } gauges @@ -153,7 +311,7 @@ impl Inner { .value() .iter() .map(|(labels, summary)| Histogram { - labels: labels.clone(), + labels: labels.iter().cloned().collect(), value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] .into_iter() .map(|q| (q, summary.quantile(q))) diff --git a/src/main.rs b/src/main.rs index 884e572..adeff75 100644 --- a/src/main.rs +++ b/src/main.rs @@ -144,7 +144,7 @@ async fn main() -> Result<(), anyhow::Error> { if args.stats() { let stats = admin::client::stats(&client, &config).await?; - println!("{:#?}", stats); + stats.present(); } return Ok(());