/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.mqueues.workmanager;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import java.io.Externalizable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.work.WorkManagerImpl;
import org.nuxeo.ecm.core.work.WorkQueueRegistry;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.ecm.core.work.api.WorkManager;
import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
import org.nuxeo.ecm.platform.mqueues.MQService;
import org.nuxeo.ecm.platform.mqueues.workmanager.ComputationWork;
import org.nuxeo.lib.core.mqueues.computation.Record;
import org.nuxeo.lib.core.mqueues.computation.Settings;
import org.nuxeo.lib.core.mqueues.computation.Topology;
import org.nuxeo.lib.core.mqueues.computation.Watermark;
import org.nuxeo.lib.core.mqueues.computation.mqueue.MQComputationManager;
import org.nuxeo.lib.core.mqueues.mqueues.MQAppender;
import org.nuxeo.lib.core.mqueues.mqueues.MQLag;
import org.nuxeo.lib.core.mqueues.mqueues.MQManager;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.metrics.NuxeoMetricSet;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.transaction.TransactionHelper;

public class WorkManagerComputation
extends WorkManagerImpl {
    protected static final Log log = LogFactory.getLog(WorkManagerComputation.class);
    public static final String WORKMANAGER_CONFIG_PROP = "nuxeo.mqueue.work.config";
    public static final String DEFAULT_WORKMANAGER_CONFIG = "work";
    public static final String WORKMANAGER_OVERPROVISIONING_PROP = "nuxeo.mqueue.work.over.provisioning";
    public static final String DEFAULT_WORKMANAGER_OVERPROVISIONING = "3";
    public static final int DEFAULT_CONCURRENCY = 4;
    protected Topology topology;
    protected Settings settings;
    protected MQComputationManager manager;
    protected MQManager mqManager;
    protected final Set<String> streamIds = new HashSet<String>();

    protected int getOverProvisioningFactor() {
        return Integer.valueOf(Framework.getProperty((String)WORKMANAGER_OVERPROVISIONING_PROP, (String)DEFAULT_WORKMANAGER_OVERPROVISIONING));
    }

    public void schedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit) {
        String queueId = this.getStreamForCategory(work.getCategory());
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work));
        }
        if (!this.isQueuingEnabled(queueId)) {
            log.info((Object)("Queue disabled, scheduling canceled: " + queueId));
            return;
        }
        if (afterCommit && this.scheduleAfterCommit(work, scheduling)) {
            return;
        }
        WorkSchedulePath.newInstance((Work)work);
        String key = work.getId();
        MQAppender appender = this.mqManager.getAppender(this.getStreamForCategory(work.getCategory()));
        if (appender == null) {
            log.error((Object)String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), this.getStreamForCategory(work.getCategory())));
            return;
        }
        appender.append(key, (Externalizable)new Record(key, ComputationWork.serialize(work), Watermark.ofTimestamp((long)System.currentTimeMillis()).getValue(), null));
    }

    public String getStreamForCategory(String category) {
        if (category != null && this.streamIds.contains(category)) {
            return category;
        }
        return "default";
    }

    public int getApplicationStartedOrder() {
        return -502;
    }

    public void start(ComponentContext context) {
        this.init();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() {
        if (this.started) {
            return;
        }
        log.debug((Object)"Initializing");
        WorkManagerComputation workManagerComputation = this;
        synchronized (workManagerComputation) {
            if (this.started) {
                return;
            }
            this.supplantWorkManagerImpl();
            this.initTopology();
            this.mqManager = this.getMQManager();
            this.manager = new MQComputationManager(this.mqManager);
            this.manager.init(this.topology, this.settings);
            this.started = true;
            Framework.getRuntime().getComponentManager().addListener((ComponentManager.Listener)new ComponentManager.LifeCycleHandler(){

                public void beforeStop(ComponentManager mgr, boolean isStandby) {
                    try {
                        if (!WorkManagerComputation.this.shutdown(10L, TimeUnit.SECONDS)) {
                            log.error((Object)"Some processors are still active");
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new NuxeoException("Interrupted while stopping work manager thread pools", (Throwable)e);
                    }
                }

                public void afterStart(ComponentManager mgr, boolean isResume) {
                    WorkManagerComputation.this.manager.start();
                    for (String id : WorkManagerComputation.this.workQueueConfig.getQueueIds()) {
                        WorkManagerComputation.this.activateQueueMetrics(id);
                    }
                }

                public void afterStop(ComponentManager mgr, boolean isStandby) {
                    Framework.getRuntime().getComponentManager().removeListener((ComponentManager.Listener)this);
                    for (String id : WorkManagerComputation.this.workQueueConfig.getQueueIds()) {
                        WorkManagerComputation.this.deactivateQueueMetrics(id);
                    }
                }
            });
            log.info((Object)"Initialized");
        }
    }

    protected MQManager getMQManager() {
        String config = this.getMQConfig();
        log.info((Object)("Init WorkManagerComputation with MQueue configuration: " + config));
        MQService service = (MQService)Framework.getService(MQService.class);
        return service.getManager(this.getMQConfig());
    }

    protected String getMQConfig() {
        return Framework.getProperty((String)WORKMANAGER_CONFIG_PROP, (String)DEFAULT_WORKMANAGER_CONFIG);
    }

    protected void activateQueueMetrics(String queueId) {
        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", new String[]{"works", "total", queueId});
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).scheduled, "scheduled", new String[0]);
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).running, "running", new String[0]);
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).completed, "completed", new String[0]);
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).canceled, "canceled", new String[0]);
        this.registry.registerAll((MetricSet)queueMetrics);
    }

    protected void deactivateQueueMetrics(String queueId) {
        String queueMetricsName = MetricRegistry.name((String)"nuxeo", (String[])new String[]{"works", "total", queueId});
        this.registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
    }

    protected void supplantWorkManagerImpl() {
        WorkQueueRegistry wqr;
        Field protectedField;
        WorkManagerImpl wmi = (WorkManagerImpl)Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
        Class<WorkManagerImpl> clazz = WorkManagerImpl.class;
        try {
            protectedField = clazz.getDeclaredField("workQueueConfig");
        }
        catch (NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
        protectedField.setAccessible(true);
        try {
            wqr = (WorkQueueRegistry)protectedField.get(wmi);
            log.debug((Object)"Remove contributions from WorkManagerImpl");
            protectedField.set(wmi, new WorkQueueRegistry());
        }
        catch (IllegalAccessException e) {
            throw new RuntimeException(e);
        }
        wqr.getQueueIds().forEach(id -> this.workQueueConfig.addContribution((Object)wqr.get(id)));
        this.streamIds.addAll(this.workQueueConfig.getQueueIds());
        this.workQueueConfig.getQueueIds().forEach(id -> log.info((Object)("Registering : " + id)));
    }

    protected void initTopology() {
        Topology.Builder builder = Topology.builder();
        this.workQueueConfig.getQueueIds().forEach(item -> builder.addComputation(() -> new ComputationWork((String)item), Collections.singletonList("i1:" + item)));
        this.topology = builder.build();
        this.settings = new Settings(4, this.getPartitions(4));
        this.workQueueConfig.getQueueIds().forEach(item -> this.settings.setConcurrency(item, this.workQueueConfig.get(item).getMaxThreads()));
        this.workQueueConfig.getQueueIds().forEach(item -> this.settings.setPartitions(item, this.getPartitions(this.workQueueConfig.get(item).getMaxThreads())));
    }

    protected int getPartitions(int maxThreads) {
        if (maxThreads == 1) {
            return 1;
        }
        return this.getOverProvisioningFactor() * maxThreads;
    }

    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) throws InterruptedException {
        log.warn((Object)"Shutdown a queue is not supported with computation implementation");
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean shutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        log.info((Object)("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"));
        this.shutdownInProgress = true;
        try {
            boolean ret = this.manager.stop(Duration.ofMillis(timeUnit.toMillis(timeout)));
            if (!ret) {
                log.error((Object)"Not able to stop worker pool within the timeout.");
            }
            boolean bl = ret;
            return bl;
        }
        finally {
            this.shutdownInProgress = false;
        }
    }

    public int getQueueSize(String queueId, Work.State state) {
        switch (state) {
            case SCHEDULED: {
                return this.getMetrics(queueId).getScheduled().intValue();
            }
            case RUNNING: {
                return this.getMetrics(queueId).getRunning().intValue();
            }
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader());
            WorkQueueMetrics workQueueMetrics = this.getMetrics(queueId);
            return workQueueMetrics;
        }
        finally {
            Thread.currentThread().setContextClassLoader(classLoader);
        }
    }

    public WorkQueueMetrics getMetrics(String queueId) {
        MQLag lag = this.mqManager.getLag(queueId, queueId);
        long running = 0L;
        if (lag.lag() > 0L) {
            running = Math.min(lag.lag(), (long)this.settings.getPartitions(queueId));
        }
        return new WorkQueueMetrics(queueId, (Number)lag.lag(), (Number)running, (Number)lag.lower(), (Number)0);
    }

    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
        if (queueId != null) {
            return this.awaitCompletionOnQueue(queueId, duration, unit);
        }
        for (String item : this.workQueueConfig.getQueueIds()) {
            if (this.awaitCompletionOnQueue(item, duration, unit)) continue;
            return false;
        }
        return true;
    }

    protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException {
        if (!this.isStarted()) {
            return true;
        }
        log.debug((Object)("awaitCompletion " + queueId + " starting"));
        long durationMs = Math.min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1L));
        long deadline = System.currentTimeMillis() + durationMs;
        while (System.currentTimeMillis() < deadline) {
            Thread.sleep(100L);
            int lag = this.getMetrics(queueId).getScheduled().intValue();
            if (lag == 0) {
                if (log.isDebugEnabled()) {
                    log.warn((Object)("awaitCompletion for " + queueId + " completed " + this.getMetrics(queueId)));
                }
                return true;
            }
            if (log.isDebugEnabled()) continue;
            log.debug((Object)("awaitCompletion for " + queueId + " not completed " + this.getMetrics(queueId)));
        }
        log.warn((Object)String.format("%s timeout after: %.2fs, %s", queueId, (double)durationMs / 1000.0, this.getMetrics(queueId)));
        return false;
    }

    public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) throws InterruptedException {
        if (!this.isStarted()) {
            return true;
        }
        long durationMs = Math.min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1L));
        long deadline = System.currentTimeMillis() + durationMs;
        long lowWatermark = this.getLowWaterMark(queueId);
        while (System.currentTimeMillis() < deadline) {
            Thread.sleep(100L);
            long wm = this.getLowWaterMark(queueId);
            if (wm == lowWatermark) {
                log.debug((Object)("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm));
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("awaitCompletion low wm  for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: " + (wm - lowWatermark)));
            }
            lowWatermark = wm;
        }
        log.warn((Object)String.format("%s timeout after: %.2fs", queueId, (double)durationMs / 1000.0));
        return false;
    }

    protected long getLowWaterMark(String queueId) {
        if (queueId != null) {
            return this.manager.getLowWatermark(queueId);
        }
        return this.manager.getLowWatermark();
    }

    public Work.State getWorkState(String s) {
        return null;
    }

    public Work find(String s, Work.State state) {
        return null;
    }

    public List<Work> listWork(String s, Work.State state) {
        return Collections.emptyList();
    }

    public List<String> listWorkIds(String s, Work.State state) {
        return Collections.emptyList();
    }

    protected boolean scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling) {
        TransactionManager transactionManager;
        try {
            transactionManager = TransactionHelper.lookupTransactionManager();
        }
        catch (NamingException e) {
            transactionManager = null;
        }
        if (transactionManager == null) {
            log.warn((Object)("Not scheduled work after commit because of missing transaction manager: " + work.getId()));
            return false;
        }
        try {
            Transaction transaction = transactionManager.getTransaction();
            if (transaction == null) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Not scheduled work after commit because of missing transaction: " + work.getId()));
                }
                return false;
            }
            int status = transaction.getStatus();
            if (status == 0) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Scheduled after commit: " + work.getId()));
                }
                transaction.registerSynchronization((Synchronization)new WorkScheduling(work, scheduling));
                return true;
            }
            if (status == 3) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Scheduled immediately: " + work.getId()));
                }
                return false;
            }
            if (status == 1) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Cancelling schedule because transaction marked rollback-only: " + work.getId()));
                }
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("Not scheduling work after commit because transaction is in status " + status + ": " + work.getId()));
            }
            return false;
        }
        catch (RollbackException | SystemException e) {
            log.error((Object)"Cannot schedule after commit", e);
            return false;
        }
    }

    public class WorkScheduling
    implements Synchronization {
        public final Work work;
        public final WorkManager.Scheduling scheduling;

        public WorkScheduling(Work work, WorkManager.Scheduling scheduling) {
            this.work = work;
            this.scheduling = scheduling;
        }

        public void beforeCompletion() {
        }

        public void afterCompletion(int status) {
            if (status == 3) {
                WorkManagerComputation.this.schedule(this.work, this.scheduling, false);
            } else if (status != 4) {
                throw new IllegalArgumentException("Unsupported transaction status " + status);
            }
        }
    }
}

