/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.core.bulk.introspection;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.nuxeo.lib.stream.log.Name;

public class StreamIntrospectionConverter {
    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    protected static final long ACTIVE_THRESHOLD_SECONDS = 300L;
    protected static final String EMPTY_JSON_ARRAY = "[]";
    protected final String json;
    protected final JsonNode root;

    public StreamIntrospectionConverter(String json) {
        if (StringUtils.isBlank((CharSequence)json)) {
            throw new IllegalArgumentException("Cannot convert blank JSON");
        }
        this.json = json;
        try {
            this.root = OBJECT_MAPPER.readTree(json);
        }
        catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Invalid JSON: " + json, e);
        }
    }

    public String getStreams() {
        if (this.root.has("streams")) {
            return this.root.get("streams").toString();
        }
        return EMPTY_JSON_ARRAY;
    }

    public String getConsumers(String stream) {
        if (StringUtils.isBlank((CharSequence)stream)) {
            return EMPTY_JSON_ARRAY;
        }
        String match = "stream:" + stream;
        JsonNode node = this.root.get("processors");
        HashSet<String> consumers = new HashSet<String>();
        if (node != null && node.isArray()) {
            for (JsonNode item : node) {
                JsonNode topologies = item.get("topology");
                for (JsonNode topo : topologies) {
                    String target;
                    String source = topo.get(0).asText();
                    if (!match.equals(source) || !(target = topo.get(1).asText()).startsWith("computation:")) continue;
                    consumers.add(target.substring(12));
                }
            }
        }
        return consumers.stream().map(consumer -> "{\"stream\":\"" + stream + "\",\"consumer\":\"" + consumer + "\"}").collect(Collectors.joining(",", "[", "]"));
    }

    public String getPuml() {
        StringBuilder ret = new StringBuilder();
        ret.append("@startuml\n");
        Map<String, String> streamMetrics = this.parseMetrics();
        ret.append(this.getPumlHeader("Stream Introspection at " + streamMetrics.get("date")));
        JsonNode node = this.root.get("streams");
        if (node != null && node.isArray()) {
            for (JsonNode item : node) {
                this.dumpStream(ret, item, streamMetrics);
            }
        }
        if ((node = this.root.get("processors")) != null && node.isArray()) {
            for (JsonNode item : node) {
                JsonNode topologies;
                String host = item.at("/metadata/nodeId").asText();
                String created = Instant.ofEpochSecond(item.at("/metadata/created").asLong()).toString();
                JsonNode computations = item.get("computations");
                if (computations.isArray()) {
                    for (JsonNode computation : computations) {
                        this.dumpComputation(host, ret, computation, streamMetrics, created);
                    }
                }
                if (!(topologies = item.get("topology")).isArray()) continue;
                for (JsonNode topo : topologies) {
                    String comment = "";
                    String source = topo.get(0).asText();
                    String target = topo.get(1).asText();
                    if (target.startsWith("computation:")) {
                        String stream = source.replace("stream:", "");
                        String computation = target.replace("computation:", "");
                        String lag = streamMetrics.get(stream + ":" + computation + ":lag");
                        String latency = streamMetrics.get(stream + ":" + computation + ":latency");
                        String pos = streamMetrics.get(stream + ":" + computation + ":pos");
                        String end = this.getStreamEnd(streamMetrics, stream);
                        if (lag != null && !"0".equals(lag)) {
                            comment = String.format(": %s/%s lag: %s, latency: %ss", pos, end, lag, latency);
                        }
                    }
                    ret.append(String.format("%s==>%s%s%n", this.getPumlIdentifierForHost(host, source), this.getPumlIdentifierForHost(host, target), comment));
                }
            }
        }
        ret.append("@enduml\n");
        return ret.toString();
    }

    protected Map<String, String> parseMetrics() {
        HashMap<String, String> streamMetrics = new HashMap<String, String>();
        JsonNode node = this.root.get("metrics");
        long timestamp = 0L;
        if (node != null && node.isArray()) {
            for (JsonNode host : node) {
                JsonNode hostMetrics;
                String nodeId = host.get("nodeId").asText();
                long metricTimestamp = host.get("timestamp").asLong();
                if (metricTimestamp > timestamp) {
                    timestamp = metricTimestamp;
                }
                if (!(hostMetrics = host.get("metrics")).isArray()) continue;
                for (JsonNode metric : hostMetrics) {
                    int value;
                    Object computationName;
                    if (metric.has("stream")) {
                        String key = metric.get("k").asText();
                        String streamName = Name.urnOfId((String)metric.get("stream").asText());
                        String computationName2 = Name.urnOfId((String)metric.get("group").asText());
                        if ("nuxeo.streams.global.stream.group.end".equals(key)) {
                            streamMetrics.put(streamName + ":end", metric.get("v").asText());
                            continue;
                        }
                        if ("nuxeo.streams.global.stream.group.lag".equals(key)) {
                            streamMetrics.put(streamName + ":" + computationName2 + ":lag", metric.get("v").asText());
                            continue;
                        }
                        if ("nuxeo.streams.global.stream.group.latency".equals(key)) {
                            streamMetrics.put(streamName + ":" + computationName2 + ":latency", this.getNiceDouble(metric.get("v").asDouble() / 1000.0));
                            continue;
                        }
                        if (!"nuxeo.streams.global.stream.group.pos".equals(key)) continue;
                        streamMetrics.put(streamName + ":" + computationName2 + ":pos", metric.get("v").asText());
                        continue;
                    }
                    if (metric.get("k").asText().endsWith("processRecord")) {
                        int count = metric.get("count").asInt();
                        if (count == 0) continue;
                        computationName = Name.urnOfId((String)metric.get("computation").asText());
                        streamMetrics.put((String)computationName + ":" + nodeId + ":count", metric.get("count").asText());
                        streamMetrics.put((String)computationName + ":" + nodeId + ":sum", this.getNiceDouble3(metric.get("sum").asDouble() / 1.0E9));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":p50", this.getNiceDouble3(metric.get("p50").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":mean", this.getNiceDouble3(metric.get("mean").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":p99", this.getNiceDouble3(metric.get("p99").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":rate1m", this.getNiceDouble(metric.get("rate1m").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":rate5m", this.getNiceDouble(metric.get("rate5m").asDouble()));
                        continue;
                    }
                    if (metric.get("k").asText().endsWith("processTimer")) {
                        int count = metric.get("count").asInt();
                        if (count == 0) continue;
                        computationName = Name.urnOfId((String)metric.get("computation").asText());
                        streamMetrics.put((String)computationName + ":" + nodeId + ":timer:count", metric.get("count").asText());
                        streamMetrics.put((String)computationName + ":" + nodeId + ":timer:sum", this.getNiceDouble3(metric.get("sum").asDouble() / 1.0E9));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":timer:p50", this.getNiceDouble3(metric.get("p50").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":timer:mean", this.getNiceDouble3(metric.get("mean").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":timer:p99", this.getNiceDouble3(metric.get("p99").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":timer:rate1m", this.getNiceDouble(metric.get("rate1m").asDouble()));
                        streamMetrics.put((String)computationName + ":" + nodeId + ":timer:rate5m", this.getNiceDouble(metric.get("rate5m").asDouble()));
                        continue;
                    }
                    if (metric.get("k").asText().endsWith("computation.failure")) {
                        int failure = metric.get("v").asInt();
                        if (failure <= 0) continue;
                        computationName = Name.urnOfId((String)metric.get("computation").asText()) + ":" + nodeId;
                        streamMetrics.put((String)computationName + ":failure", metric.get("v").asText());
                        continue;
                    }
                    if (metric.get("k").asText().endsWith("stream.failure")) {
                        int value2 = metric.get("v").asInt();
                        if (value2 <= 0) continue;
                        streamMetrics.put(nodeId + ":failure", metric.get("v").asText());
                        continue;
                    }
                    if (!metric.get("k").asText().endsWith("computation.skippedRecord") || (value = metric.get("v").asInt()) <= 0) continue;
                    computationName = Name.urnOfId((String)metric.get("computation").asText()) + ":" + nodeId;
                    streamMetrics.put((String)computationName + ":skipped", metric.get("v").asText());
                }
            }
        }
        streamMetrics.put("timestamp", String.valueOf(timestamp));
        streamMetrics.put("date", Instant.ofEpochSecond(timestamp).toString());
        return streamMetrics;
    }

    protected String getNiceDouble(Double number) {
        return String.format("%.2f", number);
    }

    protected String getNiceDouble3(Double number) {
        return String.format("%.3f", number);
    }

    protected String getPumlHeader(String title) {
        return "title " + title + "\n\nskinparam defaultFontName Courier\nskinparam handwritten false\nskinparam queueBackgroundColor LightYellow\nskinparam nodeBackgroundColor Azure\nskinparam componentBackgroundColor Azure\nskinparam nodebackgroundColor<<failure>> Yellow\nskinparam componentbackgroundColor<<failure>> Yellow\nskinparam component {\n  BorderColor black\n  ArrowColor #CC6655\n}\n";
    }

    protected String getPumlIdentifierForHost(String host, String id) {
        if (id.startsWith("computation:")) {
            return this.getPumlIdentifier(id + ":" + host);
        }
        return this.getPumlIdentifier(id);
    }

    protected void dumpStream(StringBuilder ret, JsonNode item, Map<String, String> metrics) {
        String name = item.get("name").asText();
        String partitions = item.get("partitions").asText();
        String codec = item.get("codec").asText();
        ret.append(String.format("queue %s [%s%n----%npartitions: %s%ncodec: %s%n-----%nrecords: %s]%n", this.getPumlIdentifier("stream:" + name), name, partitions, codec, this.getStreamEnd(metrics, name)));
    }

    protected String getStreamEnd(Map<String, String> metrics, String name) {
        String ret = metrics.get(name + ":end");
        return ret == null ? "0" : ret;
    }

    protected void dumpComputation(String host, StringBuilder ret, JsonNode item, Map<String, String> metrics, String created) {
        String name = item.get("name").asText();
        String threads = item.get("threads").asText();
        String continueOnFailure = item.get("continueOnFailure").asText();
        String failure = "";
        if (metrics.containsKey(name + ":" + host + ":failure")) {
            failure = " <<failure>>";
        }
        ret.append(String.format("component %s %s[%s%n----%ncreated: %s%nthreads: %s%ncontinue on failure: %s%n%s%s]%n", this.getPumlIdentifier("computation:" + name + ":" + host), failure, name + " on " + host, created, threads, continueOnFailure, this.getBatchInfo(item), this.getComputationMetrics(host, name, item, metrics)));
    }

    protected String getComputationMetrics(String host, String name, JsonNode item, Map<String, String> metrics) {
        Object ret = "";
        String baseKey = name + ":" + host;
        if (!metrics.containsKey(baseKey + ":count")) {
            return ret;
        }
        ret = (String)ret + "\n----\n";
        if (metrics.containsKey(baseKey + ":failure")) {
            ret = (String)ret + "FAILURE: " + metrics.get(baseKey + ":failure") + "\n";
        }
        ret = (String)ret + "record count: " + metrics.get(baseKey + ":count") + ", total: " + metrics.get(baseKey + ":sum") + "s\n";
        if (metrics.containsKey(baseKey + ":skipped")) {
            ret = (String)ret + "record skipped: " + metrics.get(baseKey + ":skipped") + "\n";
        }
        ret = (String)ret + "mean: " + metrics.get(baseKey + ":mean") + "s, p50: " + metrics.get(baseKey + ":p50") + "s, p99: " + metrics.get(baseKey + ":p99") + "s\n";
        ret = (String)ret + "rate 1min: " + metrics.get(baseKey + ":rate1m") + "op/s, 5min: " + metrics.get(baseKey + ":rate5m") + "op/s";
        if (!metrics.containsKey(baseKey + ":timer:count")) {
            return ret;
        }
        ret = (String)ret + "\n----\n";
        baseKey = baseKey + ":timer";
        ret = (String)ret + "timer count: " + metrics.get(baseKey + ":count") + ", total: " + metrics.get(baseKey + ":sum") + "s\n";
        ret = (String)ret + "mean: " + metrics.get(baseKey + ":mean") + "s, p50: " + metrics.get(baseKey + ":p50") + "s, p99: " + metrics.get(baseKey + ":p99") + "s\n";
        ret = (String)ret + "rate 5min: " + metrics.get(baseKey + ":rate5m") + "op/s";
        return ret;
    }

    protected String getBatchInfo(JsonNode item) {
        Object ret = "";
        int batchCapacity = item.get("batchCapacity").asInt();
        if (batchCapacity > 1) {
            int batchThresholdMs = item.get("batchCapacity").asInt();
            ret = (String)ret + "batch " + item.get("batchCapacity").asText() + " " + batchThresholdMs + "ms\n";
        } else {
            ret = (String)ret + "no batch\n";
        }
        int retry = item.get("maxRetries").asInt();
        ret = retry > 1 ? (String)ret + "max retry: " + item.get("maxRetries").asText() + ", delay: " + item.get("retryDelayMs").asText() + "ms" : (String)ret + "no retry";
        return ret;
    }

    public String getActivity() {
        return this.getActivity(System.currentTimeMillis() / 1000L);
    }

    public String getActivity(long atTimestamp) {
        ObjectMapper mapper = new ObjectMapper();
        ObjectNode ret = mapper.getNodeFactory().objectNode();
        JsonNode nodes = this.getClusterNodes(atTimestamp);
        int workerCount = 0;
        for (JsonNode node : nodes) {
            if (!"worker".equals(node.at("/type").asText())) continue;
            ++workerCount;
        }
        JsonNode computations = this.getActiveComputations(atTimestamp);
        JsonNode scale = this.getScaleMetrics(workerCount, (ArrayNode)computations);
        ret.set("scale", scale);
        ret.set("nodes", nodes);
        ret.set("computations", computations);
        return ret.toString();
    }

    protected JsonNode getScaleMetrics(int workerCount, ArrayNode computations) {
        int current = workerCount > 0 ? workerCount : -1;
        int bestNodes = workerCount > 0 ? 1 : -1;
        for (JsonNode computation : computations) {
            int bNodes;
            int nodes = computation.at("/current/nodes").asInt();
            if (nodes > current) {
                current = nodes;
            }
            if ((bNodes = computation.at("/best/nodes").asInt()) <= bestNodes) continue;
            bestNodes = bNodes;
        }
        ObjectNode ret = new ObjectMapper().getNodeFactory().objectNode();
        ret.put("currentNodes", current);
        ret.put("bestNodes", bestNodes);
        ret.put("metric", bestNodes - current);
        return ret;
    }

    protected JsonNode getActiveComputations(long atTimestamp) {
        ObjectNode comp;
        ObjectMapper mapper = new ObjectMapper();
        ArrayNode ret = mapper.getNodeFactory().arrayNode();
        HashMap<JsonNode, ObjectNode> computations = new HashMap<JsonNode, ObjectNode>();
        JsonNode metrics = this.root.get("metrics");
        if (metrics == null || !metrics.isArray() || metrics.isEmpty()) {
            return ret;
        }
        JsonNode processors = this.root.get("processors");
        if (processors == null || !processors.isArray() || processors.isEmpty()) {
            return ret;
        }
        HashMap<String, Integer> partitions = new HashMap<String, Integer>();
        for (JsonNode stream : this.root.get("streams")) {
            partitions.put(Name.ofUrn((String)stream.get("name").asText()).getId(), stream.get("partitions").asInt());
        }
        HashMap<String, Integer> threads = new HashMap<String, Integer>();
        for (JsonNode item : processors) {
            ArrayNode comps = (ArrayNode)item.get("computations");
            for (Object comp2 : comps) {
                String name = Name.ofUrn((String)comp2.get("name").asText()).getId();
                threads.put(name, comp2.get("threads").asInt());
            }
        }
        for (JsonNode node : metrics) {
            long ts = node.get("timestamp").asLong();
            if (atTimestamp - ts > 300L) continue;
            for (JsonNode metric : node.at("/metrics")) {
                if (!"nuxeo.streams.global.stream.group.lag".equals(metric.get("k").asText()) || metric.get("v").asInt() <= 0) continue;
                comp = (ObjectNode)computations.get(metric.get("group"));
                if (comp == null) {
                    comp = mapper.getNodeFactory().objectNode();
                    comp.set("computation", metric.get("group"));
                    comp.set("streams", (JsonNode)mapper.getNodeFactory().objectNode());
                }
                ObjectNode streams = (ObjectNode)comp.get("streams");
                ObjectNode stream = mapper.getNodeFactory().objectNode();
                stream.set("stream", metric.get("stream"));
                stream.put("partitions", (Integer)partitions.get(metric.get("stream").asText()));
                stream.set("lag", metric.get("v"));
                streams.set(metric.get("stream").asText(), (JsonNode)stream);
                comp.set("nodes", (JsonNode)mapper.getNodeFactory().arrayNode());
                computations.put(metric.get("group"), comp);
            }
        }
        for (JsonNode node : metrics) {
            long ts = node.get("timestamp").asLong();
            if (atTimestamp - ts > 300L) continue;
            for (JsonNode metric : node.at("/metrics")) {
                ObjectNode stream;
                if ("nuxeo.streams.global.stream.group.latency".equals(metric.get("k").asText()) && metric.get("v").asInt() > 0) {
                    comp = (ObjectNode)computations.get(metric.get("group"));
                    if (comp == null || (stream = (ObjectNode)comp.get("streams").get(metric.get("stream").asText())) == null) continue;
                    stream.set("latency", metric.get("v"));
                    continue;
                }
                if (!"nuxeo.streams.global.stream.group.end".equals(metric.get("k").asText()) || (comp = (ObjectNode)computations.get(metric.get("group"))) == null || (stream = (ObjectNode)comp.get("streams").get(metric.get("stream").asText())) == null) continue;
                stream.set("end", metric.get("v"));
            }
        }
        for (JsonNode node : metrics) {
            JsonNode ts = node.get("timestamp");
            if (atTimestamp - ts.asLong() > 300L) continue;
            JsonNode nodeId = node.get("nodeId");
            for (JsonNode metric : node.at("/metrics")) {
                if (!"nuxeo.streams.computation.processRecord".equals(metric.get("k").asText()) || metric.get("count").asInt() <= 0 || (comp = (ObjectNode)computations.get(metric.get("computation"))) == null) continue;
                ObjectNode compInstance = mapper.getNodeFactory().objectNode();
                compInstance.set("nodeId", nodeId);
                compInstance.put("threads", threads.getOrDefault(metric.get("computation").asText(), 1));
                compInstance.set("timestamp", ts);
                compInstance.set("count", metric.get("count"));
                compInstance.set("sum", metric.get("sum"));
                compInstance.set("rate1m", metric.get("rate1m"));
                compInstance.set("rate5m", metric.get("rate5m"));
                compInstance.set("min", metric.get("min"));
                compInstance.set("p50", metric.get("p50"));
                compInstance.set("mean", metric.get("mean"));
                compInstance.set("p95", metric.get("p95"));
                compInstance.set("max", metric.get("max"));
                compInstance.set("stddev", metric.get("stddev"));
                ((ArrayNode)comp.get("nodes")).add((JsonNode)compInstance);
            }
        }
        for (ObjectNode comp3 : computations.values()) {
            int count = 0;
            int threadsCount = 0;
            float rate1m = 0.0f;
            int threadsPerNode = 0;
            for (JsonNode node : comp3.get("nodes")) {
                ++count;
                threadsPerNode = node.get("threads").asInt();
                threadsCount += threadsPerNode;
                rate1m += (float)node.get("rate1m").asDouble();
            }
            int lag = 0;
            int part = 0;
            Iterator iter = comp3.get("streams").elements();
            while (iter.hasNext()) {
                JsonNode stream = (JsonNode)iter.next();
                if (stream.get("lag").asInt() <= lag) continue;
                lag = stream.get("lag").asInt();
                part = (Integer)partitions.get(stream.get("stream").asText());
            }
            if (count == 0 || lag == 0) continue;
            int eta = (int)((float)lag / rate1m);
            ObjectNode current = mapper.getNodeFactory().objectNode();
            current.put("nodes", count);
            current.put("threads", threadsCount);
            current.put("rate1m", rate1m);
            current.put("eta", eta);
            ObjectNode best = mapper.getNodeFactory().objectNode();
            int bestNodes = (int)Math.ceil((double)part / (double)threadsPerNode);
            int bestThreads = part;
            int bestEta = (int)((float)lag / (rate1m * (float)bestThreads / (float)threadsCount));
            best.put("nodes", bestNodes);
            best.put("threads", bestThreads);
            best.put("rate1m", rate1m * (float)bestThreads / (float)threadsCount);
            best.put("eta", bestEta);
            comp3.set("current", (JsonNode)current);
            comp3.set("best", (JsonNode)best);
        }
        computations.values().forEach(arg_0 -> ((ArrayNode)ret).add(arg_0));
        return ret;
    }

    protected JsonNode getClusterNodes(long atTimestamp) {
        HashMap<String, ObjectNode> nodes = new HashMap<String, ObjectNode>();
        ObjectMapper mapper = new ObjectMapper();
        ArrayNode ret = mapper.getNodeFactory().arrayNode();
        JsonNode processors = this.root.get("processors");
        if (processors != null && processors.isArray()) {
            for (JsonNode item : processors) {
                ObjectNode node2 = (ObjectNode)item.at("/metadata").deepCopy();
                node2.remove("processorName");
                node2.put("created", Instant.ofEpochSecond(node2.at("/created").asLong()).toString());
                nodes.put(item.at("/metadata/nodeId").asText(), node2);
            }
        }
        JsonNode metrics = this.root.get("metrics");
        HashSet<String> activeNodes = new HashSet<String>();
        if (metrics != null && metrics.isArray()) {
            for (JsonNode item : metrics) {
                ObjectNode node3 = (ObjectNode)nodes.get(item.at("/nodeId").asText());
                if (node3 == null) continue;
                long timestamp = item.at("/timestamp").asLong();
                if (atTimestamp - timestamp <= 300L) {
                    activeNodes.add(item.at("/nodeId").asText());
                }
                node3.put("alive", Instant.ofEpochSecond(timestamp).toString());
                JsonNode hostMetrics = item.get("metrics");
                if (!hostMetrics.isArray()) continue;
                String nodeType = "front";
                for (JsonNode it : hostMetrics) {
                    if (!"nuxeo.streams.computation.running".equals(it.get("k").asText()) || !"work-common".equals(it.get("computation").asText())) continue;
                    nodeType = "worker";
                    break;
                }
                node3.put("type", nodeType);
            }
        }
        nodes.forEach((nodeId, node) -> {
            if (activeNodes.contains(nodeId)) {
                ret.add((JsonNode)node);
            }
        });
        return ret;
    }

    protected String getPumlIdentifier(String name) {
        return name.replaceAll("[^a-zA-Z0-9]", ".");
    }
}

