/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.log.chronicle;

import java.io.Externalizable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.chronicle.ChronicleCompoundLogTailer;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogAppender;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogOffsetTracker;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogTailer;
import org.nuxeo.lib.stream.log.chronicle.ChronicleRetentionDuration;
import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;

public class ChronicleLogManager
extends AbstractLogManager {
    private static final Log log = LogFactory.getLog(ChronicleLogManager.class);
    protected final Path basePath;
    protected final ChronicleRetentionDuration retention;

    public ChronicleLogManager(Path basePath) {
        this(basePath, null);
    }

    public ChronicleLogManager(Path basePath, String retentionDuration) {
        this.basePath = basePath;
        this.retention = new ChronicleRetentionDuration(retentionDuration);
    }

    protected static void deleteQueueBasePath(Path basePath) {
        try {
            log.info((Object)("Removing Chronicle Queues directory: " + basePath));
            try (Stream<Path> paths = Files.list(basePath);){
                int count = (int)paths.filter(path -> Files.isRegularFile(path, new LinkOption[0]) && !path.toString().endsWith(".cq4")).count();
                if (count > 0) {
                    String msg = "ChronicleLog basePath: " + basePath + " contains unknown files, please choose another basePath";
                    log.error((Object)msg);
                    throw new IllegalArgumentException(msg);
                }
            }
            FileUtils.deleteDirectory((File)basePath.toFile());
        }
        catch (IOException e) {
            String msg = "Cannot remove Chronicle Queues directory: " + basePath + " " + e.getMessage();
            log.error((Object)msg, (Throwable)e);
            throw new IllegalArgumentException(msg, e);
        }
    }

    public String getBasePath() {
        return this.basePath.toAbsolutePath().toString();
    }

    @Override
    public boolean exists(String name) {
        try {
            return Files.list(this.basePath.resolve(name)).count() > 0L;
        }
        catch (IOException e) {
            return false;
        }
    }

    @Override
    public void create(String name, int size) {
        ChronicleLogAppender.create(this.basePath.resolve(name).toFile(), size, this.retention).close();
    }

    @Override
    public boolean delete(String name) {
        Path path = this.basePath.resolve(name);
        if (Files.isDirectory(path, new LinkOption[0])) {
            ChronicleLogManager.deleteQueueBasePath(path);
            return true;
        }
        return false;
    }

    protected LogLag getLagForPartition(String name, int partition, String group) {
        long pos;
        Path path = this.basePath.resolve(name);
        ChronicleLogAppender appender = (ChronicleLogAppender)this.getAppender(name);
        if (!ChronicleLogOffsetTracker.exists(path, group)) {
            pos = 0L;
        } else {
            try (ChronicleLogOffsetTracker offsetTracker = new ChronicleLogOffsetTracker(path.toString(), partition, group);){
                pos = offsetTracker.readLastCommittedOffset();
            }
        }
        long end = appender.endOffset(partition);
        if (pos == 0L) {
            pos = appender.firstOffset(partition);
        }
        long lag = appender.countMessages(partition, pos, end);
        long firstOffset = appender.firstOffset(partition);
        long endMessages = appender.countMessages(partition, firstOffset, end);
        return new LogLag(pos, end, lag, endMessages);
    }

    @Override
    public List<LogLag> getLagPerPartition(String name, String group) {
        int size = this.getAppender(name).size();
        ArrayList<LogLag> ret = new ArrayList<LogLag>(size);
        for (int i = 0; i < size; ++i) {
            ret.add(this.getLagForPartition(name, i, group));
        }
        return ret;
    }

    public String toString() {
        return "ChronicleLogManager{basePath=" + this.basePath + ", retention='" + this.retention + '\'' + '}';
    }

    @Override
    public List<String> listAll() {
        try {
            return Files.list(this.basePath).filter(x$0 -> Files.isDirectory(x$0, new LinkOption[0])).map(Path::getFileName).map(Path::toString).collect(Collectors.toList());
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Invalid base path: " + this.basePath, e);
        }
    }

    @Override
    public List<String> listConsumerGroups(String name) {
        Path logRoot = this.basePath.resolve(name);
        if (!Files.exists(logRoot, new LinkOption[0])) {
            throw new IllegalArgumentException("Unknown Log: " + name);
        }
        try {
            return Files.list(logRoot).filter(x$0 -> Files.isDirectory(x$0, new LinkOption[0])).map(Path::getFileName).map(Path::toString).filter(ChronicleLogOffsetTracker::isOffsetTracker).map(ChronicleLogOffsetTracker::getGroupFromDirectory).collect(Collectors.toList());
        }
        catch (IOException e) {
            throw new IllegalArgumentException("Cannot access Log: " + name, e);
        }
    }

    @Override
    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String name) {
        return ChronicleLogAppender.open(this.basePath.resolve(name).toFile(), this.retention);
    }

    @Override
    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> partitions, String group) {
        ArrayList pTailers = new ArrayList(partitions.size());
        partitions.forEach(partition -> pTailers.add((ChronicleLogTailer)((ChronicleLogAppender)this.getAppender(partition.name())).createTailer((LogPartition)partition, group)));
        if (pTailers.size() == 1) {
            return (LogTailer)pTailers.iterator().next();
        }
        return new ChronicleCompoundLogTailer(pTailers, group);
    }

    @Override
    protected <M extends Externalizable> LogTailer<M> doSubscribe(String group, Collection<String> names, RebalanceListener listener) {
        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
    }
}

