-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Migrate Kafka ITs to embedded tests #18870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
abhishekrb19
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this, @kfaraz! Some of the slow transaction tests have been quite flaky and hard to debug, so this migration should help.
I had some questions and left some suggestions.
| public void test_supervisorRecovers_afterChangeInTopicPartitions(boolean useTransactions) | ||
| { | ||
| totalRecords = publish1kRecords(topic, useTransactions); | ||
|
|
||
| kafkaServer.increasePartitionsInTopic(topic, 4); | ||
| totalRecords += publish1kRecords(topic, useTransactions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test failed locally for me with transactions disabled: org.apache.druid.java.util.common.ISE: Timed out waiting for event after [60,000]ms
It seems like it's stuck in some loop from these logs:
2026-01-02T20:18:32,119 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:32,225 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,331 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,387 WARN [KafkaSupervisor-supe_datasource_nhjeoeib] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_nhjeoeib] is greater than the number of partitions[1].
2026-01-02T20:18:32,388 INFO [KafkaSupervisor-supe_datasource_nhjeoeib] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_nhjeoeib], signalling tasks to complete.
2026-01-02T20:18:32,438 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:32,509 WARN [KafkaSupervisor-supe_datasource_nfgacpoa] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_nfgacpoa] is greater than the number of partitions[1].
2026-01-02T20:18:32,510 INFO [KafkaSupervisor-supe_datasource_nfgacpoa] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_nfgacpoa], signalling tasks to complete.
2026-01-02T20:18:32,542 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,648 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,754 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:32,861 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:32,962 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,035 WARN [KafkaSupervisor-supe_datasource_lnedibpp] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_lnedibpp] is greater than the number of partitions[1].
2026-01-02T20:18:33,035 INFO [KafkaSupervisor-supe_datasource_lnedibpp] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_lnedibpp], signalling tasks to complete.
2026-01-02T20:18:33,066 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,169 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,275 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,378 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
2026-01-02T20:18:33,413 WARN [KafkaSupervisor-supe_datasource_abegdknn] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_abegdknn] is greater than the number of partitions[1].
2026-01-02T20:18:33,414 INFO [KafkaSupervisor-supe_datasource_abegdknn] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_abegdknn], signalling tasks to complete.
2026-01-02T20:18:33,481 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [0]ms.
2026-01-02T20:18:33,577 WARN [KafkaSupervisor-supe_datasource_dpcjklpc] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Configured task count[2] for supervisor[supe_datasource_dpcjklpc] is greater than the number of partitions[1].
2026-01-02T20:18:33,578 INFO [KafkaSupervisor-supe_datasource_dpcjklpc] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Early stop requested for supervisor[supe_datasource_dpcjklpc], signalling tasks to complete.
2026-01-02T20:18:33,584 INFO [org.apache.druid.metadata.SqlSegmentsMetadataManager-Exec--0] org.apache.druid.metadata.SqlSegmentsMetadataManager - Polled and found [57] segments in the database in [1]ms.
org.apache.druid.java.util.common.ISE: Timed out waiting for event after [60,000]ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, let me check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhishekrb19 , I tried to debug this test. The logs that you have shared occur even in success scenarios.
Could you try running with an increased timeout and see if it works then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I tried running this test in a loop 10 times and I'm not able to reproduce the failure with the existing timeout. Iirc I noticed the failure the first time I ran it and maybe it was a one-off. We can keep an eye on the CI builds.
| indexer.latchableEmitter().waitForEventAggregate( | ||
| event -> event.hasMetricName("ingest/events/processed") | ||
| .hasDimension(DruidMetrics.DATASOURCE, dataSource), | ||
| agg -> agg.hasSumAtLeast(expectedRowCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I see this matcher doesn't support an exact count. Should we consider adding a stricter matcher hasSum rather than an hasSumAtLeast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! How about something like agg.hasSumMatching(matcher) which can be used for a variety of cases? We do something like that for hasValueMatching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abhishekrb19 , I tried doing this, but then I realized that the latchable emitter APIs are cumulative, i.e. we wait until a condition is satisfied. So, even if we were to wait for an exact sum, it wouldn't guarantee that no new events have arrived that would have increased the sum further.
So, I guess the later check to explicitly compute the totalEventsProcessed would be needed after all.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, that makes sense. This seems good for now.
| overlord.latchableEmitter().waitForEvent( | ||
| event -> event.hasMetricName("ingest/notices/time") | ||
| .hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorSpec.getId()) | ||
| .hasDimension("noticeType", "handoff_task_group_notice") | ||
| ); | ||
|
|
||
| totalRecords += publish1kRecords(topic, useTransactions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this test actually verify that the old task(s) have completed successfully on early handoff and new ones have been spun?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think I meant to do that initially but then just lazied out 😛 . Let me see if I can add that verification too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
| @JsonProperty("partitions") int partitions, | ||
| @JsonProperty("replicas") int replicas, | ||
| @JsonProperty("durationSeconds") long durationSeconds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose these fields will always be present, but I had thought that Jackson serde required Boxed types and doesn't work well with primitive types. I couldn't find examples of @JsonProperty + primitive types in our code base either...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a Jackson requirement, per se. We tend to use boxed types to allow them to be nullable.
Some example classes which are using primitives are OffHeapLoadingCache, DataNodeService and some random classes here and there.
If you feel it would be better to keep these as boxed types, I can update them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, thanks for the clarification! It makes sense to leave them as primitives since these cannot be nullable.
| try { | ||
| if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) { | ||
| return Response.ok().build(); | ||
| return Response.ok(Map.of("success", true)).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why this is needed. Also, “success” and Response.ok() feels a bit misleading here since this is an asynchronous handoff execution and the API will return immediately. Would an Accepted response status might be more appropriate for this?
It'll probably be good to clarify this in the docs too based on what we do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense. Let me update this.
I had to make this change since the current framework (the method EmbeddedServiceClient.onLeaderOverlord()) was trying to deserialize the response as a JSON value.
The empty/null entity was causing the deserialization to fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
Thanks! On a slightly different note, re:
I’d personally prefer for the embedded test framework to run on non-default ports (separate from the quickstart config/default ones), so I can run both without having to stop one to run the other for the same reason. The easy thing to do is to override the quickstart configs locally, but I was wondering if other devs would also want this capability, in which case it might make sense to change the ports in the embedded test framework. |
Yeah, that sounds reasonable. There was also a discussion a while back to use random ports for embedded servers so that multiple tests could be run in parallel. I will look into it. |
|
Thanks a lot for the prompt reviews, @abhishekrb19 ! |
Description
Kafka ITs come in 2 flavors - serialized and parallelized.
Serialized tests affect the cluster state and thus must not be run concurrently with any other test.
Parallelized tests are pretty much sandboxed since they work on a fresh datasource/supervisor/topic, as applicable.
In the context of embedded-tests, all tests are currently run serially since only 1 cluster can be running at a time to avoid port conflicts. Parallelizing them does not seem necessary at the moment, since the tests are fairly quick.
Changes
KafkaResource.produceRecordsWithoutTransaction()KafkaFaultToleranceTestto replace the following:ITKafkaIndexingServiceTransactionalParallelizedTestITKafkaIndexingServiceNonTransactionalParallelizedTestITKafkaIndexingServiceTransactionalSerializedTestITKafkaIndexingServiceNonTransactionalSerializedTestFaultyClusterTestto replaceITFaultyClusterTest. Getting this test to work required the following changes:TaskLockbox.cleanupPendingSegments()inFaultyTaskLockboxFaultyTaskActionClientFactoryon IndexersKafkaSupervisorReportPayloadKafkaMultiSupervisorTestEmbeddedKafkaSupervisorTestkafka-indexkafka-index-slowkafka-transactional-indexkafka-transactional-index-slowThis PR has: