-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix Kafka flaky test #105
fix Kafka flaky test #105
Conversation
After four times CI checks(see: https://github.com/apache/incubator-pekko-projection/actions/runs/7831923269), this kind of test has been successfully passed, and we should be able to assume that this kind of flaky has been fixed. |
@raboof Could you please do a code review? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I'm not convinced this change fixes the root cause of the instability: because of the assertion records.count(_.partition() == tp0.partition()) shouldBe 5
AFAICS it should be guaranteed that tp0TestCount
would always be 5, so this change should not have an effect other than timing differences - or did I miss something?
The producer produces a total of 20 elements, and each partition has 10 evenly. Then Sink consumes 10 elements from two partitions. There is no assertion that each partition consumes 5 evenly. If it is not consumed evenly, it will lead to problems with subsequent assertions, because partition 0 may consume 6 records in the previous step. This PR does not force the assertion that Sink consumes evenly from all partitions. If Kafka Consumer is not balanced consume from partitions, I don't think this is the responsibility of this test or should be added a new test to assert. |
Isn't that asserted by https://github.com/apache/incubator-pekko-projection/pull/105/files#diff-fdbaabc373547fac2368cc881beaa945f8d0225553d3e964dbe09eaf792592adR108-R112 ?
That makes sense to me. |
@@ -109,6 +110,12 @@ class KafkaSourceProviderImplSpec extends ScalaTestWithActorTestKit with LogCapt | |||
records.count(_.partition() == tp1.partition()) shouldBe 5 | |||
} | |||
|
|||
// because source push to handle(probe) before sinkProbe request pull, it made probe cache random one record | |||
val eagerMessage = probe.receiveMessage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kind of like akka/akka-projection#462, i won't say this is best solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@He-Pin Is there any way to make the source and sink reach a consensus on the number of elements?
Would you like to take a look? @raboof This may not the best solution, but it does reduce flaky. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, not sure I understand why it would, but if it does that would be awesome :)
Trying to solve #103, I am very doubtful that KafakConsumer will not be consumed evenly from two partitions.