/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka;

import java.io.Externalizable;
import java.util.Collection;
import java.util.Properties;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQAppender;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRebalanceListener;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.AbstractMQManager;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaMQAppender;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaMQTailer;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaUtils;

public class KafkaMQManager<M extends Externalizable>
extends AbstractMQManager<M> {
    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
    public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor";
    private final KafkaUtils kUtils;
    private final Properties producerProperties;
    private final Properties consumerProperties;
    private final String prefix;
    private final Integer defaultReplicationFactor;
    private boolean disableSubscribe = false;

    public KafkaMQManager(String zkServers, Properties producerProperties, Properties consumerProperties) {
        this(zkServers, null, producerProperties, consumerProperties);
    }

    public KafkaMQManager(String zkServers, String topicPrefix, Properties producerProperties, Properties consumerProperties) {
        this.prefix = topicPrefix != null ? topicPrefix : "";
        this.kUtils = new KafkaUtils(zkServers);
        this.disableSubscribe = Boolean.valueOf(consumerProperties.getProperty(DISABLE_SUBSCRIBE_PROP, "false"));
        this.defaultReplicationFactor = Integer.valueOf(producerProperties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1"));
        this.producerProperties = this.normalizeProducerProperties(producerProperties);
        this.consumerProperties = KafkaMQManager.normalizeConsumerProperties(consumerProperties);
    }

    protected String getTopicName(String name) {
        return this.prefix + name;
    }

    @Override
    public void create(String name, int size) {
        this.kUtils.createTopic(this.getTopicName(name), size, this.defaultReplicationFactor);
    }

    @Override
    public boolean exists(String name) {
        return this.kUtils.topicExists(this.getTopicName(name));
    }

    @Override
    public MQAppender<M> createAppender(String name) {
        return KafkaMQAppender.open(this.getTopicName(name), name, this.producerProperties, this.consumerProperties);
    }

    @Override
    protected MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group) {
        partitions.forEach(this::checkValidPartition);
        return KafkaMQTailer.createAndAssign(this.prefix, partitions, group, (Properties)this.consumerProperties.clone());
    }

    private void checkValidPartition(MQPartition partition) {
        int partitions = this.kUtils.getNumberOfPartitions(this.getTopicName(partition.name()));
        if (partition.partition() >= partitions) {
            throw new IllegalArgumentException("Partition out of bound " + partition + " max: " + partitions);
        }
    }

    public Properties getProducerProperties() {
        return this.producerProperties;
    }

    public Properties getConsumerProperties() {
        return this.consumerProperties;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.kUtils != null) {
            this.kUtils.close();
        }
    }

    @Override
    public boolean supportSubscribe() {
        return !this.disableSubscribe;
    }

    @Override
    protected MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener) {
        return KafkaMQTailer.createAndSubscribe(this.prefix, names, group, (Properties)this.consumerProperties.clone(), listener);
    }

    protected static Properties normalizeConsumerProperties(Properties consumerProperties) {
        Properties ret = consumerProperties != null ? (Properties)consumerProperties.clone() : new Properties();
        ret.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        ret.put("value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");
        ret.put("enable.auto.commit", (Object)false);
        ret.put("auto.offset.reset", "earliest");
        ret.remove(DISABLE_SUBSCRIBE_PROP);
        return ret;
    }

    protected Properties normalizeProducerProperties(Properties producerProperties) {
        Properties ret = producerProperties != null ? (Properties)producerProperties.clone() : new Properties();
        ret.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        ret.put("value.serializer", "org.apache.kafka.common.serialization.BytesSerializer");
        ret.remove(DEFAULT_REPLICATION_FACTOR_PROP);
        return ret;
    }
}

