-
Notifications
You must be signed in to change notification settings - Fork 64
azure monitor exporter gzip batcher and performance improvements. #1490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
azure monitor exporter gzip batcher and performance improvements. #1490
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## main #1490 +/- ##
===========================================
- Coverage 80.57% 67.98% -12.59%
===========================================
Files 343 277 -66
Lines 83914 72578 -11336
===========================================
- Hits 67616 49345 -18271
- Misses 15764 22699 +6935
Partials 534 534
🚀 New features to boost your workflow:
|
| #[allow(clippy::print_stdout)] | ||
| fn create_credential(auth_config: &AuthConfig) -> Result<Arc<dyn TokenCredential>, String> { | ||
| match auth_config.method { | ||
| AuthMethod::ManagedIdentity => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auth now supports managed identity and dev creds but not Azure Arc MSI; I understand this would be add later, but please add a TODO (and also create issue) tracking Azure Arc MSI support so the exporter can run on edge/Arc devices.
| if self.total_uncompressed_size == 0 { | ||
| self.buf | ||
| .write_all(b"[") | ||
| .expect("write to memory buffer failed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With `expect, any unexpected gzip/memory error crashes the exporter instead of surfacing an error.
| let json_bytes = | ||
| serde_json::to_vec(&body).map_err(|e| format!("Failed to serialize to JSON: {e}"))?; | ||
| // Clone static headers and add the auth header | ||
| let mut headers = self.static_headers.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are static headers - I believe we should be able to avoid cloning in hot path.
| self.current_uncompressed_size += 1; | ||
| } else { | ||
| self.buf | ||
| .write_all(b",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we append , here, but later when the size check triggers a flush, the batch is emitted as [... ,] without the attempted element. Trailing comma will make the JSON invalid, and most probably fail at ingestion.
d0d5368 to
413048d
Compare
jmacd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is still in draft form, please take these comments as conversation starters! 😀
| } | ||
|
|
||
| // TODO: Remove print_stdout after logging is set up | ||
| #[allow(clippy::print_stdout)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will stake out a position and ask @cijothomas for an opinion too about plain-old-logging APIs with message formatting like println!("string {variable}") where variable is borrowed if the statement executes.
As I've worked now with Arrow and Datafusion, here's what I find:
- Not much logging in Arrow itself (Arrow Flight the RPC framework uses
tracing_log) - DataFusion is heavily instrumented with the
logpackage withenv_logger(I know I can set RUST_LOG=debug and learn about its internals)
I like the log API surface.
Now, I'm asking @lquerel for an opinion too. How would you like to see a logging SDK adopted? I think env_logger is great in development settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing macros are what we use in OTel-Rust. It's zero cost when not-enabled. And we can setup subscribers (listeners that routes logs to destination like otel or stdout or file etc..) per thread.
(tracing has a log bridge too, so the subscriber we use for tracing will be able to get log events as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry/src/global/internal_logging.rs
OTel Rust wrote wrapper macros, so we can easily switch from tracing to something else, if needed.
| // TODO - Remove print statements | ||
| #[allow(clippy::print_stdout)] | ||
| pub struct Auth { | ||
| credential: Arc<dyn TokenCredential>, | ||
| scope: String, | ||
| // Thread-safe shared token cache | ||
| cached_token: Arc<RwLock<Option<AccessToken>>>, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // TODO - Remove print statements | |
| #[allow(clippy::print_stdout)] | |
| pub struct Auth { | |
| credential: Arc<dyn TokenCredential>, | |
| scope: String, | |
| // Thread-safe shared token cache | |
| cached_token: Arc<RwLock<Option<AccessToken>>>, | |
| } | |
| // TODO - Consolidate with crates/otap/src/{cloud_auth,object_store)/azure.rs | |
| #[allow(clippy::print_stdout)] | |
| pub struct Auth { | |
| credential: Arc<dyn TokenCredential>, | |
| scope: String, | |
| // Thread-safe shared token cache | |
| cached_token: Arc<RwLock<Option<AccessToken>>>, | |
| } | |
I recognize this is using things already committed in crates/otap/src/experimental/azure_monitor_exporter/config.rs.
Have a look at rust/otap-dataflow/crates/otap/src/cloud_auth/azure.rs and rust/otap-dataflow/crates/otap/src/object_store/azure.rs, added subsequently in #1517. The struct here looks similar to the object_store struct, and there are two similar Auth configs now. It will be nice to see less code and more re-use as we move forward, not to block this PR.
(@lalitb please review. I believe our position should be that Azure auth code/config belongs in an extension, the extension used by parquet_exporter for object_store and by Azure Monitor here.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jmacd That's correct. Instead of each exporter (Azure Monitor, Parquet/object_store, etc.) implementing its own auth config and credential creation logic, they should all reference this shared extension - and any common requirements should be met by extending this module rather than adding parallel implementations.
| failed_batch_count: f64, | ||
| failed_msg_count: f64, | ||
| average_client_latency_secs: f64, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have look at a few of the components in the crates/otap/src folder, such as retry_processor.rs, and how there is an existing metrics APIs covering at the Counters here.
As for histogram measurements, I would prefer to hold back. Again, asking @lquerel for opinions: recording histogram instruments could be a thread-local histogram data structure or a message-passing of latency measurements (or both), at which point it may as well be a span message aggregated in the OTel SDK or one of our own pipelines for a latency metric.
We shouldn't reinvent this stuff, see https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs.
I see this as a question of routing histogram measurements (per core) to the central instrumentation collector
@gouslu I understand that some of this is for your own performance studies. Latency and counters aside, can we remove processing_started_at, last_message_received_at, idle_duration, and average_client_latency_secs?
As for the counters, we aim to standardize pipeline metrics, the topic in #487 and a model RFC in the collector RFC https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/rfcs/component-universal-telemetry.md. The counters here are fine until we have a more-standard solution.
| async fn handle_pdata( | ||
| &mut self, | ||
| effect_handler: &EffectHandler<OtapPdata>, | ||
| request: ExportLogsServiceRequest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be a relatively large performance improvement when we avoid constructing ExportLogsServiceRequest objects and use the view instead. @lalitb please help with pointer and/or an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, @gouslu - you can find the example, and the benchmark here - OtapLogsView
| self.handle_shutdown(&effect_handler).await?; | ||
| return Ok(TerminalState::new( | ||
| deadline, | ||
| std::iter::empty::<otap_df_telemetry::metrics::MetricSetSnapshot>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned in another comment, we should follow other crates/otap component examples of the metrics integration with crates/telemetry: then you'll have a proper MetricsSet at this point.
| } | ||
|
|
||
| pub struct InFlightExports { | ||
| futures: FuturesUnordered<BoxFuture<'static, CompletedExport>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| ─────────────── AzureMonitorExporter ────────────────────────── | ||
| perf │ th/s={:.2} avg_lat={:.2}ms | ||
| success │ rows={:.0} batches={:.0} msgs={:.0} | ||
| fail │ rows={:.0} batches={:.0} msgs={:.0} | ||
| time │ elapsed={:.1}s active={:.1}s idle={:.1}s | ||
| state | batch_to_msg={} msg_to_batch={} msg_to_data={} | ||
| ───────────────────────────────────────────────────────────────\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @cijothomas I've recommended that this component use the crates/telemetry framework, which would mean we could compute performance measurements using the continuous benchmarks. OTOH we would need a server that accepts gzip-compressed-json for the testing. Either way, I've recommended to use the Counter<_> and existing metrics support for now (for counters); asked questions here about logging and histogram measurements.
|
All of my real questions have to do with instrumentation and the potential for re-use of Azure auth structs. The code looks good! |
9f5cae4 to
29fb7bc
Compare
| pub fn new(config: &Config) -> Result<Self, String> { | ||
| let http_client = Client::builder() | ||
| .timeout(Duration::from_secs(30)) | ||
| .http2_prior_knowledge() // Use HTTP/2 directly (faster, no upgrade negotiation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note on HTTP/2: Since Azure Monitor uses HTTPS, ALPN negotiates HTTP/2 as part of the SSL handshake - so there's no extra round trip. Explicitly adding this configuration provides no benefit.
I'd also recommend benchmarking with both HTTP/1.1 and HTTP/2. With HTTP/2, the Geneva backend restricts the client from creating new connections, meaning all requests are multiplexed over a single connection. When payload sizes are large, this single connection can become a bottleneck and lead to timeouts. I ran into this exact issue with the Geneva exporter and had to switch to HTTP/1.1 to resolve it.
Not a blocker for this PR, but good to do some benchmark.
No description provided.