/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.computation.log;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Computation;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.computation.log.ComputationRunner;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;

public class ComputationPool {
    private static final Log log = LogFactory.getLog(ComputationPool.class);
    protected final ComputationMetadataMapping metadata;
    protected final int threads;
    protected final LogManager manager;
    protected final Supplier<Computation> supplier;
    protected final List<List<LogPartition>> defaultAssignments;
    protected final List<ComputationRunner> runners;
    protected ExecutorService threadPool;

    public ComputationPool(Supplier<Computation> supplier, ComputationMetadataMapping metadata, List<List<LogPartition>> defaultAssignments, LogManager manager) {
        this.supplier = supplier;
        this.manager = manager;
        this.metadata = metadata;
        this.threads = defaultAssignments.size();
        this.defaultAssignments = defaultAssignments;
        this.runners = new ArrayList<ComputationRunner>(this.threads);
    }

    public String getComputationName() {
        return this.metadata.name();
    }

    public void start() {
        log.info((Object)(this.metadata.name() + ": Starting pool"));
        this.threadPool = Executors.newFixedThreadPool(this.threads, new NamedThreadFactory(this.metadata.name() + "Pool"));
        this.defaultAssignments.forEach(assignments -> {
            ComputationRunner runner = new ComputationRunner(this.supplier, this.metadata, (List<LogPartition>)assignments, this.manager);
            this.threadPool.submit(runner);
            this.runners.add(runner);
        });
        this.threadPool.shutdown();
        log.debug((Object)(this.metadata.name() + ": Pool started, threads: " + this.threads));
    }

    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
        log.info((Object)(this.metadata.name() + ": Wait for partitions assignments"));
        if (this.threadPool == null || this.threadPool.isTerminated()) {
            return true;
        }
        for (ComputationRunner runner : this.runners) {
            if (runner.waitForAssignments(timeout)) continue;
            return false;
        }
        return true;
    }

    public boolean drainAndStop(Duration timeout) {
        if (this.threadPool == null || this.threadPool.isTerminated()) {
            return true;
        }
        log.info((Object)(this.metadata.name() + ": Draining"));
        this.runners.forEach(ComputationRunner::drain);
        boolean ret = this.awaitPoolTermination(timeout);
        this.stop(Duration.ofSeconds(1L));
        return ret;
    }

    public boolean stop(Duration timeout) {
        if (this.threadPool == null || this.threadPool.isTerminated()) {
            return true;
        }
        log.info((Object)(this.metadata.name() + ": Stopping"));
        this.runners.forEach(ComputationRunner::stop);
        boolean ret = this.awaitPoolTermination(timeout);
        this.shutdown();
        return ret;
    }

    public void shutdown() {
        if (this.threadPool != null && !this.threadPool.isTerminated()) {
            log.info((Object)(this.metadata.name() + ": Shutting down"));
            this.threadPool.shutdownNow();
            try {
                this.threadPool.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn((Object)(this.metadata.name() + ": Interrupted in shutdown"));
            }
        }
        this.runners.clear();
        this.threadPool = null;
    }

    protected boolean awaitPoolTermination(Duration timeout) {
        try {
            if (!this.threadPool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                log.warn((Object)(this.metadata.name() + ": Timeout on wait for pool termination"));
                return false;
            }
        }
        catch (InterruptedException e) {
            log.warn((Object)(this.metadata.name() + ": Interrupted while waiting for pool termination"));
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
    }

    public long getLowWatermark() {
        Set watermarks = this.runners.stream().map(ComputationRunner::getLowWatermark).filter(wm -> wm.getValue() > 1L).collect(Collectors.toSet());
        long ret = watermarks.stream().filter(wm -> !wm.isCompleted()).mapToLong(Watermark::getValue).min().orElse(0L);
        boolean pending = true;
        if (ret == 0L) {
            pending = false;
            ret = watermarks.stream().filter(Watermark::isCompleted).mapToLong(Watermark::getValue).max().orElse(0L);
        }
        if (log.isTraceEnabled() && ret > 0L) {
            log.trace((Object)(this.metadata.name() + ": low: " + ret + " " + (pending ? "Pending" : "Completed")));
        }
        return ret;
    }

    protected static class NamedThreadFactory
    implements ThreadFactory {
        protected final AtomicInteger count = new AtomicInteger(0);
        protected final String prefix;

        public NamedThreadFactory(String prefix) {
            this.prefix = prefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, String.format("%s-%02d", this.prefix, this.count.getAndIncrement()));
            t.setUncaughtExceptionHandler((t1, e) -> log.error((Object)("Uncaught exception: " + e.getMessage()), e));
            return t;
        }
    }
}

