/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.streams;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.tools.streams.StreamsGroupCommandOptions;

public class StreamsGroupCommand {
    static final String MISSING_COLUMN_VALUE = "-";

    public static void main(String[] args) {
        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
        try {
            opts.checkArgs();
            long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.deleteOffsetsOpt).filter(arg_0 -> ((OptionSet)opts.options).has(arg_0)).count();
            if (numberOfActions != 1L) {
                CommandLineUtils.printUsageAndExit((OptionParser)opts.parser, (String)"Command must include exactly one action: --list, --describe, --delete, or --delete-offsets.");
            }
            StreamsGroupCommand.run(opts);
        }
        catch (OptionException e) {
            CommandLineUtils.printUsageAndExit((OptionParser)opts.parser, (String)e.getMessage());
        }
    }

    public static void run(StreamsGroupCommandOptions opts) {
        block12: {
            try (StreamsGroupService streamsGroupService = new StreamsGroupService(opts, Map.of());){
                if (opts.options.has(opts.listOpt)) {
                    streamsGroupService.listGroups();
                    break block12;
                }
                if (opts.options.has(opts.describeOpt)) {
                    streamsGroupService.describeGroups();
                    break block12;
                }
                if (opts.options.has(opts.deleteOpt)) {
                    streamsGroupService.deleteGroups();
                    break block12;
                }
                if (opts.options.has(opts.deleteOffsetsOpt)) {
                    streamsGroupService.deleteOffsets();
                    break block12;
                }
                throw new IllegalArgumentException("Unknown action!");
            }
            catch (IllegalArgumentException e) {
                CommandLineUtils.printUsageAndExit((OptionParser)opts.parser, (String)e.getMessage());
            }
            catch (Throwable e) {
                StreamsGroupCommand.printError("Executing streams group command failed due to " + e.getMessage(), Optional.of(e));
            }
        }
    }

    static Set<GroupState> groupStatesFromString(String input) {
        Set<GroupState> parsedStates = Arrays.stream(input.split(",")).map(s -> GroupState.parse((String)s.trim())).collect(Collectors.toSet());
        Set validStates = GroupState.groupStatesForType((GroupType)GroupType.STREAMS);
        if (!validStates.containsAll(parsedStates)) {
            throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + validStates.stream().map(GroupState::toString).collect(Collectors.joining(", ")));
        }
        return parsedStates;
    }

    public static void printError(String msg, Optional<Throwable> e) {
        System.out.println("\nError: " + msg);
        e.ifPresent(Throwable::printStackTrace);
    }

    static class StreamsGroupService
    implements AutoCloseable {
        final StreamsGroupCommandOptions opts;
        private final Admin adminClient;

        public StreamsGroupService(StreamsGroupCommandOptions opts, Map<String, String> configOverrides) {
            this.opts = opts;
            try {
                this.adminClient = this.createAdminClient(configOverrides);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public StreamsGroupService(StreamsGroupCommandOptions opts, Admin adminClient) {
            this.opts = opts;
            this.adminClient = adminClient;
        }

        public void listGroups() throws ExecutionException, InterruptedException {
            if (this.opts.options.has(this.opts.stateOpt)) {
                String stateValue = (String)this.opts.options.valueOf(this.opts.stateOpt);
                Set<GroupState> states = stateValue == null || stateValue.isEmpty() ? Set.of() : StreamsGroupCommand.groupStatesFromString(stateValue);
                List<GroupListing> listings = this.listStreamsGroupsInStates(states);
                this.printGroupInfo(listings);
            } else {
                this.listStreamsGroups().forEach(System.out::println);
            }
        }

        List<String> listStreamsGroups() {
            try {
                ListGroupsResult result = this.adminClient.listGroups((ListGroupsOptions)ListGroupsOptions.forStreamsGroups().timeoutMs(Integer.valueOf(((Long)this.opts.options.valueOf(this.opts.timeoutMsOpt)).intValue())));
                Collection listings = (Collection)result.all().get();
                return listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
            ListGroupsResult result = this.adminClient.listGroups(((ListGroupsOptions)ListGroupsOptions.forStreamsGroups().timeoutMs(Integer.valueOf(((Long)this.opts.options.valueOf(this.opts.timeoutMsOpt)).intValue()))).inGroupStates(states));
            return new ArrayList<GroupListing>((Collection)result.all().get());
        }

        private void printGroupInfo(List<GroupListing> groups) {
            int maxGroupLen = 15;
            for (GroupListing group : groups) {
                maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
            }
            System.out.printf("%" + -maxGroupLen + "s %s\n", "GROUP", "STATE");
            for (GroupListing group : groups) {
                String groupId = group.groupId();
                String state = group.groupState().orElse(GroupState.UNKNOWN).toString();
                System.out.printf("%" + -maxGroupLen + "s %s\n", groupId, state);
            }
        }

        public void describeGroups() throws ExecutionException, InterruptedException {
            List<String> groups = this.listStreamsGroups();
            if (!groups.isEmpty()) {
                StreamsGroupDescription description = this.getDescribeGroup(groups.get(0));
                if (description == null) {
                    return;
                }
                boolean verbose = this.opts.options.has(this.opts.verboseOpt);
                if (this.opts.options.has(this.opts.membersOpt)) {
                    this.printMembers(description, verbose);
                } else if (this.opts.options.has(this.opts.stateOpt)) {
                    this.printStates(description, verbose);
                } else {
                    this.printOffsets(description, verbose);
                }
            }
        }

        Map<String, Throwable> deleteGroups() {
            ArrayList<String> groupIds = this.opts.options.has(this.opts.allGroupsOpt) ? new ArrayList<String>(this.listStreamsGroups()) : new ArrayList(this.opts.options.valuesOf(this.opts.groupOpt));
            Map<String, Throwable> failed = this.preAdminCallChecks(groupIds);
            groupIds.removeAll(failed.keySet());
            HashMap success = new HashMap();
            Map<Object, Object> internalTopics = new HashMap();
            HashMap<String, Throwable> internalTopicsDeletionFailures = new HashMap<String, Throwable>();
            if (!groupIds.isEmpty()) {
                internalTopics = this.retrieveInternalTopics(groupIds);
                Map groupsToDelete = this.adminClient.deleteStreamsGroups(groupIds, this.withTimeoutMs(new DeleteStreamsGroupsOptions())).deletedGroups();
                groupsToDelete.forEach((g, f) -> {
                    try {
                        f.get();
                        success.put(g, null);
                    }
                    catch (InterruptedException ie) {
                        failed.put((String)g, ie);
                    }
                    catch (ExecutionException e) {
                        failed.put((String)g, e.getCause());
                    }
                });
                if (!success.isEmpty()) {
                    for (String groupId : success.keySet()) {
                        List internalTopicsToDelete = (List)internalTopics.get(groupId);
                        if (internalTopicsToDelete == null || internalTopicsToDelete.isEmpty()) continue;
                        DeleteTopicsResult deleteTopicsResult = null;
                        try {
                            deleteTopicsResult = this.adminClient.deleteTopics((Collection)internalTopicsToDelete);
                            deleteTopicsResult.all().get();
                        }
                        catch (InterruptedException | ExecutionException e) {
                            if (deleteTopicsResult != null) {
                                deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
                                    try {
                                        future.get();
                                    }
                                    catch (Exception topicException) {
                                        System.out.println("Failed to delete internal topic: " + topic);
                                    }
                                });
                            }
                            internalTopicsDeletionFailures.put(groupId, e.getCause());
                        }
                    }
                }
            }
            if (failed.isEmpty()) {
                System.out.println("Deletion of requested streams groups ('" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "') was successful.");
            } else {
                StreamsGroupCommand.printError("Deletion of some streams groups failed:", Optional.empty());
                failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + String.valueOf(error)));
                if (!success.isEmpty()) {
                    System.out.println("\nThese streams groups were deleted successfully: '" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "'.");
                }
            }
            if (!internalTopics.keySet().isEmpty()) {
                this.printInternalTopicErrors(internalTopicsDeletionFailures, success.keySet(), internalTopics.keySet());
            }
            failed.putAll(success);
            failed.putAll(internalTopicsDeletionFailures);
            return failed;
        }

        private Map<String, Throwable> preAdminCallChecks(List<String> groupIds) {
            List<GroupListing> streamsGroupIds = this.listDetailedStreamsGroups();
            LinkedHashSet<String> groupIdSet = new LinkedHashSet<String>(groupIds);
            HashMap<String, Throwable> failed = new HashMap<String, Throwable>();
            for (String groupId : groupIdSet) {
                Optional<GroupListing> listing = streamsGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny();
                if (listing.isEmpty()) {
                    failed.put(groupId, new IllegalArgumentException("Group '" + groupId + "' does not exist or is not a streams group."));
                    continue;
                }
                Optional groupState = listing.get().groupState();
                groupState.ifPresent(state -> {
                    if (state == GroupState.DEAD) {
                        failed.put(groupId, new IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
                    } else if (state != GroupState.EMPTY) {
                        failed.put(groupId, (Throwable)new GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
                    }
                });
            }
            return failed;
        }

        Map<String, List<String>> retrieveInternalTopics(List<String> groupIds) {
            HashMap<String, List<String>> groupToInternalTopics = new HashMap<String, List<String>>();
            try {
                Map descriptionMap = (Map)this.adminClient.describeStreamsGroups(groupIds).all().get();
                for (StreamsGroupDescription description : descriptionMap.values()) {
                    List sourceTopics = description.subtopologies().stream().flatMap(subtopology -> subtopology.sourceTopics().stream()).toList();
                    List internalTopics = description.subtopologies().stream().flatMap(subtopology -> Stream.concat(subtopology.repartitionSourceTopics().keySet().stream(), subtopology.stateChangelogTopics().keySet().stream())).filter(topic -> !sourceTopics.contains(topic)).collect(Collectors.toList());
                    internalTopics.removeIf(topic -> {
                        if (!this.isInferredInternalTopic((String)topic, description.groupId())) {
                            StreamsGroupCommand.printError("The internal topic '" + topic + "' is not inferred as internal and thus will not be deleted with the group '" + description.groupId() + "'.", Optional.empty());
                            return true;
                        }
                        return false;
                    });
                    if (internalTopics.isEmpty()) continue;
                    groupToInternalTopics.put(description.groupId(), internalTopics);
                }
            }
            catch (InterruptedException | ExecutionException e) {
                if (e.getCause() instanceof UnsupportedVersionException) {
                    StreamsGroupCommand.printError("Retrieving internal topics is not supported by the broker version. Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause()));
                }
                StreamsGroupCommand.printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e));
            }
            return groupToInternalTopics;
        }

        private boolean isInferredInternalTopic(String topicName, String applicationId) {
            return topicName.startsWith(applicationId + StreamsGroupCommand.MISSING_COLUMN_VALUE) && StreamsGroupService.matchesInternalTopicFormat(topicName);
        }

        public static boolean matchesInternalTopicFormat(String topicName) {
            return topicName.endsWith("-changelog") || topicName.endsWith("-repartition") || topicName.endsWith("-subscription-registration-topic") || topicName.endsWith("-subscription-response-topic") || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic") || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
        }

        private void printInternalTopicErrors(Map<String, Throwable> internalTopicsDeletionFailures, Set<String> deletedGroupIds, Set<String> groupIdsWithInternalTopics) {
            if (!deletedGroupIds.isEmpty()) {
                if (internalTopicsDeletionFailures.isEmpty()) {
                    List successfulGroups = deletedGroupIds.stream().filter(groupIdsWithInternalTopics::contains).collect(Collectors.toList());
                    System.out.println("Deletion of associated internal topics of the streams groups ('" + String.join((CharSequence)"', '", successfulGroups) + "') was successful.");
                } else {
                    System.out.println("Deletion of some associated internal topics failed:");
                    internalTopicsDeletionFailures.forEach((group, error) -> System.out.println("* Internal topics of the streams group '" + group + "' could not be deleted due to: " + String.valueOf(error)));
                }
            }
        }

        List<GroupListing> listDetailedStreamsGroups() {
            try {
                ListGroupsResult result = this.adminClient.listGroups(((ListGroupsOptions)new ListGroupsOptions().timeoutMs(Integer.valueOf(((Long)this.opts.options.valueOf(this.opts.timeoutMsOpt)).intValue()))).withTypes(Set.of(GroupType.STREAMS)));
                Collection listings = (Collection)result.all().get();
                return listings.stream().toList();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        private Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, List<String> topics) {
            HashMap<TopicPartition, Throwable> partitionLevelResult = new HashMap<TopicPartition, Throwable>();
            HashSet<String> topicWithPartitions = new HashSet<String>();
            HashSet<String> topicWithoutPartitions = new HashSet<String>();
            for (String topic : topics) {
                if (topic.contains(":")) {
                    topicWithPartitions.add(topic);
                    continue;
                }
                topicWithoutPartitions.add(topic);
            }
            List specifiedPartitions = topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).toList();
            DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(topicWithoutPartitions, this.withTimeoutMs(new DescribeTopicsOptions()));
            Iterator unspecifiedPartitions = describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> {
                String topic = (String)e.getKey();
                try {
                    return ((TopicDescription)((KafkaFuture)e.getValue()).get()).partitions().stream().map(partition -> new TopicPartition(topic, partition.partition()));
                }
                catch (InterruptedException | ExecutionException err) {
                    partitionLevelResult.put(new TopicPartition(topic, -1), err);
                    return Stream.empty();
                }
            }).iterator();
            HashSet<TopicPartition> partitions = new HashSet<TopicPartition>(specifiedPartitions);
            unspecifiedPartitions.forEachRemaining(partitions::add);
            return this.deleteOffsets(groupId, partitions, partitionLevelResult);
        }

        private Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, Set<TopicPartition> partitions, Map<TopicPartition, Throwable> partitionLevelResult) {
            DeleteStreamsGroupOffsetsResult deleteResult = this.adminClient.deleteStreamsGroupOffsets(groupId, partitions, this.withTimeoutMs(new DeleteStreamsGroupOffsetsOptions()));
            Errors topLevelException = Errors.NONE;
            try {
                deleteResult.all().get();
            }
            catch (InterruptedException | ExecutionException e) {
                topLevelException = Errors.forException((Throwable)e.getCause());
            }
            partitions.forEach(partition -> {
                try {
                    deleteResult.partitionResult(partition).get();
                    partitionLevelResult.put((TopicPartition)partition, (Throwable)null);
                }
                catch (InterruptedException | ExecutionException e) {
                    partitionLevelResult.put((TopicPartition)partition, e);
                }
            });
            return new AbstractMap.SimpleImmutableEntry<Errors, Map<TopicPartition, Throwable>>(topLevelException, partitionLevelResult);
        }

        Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets() {
            Map.Entry<Errors, Map<TopicPartition, Throwable>> res;
            String groupId = (String)this.opts.options.valueOf(this.opts.groupOpt);
            if (this.opts.options.has(this.opts.allInputTopicsOpt)) {
                Set<TopicPartition> partitions = this.getCommittedOffsets(groupId).keySet();
                res = this.deleteOffsets(groupId, partitions, new HashMap<TopicPartition, Throwable>());
            } else if (this.opts.options.has(this.opts.inputTopicOpt)) {
                List topics = this.opts.options.valuesOf(this.opts.inputTopicOpt);
                res = this.deleteOffsets(groupId, topics);
            } else {
                CommandLineUtils.printUsageAndExit((OptionParser)this.opts.parser, (String)("Option " + String.valueOf(this.opts.deleteOffsetsOpt) + " requires either" + String.valueOf(this.opts.allInputTopicsOpt) + " or " + String.valueOf(this.opts.inputTopicOpt) + " to be specified."));
                return null;
            }
            Errors topLevelResult = res.getKey();
            Map<TopicPartition, Throwable> partitionLevelResult = res.getValue();
            switch (topLevelResult) {
                case NONE: {
                    System.out.println("Request succeeded for deleting offsets from group " + groupId + ".");
                    break;
                }
                case INVALID_GROUP_ID: 
                case GROUP_ID_NOT_FOUND: 
                case GROUP_AUTHORIZATION_FAILED: 
                case NON_EMPTY_GROUP: {
                    StreamsGroupCommand.printError(topLevelResult.message(), Optional.empty());
                    break;
                }
                case GROUP_SUBSCRIBED_TO_TOPIC: 
                case TOPIC_AUTHORIZATION_FAILED: 
                case UNKNOWN_TOPIC_OR_PARTITION: {
                    StreamsGroupCommand.printError("Encountered some partition-level error, see the follow-up details.", Optional.empty());
                    break;
                }
                default: {
                    StreamsGroupCommand.printError("Encountered some unknown error: " + String.valueOf(topLevelResult), Optional.empty());
                }
            }
            int maxTopicLen = 15;
            for (TopicPartition tp : partitionLevelResult.keySet()) {
                maxTopicLen = Math.max(maxTopicLen, tp.topic().length());
            }
            String format = "%n%" + -maxTopicLen + "s %-10s %-15s";
            System.out.printf(format, "TOPIC", "PARTITION", "STATUS");
            partitionLevelResult.entrySet().stream().sorted(Comparator.comparing(e -> ((TopicPartition)e.getKey()).topic() + ((TopicPartition)e.getKey()).partition())).forEach(e -> {
                TopicPartition tp = (TopicPartition)e.getKey();
                Throwable error = (Throwable)e.getValue();
                System.out.printf(format, tp.topic(), tp.partition() >= 0 ? Integer.valueOf(tp.partition()) : StreamsGroupCommand.MISSING_COLUMN_VALUE, error != null ? "Error: " + error.getMessage() : "Successful");
            });
            System.out.println();
            return res;
        }

        StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException {
            DescribeStreamsGroupsResult result = this.adminClient.describeStreamsGroups(List.of(group));
            Map descriptionMap = (Map)result.all().get();
            return (StreamsGroupDescription)descriptionMap.get(group);
        }

        private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
            int t = ((Long)this.opts.options.valueOf(this.opts.timeoutMsOpt)).intValue();
            return (T)options.timeoutMs(Integer.valueOf(t));
        }

        private void printMembers(StreamsGroupDescription description, boolean verbose) {
            block5: {
                int groupLen = Math.max(15, description.groupId().length());
                int maxMemberIdLen = 15;
                int maxHostLen = 15;
                int maxClientIdLen = 15;
                Collection members = description.members();
                if (!StreamsGroupService.isGroupStateValid(description.groupState(), description.members().size())) break block5;
                StreamsGroupService.maybePrintEmptyGroupState(description.groupId(), description.groupState());
                for (Object member : members) {
                    maxMemberIdLen = Math.max(maxMemberIdLen, member.memberId().length());
                    maxHostLen = Math.max(maxHostLen, member.processId().length());
                    maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length());
                }
                if (!verbose) {
                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
                    System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
                    for (StreamsGroupMemberDescription member : members) {
                        System.out.printf(fmt, description.groupId(), member.memberId(), member.processId(), member.clientId(), this.getTasksForPrinting(member.assignment(), Optional.empty()));
                    }
                } else {
                    int targetAssignmentEpochLen = 25;
                    int topologyEpochLen = 15;
                    int memberProtocolLen = 15;
                    int memberEpochLen = 15;
                    String fmt = "%" + -groupLen + "s %-25s %-15s%" + -maxMemberIdLen + "s %-15s %-15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
                    System.out.printf(fmt, "GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
                    for (StreamsGroupMemberDescription member : members) {
                        System.out.printf(fmt, description.groupId(), description.targetAssignmentEpoch(), description.topologyEpoch(), member.memberId(), member.isClassic() ? "classic" : "streams", member.memberEpoch(), member.processId(), member.clientId(), this.getTasksForPrinting(member.assignment(), Optional.of(member.targetAssignment())));
                    }
                }
            }
        }

        GroupState collectGroupState(String groupId) throws Exception {
            return this.getDescribeGroup(groupId).groupState();
        }

        Collection<StreamsGroupMemberDescription> collectGroupMembers(String groupId) throws Exception {
            return this.getDescribeGroup(groupId).members();
        }

        private String prepareTaskType(List<StreamsGroupMemberAssignment.TaskIds> tasks, String taskType) {
            if (tasks.isEmpty()) {
                return "";
            }
            StringBuilder builder = new StringBuilder(taskType).append(": ");
            for (StreamsGroupMemberAssignment.TaskIds taskIds : tasks) {
                builder.append(taskIds.subtopologyId()).append(":[");
                builder.append(taskIds.partitions().stream().map(String::valueOf).collect(Collectors.joining(",")));
                builder.append("]; ");
            }
            return builder.toString();
        }

        private String getTasksForPrinting(StreamsGroupMemberAssignment assignment, Optional<StreamsGroupMemberAssignment> targetAssignment) {
            StringBuilder builder = new StringBuilder();
            builder.append(this.prepareTaskType(assignment.activeTasks(), "ACTIVE")).append(this.prepareTaskType(assignment.standbyTasks(), "STANDBY")).append(this.prepareTaskType(assignment.warmupTasks(), "WARMUP"));
            targetAssignment.ifPresent(target -> builder.append(this.prepareTaskType(target.activeTasks(), "TARGET-ACTIVE")).append(this.prepareTaskType(target.standbyTasks(), "TARGET-STANDBY")).append(this.prepareTaskType(target.warmupTasks(), "TARGET-WARMUP")));
            return builder.toString();
        }

        private void printStates(StreamsGroupDescription description, boolean verbose) {
            StreamsGroupService.maybePrintEmptyGroupState(description.groupId(), description.groupState());
            int groupLen = Math.max(15, description.groupId().length());
            String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
            int coordinatorLen = Math.max(25, coordinator.length());
            int stateLen = 25;
            if (!verbose) {
                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-25s %s\n";
                System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS");
                System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size());
            } else {
                int groupEpochLen = 15;
                int targetAssignmentEpochLen = 25;
                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-25s %-15s %-25s %s\n";
                System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
                System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.groupEpoch(), description.targetAssignmentEpoch(), description.members().size());
            }
        }

        private void printOffsets(StreamsGroupDescription description, boolean verbose) throws ExecutionException, InterruptedException {
            block5: {
                Map<TopicPartition, OffsetsInfo> offsets = this.getOffsets(description);
                if (!StreamsGroupService.isGroupStateValid(description.groupState(), description.members().size())) break block5;
                StreamsGroupService.maybePrintEmptyGroupState(description.groupId(), description.groupState());
                int groupLen = Math.max(15, description.groupId().length());
                int maxTopicLen = 15;
                for (TopicPartition topicPartition : offsets.keySet()) {
                    maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length());
                }
                int maxPartitionLen = 10;
                if (!verbose) {
                    fmt = "%" + -groupLen + "s %" + -maxTopicLen + "s %-10s %s\n";
                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
                    for (Map.Entry<TopicPartition, OffsetsInfo> offset : offsets.entrySet()) {
                        System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue().lag);
                    }
                } else {
                    fmt = "%" + -groupLen + "s %" + -maxTopicLen + "s %-10s %-15s %-15s %-15s %-15s%n";
                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
                    for (Map.Entry<TopicPartition, OffsetsInfo> offset : offsets.entrySet()) {
                        System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue().currentOffset.map(Object::toString).orElse(StreamsGroupCommand.MISSING_COLUMN_VALUE), offset.getValue().leaderEpoch.map(Object::toString).orElse(StreamsGroupCommand.MISSING_COLUMN_VALUE), offset.getValue().logEndOffset, offset.getValue().lag);
                    }
                }
            }
        }

        Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription description) throws ExecutionException, InterruptedException {
            Collection members = description.members();
            HashSet<TopicPartition> allTp = new HashSet<TopicPartition>();
            for (StreamsGroupMemberDescription memberDescription : members) {
                allTp.addAll(StreamsGroupService.getTopicPartitions(memberDescription.assignment().activeTasks(), description));
            }
            HashMap<TopicPartition, OffsetSpec> earliest = new HashMap<TopicPartition, OffsetSpec>();
            HashMap<TopicPartition, OffsetSpec> latest = new HashMap<TopicPartition, OffsetSpec>();
            for (TopicPartition tp : allTp) {
                earliest.put(tp, OffsetSpec.earliest());
                latest.put(tp, OffsetSpec.latest());
            }
            Map earliestResult = (Map)this.adminClient.listOffsets(earliest).all().get();
            Map latestResult = (Map)this.adminClient.listOffsets(latest).all().get();
            Map<TopicPartition, OffsetAndMetadata> committedOffsets = this.getCommittedOffsets(description.groupId());
            HashMap<TopicPartition, OffsetsInfo> output = new HashMap<TopicPartition, OffsetsInfo>();
            for (Map.Entry tp : earliestResult.entrySet()) {
                Optional<Long> currentOffset = committedOffsets.containsKey(tp.getKey()) ? Optional.of(committedOffsets.get(tp.getKey()).offset()) : Optional.empty();
                Optional leaderEpoch = committedOffsets.containsKey(tp.getKey()) ? committedOffsets.get(tp.getKey()).leaderEpoch() : Optional.empty();
                long lag = currentOffset.map(current -> ((ListOffsetsResult.ListOffsetsResultInfo)latestResult.get(tp.getKey())).offset() - current).orElseGet(() -> ((ListOffsetsResult.ListOffsetsResultInfo)latestResult.get(tp.getKey())).offset() - ((ListOffsetsResult.ListOffsetsResultInfo)earliestResult.get(tp.getKey())).offset());
                output.put((TopicPartition)tp.getKey(), new OffsetsInfo(currentOffset, leaderEpoch, ((ListOffsetsResult.ListOffsetsResultInfo)latestResult.get(tp.getKey())).offset(), lag));
            }
            return output;
        }

        private Stream<TopicPartition> parseTopicsWithPartitions(String topicArg) {
            ToIntFunction<String> partitionNum = partition -> {
                try {
                    return Integer.parseInt(partition);
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Invalid partition '" + partition + "' specified in topic arg '" + topicArg + "'");
                }
            };
            String[] arr = topicArg.split(":");
            if (arr.length != 2) {
                throw new IllegalArgumentException("Invalid topic arg '" + topicArg + "', expected topic name and partitions");
            }
            String topic = arr[0];
            String partitions = arr[1];
            return Arrays.stream(partitions.split(",")).map(partition -> new TopicPartition(topic, partitionNum.applyAsInt((String)partition)));
        }

        Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupId) {
            try {
                Set sourceTopics = ((StreamsGroupDescription)((Map)this.adminClient.describeStreamsGroups(List.of(groupId)).all().get()).get(groupId)).subtopologies().stream().flatMap(subtopology -> subtopology.sourceTopics().stream()).collect(Collectors.toSet());
                Map allTopicPartitions = (Map)this.adminClient.listStreamsGroupOffsets(Map.of(groupId, new ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
                allTopicPartitions.keySet().removeIf(tp -> !sourceTopics.contains(tp.topic()));
                return allTopicPartitions;
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        private static void maybePrintEmptyGroupState(String group, GroupState state) {
            if (state == GroupState.DEAD) {
                StreamsGroupCommand.printError("Streams group '" + group + "' does not exist.", Optional.empty());
            } else if (state == GroupState.EMPTY) {
                StreamsGroupCommand.printError("Streams group '" + group + "' has no active members.", Optional.empty());
            }
        }

        static boolean isGroupStateValid(GroupState state, int numRows) {
            return !state.equals((Object)GroupState.DEAD) && numRows > 0;
        }

        private static Set<TopicPartition> getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds, StreamsGroupDescription description) {
            HashMap allSourceTopics = new HashMap();
            for (StreamsGroupSubtopologyDescription subtopologyDescription : description.subtopologies()) {
                ArrayList topics = new ArrayList(subtopologyDescription.sourceTopics());
                topics.addAll(subtopologyDescription.repartitionSourceTopics().keySet());
                allSourceTopics.put(subtopologyDescription.subtopologyId(), topics);
            }
            HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
            for (StreamsGroupMemberAssignment.TaskIds task : taskIds) {
                List sourceTopics = (List)allSourceTopics.get(task.subtopologyId());
                if (sourceTopics == null) {
                    throw new IllegalArgumentException("Subtopology " + task.subtopologyId() + " not found in group description!");
                }
                for (String topic : sourceTopics) {
                    for (Integer partition : task.partitions()) {
                        topicPartitions.add(new TopicPartition(topic, partition.intValue()));
                    }
                }
            }
            return topicPartitions;
        }

        @Override
        public void close() {
            this.adminClient.close();
        }

        protected Admin createAdminClient(Map<String, String> configOverrides) throws IOException {
            Properties props = this.opts.options.has(this.opts.commandConfigOpt) ? Utils.loadProps((String)((String)this.opts.options.valueOf(this.opts.commandConfigOpt))) : new Properties();
            props.put("bootstrap.servers", this.opts.options.valueOf(this.opts.bootstrapServerOpt));
            props.putAll(configOverrides);
            return Admin.create((Properties)props);
        }
    }

    public record OffsetsInfo(Optional<Long> currentOffset, Optional<Integer> leaderEpoch, Long logEndOffset, Long lag) {
    }
}

