/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.core.mqueues.mqueues.kafka;

import java.io.ByteArrayInputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Bytes;
import org.nuxeo.lib.core.mqueues.mqueues.MQOffset;
import org.nuxeo.lib.core.mqueues.mqueues.MQPartition;
import org.nuxeo.lib.core.mqueues.mqueues.MQRebalanceException;
import org.nuxeo.lib.core.mqueues.mqueues.MQRebalanceListener;
import org.nuxeo.lib.core.mqueues.mqueues.MQRecord;
import org.nuxeo.lib.core.mqueues.mqueues.MQTailer;
import org.nuxeo.lib.core.mqueues.mqueues.internals.MQOffsetImpl;

public class KafkaMQTailer<M extends Externalizable>
implements MQTailer<M>,
ConsumerRebalanceListener {
    private static final Log log = LogFactory.getLog(KafkaMQTailer.class);
    protected final String group;
    protected final String prefix;
    protected KafkaConsumer<String, Bytes> consumer;
    protected String id;
    protected Collection<TopicPartition> topicPartitions;
    protected Collection<MQPartition> partitions;
    protected final Map<TopicPartition, Long> lastOffsets = new HashMap<TopicPartition, Long>();
    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<TopicPartition, Long>();
    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<ConsumerRecord<String, Bytes>>();
    protected boolean closed = false;
    protected Collection<String> names;
    protected MQRebalanceListener listener;
    protected boolean isRebalanced = false;

    protected KafkaMQTailer(String prefix, String group, Properties consumerProps) {
        Objects.requireNonNull(group);
        this.prefix = prefix;
        this.group = group;
        consumerProps.put("group.id", group);
        this.consumer = new KafkaConsumer(consumerProps);
    }

    public static <M extends Externalizable> KafkaMQTailer<M> createAndAssign(String prefix, Collection<MQPartition> partitions, String group, Properties consumerProps) {
        KafkaMQTailer<M> ret = new KafkaMQTailer<M>(prefix, group, consumerProps);
        ret.id = KafkaMQTailer.buildId(ret.group, partitions);
        ret.partitions = partitions;
        ret.topicPartitions = partitions.stream().map(partition -> new TopicPartition(prefix + partition.name(), partition.partition())).collect(Collectors.toList());
        ret.consumer.assign(ret.topicPartitions);
        log.debug((Object)String.format("Created tailer with assignments: %s using prefix: %s", ret.id, prefix));
        return ret;
    }

    public static <M extends Externalizable> KafkaMQTailer<M> createAndSubscribe(String prefix, Collection<String> names, String group, Properties consumerProps, MQRebalanceListener listener) {
        KafkaMQTailer<M> ret = new KafkaMQTailer<M>(prefix, group, consumerProps);
        ret.id = KafkaMQTailer.buildSubscribeId(ret.group, names);
        ret.names = names;
        Collection topics = names.stream().map(name -> prefix + name).collect(Collectors.toList());
        ret.listener = listener;
        ret.consumer.subscribe(topics, ret);
        ret.partitions = Collections.emptyList();
        ret.topicPartitions = Collections.emptyList();
        log.debug((Object)String.format("Created tailer with subscription: %s using prefix: %s", ret.id, prefix));
        return ret;
    }

    protected static String buildId(String group, Collection<MQPartition> partitions) {
        return group + ":" + partitions.stream().map(MQPartition::toString).collect(Collectors.joining("|"));
    }

    protected static String buildSubscribeId(String group, Collection<String> names) {
        return group + ":" + names.stream().collect(Collectors.joining("|"));
    }

    @Override
    public MQRecord<M> read(Duration timeout) throws InterruptedException {
        if (this.closed) {
            throw new IllegalStateException("The tailer has been closed.");
        }
        if (this.records.isEmpty()) {
            int items = this.poll(timeout);
            if (this.isRebalanced) {
                this.isRebalanced = false;
                log.debug((Object)"Rebalance happens during poll, raising exception");
                throw new MQRebalanceException("Partitions has been rebalanced");
            }
            if (items == 0) {
                if (log.isTraceEnabled()) {
                    log.trace((Object)("No data " + this.id + " after " + timeout.toMillis() + " ms"));
                }
                return null;
            }
        }
        ConsumerRecord<String, Bytes> record = this.records.poll();
        this.lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        M value = this.messageOf((Bytes)record.value());
        MQPartition partition = MQPartition.of(this.getNameForTopic(record.topic()), record.partition());
        MQOffsetImpl offset = new MQOffsetImpl(partition, record.offset());
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Read from %s/%s, key: %s, value: %s", offset, this.group, record.key(), value));
        }
        return new MQRecord<M>(partition, value, offset);
    }

    protected String getNameForTopic(String topic) {
        return topic.replaceFirst(this.prefix, "");
    }

    protected M messageOf(Bytes value) {
        Externalizable externalizable;
        ByteArrayInputStream bis = new ByteArrayInputStream(value.get());
        ObjectInputStream in = null;
        try {
            in = new ObjectInputStream(bis);
            externalizable = (Externalizable)in.readObject();
        }
        catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            }
            catch (IOException iOException) {}
        }
        return (M)externalizable;
    }

    protected int poll(Duration timeout) throws InterruptedException {
        this.records.clear();
        try {
            for (ConsumerRecord record : this.consumer.poll(timeout.toMillis())) {
                if (log.isDebugEnabled() && this.records.isEmpty()) {
                    log.debug((Object)("Poll first record: " + this.getNameForTopic(record.topic()) + ":" + record.partition() + ":+" + record.offset()));
                }
                this.records.add((ConsumerRecord<String, Bytes>)record);
            }
        }
        catch (InterruptException e) {
            Thread.currentThread().interrupt();
            throw new InterruptedException(e.getMessage());
        }
        catch (WakeupException e) {
            log.debug((Object)"Receiving wakeup from another thread to close the tailer");
            try {
                this.close();
            }
            catch (Exception e1) {
                log.warn((Object)("Error while closing the tailer " + this));
            }
            throw new IllegalStateException("poll interrupted because tailer has been closed");
        }
        if (log.isDebugEnabled()) {
            String msg = "Polling " + this.id + " returns " + this.records.size() + " records";
            if (this.records.size() > 0) {
                log.debug((Object)msg);
            } else {
                log.trace((Object)msg);
            }
        }
        return this.records.size();
    }

    @Override
    public void toEnd() {
        log.debug((Object)("toEnd: " + this.id));
        this.lastOffsets.clear();
        this.records.clear();
        this.consumer.seekToEnd(Collections.emptyList());
    }

    @Override
    public void toStart() {
        log.debug((Object)("toStart: " + this.id));
        this.lastOffsets.clear();
        this.records.clear();
        this.consumer.seekToBeginning(Collections.emptyList());
    }

    @Override
    public void toLastCommitted() {
        log.debug((Object)("toLastCommitted tailer: " + this.id));
        String msg = this.consumer.assignment().stream().map(tp -> String.format("%s-%02d:+%d", this.getNameForTopic(tp.topic()), tp.partition(), this.toLastCommitted((TopicPartition)tp))).collect(Collectors.joining("|"));
        if (msg.length() > 0) {
            log.info((Object)("toLastCommitted offsets: " + this.group + ":" + msg));
        }
        this.lastCommittedOffsets.clear();
        this.lastOffsets.clear();
        this.records.clear();
    }

    protected long toLastCommitted(TopicPartition topicPartition) {
        OffsetAndMetadata offsetMeta;
        Long offset = this.lastCommittedOffsets.get(topicPartition);
        if (offset == null && (offsetMeta = this.consumer.committed(topicPartition)) != null) {
            offset = offsetMeta.offset();
        }
        if (offset != null) {
            this.consumer.seek(topicPartition, offset.longValue());
        } else {
            this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            offset = this.consumer.position(topicPartition);
        }
        log.debug((Object)String.format(" toLastCommitted: %s-%02d:+%d", this.getNameForTopic(topicPartition.topic()), topicPartition.partition(), offset));
        return offset;
    }

    @Override
    public void seek(MQOffset offset) {
        log.debug((Object)("Seek to: " + offset + " from tailer: " + this.id));
        TopicPartition topicPartition = new TopicPartition(this.prefix + offset.partition().name(), offset.partition().partition());
        this.consumer.seek(topicPartition, offset.offset());
        this.lastOffsets.remove(topicPartition);
        List<ConsumerRecord> toRemove = this.records.stream().filter(rec -> rec.partition() == topicPartition.partition()).collect(Collectors.toList());
        toRemove.forEach(this.records::remove);
    }

    @Override
    public void reset() {
        log.info((Object)("Reset committed offsets for all assigned partitions: " + this.topicPartitions + " tailer: " + this.id));
        Map beginningOffsets = this.consumer.beginningOffsets(this.topicPartitions);
        HashMap offsetToCommit = new HashMap();
        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset.longValue())));
        this.consumer.commitSync(offsetToCommit);
        this.lastCommittedOffsets.clear();
        this.toLastCommitted();
    }

    @Override
    public void reset(MQPartition partition) {
        log.info((Object)("Reset committed offset for partition: " + partition + " tailer: " + this.id));
        TopicPartition topicPartition = new TopicPartition(this.prefix + partition.name(), partition.partition());
        Map beginningOffsets = this.consumer.beginningOffsets(Collections.singleton(topicPartition));
        HashMap offsetToCommit = new HashMap();
        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset.longValue())));
        this.consumer.commitSync(offsetToCommit);
        this.lastCommittedOffsets.remove(topicPartition);
        this.seek(new MQOffsetImpl(partition, (Long)beginningOffsets.get(0)));
    }

    @Override
    public void commit() {
        HashMap<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.lastOffsets.forEach((tp, offset) -> offsetToCommit.put((TopicPartition)tp, new OffsetAndMetadata(offset + 1L)));
        this.lastOffsets.clear();
        if (offsetToCommit.isEmpty()) {
            return;
        }
        this.consumer.commitSync(offsetToCommit);
        offsetToCommit.forEach((topicPartition, offset) -> this.lastCommittedOffsets.put((TopicPartition)topicPartition, offset.offset()));
        if (log.isDebugEnabled()) {
            String msg = offsetToCommit.entrySet().stream().map(entry -> String.format("%s-%02d:+%d", this.getNameForTopic(((TopicPartition)entry.getKey()).topic()), ((TopicPartition)entry.getKey()).partition(), ((OffsetAndMetadata)entry.getValue()).offset())).collect(Collectors.joining("|"));
            log.debug((Object)("Committed offsets  " + this.group + ":" + msg));
        }
    }

    @Override
    public MQOffset commit(MQPartition partition) {
        TopicPartition topicPartition = new TopicPartition(this.prefix + partition.name(), partition.partition());
        Long offset = this.lastOffsets.get(topicPartition);
        if (offset == null) {
            log.debug((Object)("unchanged partition, nothing to commit: " + partition));
            return null;
        }
        offset = offset + 1L;
        this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset.longValue())));
        MQOffsetImpl ret = new MQOffsetImpl(partition, offset);
        if (log.isDebugEnabled()) {
            log.info((Object)("Committed: " + offset + "/" + this.group));
        }
        return ret;
    }

    @Override
    public Collection<MQPartition> assignments() {
        return this.partitions;
    }

    @Override
    public String group() {
        return this.group;
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    @Override
    public void close() {
        if (this.consumer != null) {
            log.info((Object)("Closing tailer: " + this.id));
            try {
                this.consumer.close();
            }
            catch (ConcurrentModificationException e) {
                log.info((Object)"Closing tailer from another thread, send wakeup");
                this.consumer.wakeup();
                return;
            }
            catch (IllegalStateException | InterruptException e) {
                log.warn((Object)"Discard error while closing consumer: ", e);
            }
            catch (Throwable t) {
                log.error((Object)"interrupted", t);
            }
            this.consumer = null;
        }
        this.closed = true;
    }

    public String toString() {
        return "KafkaMQTailer{prefix='" + this.prefix + '\'' + ", id=" + this.id + ", closed=" + this.closed + '}';
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Collection revoked = partitions.stream().map(tp -> MQPartition.of(this.getNameForTopic(tp.topic()), tp.partition())).collect(Collectors.toList());
        log.info((Object)String.format("Rebalance revoked: %s", revoked));
        this.id = this.id + "-revoked";
        if (this.listener != null) {
            this.listener.onPartitionsRevoked(revoked);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
        this.partitions = newPartitions.stream().map(tp -> MQPartition.of(this.getNameForTopic(tp.topic()), tp.partition())).collect(Collectors.toList());
        this.topicPartitions = newPartitions;
        this.id = KafkaMQTailer.buildId(this.group, this.partitions);
        this.lastCommittedOffsets.clear();
        this.lastOffsets.clear();
        this.records.clear();
        this.isRebalanced = true;
        log.info((Object)String.format("Rebalance assigned: %s", this.partitions));
        if (this.listener != null) {
            this.listener.onPartitionsAssigned(this.partitions);
        }
    }
}

