/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.tools.command;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;
import org.nuxeo.lib.stream.tools.command.Command;
import org.nuxeo.lib.stream.tools.command.LatencyTrackerComputation;
import org.nuxeo.lib.stream.tools.command.PositionCommand;

public class RestoreCommand
extends Command {
    private static final Log log = LogFactory.getLog(RestoreCommand.class);
    protected static final String NAME = "restore";
    protected static final String GROUP = "tools";
    protected boolean verbose = false;
    protected String input;
    protected List<String> logNames;
    protected long date;
    protected boolean dryRun;
    protected String codec;

    @Override
    public String name() {
        return NAME;
    }

    @Override
    public void updateOptions(Options options) {
        options.addOption(Option.builder((String)"l").longOpt("log-name").desc("Restore consumers positions for this LOG, must be a computation Record, can be a comma separated list of log names or ALL").required().hasArg().argName("LOG_NAME").build());
        options.addOption(Option.builder((String)"i").longOpt("log-input").desc("Log name of the input default to _consumer_latencies").hasArg().argName("LOG_INPUT").build());
        options.addOption(Option.builder().longOpt("to-date").desc("Sets the committed positions as they where at a specific date. The date is specified in ISO-8601 format, eg. " + Instant.now()).hasArg().argName("DATE").build());
        options.addOption(Option.builder().longOpt("codec").desc("Codec used to read record, can be: java, avro, avroBinary, avroJson").hasArg().argName("CODEC").build());
        options.addOption(Option.builder().longOpt("verbose").build());
        options.addOption(Option.builder().longOpt("dry-run").desc("Do not change any position").build());
    }

    @Override
    public boolean run(LogManager manager, CommandLine cmd) throws InterruptedException {
        this.logNames = this.getLogNames(manager, cmd.getOptionValue("log-name"));
        this.input = cmd.getOptionValue("log-input", "_consumer_latencies");
        this.date = PositionCommand.getTimestampFromDate(cmd.getOptionValue("to-date"));
        this.verbose = cmd.hasOption("verbose");
        this.dryRun = cmd.hasOption("dry-run");
        this.codec = cmd.getOptionValue("codec");
        return this.restorePosition(manager);
    }

    protected boolean restorePosition(LogManager manager) throws InterruptedException {
        Map<LogPartitionGroup, Latency> latencies = this.readLatencies(manager);
        Map<LogPartitionGroup, LogOffset> offsets = this.searchOffsets(manager, latencies);
        if (this.dryRun) {
            log.info((Object)"# Dry run mode returning without doing any changes");
            return true;
        }
        this.updatePositions(manager, offsets);
        return true;
    }

    protected void updatePositions(LogManager manager, Map<LogPartitionGroup, LogOffset> offsets) {
        log.info((Object)"# Update positions");
        offsets.forEach((key, offset) -> this.updatePosition(manager, (LogPartitionGroup)key, (LogOffset)offset));
    }

    protected void updatePosition(LogManager manager, LogPartitionGroup key, LogOffset offset) {
        if (offset == null) {
            return;
        }
        log.info((Object)(key + " new position: " + offset));
        try (LogTailer<Record> tailer = manager.createTailer(key.group, key.getLogPartition(), this.getRecordCodec(this.codec));){
            tailer.seek(offset);
            tailer.commit();
        }
    }

    protected Map<LogPartitionGroup, LogOffset> searchOffsets(LogManager manager, Map<LogPartitionGroup, Latency> latencies) throws InterruptedException {
        HashMap<LogPartitionGroup, LogOffset> ret = new HashMap<LogPartitionGroup, LogOffset>(latencies.size());
        log.info((Object)"# Searching records matching the latencies lower timestamp and key");
        for (Map.Entry<LogPartitionGroup, Latency> entry : latencies.entrySet()) {
            ret.put(entry.getKey(), this.findOffset(manager, entry.getKey(), entry.getValue()));
        }
        return ret;
    }

    protected LogOffset findOffset(LogManager manager, LogPartitionGroup key, Latency latency) throws InterruptedException {
        long targetWatermark = latency.lower();
        String targetKey = latency.key();
        try (LogTailer<Record> tailer = manager.createTailer(GROUP, key.getLogPartition(), this.getRecordCodec(this.codec));){
            LogRecord<Record> rec = tailer.read(PositionCommand.FIRST_READ_TIMEOUT);
            while (rec != null) {
                long timestamp;
                if ((targetKey == null || targetKey.equals(rec.message().getKey())) && targetWatermark == (timestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp())) {
                    log.info((Object)String.format("%s: offset: %s wm: %d key: %s", key, rec.offset(), rec.message().getWatermark(), rec.message().getKey()));
                    LogOffset logOffset = rec.offset().nextOffset();
                    return logOffset;
                }
                rec = tailer.read(PositionCommand.READ_TIMEOUT);
            }
        }
        log.error((Object)("No offset found for: " + key + ", matching: " + latency.asJson()));
        return null;
    }

    protected Map<LogPartitionGroup, Latency> readLatencies(LogManager manager) throws InterruptedException {
        HashMap<LogPartitionGroup, Latency> latencies = new HashMap<LogPartitionGroup, Latency>();
        log.info((Object)("# Reading latencies log: " + this.input + ", searching for the higher timestamp <= " + this.date));
        try (LogTailer<Record> tailer = manager.createTailer(GROUP, this.input, this.getRecordCodec(this.codec));){
            LogRecord<Record> rec = tailer.read(PositionCommand.FIRST_READ_TIMEOUT);
            while (rec != null) {
                long timestamp = Watermark.ofValue(rec.message().getWatermark()).getTimestamp();
                if (this.date <= 0L || timestamp <= this.date) {
                    Latency latency2;
                    LogPartitionGroup key2 = LatencyTrackerComputation.decodeKey(rec.message().getKey());
                    if (this.logNames.contains(key2.name) && (latency2 = this.decodeLatency(rec.message().getData())) != null && latency2.lower() > 0L) {
                        latencies.put(key2, latency2);
                    }
                }
                rec = tailer.read(PositionCommand.READ_TIMEOUT);
            }
        }
        log.info((Object)"# Latencies found (group:log:partition -> lat)");
        latencies.forEach((key, latency) -> log.info((Object)String.format("%s: %s", key, latency.asJson())));
        return latencies;
    }

    protected Latency decodeLatency(byte[] data) {
        return Latency.fromJson(new String(data, StandardCharsets.UTF_8));
    }

    protected List<String> getLogNames(LogManager manager, String names) {
        if ("all".equalsIgnoreCase(names)) {
            return manager.listAll().stream().filter(name -> !name.startsWith("_")).collect(Collectors.toList());
        }
        List<String> ret = Arrays.asList(names.split(","));
        for (String name2 : ret) {
            if (manager.exists(name2)) continue;
            throw new IllegalArgumentException("Unknown log name: " + name2);
        }
        return ret;
    }
}

