/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationManager;
import org.nuxeo.ecm.platform.importer.mqueues.computation.ComputationMetadataMapping;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Record;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Settings;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Topology;
import org.nuxeo.ecm.platform.importer.mqueues.computation.Watermark;
import org.nuxeo.ecm.platform.importer.mqueues.computation.mqueue.MQComputationPool;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQPartition;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaUtils;

public class MQComputationManager
implements ComputationManager {
    private static final Log log = LogFactory.getLog(MQComputationManager.class);
    private final MQManager<Record> manager;
    private final Topology topology;
    private final Settings settings;
    private final List<MQComputationPool> pools;

    public MQComputationManager(MQManager<Record> manager, Topology topology, Settings settings) {
        this.manager = manager;
        this.topology = topology;
        this.settings = settings;
        this.initStreams();
        this.pools = this.initPools();
        Objects.requireNonNull(this.pools);
    }

    @Override
    public void start() {
        log.debug((Object)"Starting ...");
        this.pools.forEach(MQComputationPool::start);
    }

    @Override
    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
        for (MQComputationPool pool : this.pools) {
            if (pool.waitForAssignments(timeout)) continue;
            return false;
        }
        return true;
    }

    @Override
    public boolean stop(Duration timeout) {
        log.debug((Object)"Starting ...");
        long failures = this.pools.parallelStream().filter(comp -> !comp.stop(timeout)).count();
        log.debug((Object)String.format("Stopped %d failure", failures));
        return failures == 0L;
    }

    @Override
    public boolean stop() {
        return this.stop(Duration.ofSeconds(1L));
    }

    @Override
    public boolean drainAndStop(Duration timeout) {
        log.debug((Object)"Drain and stop");
        long failures = this.pools.stream().filter(comp -> !comp.drainAndStop(timeout)).count();
        log.debug((Object)String.format("Drained and stopped %d failure", failures));
        return failures == 0L;
    }

    @Override
    public void shutdown() {
        log.debug((Object)"Shutdown ...");
        this.pools.parallelStream().forEach(MQComputationPool::shutdown);
        log.debug((Object)"Shutdown done");
    }

    @Override
    public long getLowWatermark() {
        HashMap watermarks = new HashMap(this.pools.size());
        Set<String> roots = this.topology.getRoots();
        HashMap<String, Long> watermarkTrees = new HashMap<String, Long>(roots.size());
        this.pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
        for (String root : roots) {
            watermarkTrees.put(root, this.topology.getDescendantComputationNames(root).stream().map(watermarks::get).min(Comparator.naturalOrder()).orElse(0L));
        }
        long ret = watermarkTrees.values().stream().filter(wm -> wm > 1L).min(Comparator.naturalOrder()).orElse(0L);
        if (log.isTraceEnabled()) {
            log.trace((Object)("lowWatermark: " + ret));
            watermarkTrees.forEach((k, v) -> log.trace((Object)("tree " + k + ": " + v)));
        }
        return ret;
    }

    @Override
    public long getLowWatermark(String computationName) {
        Objects.nonNull(computationName);
        HashMap watermarks = new HashMap(this.pools.size());
        this.pools.forEach(pool -> watermarks.put(pool.getComputationName(), pool.getLowWatermark()));
        long ret = this.topology.getAncestorComputationNames(computationName).stream().map(watermarks::get).min(Comparator.naturalOrder()).orElse(0L);
        ret = Math.min(ret, (Long)watermarks.get(computationName));
        return ret;
    }

    @Override
    public boolean isDone(long timestamp) {
        return Watermark.ofValue(this.getLowWatermark()).isDone(timestamp);
    }

    private List<MQComputationPool> initPools() {
        log.debug((Object)"Initializing pools");
        return this.topology.metadataList().stream().map(meta -> new MQComputationPool(this.topology.getSupplier(meta.name()), (ComputationMetadataMapping)meta, this.getDefaultAssignments((ComputationMetadataMapping)meta), this.manager)).collect(Collectors.toList());
    }

    private List<List<MQPartition>> getDefaultAssignments(ComputationMetadataMapping meta) {
        int threads = this.settings.getConcurrency(meta.name());
        HashMap<String, Integer> streams = new HashMap<String, Integer>();
        meta.inputStreams().forEach(streamName -> streams.put((String)streamName, this.settings.getPartitions((String)streamName)));
        return KafkaUtils.roundRobinAssignments(threads, streams);
    }

    private void initStreams() {
        log.debug((Object)"Initializing streams");
        this.topology.streamsSet().forEach(streamName -> this.manager.createIfNotExists((String)streamName, this.settings.getPartitions((String)streamName)));
    }
}

