/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.mqueues.automation;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.automation.OperationContext;
import org.nuxeo.ecm.automation.core.annotations.Context;
import org.nuxeo.ecm.automation.core.annotations.Operation;
import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
import org.nuxeo.ecm.automation.core.annotations.Param;
import org.nuxeo.ecm.platform.importer.mqueues.automation.RandomBlobProducers;
import org.nuxeo.ecm.platform.importer.mqueues.chronicle.ChronicleConfig;
import org.nuxeo.ecm.platform.importer.mqueues.kafka.KafkaConfigService;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQManager;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.chronicle.ChronicleMQManager;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.kafka.KafkaMQManager;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.BatchPolicy;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.ConsumerPolicy;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.DocumentConsumerPolicy;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.DocumentConsumerPool;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.consumer.DocumentMessageConsumerFactory;
import org.nuxeo.ecm.platform.importer.mqueues.pattern.message.DocumentMessage;
import org.nuxeo.runtime.api.Framework;

@Operation(id="MQImporter.runDocumentConsumers", category="Services", label="Imports document", since="9.1", description="Import mqueues document into repository.")
public class DocumentConsumers {
    private static final Log log = LogFactory.getLog(DocumentConsumers.class);
    public static final String ID = "MQImporter.runDocumentConsumers";
    @Context
    protected OperationContext ctx;
    @Param(name="nbThreads", required=false)
    protected Integer nbThreads;
    @Param(name="rootFolder")
    protected String rootFolder;
    @Param(name="repositoryName", required=false)
    protected String repositoryName;
    @Param(name="batchSize", required=false)
    protected Integer batchSize = 10;
    @Param(name="batchThresholdS", required=false)
    protected Integer batchThresholdS = 20;
    @Param(name="retryMax", required=false)
    protected Integer retryMax = 3;
    @Param(name="retryDelayS", required=false)
    protected Integer retryDelayS = 2;
    @Param(name="mqName", required=false)
    protected String mqName;
    @Param(name="kafkaConfig", required=false)
    protected String kafkaConfig;
    @Param(name="blockIndexing", required=false)
    protected Boolean blockIndexing = false;
    @Param(name="blockAsyncListeners", required=false)
    protected Boolean blockAsyncListeners = false;
    @Param(name="blockPostCommitListeners", required=false)
    protected Boolean blockPostCommitListeners = false;
    @Param(name="blockDefaultSyncListeners", required=false)
    protected Boolean blockSyncListeners = false;
    @Param(name="useBulkMode", required=false)
    protected Boolean useBulkMode = false;

    @OperationMethod
    public void run() {
        RandomBlobProducers.checkAccess(this.ctx);
        this.repositoryName = this.getRepositoryName();
        ConsumerPolicy consumerPolicy = DocumentConsumerPolicy.builder().blockIndexing(this.blockIndexing).blockAsyncListeners(this.blockAsyncListeners).blockPostCommitListeners(this.blockPostCommitListeners).blockDefaultSyncListener(this.blockSyncListeners).useBulkMode(this.useBulkMode).name(ID).batchPolicy(BatchPolicy.builder().capacity(this.batchSize.intValue()).timeThreshold(Duration.ofSeconds(this.batchThresholdS.intValue())).build()).retryPolicy(new RetryPolicy().withMaxRetries(this.retryMax.intValue()).withDelay((long)this.retryDelayS.intValue(), TimeUnit.SECONDS)).maxThreads(this.getNbThreads()).salted().build();
        log.warn((Object)String.format("Import documents from mqueue: %s into: %s/%s, with policy: %s", new Object[]{this.getMQName(), this.repositoryName, this.rootFolder, (DocumentConsumerPolicy)consumerPolicy}));
        try (MQManager<DocumentMessage> manager = this.getManager();
             DocumentConsumerPool<DocumentMessage> consumers = new DocumentConsumerPool<DocumentMessage>(this.getMQName(), manager, new DocumentMessageConsumerFactory(this.repositoryName, this.rootFolder), consumerPolicy);){
            consumers.start().get();
        }
        catch (Exception e) {
            log.error((Object)e.getMessage(), (Throwable)e);
        }
    }

    private short getNbThreads() {
        if (this.nbThreads != null) {
            return this.nbThreads.shortValue();
        }
        return 0;
    }

    protected String getRepositoryName() {
        if (this.repositoryName != null && !this.repositoryName.isEmpty()) {
            return this.repositoryName;
        }
        return this.ctx.getCoreSession().getRepositoryName();
    }

    protected String getMQName() {
        if (this.mqName != null) {
            return this.mqName;
        }
        return "mq-doc";
    }

    protected MQManager<DocumentMessage> getManager() {
        if (this.kafkaConfig == null || this.kafkaConfig.isEmpty()) {
            return new ChronicleMQManager(ChronicleConfig.getBasePath("import"), ChronicleConfig.getRetentionDuration());
        }
        KafkaConfigService service = (KafkaConfigService)Framework.getService(KafkaConfigService.class);
        return new KafkaMQManager(service.getZkServers(this.kafkaConfig), service.getTopicPrefix(this.kafkaConfig), service.getProducerProperties(this.kafkaConfig), service.getConsumerProperties(this.kafkaConfig));
    }
}

