/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle;

import java.io.Externalizable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.queue.ExcerptTailer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQOffset;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQRecord;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQTailer;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle.ChronicleMQOffsetTracker;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQOffsetImpl;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.internals.MQPartitionGroup;

public class ChronicleMQTailer<M extends Externalizable>
implements MQTailer<M> {
    private static final Log log = LogFactory.getLog(ChronicleMQTailer.class);
    protected static final long POLL_INTERVAL_MS = 100L;
    private final String basePath;
    private final ExcerptTailer cqTailer;
    private final ChronicleMQOffsetTracker offsetTracker;
    private final MQPartitionGroup id;
    private final MQPartition partition;
    private boolean closed = false;
    private static final Set<MQPartitionGroup> tailersId = Collections.newSetFromMap(new ConcurrentHashMap());

    public ChronicleMQTailer(String basePath, ExcerptTailer cqTailer, MQPartition partition, String group) {
        Objects.requireNonNull(group);
        this.basePath = basePath;
        this.cqTailer = cqTailer;
        this.partition = partition;
        this.id = new MQPartitionGroup(group, partition.name(), partition.partition());
        this.registerTailer();
        this.offsetTracker = new ChronicleMQOffsetTracker(basePath, partition.partition(), group);
        this.toLastCommitted();
    }

    private void registerTailer() {
        if (!tailersId.add(this.id)) {
            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + this.id);
        }
    }

    private void unregisterTailer() {
        tailersId.remove(this.id);
    }

    @Override
    public MQRecord<M> read(Duration timeout) throws InterruptedException {
        MQRecord<M> ret = this.read();
        if (ret != null) {
            return ret;
        }
        long timeoutMs = timeout.toMillis();
        long deadline = System.currentTimeMillis() + timeoutMs;
        long delay = Math.min(100L, timeoutMs);
        while (ret == null && System.currentTimeMillis() < deadline) {
            Thread.sleep(delay);
            ret = this.read();
        }
        return ret;
    }

    protected MQRecord<M> read() {
        if (this.closed) {
            throw new IllegalStateException("The tailer has been closed.");
        }
        ArrayList value = new ArrayList(1);
        if (!this.cqTailer.readDocument(w -> value.add((Externalizable)w.read("msg").object()))) {
            return null;
        }
        return new MQRecord<Externalizable>(this.partition, (Externalizable)value.get(0), new MQOffsetImpl(this.partition, this.cqTailer.index()));
    }

    @Override
    public MQOffset commit(MQPartition partition) {
        if (!this.partition.equals(partition)) {
            throw new IllegalArgumentException("Can not commit this partition: " + partition + " from " + this.id);
        }
        long offset = this.cqTailer.index();
        this.offsetTracker.commit(offset);
        if (log.isTraceEnabled()) {
            log.trace((Object)String.format("Commit %s:+%d", this.id, offset));
        }
        return new MQOffsetImpl(partition, offset);
    }

    @Override
    public void commit() {
        this.commit(this.partition);
    }

    @Override
    public void toEnd() {
        log.debug((Object)String.format("toEnd: %s", this.id));
        this.cqTailer.toEnd();
    }

    @Override
    public void toStart() {
        log.debug((Object)String.format("toStart: %s", this.id));
        this.cqTailer.toStart();
    }

    @Override
    public void toLastCommitted() {
        long offset = this.offsetTracker.getLastCommittedOffset();
        if (offset > 0L) {
            log.debug((Object)String.format("toLastCommitted: %s, found: %d", this.id, offset));
            this.cqTailer.moveToIndex(offset);
        } else {
            log.debug((Object)String.format("toLastCommitted: %s not found, run from beginning", this.id));
            this.cqTailer.toStart();
        }
    }

    public void seek(MQPartition partition, MQOffset offset) {
        this.cqTailer.moveToIndex(offset.offset());
    }

    @Override
    public Collection<MQPartition> assignments() {
        return Collections.singletonList(new MQPartition(this.id.name, this.id.partition));
    }

    @Override
    public String group() {
        return this.id.group;
    }

    @Override
    public void close() throws Exception {
        this.offsetTracker.close();
        this.unregisterTailer();
        this.closed = true;
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    public String toString() {
        return "ChronicleMQTailer{basePath='" + this.basePath + '\'' + ", id=" + this.id + ", closed=" + this.closed + '}';
    }
}

