-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add new mse metrics #17419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add new mse metrics #17419
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR enhances metrics collection for Multi-Stage Execution (MSE) by adding granular tracking of stages and opchains on both broker and server sides. The changes support better observability of query execution by tracking CPU time, memory allocation, and emitted documents.
- Changes the visibility of
copyStatMaps()method fromprotectedtopublicacross all operator implementations - Adds new broker-side metrics for tracking stage and opchain lifecycle (started/completed)
- Adds new server-side metrics for tracking opchain execution, CPU time, memory allocation, and emitted rows
- Introduces
getUnsafe()method toStatMapfor flexible stat retrieval without complex type casting
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| MultiStageOperator.java | Changes copyStatMaps() from protected abstract to public abstract |
| OpChainSchedulerService.java | Adds metrics collection for opchain lifecycle and resource usage on server |
| ServerMeter.java | Defines new MSE-related meter types and adds getGlobalMeter() helper |
| BrokerMeter.java | Defines new broker-side stage/opchain meter types and adds getGlobalMeter() helper |
| StatMap.java | Adds getUnsafe() method for string-based key lookup with default values |
| MultiStageBrokerRequestHandler.java | Instruments query execution with stage/opchain counting and meter marking |
| Multiple operator files | Updates copyStatMaps() implementations from protected to public |
Comments suppressed due to low confidence (1)
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:1
- Corrected spelling of 'competed' to 'completed'.
/**
pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
Outdated
Show resolved
Hide resolved
| /// @param keyName The name of the key. | ||
| /// @param defaultValue The default value to return if the key is not found. | ||
| /// @throws ClassCastException if the value cannot be cast to the same static type as the default value. | ||
| public <E> E getUnsafe(String keyName, E defaultValue) |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name getUnsafe is ambiguous. It's unclear whether 'unsafe' refers to the lack of type safety, potential for exceptions, or unchecked access. Consider getByName or getByKeyName to better convey that it performs string-based lookup.
| public <E> E getUnsafe(String keyName, E defaultValue) | |
| public <E> E getByKeyName(String keyName, E defaultValue) |
pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
Show resolved
Hide resolved
...y-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #17419 +/- ##
=============================================
- Coverage 55.65% 33.93% -21.73%
- Complexity 702 771 +69
=============================================
Files 2463 3153 +690
Lines 139016 187963 +48947
Branches 22167 28763 +6596
=============================================
- Hits 77376 63789 -13587
- Misses 55119 118895 +63776
+ Partials 6521 5279 -1242
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java:1
- Corrected spelling of 'competedOpchains' to 'completedOpchains'.
/**
| * Returns the value associated with the key name. | ||
| * | ||
| * In general, it is better to use the type-specific getters with the enum key directly, but sometimes it is | ||
| * impossible or requires complex to read code (like complex unsafe casts). |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The phrase 'requires complex to read code' should be 'requires complex-to-read code' or 'requires code that is complex to read'.
| * impossible or requires complex to read code (like complex unsafe casts). | |
| * impossible or requires complex-to-read code (like complex unsafe casts). |
| int stageCount = dispatchableSubPlan.getQueryStageMap().size(); | ||
| int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream() |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dispatchableSubPlan.getQueryStageMap() is called twice. Consider storing the result in a variable to avoid redundant calls.
| int stageCount = dispatchableSubPlan.getQueryStageMap().size(); | |
| int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream() | |
| Map<Integer, DispatchablePlanFragment> queryStageMap = dispatchableSubPlan.getQueryStageMap(); | |
| int stageCount = queryStageMap.size(); | |
| int opChainCount = queryStageMap.values().stream() |
| public <E> E getUnsafe(String keyName, E defaultValue) | ||
| throws ClassCastException { | ||
| K key = getKey(keyName); | ||
| if (key == null) { | ||
| return defaultValue; | ||
| } | ||
| return (E) getAny(key); |
Copilot
AI
Dec 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getUnsafe() method should document the behavior when the key exists but has a null value, as getAny() can return null. Currently, it's unclear whether this would return null or defaultValue.
| public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGE_STARTED", "stages", false); | ||
| public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGE_COMPLETED", "stages", false); | ||
| public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAIN_STARTED", "opchains", false); | ||
| public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAIN_COMPLETED", "opchains", false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGE_STARTED", "stages", false); | |
| public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGE_COMPLETED", "stages", false); | |
| public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAIN_STARTED", "opchains", false); | |
| public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAIN_COMPLETED", "opchains", false); | |
| public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGES_STARTED", "stages", false); | |
| public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGES_COMPLETED", "stages", false); | |
| public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAINS_STARTED", "opchains", false); | |
| public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAINS_COMPLETED", "opchains", false); |
nit: for consistency
| public static final BrokerMeter MSE_STAGES_STARTED = create("MSE_STAGE_STARTED", "stages", false); | ||
| public static final BrokerMeter MSE_STAGES_COMPLETED = create("MSE_STAGE_COMPLETED", "stages", false); | ||
| public static final BrokerMeter MSE_OPCHAINS_STARTED = create("MSE_OPCHAIN_STARTED", "opchains", false); | ||
| public static final BrokerMeter MSE_OPCHAINS_COMPLETED = create("MSE_OPCHAIN_COMPLETED", "opchains", false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the global parameter be true for all of these?
|
|
||
| int stageCount = dispatchableSubPlan.getQueryStageMap().size(); | ||
| int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream() | ||
| .mapToInt(stage -> stage.getServerInstances().size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use the worker info instead of server info since we could potentially have multiple workers per server?
| protected final PinotMeter _stagesStartedMeter = BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter(); | ||
| protected final PinotMeter _stagesFinishedMeter = BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter(); | ||
| protected final PinotMeter _opchainsStartedMeter = BrokerMeter.MSE_OPCHAINS_STARTED.getGlobalMeter(); | ||
| protected final PinotMeter _opchainsCompletedMeter = BrokerMeter.MSE_OPCHAINS_COMPLETED.getGlobalMeter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BaseBrokerRequestHandler class already has an instance of BrokerMetrics. We should use that here instead for better testability instead of the global singleton IMO.
This PR adds new metrics for MSE:
On broker:
pinot.broker.mseOpchainStarted&&pinot.broker.mseOpchainCompleted(meter): Total opchains started/finishedpinot.broker.mseStagesStarted&&pinot.broker.mseStagesCompleted(meter): Total stages started/finishedOn server:
pinot.server.mseOpchainsStarted&pinot.server.mseOpchainsCompleted(meters): Total opchain started/finishedpinot.server.mseCpuExecutionTimeMs(meter): CPU time for each taskpinot.server.mseMemoryAllocatedBytes(meter): Memory allocation for each taskpinot.server.mseEmittedRows(meter): Emitted docs