/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.core.mqueues.computation.mqueue;

import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.core.mqueues.computation.Computation;
import org.nuxeo.lib.core.mqueues.computation.ComputationMetadataMapping;
import org.nuxeo.lib.core.mqueues.computation.Record;
import org.nuxeo.lib.core.mqueues.computation.Watermark;
import org.nuxeo.lib.core.mqueues.computation.internals.ComputationContextImpl;
import org.nuxeo.lib.core.mqueues.computation.internals.WatermarkMonotonicInterval;
import org.nuxeo.lib.core.mqueues.mqueues.MQAppender;
import org.nuxeo.lib.core.mqueues.mqueues.MQManager;
import org.nuxeo.lib.core.mqueues.mqueues.MQPartition;
import org.nuxeo.lib.core.mqueues.mqueues.MQRebalanceException;
import org.nuxeo.lib.core.mqueues.mqueues.MQRebalanceListener;
import org.nuxeo.lib.core.mqueues.mqueues.MQRecord;
import org.nuxeo.lib.core.mqueues.mqueues.MQTailer;

public class MQComputationRunner
implements Runnable,
MQRebalanceListener {
    private static final Log log = LogFactory.getLog(MQComputationRunner.class);
    protected static final long STARVING_TIMEOUT_MS = 1000L;
    public static final Duration READ_TIMEOUT = Duration.ofMillis(25L);
    protected ComputationContextImpl context;
    protected final MQManager mqManager;
    protected final ComputationMetadataMapping metadata;
    protected final MQTailer<Record> tailer;
    protected final Supplier<Computation> supplier;
    protected volatile boolean stop = false;
    protected volatile boolean drain = false;
    protected CountDownLatch assignmentLatch = new CountDownLatch(1);
    protected Computation computation;
    protected long counter = 0L;
    protected long inRecords = 0L;
    protected long inCheckpointRecords = 0L;
    protected long outRecords = 0L;
    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
    protected long lastReadTime = System.currentTimeMillis();
    protected long lastTimerExecution = 0L;
    protected String threadName;
    protected boolean needContextInitialize = false;

    public MQComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, List<MQPartition> defaultAssignment, MQManager mqManager) {
        this.supplier = supplier;
        this.metadata = metadata;
        this.mqManager = mqManager;
        this.context = new ComputationContextImpl(metadata);
        if (metadata.inputStreams().isEmpty()) {
            this.tailer = null;
            this.assignmentLatch.countDown();
        } else if (mqManager.supportSubscribe()) {
            this.tailer = mqManager.subscribe(metadata.name(), metadata.inputStreams(), this);
        } else {
            this.tailer = mqManager.createTailer(metadata.name(), defaultAssignment);
            this.assignmentLatch.countDown();
        }
    }

    public void stop() {
        log.debug((Object)(this.metadata.name() + ": Receives Stop signal"));
        this.stop = true;
    }

    public void drain() {
        log.debug((Object)(this.metadata.name() + ": Receives Drain signal"));
        this.drain = true;
    }

    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
        if (!this.assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
            log.warn((Object)(this.metadata.name() + ": Timeout waiting for assignment"));
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block20: {
            this.threadName = Thread.currentThread().getName();
            boolean interrupted = false;
            this.computation = this.supplier.get();
            log.debug((Object)(this.metadata.name() + ": Init"));
            this.computation.init(this.context);
            log.debug((Object)(this.metadata.name() + ": Start"));
            try {
                this.processLoop();
            }
            catch (InterruptedException e) {
                if (log.isTraceEnabled()) {
                    log.debug((Object)(this.metadata.name() + ": Interrupted"), (Throwable)e);
                } else {
                    log.debug((Object)(this.metadata.name() + ": Interrupted"));
                }
                interrupted = true;
            }
            catch (Exception e) {
                if (Thread.currentThread().isInterrupted()) {
                    log.info((Object)(this.metadata.name() + ": Interrupted"), (Throwable)e);
                    break block20;
                }
                log.error((Object)(this.metadata.name() + ": Exception in processLoop: " + e.getMessage()), (Throwable)e);
                throw e;
            }
            finally {
                try {
                    this.computation.destroy();
                    this.closeTailer();
                    log.debug((Object)(this.metadata.name() + ": Exited"));
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    protected void closeTailer() {
        if (this.tailer != null && !this.tailer.closed()) {
            try {
                this.tailer.close();
            }
            catch (Exception e) {
                log.debug((Object)(this.metadata.name() + ": Exception while closing tailer"), (Throwable)e);
            }
        }
    }

    protected void processLoop() throws InterruptedException {
        while (this.continueLoop()) {
            this.processTimer();
            this.processRecord();
            ++this.counter;
        }
    }

    protected boolean continueLoop() {
        if (this.stop || Thread.currentThread().isInterrupted()) {
            return false;
        }
        if (this.drain) {
            long now = System.currentTimeMillis();
            if (this.metadata.inputStreams().isEmpty()) {
                if (this.lastTimerExecution > 0L && now - this.lastTimerExecution > 1000L) {
                    log.info((Object)(this.metadata.name() + ": End of source drain, last timer " + 1000L + " ms ago"));
                    return false;
                }
            } else if (now - this.lastReadTime > 1000L) {
                log.info((Object)(this.metadata.name() + ": End of drain no more input after " + (now - this.lastReadTime) + " ms, " + this.inRecords + " records read, " + this.counter + " reads attempt"));
                return false;
            }
        }
        return true;
    }

    protected void processTimer() throws InterruptedException {
        Map<String, Long> timers = this.context.getTimers();
        if (timers.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        boolean[] timerUpdate = new boolean[]{false};
        LinkedHashMap sortedTimer = timers.entrySet().stream().filter(entry -> (Long)entry.getValue() <= now).sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
        sortedTimer.forEach((key, value) -> {
            this.context.removeTimer((String)key);
            this.computation.processTimer(this.context, (String)key, (long)value);
            timerUpdate[0] = true;
        });
        if (timerUpdate[0]) {
            this.checkSourceLowWatermark();
            this.lastTimerExecution = now;
            this.setThreadName("timer");
            this.checkpointIfNecessary();
        }
    }

    protected void processRecord() throws InterruptedException {
        if (this.tailer == null) {
            return;
        }
        Duration timeoutRead = this.getTimeoutDuration();
        MQRecord<Record> mqRecord = null;
        try {
            mqRecord = this.tailer.read(timeoutRead);
        }
        catch (MQRebalanceException mQRebalanceException) {
            // empty catch block
        }
        if (mqRecord != null) {
            Record record = mqRecord.message();
            this.lastReadTime = System.currentTimeMillis();
            ++this.inRecords;
            this.lowWatermark.mark(record.watermark);
            String from = this.metadata.reverseMap(mqRecord.partition().name());
            this.computation.processRecord(this.context, from, record);
            this.checkRecordFlags(record);
            this.checkSourceLowWatermark();
            this.setThreadName("record");
            this.checkpointIfNecessary();
        }
    }

    protected Duration getTimeoutDuration() {
        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), System.currentTimeMillis() - this.lastReadTime));
    }

    protected void checkSourceLowWatermark() {
        long watermark = this.context.getSourceLowWatermark();
        if (watermark > 0L) {
            this.lowWatermark.mark(Watermark.ofValue(watermark));
            this.context.setSourceLowWatermark(0L);
        }
    }

    protected void checkRecordFlags(Record record) {
        if (record.flags.contains((Object)Record.Flag.POISON_PILL)) {
            log.info((Object)(this.metadata.name() + ": Receive POISON PILL"));
            this.context.askForCheckpoint();
            this.stop = true;
        } else if (record.flags.contains((Object)Record.Flag.COMMIT)) {
            this.context.askForCheckpoint();
        }
    }

    protected void checkpointIfNecessary() throws InterruptedException {
        if (this.context.requireCheckpoint()) {
            boolean completed = false;
            try {
                this.checkpoint();
                completed = true;
                this.inCheckpointRecords = this.inRecords;
            }
            finally {
                if (!completed) {
                    log.error((Object)(this.metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."));
                }
            }
        }
    }

    protected void checkpoint() throws InterruptedException {
        this.sendRecords();
        this.saveTimers();
        this.saveState();
        this.saveOffsets();
        this.lowWatermark.checkpoint();
        this.context.removeCheckpointFlag();
        log.debug((Object)(this.metadata.name() + ": checkpoint"));
        this.setThreadName("checkpoint");
    }

    protected void saveTimers() {
    }

    protected void saveState() {
    }

    protected void saveOffsets() {
        if (this.tailer != null) {
            this.tailer.commit();
        }
    }

    protected void sendRecords() {
        for (String ostream : this.metadata.outputStreams()) {
            MQAppender<Record> appender = this.mqManager.getAppender(ostream);
            for (Record record : this.context.getRecords(ostream)) {
                if (record.watermark == 0L) {
                    record.watermark = this.lowWatermark.getLow().getValue();
                }
                appender.append(record.key, record);
                ++this.outRecords;
            }
            this.context.getRecords(ostream).clear();
        }
    }

    public Watermark getLowWatermark() {
        return this.lowWatermark.getLow();
    }

    protected void setThreadName(String message) {
        String name = this.threadName + ",in:" + this.inRecords + ",inCheckpoint:" + this.inCheckpointRecords + ",out:" + this.outRecords + ",lastRead:" + this.lastReadTime + ",lastTimer:" + this.lastTimerExecution + ",wm:" + this.lowWatermark.getLow().getValue() + ",loop:" + this.counter;
        if (message != null) {
            name = name + "," + message;
        }
        Thread.currentThread().setName(name);
    }

    @Override
    public void onPartitionsRevoked(Collection<MQPartition> partitions) {
        this.setThreadName("rebalance revoked");
    }

    @Override
    public void onPartitionsAssigned(Collection<MQPartition> partitions) {
        this.lastReadTime = System.currentTimeMillis();
        this.setThreadName("rebalance assigned");
        this.context = new ComputationContextImpl(this.metadata);
        log.debug((Object)(this.metadata.name() + ": Init"));
        this.computation.init(this.context);
        this.lastReadTime = System.currentTimeMillis();
        this.lastTimerExecution = 0L;
        this.assignmentLatch.countDown();
    }
}

