KAFKA-16598 Mirgrate ResetConsumerGroupOffsetTest to new test infra#15779
Conversation
|
@chia7712 Could you please take a look at this question? Thank you. |
|
Hey @m1a2st , the way to get the servers' addresses is |
|
@m1a2st Please add following server property: @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
})The default replica of offset topic is 3, and the new infra create single broker. Hence, creating offset topic get failed, and hence you can't find a coordinator as the offset partition can't get be ready. Also, please notice @lianetm comments that bootstrap servers need to be updated manually if you kill any broker. |
|
Rely on #15766 |
|
@chia7712 , please take a look for this PR, thank you |
|
@m1a2st Could you please use |
| ConsumerGroupExecutor executor = addConsumerGroupExecutor(numConsumers, topic, group, GroupProtocol.CLASSIC.name); | ||
| awaitConsumerProgress(topic, group, totalMessages); | ||
| executor.shutdown(); | ||
| private Consumer<String, String> createNoAutoCommitConsumer(ClusterInstance cluster, String group) { |
There was a problem hiding this comment.
Could you test the new consumer also?
https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/ClusterInstance.java#L157
ClusterInstance can return the supported protocol, and you can leverage it to create specify consumer.
|
@m1a2st Could you please make sure "all" available consumer groups are included in this test? For example: |
| produceMessages(cluster, topic, 100); | ||
| for (int i = 1; i <= 3; i++) { | ||
| String group = GROUP_PREFIX + i; | ||
| try (AutoCloseable consumerGroupCloseable = |
There was a problem hiding this comment.
@chia7712, There is a weird problem, I use the try with auto closeable resource, I should not use onsumerGroupCloseable.close() this method to close resource. But if I don't use close() method, when I testing Kraft and co-Kraft mode with multi groupProtocol, it will be fail. Please take a look, thank you
There was a problem hiding this comment.
Hi @m1a2st, I remove consumerGroupCloseable.close() and both KRAFT and CO_KRAFT can pass. Not sure whether you can use String group = GROUP_PREFIX + groupProtocol.name() + i; and try it again, because you use same group names for both group protocols.
There was a problem hiding this comment.
@FrankYang0529, Thanks for your comment, and I change to use random group id and then tests will pass.
| .collect(Collectors.toSet()); | ||
| long count, | ||
| GroupProtocol groupProtocol) throws Exception { | ||
| try (Consumer<String, String> consumer = createNoAutoCommitConsumer(cluster, group, groupProtocol)) { |
There was a problem hiding this comment.
It seems we can rewrite this by Admin. for example:
try (Admin admin = Admin.create(Collections.singletonMap(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) {
TestUtils.waitForCondition(() -> admin.listConsumerGroupOffsets(group).all().get().get(group).entrySet()
.stream().filter(e -> e.getKey().topic().equals(topic))
.mapToLong(e -> e.getValue().offset()).sum() == count, "Expected that consumer group has consumed all messages from topic/partition. " +
"Expected offset: " + count +
". Actual offset: " +
admin.listConsumerGroupOffsets(group).all().get().get(group).entrySet()
.stream().filter(e -> e.getKey().topic().equals(topic)));
}| TopicPartition tp0 = new TopicPartition(topic, 0); | ||
| TopicPartition tp1 = new TopicPartition(topic, 1); | ||
| createTopic(topic, 2, 1, new Properties(), listenerName(), new Properties()); | ||
| private Map<TopicPartition, Long> committedOffsets(ClusterInstance cluster, |
| .collect(Collectors.toList()); | ||
| kafka.utils.TestUtils.produceMessages(servers(), seq(records), 1); | ||
| private static List<String> generateIds(String name) { | ||
| return IntStream.rangeClosed(1, 3) |
There was a problem hiding this comment.
Could we reduce the number? There are 5 (cluster) * 3 (groups) * 3 (topics) * 2 (protocols) = 90 ...
|
@chia7712, Thanks for your review, I have been changed according to your conversations. |
@m1a2st Those tests verify all groups so the loop of try (Admin admin = cluster.createAdminClient()) {
admin.deleteConsumerGroups(groups).all().get();
} |
|
@chia7712 , Thanks for your review, these test passed. |
|
@m1a2st could you please rebase code to trigger QA again? |
|
@chia7712, Thanks for your conversations, I already rebase this code. |
| String group, | ||
| long count) throws Exception { | ||
| try (Admin admin = Admin.create(singletonMap(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { | ||
| TestUtils.waitForCondition(() -> admin.listConsumerGroupOffsets(group) |
There was a problem hiding this comment.
Could you rewrite it by committedOffsets?
|
@chia7712, Please review, Thanks for your comments. |
| private static final String TOPIC_PREFIX = "foo-"; | ||
| private static final String GROUP_PREFIX = "test.group-"; | ||
|
|
||
| private static void generator(ClusterGenerator clusterGenerator) { |
There was a problem hiding this comment.
Please rebase code to fix this
|
It seems there are something is failed. Let's wait for https://issues.apache.org/jira/browse/KAFKA-16828 |
|
|
||
| Map<String, Map<TopicPartition, OffsetAndMetadata>> exportedOffsets = service.resetOffsets(); | ||
| bw.write(service.exportOffsetsToCsv(exportedOffsets)); | ||
| bw.close(); |
There was a problem hiding this comment.
BufferedWriter is in try-resource so do we need to call it again? Maybe we can add a little helper to output string content to specific file
| String topic, | ||
| String group, | ||
| long count) throws Exception { | ||
| TestUtils.waitForCondition(() -> committedOffsets(cluster, topic, group) |
There was a problem hiding this comment.
Could we avoid creating Admin repeatedly?
| private Map<String, Object> composeConsumerConfigs(ClusterInstance cluster, | ||
| String group, | ||
| GroupProtocol groupProtocol) { | ||
| HashMap<String, Object> configs = new HashMap<>(); |
There was a problem hiding this comment.
This configs could make the test slower. In those test cases they expect to wait "committed consumer offsets", and hence we have to wait (balance time: 3s + internal commit time: 5s) at least ...
Maybe we can reduce the value of both group.initial.rebalance.delay.ms and auto.commit.interval.ms to "1000"? WDYT?
…apache#15779) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…apache#15779) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…apache#15779) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…apache#15779) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Currently, I'm working on the ResetConsumerGroupOffsetTest. I'm a bit puzzled about the parameter
bootstrapServers(listenerName())in the old code. It seems to return localhost:+random port number. My understanding is that in the Scala part of the code, it first creates a broker and then returns the IP and port number of that broker. However, I'm not sure which part of the new testing framework corresponds to this parameter. Currently, I'm usingClusterInstance, but in the subsequent segmentconsumerGroupCommand.collectGroupState(group).coordinator.host(), it times out.