/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.core.mqueues.mqueues.chronicle;

import java.io.Externalizable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.core.mqueues.mqueues.MQAppender;
import org.nuxeo.lib.core.mqueues.mqueues.MQLag;
import org.nuxeo.lib.core.mqueues.mqueues.MQPartition;
import org.nuxeo.lib.core.mqueues.mqueues.MQRebalanceListener;
import org.nuxeo.lib.core.mqueues.mqueues.MQTailer;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleCompoundMQTailer;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleMQAppender;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleMQOffsetTracker;
import org.nuxeo.lib.core.mqueues.mqueues.chronicle.ChronicleMQTailer;
import org.nuxeo.lib.core.mqueues.mqueues.internals.AbstractMQManager;

public class ChronicleMQManager
extends AbstractMQManager {
    private static final Log log = LogFactory.getLog(ChronicleMQManager.class);
    public static final String DEFAULT_RETENTION_DURATION = "4d";
    protected final Path basePath;
    protected final String retentionDuration;

    public ChronicleMQManager(Path basePath) {
        this.basePath = basePath;
        this.retentionDuration = DEFAULT_RETENTION_DURATION;
    }

    public ChronicleMQManager(Path basePath, String retentionDuration) {
        this.basePath = basePath;
        this.retentionDuration = retentionDuration == null ? DEFAULT_RETENTION_DURATION : retentionDuration;
    }

    public String getBasePath() {
        return this.basePath.toAbsolutePath().toString();
    }

    @Override
    public boolean exists(String name) {
        File path = new File(this.basePath.toFile(), name);
        return path.isDirectory() && path.list().length > 0;
    }

    @Override
    public void create(String name, int size) {
        ChronicleMQAppender cq = ChronicleMQAppender.create(new File(this.basePath.toFile(), name), size, this.retentionDuration);
        try {
            cq.close();
        }
        catch (Exception e) {
            throw new RuntimeException("Can not create and close " + name, e);
        }
    }

    @Override
    public boolean delete(String name) {
        File path = new File(this.basePath.toFile(), name);
        if (path.isDirectory()) {
            ChronicleMQManager.deleteQueueBasePath(path);
            return true;
        }
        return false;
    }

    protected MQLag getLagForPartition(String name, int partition, String group) {
        long pos = 0L;
        File path = new File(this.basePath.toFile(), name);
        try (ChronicleMQOffsetTracker offsetTracker = new ChronicleMQOffsetTracker(path.toString(), partition, group);){
            pos = offsetTracker.readLastCommittedOffset();
        }
        ChronicleMQAppender appender = (ChronicleMQAppender)this.getAppender(name);
        if (pos == 0L) {
            pos = appender.firstOffset(partition);
        }
        long end = appender.endOffset(partition);
        long lag = appender.countMessages(partition, pos, end);
        long firstOffset = appender.firstOffset(partition);
        long endMessages = appender.countMessages(partition, firstOffset, end);
        return new MQLag(pos, end, lag, endMessages);
    }

    @Override
    public List<MQLag> getLagPerPartition(String name, String group) {
        int size = this.getAppender(name).size();
        ArrayList<MQLag> ret = new ArrayList<MQLag>(size);
        for (int i = 0; i < size; ++i) {
            ret.add(this.getLagForPartition(name, i, group));
        }
        return ret;
    }

    public String toString() {
        return "ChronicleMQManager{basePath=" + this.basePath + ", retentionDuration='" + this.retentionDuration + '\'' + '}';
    }

    @Override
    public List<String> listAll() {
        if (!this.basePath.toFile().exists() || !this.basePath.toFile().isDirectory()) {
            throw new IllegalArgumentException("Invalid base path: " + this.basePath);
        }
        return Arrays.asList(this.basePath.toFile().list((dir, name) -> new File(dir, name).isDirectory()));
    }

    @Override
    public List<String> listConsumerGroups(String name) {
        File mqRoot = new File(this.basePath.toFile(), name);
        if (!this.exists(name)) {
            throw new IllegalArgumentException("Unknown MQueue: " + name);
        }
        return Arrays.stream(mqRoot.list((dir, rep) -> new File(dir, rep).isDirectory() && ChronicleMQOffsetTracker.isOffsetTracker(rep))).map(ChronicleMQOffsetTracker::getGroupFromDirectory).collect(Collectors.toList());
    }

    @Override
    public <M extends Externalizable> MQAppender<M> createAppender(String name) {
        return ChronicleMQAppender.open(new File(this.basePath.toFile(), name), this.retentionDuration);
    }

    @Override
    protected <M extends Externalizable> MQTailer<M> acquireTailer(Collection<MQPartition> partitions, String group) {
        ArrayList pTailers = new ArrayList(partitions.size());
        partitions.forEach(partition -> pTailers.add((ChronicleMQTailer)((ChronicleMQAppender)this.getAppender(partition.name())).createTailer((MQPartition)partition, group)));
        if (pTailers.size() == 1) {
            return (MQTailer)pTailers.iterator().next();
        }
        return new ChronicleCompoundMQTailer(pTailers, group);
    }

    @Override
    protected <M extends Externalizable> MQTailer<M> doSubscribe(String group, Collection<String> names, MQRebalanceListener listener) {
        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
    }

    protected static void deleteQueueBasePath(File basePath) {
        try {
            log.info((Object)("Removing Chronicle Queues directory: " + basePath));
            try (Stream<Path> paths = Files.list(basePath.toPath());){
                int count = (int)paths.filter(path -> Files.isRegularFile(path, new LinkOption[0]) && !path.toString().endsWith(".cq4")).count();
                if (count > 0) {
                    String msg = "ChronicleMQueue basePath: " + basePath + " contains unknown files, please choose another basePath";
                    log.error((Object)msg);
                    throw new IllegalArgumentException(msg);
                }
            }
            FileUtils.deleteDirectory((File)basePath);
        }
        catch (IOException e) {
            String msg = "Can not remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
            log.error((Object)msg, (Throwable)e);
            throw new IllegalArgumentException(msg, e);
        }
    }
}

