/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.cluster.Broker;
import kafka.cluster.EndPoint;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.requests.MetadataResponse;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
import scala.collection.Seq;

public class KafkaUtils
implements AutoCloseable {
    private static final Log log = LogFactory.getLog(KafkaUtils.class);
    private final ZkClient zkClient;
    private final ZkUtils zkUtils;
    public static final String DEFAULT_ZK_SERVER = "localhost:2181";
    public static final int ZK_TIMEOUT_MS = 6000;
    public static final int ZK_CONNECTION_TIMEOUT_MS = 10000;

    public KafkaUtils() {
        this(DEFAULT_ZK_SERVER);
    }

    public KafkaUtils(String zkServers) {
        log.debug((Object)("Init zkServers: " + zkServers));
        this.zkClient = KafkaUtils.createZkClient(zkServers);
        this.zkUtils = KafkaUtils.createZkUtils(zkServers, this.zkClient);
    }

    public static boolean kafkaDetected() {
        return KafkaUtils.kafkaDetected(DEFAULT_ZK_SERVER);
    }

    public static boolean kafkaDetected(String zkServers) {
        try {
            ZkClient tmp = new ZkClient(zkServers, 1000, 1000, (ZkSerializer)ZKStringSerializer$.MODULE$);
            tmp.close();
        }
        catch (ZkTimeoutException e) {
            return false;
        }
        return true;
    }

    private static ZkUtils createZkUtils(String zkServers, ZkClient zkClient) {
        return new ZkUtils(zkClient, new ZkConnection(zkServers), false);
    }

    private static ZkClient createZkClient(String zkServers) {
        return new ZkClient(zkServers, 6000, 10000, (ZkSerializer)ZKStringSerializer$.MODULE$);
    }

    public void createTopicWithoutReplication(String topic, int partitions) {
        this.createTopic(topic, partitions, 1);
    }

    public void createTopic(String topic, int partitions, int replicationFactor) {
        log.info((Object)("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor));
        if (AdminUtils.topicExists((ZkUtils)this.zkUtils, (String)topic)) {
            String msg = "Can not create Topic already exists: " + topic;
            log.error((Object)msg);
            throw new IllegalArgumentException(msg);
        }
        AdminUtils.createTopic((ZkUtils)this.zkUtils, (String)topic, (int)partitions, (int)replicationFactor, (Properties)new Properties(), (RackAwareMode)RackAwareMode.Disabled$.MODULE$);
        try {
            this.waitForTopicCreation(topic, Duration.ofSeconds(5L));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        AdminUtils.deleteAllConsumerGroupInfoForTopicInZK((ZkUtils)this.zkUtils, (String)topic);
    }

    private boolean waitForTopicCreation(String topic, Duration timeout) throws InterruptedException {
        long timeoutMs = timeout.toMillis();
        long deadline = System.currentTimeMillis() + timeoutMs;
        boolean ret = false;
        while (!ret && System.currentTimeMillis() < deadline) {
            ret = this.allPartitionsAssigned(topic);
            Thread.sleep(100L);
        }
        if (!ret) {
            log.error((Object)("Topic: " + topic + " has some uninitialized partitions."));
        }
        return ret;
    }

    private boolean allPartitionsAssigned(String topic) {
        if (!AdminUtils.topicExists((ZkUtils)this.zkUtils, (String)topic)) {
            log.debug((Object)("Topic " + topic + " does not exists yet"));
            return false;
        }
        MetadataResponse.TopicMetadata meta = AdminUtils.fetchTopicMetadataFromZk((String)topic, (ZkUtils)this.zkUtils);
        if (meta.partitionMetadata().isEmpty()) {
            log.debug((Object)("Topic " + topic + " has no partition yet"));
            return false;
        }
        long errors = meta.partitionMetadata().stream().filter(p -> p.error().code() > 0).count();
        if (errors != 0L) {
            log.debug((Object)("Topic " + topic + " have some uninitialized partitions"));
        }
        return errors == 0L;
    }

    public boolean topicExists(String topic) {
        return AdminUtils.topicExists((ZkUtils)this.zkUtils, (String)topic);
    }

    public void markTopicForDeletion(String topic) {
        log.debug((Object)("mark topic for deletion: " + topic));
        AdminUtils.deleteTopic((ZkUtils)this.zkUtils, (String)topic);
    }

    public int getNumberOfPartitions(String topic) {
        MetadataResponse.TopicMetadata metadata = AdminUtils.fetchTopicMetadataFromZk((String)topic, (ZkUtils)this.zkUtils);
        return metadata.partitionMetadata().size();
    }

    public void resetConsumerStates(String topic) {
        log.debug((Object)"Resetting consumer states");
        AdminUtils.deleteAllConsumerGroupInfoForTopicInZK((ZkUtils)this.zkUtils, (String)topic);
    }

    public Set<String> getBrokerEndPoints() {
        HashSet<String> ret = new HashSet<String>();
        Seq brokers = this.zkUtils.getAllBrokersInCluster();
        for (Broker broker : brokers) {
            if (broker == null) continue;
            Seq endPoints = broker.endPoints();
            for (EndPoint endPoint : endPoints) {
                ret.add(endPoint.connectionString());
            }
        }
        return ret;
    }

    public String getDefaultBootstrapServers() {
        return this.getBrokerEndPoints().stream().collect(Collectors.joining(","));
    }

    @Override
    public void close() throws Exception {
        if (this.zkUtils != null) {
            this.zkUtils.close();
        }
        if (this.zkClient != null) {
            this.zkClient.close();
        }
        log.debug((Object)"Closed.");
    }

    public static List<List<MQPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
        RangeAssignor assignor = new RangeAssignor();
        return KafkaUtils.assignments((PartitionAssignor)assignor, threads, streams);
    }

    public static List<List<MQPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
        RoundRobinAssignor assignor = new RoundRobinAssignor();
        return KafkaUtils.assignments((PartitionAssignor)assignor, threads, streams);
    }

    protected static List<List<MQPartition>> assignments(PartitionAssignor assignor, int threads, Map<String, Integer> streams) {
        ArrayList parts = new ArrayList();
        streams.forEach((streamName, size) -> parts.addAll(KafkaUtils.getPartsFor(streamName, size)));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        List streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
        for (int i = 0; i < threads; ++i) {
            subscriptions.put(String.valueOf(i), new PartitionAssignor.Subscription(streamNames));
        }
        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), Collections.emptySet());
        Map assignments = assignor.assign(cluster, subscriptions);
        ArrayList<List<MQPartition>> ret = new ArrayList<List<MQPartition>>(threads);
        for (int i = 0; i < threads; ++i) {
            ret.add(((PartitionAssignor.Assignment)assignments.get(String.valueOf(i))).partitions().stream().map(part -> new MQPartition(part.topic(), part.partition())).collect(Collectors.toList()));
        }
        return ret;
    }

    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
        ArrayList<PartitionInfo> ret = new ArrayList<PartitionInfo>();
        for (int i = 0; i < partitions; ++i) {
            ret.add(new PartitionInfo(topic, i, null, null, null));
        }
        return ret;
    }
}

