KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions#15766
Conversation
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for this contribution. some comments left. PTAL
| result.containsKey(missingGroup) && | ||
| result.get(missingGroup).getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()), | ||
| "The consumer group deletion did not work as expected"); | ||
| class AbstractConsumerGroupExecutor { |
There was a problem hiding this comment.
those classes can be static class.
| assert configured : "Must call configure before use"; | ||
| try { | ||
| subscribe(); | ||
| while (true) { |
There was a problem hiding this comment.
Could we add a flag to break the loop when calling shutdown?
| @ClusterTest | ||
| public void testDeleteWithTopicOption() { | ||
| try (Admin admin = cluster.createAdminClient()) { | ||
| admin.createTopics(buildSingletonTestTopic()); |
There was a problem hiding this comment.
please call get to make sure the request is completed.
| public void testDeleteWithUnrecognizedNewConsumerOption(String quorum) { | ||
| String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP}; | ||
| assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); | ||
| class ConsumerGroupExecutor extends AbstractConsumerGroupExecutor { |
There was a problem hiding this comment.
AbstractConsumerGroupExecutor is extended by ConsumerGroupExecutor only, so we can merge them together.
| executor.submit(consumerThread); | ||
| } | ||
|
|
||
| void shutdown() { |
There was a problem hiding this comment.
We can change this to close and make AbstractConsumerGroupExecutor extend Closeable. That allow us to use try-with-resources
There was a problem hiding this comment.
In this usage scenario, many test cases involve manually invoking the shutdown method and subsequently checking the status of resource closure. In this case, do I still need to implement the AutoCloseable interface?
The first option is to keep it as it is.
The second option is to implement AutoCloseable, but still manually call the close method.
The third option is to implement AutoCloseable, but remove the tests for resource release.
| createOffsetsTopic(listenerName(), new Properties()); | ||
| String missingGroup = "missing.group"; | ||
| @ExtendWith(value = ClusterTestExtensions.class) | ||
| @ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = { |
There was a problem hiding this comment.
We reduce the number of partitions/replicas so do we need 3 brokers?
| Admin admin = cluster.createAdminClient(); | ||
| ConsumerGroupExecutor consumerGroupExecutor = buildConsumerGroupExecutor(GROUP) | ||
| ) { | ||
| createAndAwaitTestTopic(admin); |
There was a problem hiding this comment.
Kafka will auto-create the topic when the topic is nonexistent, so you don't need to call createAndAwaitTestTopic
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. I notice that there are many similar test cases. Please try to merge them to simplify this test
| assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()), | ||
| "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")"); | ||
| @ClusterTest | ||
| public void testDeleteCmdNonEmptyGroup() throws Exception { |
There was a problem hiding this comment.
Could we merge testDeleteCmdNonEmptyGroup into testDeleteNonEmptyGroup? For example:
try (ConsumerGroupExecutor consumerGroupExecutor = buildConsumerGroupExecutor(GROUP)) {
String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1,
"The group did not initialize as expected."
);
Map<String, Throwable> result = new HashMap<>();
String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
assertNotNull(result.get(GROUP),
"Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP).getCause() instanceof GroupNotEmptyException,
"The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
}| public void testDeleteEmptyGroup(String quorum) throws Exception { | ||
| createOffsetsTopic(listenerName(), new Properties()); | ||
| @ClusterTest | ||
| public void testDeleteEmptyGroup() throws Exception { |
There was a problem hiding this comment.
ditto that testDeleteEmptyGroup and testDeleteCmdEmptyGroup can get merged
There was a problem hiding this comment.
try (ConsumerGroupExecutor consumerGroupExecutor = buildConsumerGroupExecutor(GROUP)) {
String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(
() -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
"The group did not initialize as expected."
);
consumerGroupExecutor.close();
TestUtils.waitForCondition(
() -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
"The group did not become empty as expected."
);
Map<String, Throwable> result = new HashMap<>();
String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
assertTrue(output.contains("Deletion of requested consumer groups ('" + GROUP + "') was successful."),
"The consumer group could not be deleted as expected");
assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP) == null,
"The consumer group could not be deleted as expected");
}| "The group did not initialize as expected." | ||
| ); | ||
| TestUtils.waitForCondition( | ||
| () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"), |
There was a problem hiding this comment.
There is a ConsumerGroupState enum with all the group states. Would be sensible to use it instead of the literal "Stable" just to avoid breaks/duplicated work in the future if states get updated (ditto for all other references to literal state names)
| void configure(Properties props) { | ||
| super.configure(props); | ||
| props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); | ||
| if (groupProtocol.toUpperCase(Locale.ROOT).equals(GroupProtocol.CONSUMER.toString())) { |
There was a problem hiding this comment.
This could be simplified to if (groupProtocol == GroupProtocol.CONSUMER) if we work with the GroupProtocol object instead of a string throughout the test. We truly only need the string when passing it into the consumer prop GROUP_PROTOCOL_CONFIG above.
| return new ConsumerGroupExecutor(cluster.bootstrapServers(), | ||
| 1, | ||
| null != group ? group : GROUP, | ||
| GroupProtocol.CLASSIC.name, |
There was a problem hiding this comment.
The consumer group command supports deleting CONSUMER groups, not only CLASSIC ones, so shouldn't we parametrize this test to make sure checks deleting both group types?
Not sure if I'm missing something or that's being tracked in a different jira maybe @dajac? (I can't find it)
There was a problem hiding this comment.
agree to add test for CONSUMER. The origin test case does not include that, and we can complete that in this PR.
@frankvicky It seems to me the following changes can be added.
- add
@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") - use
cluster.isKRaftTest()to avoid usingGroupProtocol.CONSUMERin zk mode
There was a problem hiding this comment.
we will also need @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "consumer,classic") (along with group.coordinator.new.enable=true) to run with the new protocol
There was a problem hiding this comment.
we will also need @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "consumer,classic") (along with group.coordinator.new.enable=true )
It seems all we need is to set one of them. see
There was a problem hiding this comment.
You're totally right, I was wrongfully thinking it was also needed (and always using it like that), mainly because the list of supported protocols defaults to classic only, but agree, not needed.
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. a couple of comments left.
| } | ||
|
|
||
| cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup}; | ||
| static abstract class AbstractConsumerRunnable implements Runnable { |
There was a problem hiding this comment.
As AbstractConsumerRunnable has only one sub-class, we can merge them into to one class
| static abstract class AbstractConsumerRunnable implements Runnable { | ||
| final String broker; | ||
| final String groupId; | ||
| final Optional<Properties> customPropsOpt; |
There was a problem hiding this comment.
if customPropsOpt is always, please remove it.
| final String topic; | ||
| final String groupProtocol; | ||
| final String strategy; | ||
| final Optional<String> remoteAssignor; |
|
@frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer running in background. Hence, we can consider moving |
|
@chia7712 's comment makes sense to me, and heads-up, similar classes are already defined in ConsumerGroupCommandTest, so probably sensible to take a look at all and consider consolidating |
It sounds good. 😀 |
| public ConsumerGroupExecutor( | ||
| String broker, | ||
| int numConsumers, | ||
| String groupId, | ||
| String groupProtocol, | ||
| String topic, | ||
| String strategy, | ||
| Optional<String> remoteAssignor, | ||
| Optional<Properties> customPropsOpt, | ||
| boolean syncCommit |
There was a problem hiding this comment.
We have 2 well defined group types, classic and consumer, with different creation params, and having them all together seems a bit confusing (ex. having assignment strategy along with remote assignor)...not very intuitive when creating a new instance (that I expect several other test files will do as they get migrated). So wonder if it would maybe be clearer to split them into static builders that would take only the params they need? like:
public static ConsumerGroupExecutor forConsumerGroup(
String broker,
int numConsumers,
String groupId,
String topic,
Optional<String> remoteAssignor,
Optional<Properties> customPropsOpt,
boolean syncCommit) {
}
public static ConsumerGroupExecutor forClassicGroup(
String broker,
int numConsumers,
String groupId,
String topic,
String strategy,
Optional<Properties> customPropsOpt,
boolean syncCommit) {
}
There was a problem hiding this comment.
Indeed, too many parameters can be very confusing for users. I will make the changes according to your suggestion. Thank you! 😀
| String groupId, | ||
| String groupProtocol, | ||
| String topic, | ||
| String strategy, |
There was a problem hiding this comment.
maybe rename this to assignmentStrategy for clarity on what it is
There was a problem hiding this comment.
Thank you, this make sense. 😄
| String strategy, | ||
| Optional<String> remoteAssignor, | ||
| Optional<Properties> customPropsOpt, | ||
| boolean syncCommit |
There was a problem hiding this comment.
The syncCommit param seems to always be false. If that's the case what's the purpose of it? (is it with the intention of using this same common ConsumerGroupExecutor class from the ConsumerGroupCommandTest too?)
There was a problem hiding this comment.
Hi @lianetm
Yes, your assumption is correct. I'm pretty new to being a Committer myself, so I aimed to complete this migration with minimal alterations.
There was a problem hiding this comment.
Makes sense, let's just make sure we have a follow-up Jira to update the ConsumerGroupCommandTest then, so we don't leave the duplicated ConsumerGroupExecutor we currently have.
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. a couple of style-related comments. PTAL
| import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; | ||
| import static org.apache.kafka.common.GroupType.CONSUMER; | ||
|
|
||
| public class ConsumerRunnable implements Runnable { |
There was a problem hiding this comment.
This class is used by ConsumerGroupExecutor so we can move it to be a static class within ConsumerGroupExecutor
| } | ||
| } | ||
|
|
||
| void subscribe() { |
There was a problem hiding this comment.
We don't need this method as it is used by run only
|
|
||
| @Override | ||
| public void run() { | ||
| assert configured : "Must call configure before use"; |
There was a problem hiding this comment.
We expect configure must be called so it would be nice to move it to the constructor. It means we should create the consumer in constructor.
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. a couple of comments left. PTAL
| import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; | ||
| import static org.apache.kafka.common.GroupType.CONSUMER; | ||
|
|
||
| public class ConsumerGroupExecutor implements AutoCloseable { |
There was a problem hiding this comment.
Remove the public to make it be package-private
| }); | ||
| } | ||
|
|
||
| public static ConsumerGroupExecutor buildConsumerGroup(String brokerAddress, |
| ); | ||
| } | ||
|
|
||
| public static ConsumerGroupExecutor buildClassicGroup(String brokerAddress, |
| } | ||
| } | ||
|
|
||
| static class ConsumerRunnable implements Runnable { |
There was a problem hiding this comment.
add private since it is not exposed to callers
| private boolean configured = false; | ||
| private volatile boolean isShutdown = false; | ||
|
|
||
| public ConsumerRunnable(String brokerAddress, |
| } | ||
|
|
||
| static class ConsumerRunnable implements Runnable { | ||
| private final String brokerAddress; |
There was a problem hiding this comment.
those inner variables are not used after construction, so we can remove them
| String topic, | ||
| String assignmentStrategy, | ||
| Optional<String> remoteAssignor, | ||
| Optional<Properties> customConfigs, |
There was a problem hiding this comment.
Map<String, Object> is more suitable in this case. The empty map can be equal to Optional.empty but it gets more graceful.
| this.assignmentStrategy = assignmentStrategy; | ||
| this.remoteAssignor = remoteAssignor; | ||
|
|
||
| this.configure(); |
There was a problem hiding this comment.
We don't need this method, since it is used only once. For example:
Map<String, Object> configs = new HashMap<>();
configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
configs.put(GROUP_ID_CONFIG, groupId);
configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
if (Objects.equals(groupProtocol, CONSUMER.toString())) {
remoteAssignor.ifPresent(assignor -> configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
} else {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy);
}
configs.putAll(customConfigs);
consumer = new KafkaConsumer<>(configs);
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. two major comments left. PTAL
| Optional<String> remoteAssignor, | ||
| Optional<Properties> customConfigs, | ||
| boolean syncCommit) { | ||
| static ConsumerGroupExecutor buildConsumerGroup(String brokerAddress, |
There was a problem hiding this comment.
Could you move those static methods up?
| configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy); | ||
| } | ||
| configs.putAll(customConfigs); | ||
| consumer = new KafkaConsumer<>(configs); |
There was a problem hiding this comment.
Could you make sure all consumers get closed even though one of consumer gets error? We encountered resource leaks before and it makes our CI unstable. For example:
private static AutoCloseable run(
String brokerAddress,
int numberOfConsumers,
String groupId,
String groupProtocol,
String topic,
String assignmentStrategy,
Optional<String> remoteAssignor,
Map<String, Object> customConfigs,
boolean syncCommit
) {
Queue<Consumer<byte[], byte[]>> consumers = consumers(IntStream.range(0, numberOfConsumers).mapToObj(ignored -> {
Map<String, Object> configs = new HashMap<>();
configs.put(BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
configs.put(GROUP_ID_CONFIG, groupId);
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
if (Objects.equals(groupProtocol, CONSUMER.toString())) {
remoteAssignor.ifPresent(assignor -> configs.put(GROUP_REMOTE_ASSIGNOR_CONFIG, assignor));
} else {
configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy);
}
configs.putAll(customConfigs);
return configs;
}).collect(Collectors.toList()));
AtomicBoolean closed = new AtomicBoolean(false);
ExecutorService service = Executors.newFixedThreadPool(consumers.size());
final AutoCloseable closeable = () -> {
closed.set(true);
consumers.forEach(c -> Utils.closeQuietly(c, "close consumer"));
service.shutdownNow();
service.awaitTermination(1, TimeUnit.MINUTES);
};
try {
while (!consumers.isEmpty()) {
Consumer<byte[], byte[]> consumer = consumers.poll();
service.submit(() -> {
try {
consumer.subscribe(singleton(topic));
while (closed.get()) {
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
if (syncCommit)
consumer.commitSync();
}
} catch (WakeupException | InterruptException e) {
// OK
} finally {
consumer.close();
}
});
}
return closeable;
} catch (Throwable e) {
Utils.closeQuietly(closeable, "release consumer");
throw e;
}
}
private static Queue<Consumer<byte[], byte[]>> consumers(List<Map<String, Object>> allConfigs) {
Queue<Consumer<byte[], byte[]>> consumers = new LinkedList<>();
try {
allConfigs.forEach(configs ->
consumers.add(new KafkaConsumer<>(configs, new ByteArrayDeserializer(), new ByteArrayDeserializer())));
return consumers;
} catch (Throwable e) {
consumers.forEach(c -> Utils.closeQuietly(c, "close consumer"));
throw e;
}
}
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. there are two major comments. PTAL
| executor.execute(() -> { | ||
| try { | ||
| consumer.subscribe(singleton(topic)); | ||
| while (closed.get()) { |
| @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true") | ||
| }) | ||
| public class DeleteConsumerGroupsTest { | ||
| private static final String TOPIC = "foo"; |
There was a problem hiding this comment.
As we run a loop in each test case, it would be nice to create different topic/group for them. please take a look at #15802
|
I file https://issues.apache.org/jira/browse/KAFKA-16639 to trace the potential bug about AsyncConsumer. Please wait for that issue. If the issue is too hard to resolve, we can remove the test case about new consumer from this PR |
| if (syncCommit) | ||
| consumer.commitSync(); | ||
| } | ||
| } catch (WakeupException | InterruptException e) { |
There was a problem hiding this comment.
Could you clear the interrupt status to avoid impacting the consumer#close? for example:
} catch (WakeupException e) {
// OK
} catch (InterruptException e) {
// OK with reset interrupt
Thread.interrupted();
} finally {
consumer.close();
}| ) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| Queue<KafkaConsumer<String, String>> kafkaConsumers = buildConsumersSafely(allConfigs); |
There was a problem hiding this comment.
Maybe we can get it more simple.
private static AutoCloseable buildConsumers(
String brokerAddress,
int numberOfConsumers,
String groupId,
String groupProtocol,
String topic,
String assignmentStrategy,
Optional<String> remoteAssignor,
Map<String, Object> customConfigs,
boolean syncCommit
) {
List<Map<String, Object>> allConfigs = IntStream.range(0, numberOfConsumers)
.mapToObj(ignored ->
composeConfigs(
brokerAddress,
groupId,
groupProtocol,
assignmentStrategy,
remoteAssignor,
customConfigs
)
)
.collect(Collectors.toList());
List<KafkaConsumer<byte[], byte[]>> kafkaConsumers = Collections.synchronizedList(new ArrayList<>());
ExecutorService executor = Executors.newFixedThreadPool(allConfigs.size());
AtomicBoolean closed = new AtomicBoolean(false);
final AutoCloseable closeable = () -> releaseConsumers(closed, kafkaConsumers, executor);
try {
for (Map<String, Object> configs : allConfigs) {
executor.execute(() -> {
try(KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(configs)) {
kafkaConsumers.add(consumer);
consumer.subscribe(singleton(topic));
while (!closed.get()) {
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
if (syncCommit)
consumer.commitSync();
}
} catch (WakeupException e) {
// OK
}
});
}
return closeable;
} catch (Throwable e) {
Utils.closeQuietly(closeable, "Release Consumer");
throw e;
}
}
private static void releaseConsumers(AtomicBoolean closed, List<KafkaConsumer<byte[], byte[]>> kafkaConsumers, ExecutorService executor) throws InterruptedException {
closed.set(true);
kafkaConsumers.forEach(KafkaConsumer::wakeup);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}…sumerGroupService to try-with resource block to leverage the autocloseable.
…Instance, and refactor DeleteConsumerGroupsTest.
…GroupState and refactor related test.
|
@frankvicky Could you change |
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. two comments left.
|
|
||
| System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"); | ||
| System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers); | ||
| System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state.name(), state.numMembers); |
There was a problem hiding this comment.
Sorry that we should use toString() instead of name()
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky thanks for updated PR. Some comments are left. PTAL
|
|
||
| cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId}; | ||
|
|
||
| ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs); |
|
|
||
| cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId}; | ||
|
|
||
| ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs); |
| @ClusterTemplate("generator") | ||
| public void testDeleteWithUnrecognizedNewConsumerOption() { | ||
| String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", getDummyGroupId()}; | ||
| assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); |
There was a problem hiding this comment.
We expect the option parse get failed, so it would be more suitable to test ConsumerGroupCommandOptions.fromArgs directly
| @ClusterTemplate("generator") | ||
| public void testDeleteWithTopicOption() { | ||
| String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", getDummyGroupId(), "--topic"}; | ||
| assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); |
| } | ||
|
|
||
| ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs); | ||
| @ClusterTemplate("generator") |
There was a problem hiding this comment.
It tests only the arguments, so we don't need to create a embedded server.
| String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); | ||
| assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()), | ||
| "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")"); | ||
| @ClusterTemplate("generator") |
| String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); | ||
| assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), | ||
| "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); | ||
| public DeleteConsumerGroupsTest(ClusterInstance cluster) { |
There was a problem hiding this comment.
As some test cases don't require ClusterInstance, please inject ClusterInstance to method-level instead of class-level. For example:
@ClusterTemplate("generator")
public void testDeleteCmdNonExistingGroup(ClusterInstance cluster)
chia7712
left a comment
There was a problem hiding this comment.
@frankvicky Sorry that I have left more comments. PTAL
| public void testClusterTemplate() { | ||
| Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType(), | ||
| "generate1 provided a Zk cluster, so we should see that here"); | ||
| "generate1 provided a Zk cluster, so we should see that here"); |
| "generate1 provided a Zk cluster, so we should see that here"); | ||
| Assertions.assertEquals("Generated Test", clusterInstance.config().name().orElse(""), | ||
| "generate1 named this cluster config, so we should see that here"); | ||
| "generate1 named this cluster config, so we should see that here"); |
| }) | ||
| @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = { | ||
| @ClusterConfigProperty(key = "foo", value = "bar"), | ||
| @ClusterConfigProperty(key = "spam", value = "eggs") |
| @ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2), | ||
| @ClusterTest(clusterType = Type.CO_KRAFT), | ||
| @ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2) | ||
| @ClusterTest(clusterType = Type.ZK), |
| }) | ||
| public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) { | ||
| Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC)); | ||
| Assertions.assertEquals(clusterInstance.supportedGroupProtocols().size(), 1); |
There was a problem hiding this comment.
the first argument is "expected" value, so it should be Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size());
|
|
||
| import static java.util.Collections.singleton; | ||
|
|
||
| class ConsumerGroupExecutor { |
There was a problem hiding this comment.
Other tests (for example: #15872) need both this and generator. How about renaming this to ConsumerGroupCommandTestUtils and move the implementation of generator to it?
|
@frankvicky Could you please rebase code to trigger QA again? |
…apache#15766) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions.