/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.core.work;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.work.WorkComputation;
import org.nuxeo.ecm.core.work.WorkManagerImpl;
import org.nuxeo.ecm.core.work.WorkStateHelper;
import org.nuxeo.ecm.core.work.api.Work;
import org.nuxeo.ecm.core.work.api.WorkManager;
import org.nuxeo.ecm.core.work.api.WorkQueueDescriptor;
import org.nuxeo.ecm.core.work.api.WorkQueueMetrics;
import org.nuxeo.ecm.core.work.api.WorkSchedulePath;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.ComputationPolicyBuilder;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.RecordFilter;
import org.nuxeo.lib.stream.computation.RecordFilterChain;
import org.nuxeo.lib.stream.computation.Settings;
import org.nuxeo.lib.stream.computation.StreamManager;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.lib.stream.computation.internals.RecordFilterChainImpl;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.metrics.NuxeoMetricSet;
import org.nuxeo.runtime.model.ComponentContext;
import org.nuxeo.runtime.model.ComponentManager;
import org.nuxeo.runtime.model.Descriptor;
import org.nuxeo.runtime.services.config.ConfigurationService;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.transaction.TransactionHelper;

public class StreamWorkManager
extends WorkManagerImpl {
    protected static final Log log = LogFactory.getLog(StreamWorkManager.class);
    public static final String WORK_LOG_CONFIG_PROP = "nuxeo.stream.work.log.config";
    public static final String DEFAULT_WORK_LOG_CONFIG = "work";
    public static final String WORK_CODEC_PROP = "nuxeo.stream.work.log.codec";
    public static final String DEFAULT_WORK_CODEC = "legacy";
    public static final String WORK_OVER_PROVISIONING_PROP = "nuxeo.stream.work.over.provisioning.factor";
    public static final String DEFAULT_WORK_OVER_PROVISIONING = "3";
    public static final int DEFAULT_CONCURRENCY = 4;
    public static final String STATETTL_KEY = "nuxeo.stream.work.state.ttl.seconds";
    public static final String STORESTATE_KEY = "nuxeo.stream.work.storestate.enabled";
    public static final String STATETTL_DEFAULT_VALUE = "3600";
    public static final String COMPUTATION_FILTER_CLASS_KEY = "nuxeo.stream.work.computation.filter.class";
    public static final String COMPUTATION_FILTER_STORE_KEY = "nuxeo.stream.work.computation.filter.storeName";
    public static final String COMPUTATION_FILTER_STORE_TTL_KEY = "nuxeo.stream.work.computation.filter.storeTTL";
    public static final String COMPUTATION_FILTER_THRESHOLD_SIZE_KEY = "nuxeo.stream.work.computation.filter.thresholdSize";
    public static final String COMPUTATION_FILTER_PREFIX_KEY = "nuxeo.stream.work.computation.filter.storeKeyPrefix";
    protected Topology topology;
    protected Topology topologyDisabled;
    protected Settings settings;
    protected StreamProcessor streamProcessor;
    protected LogManager logManager;
    protected StreamManager streamManager;
    protected boolean storeState;
    protected long stateTTL;

    protected int getOverProvisioningFactor() {
        if (this.getLogManager().supportSubscribe()) {
            return Integer.parseInt(Framework.getProperty((String)WORK_OVER_PROVISIONING_PROP, (String)DEFAULT_WORK_OVER_PROVISIONING));
        }
        return 1;
    }

    protected String getCodecName() {
        return Framework.getProperty((String)WORK_CODEC_PROP, (String)DEFAULT_WORK_CODEC);
    }

    protected Codec<Record> getCodec() {
        return ((CodecService)Framework.getService(CodecService.class)).getCodec(this.getCodecName(), Record.class);
    }

    @Override
    public void schedule(Work work, WorkManager.Scheduling scheduling, boolean afterCommit) {
        String queueId = this.getCategoryQueueId(work.getCategory());
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Scheduling: workId: %s, category: %s, queue: %s, scheduling: %s, afterCommit: %s, work: %s", new Object[]{work.getId(), work.getCategory(), queueId, scheduling, afterCommit, work}));
        }
        if (!this.isQueuingEnabled(queueId)) {
            log.info((Object)("Queue disabled, scheduling canceled: " + queueId));
            return;
        }
        if (WorkManager.Scheduling.CANCEL_SCHEDULED.equals((Object)scheduling)) {
            if (this.storeState) {
                if (WorkStateHelper.getState(work.getId()) != null) {
                    WorkStateHelper.setCanceled(work.getId());
                }
            } else {
                log.warn((Object)String.format("Canceling a work is only supported if '%s' is true. Skipping work: %s", STORESTATE_KEY, work));
            }
            return;
        }
        if (afterCommit && this.scheduleAfterCommit(work, scheduling)) {
            return;
        }
        WorkSchedulePath.newInstance(work);
        LogAppender appender = this.logManager.getAppender(queueId);
        if (appender == null) {
            log.error((Object)String.format("Not scheduled work, unknown category: %s, mapped to %s", work.getCategory(), queueId));
            return;
        }
        String key = work.getPartitionKey();
        LogOffset offset = this.streamManager.append(queueId, Record.of((String)key, (byte[])WorkComputation.serialize(work)));
        if (work.isCoalescing()) {
            WorkStateHelper.setLastOffset(work.getId(), offset.offset(), this.stateTTL);
        }
        if (work.isGroupJoin()) {
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Submit Work: %s to GroupJoin: %s, offset: %s", work.getId(), work.getPartitionKey(), offset));
            }
            WorkStateHelper.addGroupJoinWork(work.getPartitionKey());
        }
        if (this.storeState) {
            WorkStateHelper.setState(work.getId(), Work.State.SCHEDULED, this.stateTTL);
        }
    }

    @Override
    public int getApplicationStartedOrder() {
        return -502;
    }

    @Override
    public void start(ComponentContext context) {
        super.start(context);
        ConfigurationService configuration = (ConfigurationService)Framework.getService(ConfigurationService.class);
        this.storeState = configuration.isBooleanPropertyTrue(STORESTATE_KEY);
        this.stateTTL = Long.parseLong(configuration.getProperty(STATETTL_KEY, STATETTL_DEFAULT_VALUE));
    }

    protected RecordFilterChain getRecordFilter() {
        String filterClass = this.getRecordFilterClass();
        if (filterClass == null) {
            return null;
        }
        RecordFilterChainImpl filter = new RecordFilterChainImpl();
        try {
            Class<?> klass = Class.forName(filterClass);
            if (!RecordFilter.class.isAssignableFrom(klass)) {
                throw new IllegalArgumentException("Invalid class for RecordFilter: " + filterClass);
            }
            RecordFilter ret = (RecordFilter)klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            ret.init(this.getRecordFilterOptions());
            filter.addFilter(ret);
        }
        catch (ReflectiveOperationException e) {
            throw new IllegalArgumentException("Invalid class for RecordFilter: " + filterClass, e);
        }
        return filter;
    }

    protected Map<String, String> getRecordFilterOptions() {
        HashMap<String, String> ret = new HashMap<String, String>();
        ConfigurationService configuration = (ConfigurationService)Framework.getService(ConfigurationService.class);
        configuration.getString(COMPUTATION_FILTER_STORE_KEY).ifPresent(value -> ret.put("storeName", (String)value));
        configuration.getString(COMPUTATION_FILTER_PREFIX_KEY).ifPresent(value -> ret.put("prefix", (String)value));
        configuration.getInteger(COMPUTATION_FILTER_THRESHOLD_SIZE_KEY).ifPresent(value -> ret.put("thresholdSize", value.toString()));
        configuration.getString(COMPUTATION_FILTER_STORE_TTL_KEY).ifPresent(value -> ret.put("storeTTL", (String)value));
        return ret;
    }

    protected String getRecordFilterClass() {
        ConfigurationService configuration = (ConfigurationService)Framework.getService(ConfigurationService.class);
        return configuration.getString(COMPUTATION_FILTER_CLASS_KEY).orElse(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void init() {
        if (this.started) {
            return;
        }
        WorkManagerImpl wmi = (WorkManagerImpl)Framework.getRuntime().getComponent("org.nuxeo.ecm.core.work.service");
        wmi.active = false;
        log.debug((Object)"Initializing");
        StreamWorkManager streamWorkManager = this;
        synchronized (streamWorkManager) {
            if (this.started) {
                return;
            }
            this.getDescriptors("queues").forEach(d -> this.categoryToQueueId.put(d.getId(), d.getId()));
            this.index();
            this.initTopology();
            this.logManager = this.getLogManager();
            this.streamManager = this.getStreamManager();
            this.streamManager.register("StreamWorkManagerDisable", this.topologyDisabled, this.settings);
            this.streamProcessor = this.streamManager.registerAndCreateProcessor("StreamWorkManager", this.topology, this.settings);
            this.started = true;
            new ComponentListener().install();
            log.info((Object)"Initialized");
        }
    }

    protected LogManager getLogManager() {
        String config = this.getLogConfig();
        log.info((Object)("Init StreamWorkManager with Log configuration: " + config));
        StreamService service = (StreamService)Framework.getService(StreamService.class);
        return service.getLogManager(this.getLogConfig());
    }

    protected StreamManager getStreamManager() {
        StreamService service = (StreamService)Framework.getService(StreamService.class);
        return service.getStreamManager(this.getLogConfig());
    }

    protected String getLogConfig() {
        return Framework.getProperty((String)WORK_LOG_CONFIG_PROP, (String)DEFAULT_WORK_LOG_CONFIG);
    }

    @Override
    public boolean isProcessingEnabled(String queueId) {
        WorkQueueDescriptor wqd = this.getWorkQueueDescriptor(queueId);
        return wqd != null && wqd.isProcessingEnabled();
    }

    protected void initTopology() {
        List descriptors = this.getDescriptors("queues");
        Topology.Builder builder = Topology.builder();
        descriptors.stream().filter(WorkQueueDescriptor::isProcessingEnabled).forEach(d -> builder.addComputation(() -> new WorkComputation(d.getId()), Collections.singletonList("i1:" + d.getId())));
        this.topology = builder.build();
        Topology.Builder builderDisabled = Topology.builder();
        descriptors.stream().filter(d -> !d.isProcessingEnabled()).forEach(d -> builderDisabled.addComputation(() -> new WorkComputation(d.getId()), Collections.singletonList("i1:" + d.getId())));
        this.topologyDisabled = builderDisabled.build();
        ComputationPolicy policy = new ComputationPolicyBuilder().continueOnFailure(true).build();
        RecordFilterChain filter = this.getRecordFilter();
        this.settings = new Settings(4, this.getPartitions(4), this.getCodec(), policy, filter);
        descriptors.forEach(item -> this.settings.setConcurrency(item.getId(), item.getMaxThreads()));
        descriptors.forEach(item -> this.settings.setPartitions(item.getId(), this.getPartitions(item.getMaxThreads())));
    }

    protected int getPartitions(int maxThreads) {
        if (maxThreads == 1) {
            return 1;
        }
        return this.getOverProvisioningFactor() * maxThreads;
    }

    @Override
    void activateQueue(WorkQueueDescriptor config) {
        if ("*".equals(config.id)) {
            throw new IllegalArgumentException("cannot activate all queues");
        }
        log.info((Object)("Activated queue " + config.id + " " + config.toString()));
        if (config.isProcessingEnabled()) {
            this.activateQueueMetrics(config.id);
        }
    }

    @Override
    void deactivateQueue(WorkQueueDescriptor config) {
        if ("*".equals(config.id)) {
            throw new IllegalArgumentException("cannot deactivate all queues");
        }
        if (config.isProcessingEnabled()) {
            this.deactivateQueueMetrics(config.id);
        }
        log.info((Object)("Deactivated work queue not supported: " + config.id));
    }

    @Override
    protected void activateQueueMetrics(String queueId) {
        NuxeoMetricSet queueMetrics = new NuxeoMetricSet("nuxeo", new String[]{"works", "total", queueId});
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).scheduled, "scheduled", new String[0]);
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).running, "running", new String[0]);
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).completed, "completed", new String[0]);
        queueMetrics.putGauge(() -> this.getMetricsWithNuxeoClassLoader((String)queueId).canceled, "canceled", new String[0]);
        this.registry.registerAll((MetricSet)queueMetrics);
    }

    @Override
    protected void deactivateQueueMetrics(String queueId) {
        String queueMetricsName = MetricRegistry.name((String)"nuxeo", (String[])new String[]{"works", "total", queueId});
        this.registry.removeMatching((name, metric) -> name.startsWith(queueMetricsName));
    }

    @Override
    public boolean shutdownQueue(String queueId, long timeout, TimeUnit unit) {
        log.warn((Object)"Shutdown a queue is not supported with computation implementation");
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean shutdown(long timeout, TimeUnit timeUnit) {
        log.info((Object)("Shutdown WorkManager in " + timeUnit.toMillis(timeout) + " ms"));
        this.shutdownInProgress = true;
        try {
            long shutdownDelay = Long.parseLong(((ConfigurationService)Framework.getService(ConfigurationService.class)).getProperty("nuxeo.work.shutdown.delay.ms", "0"));
            boolean ret = this.streamProcessor.stop(Duration.ofMillis(Math.max(timeUnit.toMillis(timeout), shutdownDelay)));
            if (!ret) {
                log.error((Object)"Not able to stop worker pool within the timeout.");
            }
            boolean bl = ret;
            return bl;
        }
        finally {
            this.shutdownInProgress = false;
        }
    }

    @Override
    public int getQueueSize(String queueId, Work.State state) {
        switch (state) {
            case SCHEDULED: {
                return this.getMetrics(queueId).getScheduled().intValue();
            }
            case RUNNING: {
                return this.getMetrics(queueId).getRunning().intValue();
            }
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected WorkQueueMetrics getMetricsWithNuxeoClassLoader(String queueId) {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(Framework.class.getClassLoader());
            WorkQueueMetrics workQueueMetrics = this.getMetrics(queueId);
            return workQueueMetrics;
        }
        finally {
            Thread.currentThread().setContextClassLoader(classLoader);
        }
    }

    @Override
    public WorkQueueMetrics getMetrics(String queueId) {
        LogLag lag = this.logManager.getLag(queueId, queueId);
        long running = 0L;
        if (lag.lag() > 0L) {
            running = Math.min(lag.lag(), (long)this.settings.getPartitions(queueId));
        }
        return new WorkQueueMetrics(queueId, lag.lag(), running, lag.lower(), 0);
    }

    @Override
    public boolean awaitCompletion(String queueId, long duration, TimeUnit unit) throws InterruptedException {
        if (queueId != null) {
            return this.awaitCompletionOnQueue(queueId, duration, unit);
        }
        for (Descriptor item : this.getDescriptors("queues")) {
            if (this.awaitCompletionOnQueue(item.getId(), duration, unit)) continue;
            return false;
        }
        return true;
    }

    protected boolean awaitCompletionOnQueue(String queueId, long duration, TimeUnit unit) throws InterruptedException {
        if (!this.isStarted()) {
            return true;
        }
        log.debug((Object)("awaitCompletion " + queueId + " starting"));
        long durationMs = Math.min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1L));
        long deadline = System.currentTimeMillis() + durationMs;
        while (System.currentTimeMillis() < deadline) {
            Thread.sleep(100L);
            int lag = this.getMetrics(queueId).getScheduled().intValue();
            if (lag == 0) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("awaitCompletion for " + queueId + " completed " + this.getMetrics(queueId)));
                }
                return true;
            }
            if (log.isDebugEnabled()) continue;
            log.debug((Object)("awaitCompletion for " + queueId + " not completed " + this.getMetrics(queueId)));
        }
        log.warn((Object)String.format("%s timeout after: %.2fs, %s", queueId, (double)durationMs / 1000.0, this.getMetrics(queueId)));
        return false;
    }

    @Deprecated
    public boolean awaitCompletionWithWaterMark(String queueId, long duration, TimeUnit unit) throws InterruptedException {
        if (!this.isStarted()) {
            return true;
        }
        long durationMs = Math.min(unit.toMillis(duration), TimeUnit.DAYS.toMillis(1L));
        long deadline = System.currentTimeMillis() + durationMs;
        long lowWatermark = this.getLowWaterMark(queueId);
        while (System.currentTimeMillis() < deadline) {
            Thread.sleep(100L);
            long wm = this.getLowWaterMark(queueId);
            if (wm == lowWatermark) {
                log.debug((Object)("awaitCompletion for " + (queueId == null ? "all" : queueId) + " completed " + wm));
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("awaitCompletion low wm  for " + (queueId == null ? "all" : queueId) + ":" + wm + " diff: " + (wm - lowWatermark)));
            }
            lowWatermark = wm;
        }
        log.warn((Object)String.format("%s timeout after: %.2fs", queueId, (double)durationMs / 1000.0));
        return false;
    }

    protected long getLowWaterMark(String queueId) {
        if (queueId != null) {
            return this.streamProcessor.getLowWatermark(queueId);
        }
        return this.streamProcessor.getLowWatermark();
    }

    @Override
    public Work.State getWorkState(String workId) {
        if (!this.storeState) {
            return null;
        }
        return WorkStateHelper.getState(workId);
    }

    @Override
    public Work find(String s, Work.State state) {
        return null;
    }

    @Override
    public List<Work> listWork(String s, Work.State state) {
        return Collections.emptyList();
    }

    @Override
    public List<String> listWorkIds(String s, Work.State state) {
        return Collections.emptyList();
    }

    @Override
    protected boolean scheduleAfterCommit(Work work, WorkManager.Scheduling scheduling) {
        TransactionManager transactionManager;
        try {
            transactionManager = TransactionHelper.lookupTransactionManager();
        }
        catch (NamingException e) {
            transactionManager = null;
        }
        if (transactionManager == null) {
            log.warn((Object)("Not scheduled work after commit because of missing transaction manager: " + work.getId()));
            return false;
        }
        try {
            Transaction transaction = transactionManager.getTransaction();
            if (transaction == null) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Not scheduled work after commit because of missing transaction: " + work.getId()));
                }
                return false;
            }
            int status = transaction.getStatus();
            if (status == 0) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Scheduled after commit: " + work.getId()));
                }
                transaction.registerSynchronization((Synchronization)new WorkScheduling(work, scheduling));
                return true;
            }
            if (status == 3) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Scheduled immediately: " + work.getId()));
                }
                return false;
            }
            if (status == 1) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Cancelling schedule because transaction marked rollback-only: " + work.getId()));
                }
                return true;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("Not scheduling work after commit because transaction is in status " + status + ": " + work.getId()));
            }
            return false;
        }
        catch (RollbackException | SystemException e) {
            log.error((Object)"Cannot schedule after commit", e);
            return false;
        }
    }

    @Override
    public boolean supportsProcessingDisabling() {
        return true;
    }

    public class WorkScheduling
    implements Synchronization {
        public final Work work;
        public final WorkManager.Scheduling scheduling;

        public WorkScheduling(Work work, WorkManager.Scheduling scheduling) {
            this.work = work;
            this.scheduling = scheduling;
        }

        public void beforeCompletion() {
        }

        public void afterCompletion(int status) {
            if (status == 3) {
                StreamWorkManager.this.schedule(this.work, this.scheduling, false);
            } else if (status != 4) {
                throw new IllegalArgumentException("Unsupported transaction status " + status);
            }
        }
    }

    class ComponentListener
    implements ComponentManager.Listener {
        ComponentListener() {
        }

        public void beforeStop(ComponentManager mgr, boolean isStandby) {
            if (!StreamWorkManager.this.shutdown(10L, TimeUnit.SECONDS)) {
                log.error((Object)"Some processors are still active");
            }
        }

        public void afterStart(ComponentManager mgr, boolean isResume) {
            if (StreamWorkManager.this.isProcessingDisabled()) {
                log.warn((Object)"WorkManager processing has been disabled on this node");
                return;
            }
            StreamWorkManager.this.streamProcessor.start();
            for (Descriptor d : StreamWorkManager.this.getDescriptors("queues")) {
                StreamWorkManager.this.activateQueueMetrics(d.getId());
            }
        }

        public void afterStop(ComponentManager mgr, boolean isStandby) {
            Framework.getRuntime().getComponentManager().removeListener((ComponentManager.Listener)this);
            for (Descriptor d : StreamWorkManager.this.getDescriptors("queues")) {
                StreamWorkManager.this.deactivateQueueMetrics(d.getId());
            }
        }
    }
}

