Skip to content

KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions#15766

Merged
chia7712 merged 32 commits into
apache:trunkfrom
frankvicky:KAFKA-16593
May 7, 2024
Merged

KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions#15766
chia7712 merged 32 commits into
apache:trunkfrom
frankvicky:KAFKA-16593

Conversation

@frankvicky
Copy link
Copy Markdown
Contributor

Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for this contribution. some comments left. PTAL

result.containsKey(missingGroup) &&
result.get(missingGroup).getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()),
"The consumer group deletion did not work as expected");
class AbstractConsumerGroupExecutor {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those classes can be static class.

assert configured : "Must call configure before use";
try {
subscribe();
while (true) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a flag to break the loop when calling shutdown?

@ClusterTest
public void testDeleteWithTopicOption() {
try (Admin admin = cluster.createAdminClient()) {
admin.createTopics(buildSingletonTestTopic());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please call get to make sure the request is completed.

public void testDeleteWithUnrecognizedNewConsumerOption(String quorum) {
String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
class ConsumerGroupExecutor extends AbstractConsumerGroupExecutor {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractConsumerGroupExecutor is extended by ConsumerGroupExecutor only, so we can merge them together.

executor.submit(consumerThread);
}

void shutdown() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change this to close and make AbstractConsumerGroupExecutor extend Closeable. That allow us to use try-with-resources

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this usage scenario, many test cases involve manually invoking the shutdown method and subsequently checking the status of resource closure. In this case, do I still need to implement the AutoCloseable interface?

The first option is to keep it as it is.
The second option is to implement AutoCloseable, but still manually call the close method.
The third option is to implement AutoCloseable, but remove the tests for resource release.

createOffsetsTopic(listenerName(), new Properties());
String missingGroup = "missing.group";
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reduce the number of partitions/replicas so do we need 3 brokers?

Admin admin = cluster.createAdminClient();
ConsumerGroupExecutor consumerGroupExecutor = buildConsumerGroupExecutor(GROUP)
) {
createAndAwaitTestTopic(admin);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka will auto-create the topic when the topic is nonexistent, so you don't need to call createAndAwaitTestTopic

@frankvicky frankvicky changed the title [DRAFT] KAFKA-16593: wip [DRAFT] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions Apr 22, 2024
@frankvicky frankvicky marked this pull request as ready for review April 22, 2024 14:40
@frankvicky frankvicky changed the title [DRAFT] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions Apr 22, 2024
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. I notice that there are many similar test cases. Please try to merge them to simplify this test

assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
@ClusterTest
public void testDeleteCmdNonEmptyGroup() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge testDeleteCmdNonEmptyGroup into testDeleteNonEmptyGroup? For example:

        try (ConsumerGroupExecutor consumerGroupExecutor = buildConsumerGroupExecutor(GROUP)) {
            String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP};
            ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
            TestUtils.waitForCondition(
                    () -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1,
                    "The group did not initialize as expected."
            );

            Map<String, Throwable> result = new HashMap<>();

            String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
            assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
                    "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
            assertNotNull(result.get(GROUP),
                    "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
            assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP).getCause() instanceof GroupNotEmptyException,
                    "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");

        }

public void testDeleteEmptyGroup(String quorum) throws Exception {
createOffsetsTopic(listenerName(), new Properties());
@ClusterTest
public void testDeleteEmptyGroup() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto that testDeleteEmptyGroup and testDeleteCmdEmptyGroup can get merged

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        try (ConsumerGroupExecutor consumerGroupExecutor = buildConsumerGroupExecutor(GROUP)) {
            String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP};
            ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);

            TestUtils.waitForCondition(
                    () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
                    "The group did not initialize as expected."
            );

            consumerGroupExecutor.close();

            TestUtils.waitForCondition(
                    () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
                    "The group did not become empty as expected."
            );

            Map<String, Throwable> result = new HashMap<>();

            String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
            assertTrue(output.contains("Deletion of requested consumer groups ('" + GROUP + "') was successful."),
                    "The consumer group could not be deleted as expected");
            assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP) == null,
                    "The consumer group could not be deleted as expected");
        }

"The group did not initialize as expected."
);
TestUtils.waitForCondition(
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a ConsumerGroupState enum with all the group states. Would be sensible to use it instead of the literal "Stable" just to avoid breaks/duplicated work in the future if states get updated (ditto for all other references to literal state names)

void configure(Properties props) {
super.configure(props);
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
if (groupProtocol.toUpperCase(Locale.ROOT).equals(GroupProtocol.CONSUMER.toString())) {
Copy link
Copy Markdown
Member

@lianetm lianetm Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simplified to if (groupProtocol == GroupProtocol.CONSUMER) if we work with the GroupProtocol object instead of a string throughout the test. We truly only need the string when passing it into the consumer prop GROUP_PROTOCOL_CONFIG above.

return new ConsumerGroupExecutor(cluster.bootstrapServers(),
1,
null != group ? group : GROUP,
GroupProtocol.CLASSIC.name,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer group command supports deleting CONSUMER groups, not only CLASSIC ones, so shouldn't we parametrize this test to make sure checks deleting both group types?

Not sure if I'm missing something or that's being tracked in a different jira maybe @dajac? (I can't find it)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree to add test for CONSUMER. The origin test case does not include that, and we can complete that in this PR.

@frankvicky It seems to me the following changes can be added.

  1. add @ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true")
  2. use cluster.isKRaftTest() to avoid using GroupProtocol.CONSUMER in zk mode

Copy link
Copy Markdown
Member

@lianetm lianetm Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will also need @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "consumer,classic") (along with group.coordinator.new.enable=true) to run with the new protocol

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will also need @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "consumer,classic") (along with group.coordinator.new.enable=true )

It seems all we need is to set one of them. see

val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally right, I was wrongfully thinking it was also needed (and always using it like that), mainly because the list of supported protocols defaults to classic only, but agree, not needed.

@frankvicky
Copy link
Copy Markdown
Contributor Author

Hi @lianetm, @chia7712
Thanks for the suggestions, I have addressed the comments. 😀

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. a couple of comments left.

}

cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup};
static abstract class AbstractConsumerRunnable implements Runnable {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As AbstractConsumerRunnable has only one sub-class, we can merge them into to one class

static abstract class AbstractConsumerRunnable implements Runnable {
final String broker;
final String groupId;
final Optional<Properties> customPropsOpt;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if customPropsOpt is always, please remove it.

final String topic;
final String groupProtocol;
final String strategy;
final Optional<String> remoteAssignor;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@chia7712
Copy link
Copy Markdown
Member

@frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer running in background. Hence, we can consider moving AbstractConsumerGroupExecutor/ConsumerGroupExecutor to a individual java file. WDYT?

@lianetm
Copy link
Copy Markdown
Member

lianetm commented Apr 23, 2024

@chia7712 's comment makes sense to me, and heads-up, similar classes are already defined in ConsumerGroupCommandTest, so probably sensible to take a look at all and consider consolidating

@frankvicky
Copy link
Copy Markdown
Contributor Author

@frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer running in background. Hence, we can consider moving AbstractConsumerGroupExecutor/ConsumerGroupExecutor to a individual java file. WDYT?

It sounds good. 😀

Comment on lines +34 to +56
public ConsumerGroupExecutor(
String broker,
int numConsumers,
String groupId,
String groupProtocol,
String topic,
String strategy,
Optional<String> remoteAssignor,
Optional<Properties> customPropsOpt,
boolean syncCommit
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 2 well defined group types, classic and consumer, with different creation params, and having them all together seems a bit confusing (ex. having assignment strategy along with remote assignor)...not very intuitive when creating a new instance (that I expect several other test files will do as they get migrated). So wonder if it would maybe be clearer to split them into static builders that would take only the params they need? like:

public static ConsumerGroupExecutor forConsumerGroup(
                                                         String broker,
                                                         int numConsumers,
                                                         String groupId,
                                                         String topic,
                                                         Optional<String> remoteAssignor,
                                                         Optional<Properties> customPropsOpt,
                                                         boolean syncCommit) {

    }

    public static ConsumerGroupExecutor forClassicGroup(
                                                        String broker,
                                                        int numConsumers,
                                                        String groupId,
                                                        String topic,
                                                        String strategy,
                                                        Optional<Properties> customPropsOpt,
                                                        boolean syncCommit) {

    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, too many parameters can be very confusing for users. I will make the changes according to your suggestion. Thank you! 😀

String groupId,
String groupProtocol,
String topic,
String strategy,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename this to assignmentStrategy for clarity on what it is

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this make sense. 😄

String strategy,
Optional<String> remoteAssignor,
Optional<Properties> customPropsOpt,
boolean syncCommit
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The syncCommit param seems to always be false. If that's the case what's the purpose of it? (is it with the intention of using this same common ConsumerGroupExecutor class from the ConsumerGroupCommandTest too?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm
Yes, your assumption is correct. I'm pretty new to being a Committer myself, so I aimed to complete this migration with minimal alterations.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, let's just make sure we have a follow-up Jira to update the ConsumerGroupCommandTest then, so we don't leave the duplicated ConsumerGroupExecutor we currently have.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. a couple of style-related comments. PTAL

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
import static org.apache.kafka.common.GroupType.CONSUMER;

public class ConsumerRunnable implements Runnable {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is used by ConsumerGroupExecutor so we can move it to be a static class within ConsumerGroupExecutor

}
}

void subscribe() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this method as it is used by run only


@Override
public void run() {
assert configured : "Must call configure before use";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect configure must be called so it would be nice to move it to the constructor. It means we should create the consumer in constructor.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. a couple of comments left. PTAL

import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.GroupType.CONSUMER;

public class ConsumerGroupExecutor implements AutoCloseable {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the public to make it be package-private

});
}

public static ConsumerGroupExecutor buildConsumerGroup(String brokerAddress,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: remove public

);
}

public static ConsumerGroupExecutor buildClassicGroup(String brokerAddress,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: remove public

}
}

static class ConsumerRunnable implements Runnable {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add private since it is not exposed to callers

private boolean configured = false;
private volatile boolean isShutdown = false;

public ConsumerRunnable(String brokerAddress,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

}

static class ConsumerRunnable implements Runnable {
private final String brokerAddress;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those inner variables are not used after construction, so we can remove them

String topic,
String assignmentStrategy,
Optional<String> remoteAssignor,
Optional<Properties> customConfigs,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<String, Object> is more suitable in this case. The empty map can be equal to Optional.empty but it gets more graceful.

this.assignmentStrategy = assignmentStrategy;
this.remoteAssignor = remoteAssignor;

this.configure();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this method, since it is used only once. For example:

            Map<String, Object> configs = new HashMap<>();
            configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
            configs.put(GROUP_ID_CONFIG, groupId);
            configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
            if (Objects.equals(groupProtocol, CONSUMER.toString())) {
                remoteAssignor.ifPresent(assignor -> configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
            } else {
                configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy);
            }
            configs.putAll(customConfigs);
            consumer = new KafkaConsumer<>(configs);

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. two major comments left. PTAL

Optional<String> remoteAssignor,
Optional<Properties> customConfigs,
boolean syncCommit) {
static ConsumerGroupExecutor buildConsumerGroup(String brokerAddress,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move those static methods up?

configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy);
}
configs.putAll(customConfigs);
consumer = new KafkaConsumer<>(configs);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make sure all consumers get closed even though one of consumer gets error? We encountered resource leaks before and it makes our CI unstable. For example:

    private static AutoCloseable run(
            String brokerAddress,
            int numberOfConsumers,
            String groupId,
            String groupProtocol,
            String topic,
            String assignmentStrategy,
            Optional<String> remoteAssignor,
            Map<String, Object> customConfigs,
            boolean syncCommit
    ) {
        Queue<Consumer<byte[], byte[]>> consumers = consumers(IntStream.range(0, numberOfConsumers).mapToObj(ignored -> {
            Map<String, Object> configs = new HashMap<>();
            configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
            configs.put(GROUP_ID_CONFIG, groupId);
            configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
            if (Objects.equals(groupProtocol, CONSUMER.toString())) {
                remoteAssignor.ifPresent(assignor -> configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
            } else {
                configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy);
            }
            configs.putAll(customConfigs);
            return configs;
        }).collect(Collectors.toList()));

        AtomicBoolean closed = new AtomicBoolean(false);
        ExecutorService service = Executors.newFixedThreadPool(consumers.size());
        final AutoCloseable closeable = () -> {
            closed.set(true);
            consumers.forEach(c -> Utils.closeQuietly(c, "close consumer"));
            service.shutdownNow();
            service.awaitTermination(1, TimeUnit.MINUTES);
        };
        try {
            while (!consumers.isEmpty()) {
                Consumer<byte[], byte[]> consumer = consumers.poll();
                service.submit(() -> {
                    try {
                        consumer.subscribe(singleton(topic));
                        while (closed.get()) {
                            consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                            if (syncCommit)
                                consumer.commitSync();
                        }
                    } catch (WakeupException | InterruptException e) {
                        // OK
                    } finally {
                        consumer.close();
                    }
                });
            }
            return closeable;
        } catch (Throwable e) {
            Utils.closeQuietly(closeable, "release consumer");
            throw e;
        }
    }

    private static Queue<Consumer<byte[], byte[]>> consumers(List<Map<String, Object>> allConfigs) {

        Queue<Consumer<byte[], byte[]>> consumers = new LinkedList<>();
        try {
            allConfigs.forEach(configs ->
                    consumers.add(new KafkaConsumer<>(configs, new ByteArrayDeserializer(), new ByteArrayDeserializer())));
            return consumers;
        } catch (Throwable e) {
            consumers.forEach(c -> Utils.closeQuietly(c, "close consumer"));
            throw e;
        }
    }

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. there are two major comments. PTAL

executor.execute(() -> {
try {
consumer.subscribe(singleton(topic));
while (closed.get()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while (!closed.get())

@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true")
})
public class DeleteConsumerGroupsTest {
private static final String TOPIC = "foo";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we run a loop in each test case, it would be nice to create different topic/group for them. please take a look at #15802

@chia7712
Copy link
Copy Markdown
Member

I file https://issues.apache.org/jira/browse/KAFKA-16639 to trace the potential bug about AsyncConsumer. Please wait for that issue. If the issue is too hard to resolve, we can remove the test case about new consumer from this PR

if (syncCommit)
consumer.commitSync();
}
} catch (WakeupException | InterruptException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clear the interrupt status to avoid impacting the consumer#close? for example:

                    } catch (WakeupException e) {
                        // OK
                    } catch (InterruptException e) {
                        // OK with reset interrupt
                        Thread.interrupted();
                    } finally {
                        consumer.close();
                    }

)
.collect(Collectors.toList());

Queue<KafkaConsumer<String, String>> kafkaConsumers = buildConsumersSafely(allConfigs);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can get it more simple.

    private static AutoCloseable buildConsumers(
            String brokerAddress,
            int numberOfConsumers,
            String groupId,
            String groupProtocol,
            String topic,
            String assignmentStrategy,
            Optional<String> remoteAssignor,
            Map<String, Object> customConfigs,
            boolean syncCommit
    ) {
        List<Map<String, Object>> allConfigs = IntStream.range(0, numberOfConsumers)
                .mapToObj(ignored ->
                        composeConfigs(
                                brokerAddress,
                                groupId,
                                groupProtocol,
                                assignmentStrategy,
                                remoteAssignor,
                                customConfigs
                        )
                )
                .collect(Collectors.toList());

        List<KafkaConsumer<byte[], byte[]>> kafkaConsumers = Collections.synchronizedList(new ArrayList<>());
        ExecutorService executor = Executors.newFixedThreadPool(allConfigs.size());
        AtomicBoolean closed = new AtomicBoolean(false);
        final AutoCloseable closeable = () -> releaseConsumers(closed, kafkaConsumers, executor);
        
        try {
            for (Map<String, Object> configs : allConfigs) {
                executor.execute(() -> {
                    try(KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(configs)) {
                        kafkaConsumers.add(consumer);
                        consumer.subscribe(singleton(topic));
                        while (!closed.get()) {
                            consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                            if (syncCommit)
                                consumer.commitSync();
                        }
                    } catch (WakeupException e) {
                        // OK
                    }
                });
            }
            return closeable;
        } catch (Throwable e) {
            Utils.closeQuietly(closeable, "Release Consumer");
            throw e;
        }
    }

    private static void releaseConsumers(AtomicBoolean closed, List<KafkaConsumer<byte[], byte[]>> kafkaConsumers, ExecutorService executor) throws InterruptedException {
        closed.set(true);
        kafkaConsumers.forEach(KafkaConsumer::wakeup);
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented May 5, 2024

System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers);

@frankvicky Could you change state.state to state.state.name() for same output. Also, please fix the build error

Comment thread core/src/test/java/kafka/test/ClusterInstance.java
Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. two comments left.

Comment thread core/src/test/java/kafka/test/ClusterInstance.java

System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers);
System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state.name(), state.numMembers);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that we should use toString() instead of name()

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky thanks for updated PR. Some comments are left. PTAL


cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId};

ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please close this service2


cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId};

ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@ClusterTemplate("generator")
public void testDeleteWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", getDummyGroupId()};
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect the option parse get failed, so it would be more suitable to test ConsumerGroupCommandOptions.fromArgs directly

@ClusterTemplate("generator")
public void testDeleteWithTopicOption() {
String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", getDummyGroupId(), "--topic"};
assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
@ClusterTemplate("generator")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It tests only the arguments, so we don't need to create a embedded server.

String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
@ClusterTemplate("generator")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. replace it by @Test

String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
public DeleteConsumerGroupsTest(ClusterInstance cluster) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As some test cases don't require ClusterInstance, please inject ClusterInstance to method-level instead of class-level. For example:

    @ClusterTemplate("generator")
    public void testDeleteCmdNonExistingGroup(ClusterInstance cluster)

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky Sorry that I have left more comments. PTAL

Comment thread core/src/test/java/kafka/test/ClusterTestExtensionsTest.java Outdated
public void testClusterTemplate() {
Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType(),
"generate1 provided a Zk cluster, so we should see that here");
"generate1 provided a Zk cluster, so we should see that here");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"generate1 provided a Zk cluster, so we should see that here");
Assertions.assertEquals("Generated Test", clusterInstance.config().name().orElse(""),
"generate1 named this cluster config, so we should see that here");
"generate1 named this cluster config, so we should see that here");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

})
@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
@ClusterConfigProperty(key = "spam", value = "eggs")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2),
@ClusterTest(clusterType = Type.CO_KRAFT),
@ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2)
@ClusterTest(clusterType = Type.ZK),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

})
public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
Assertions.assertEquals(clusterInstance.supportedGroupProtocols().size(), 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first argument is "expected" value, so it should be Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size());


import static java.util.Collections.singleton;

class ConsumerGroupExecutor {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other tests (for example: #15872) need both this and generator. How about renaming this to ConsumerGroupCommandTestUtils and move the implementation of generator to it?

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented May 7, 2024

@frankvicky Could you please rebase code to trigger QA again?

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit 0de3b7c into apache:trunk May 7, 2024
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants