/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.mqueues.importer.automation;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.automation.OperationContext;
import org.nuxeo.ecm.automation.core.annotations.Context;
import org.nuxeo.ecm.automation.core.annotations.Operation;
import org.nuxeo.ecm.automation.core.annotations.OperationMethod;
import org.nuxeo.ecm.automation.core.annotations.Param;
import org.nuxeo.ecm.platform.mqueues.MQService;
import org.nuxeo.ecm.platform.mqueues.importer.automation.RandomBlobProducers;
import org.nuxeo.ecm.platform.mqueues.importer.consumer.BlobInfoWriter;
import org.nuxeo.ecm.platform.mqueues.importer.consumer.BlobMessageConsumerFactory;
import org.nuxeo.ecm.platform.mqueues.importer.consumer.MQBlobInfoWriter;
import org.nuxeo.ecm.platform.mqueues.importer.message.BlobInfoMessage;
import org.nuxeo.lib.core.mqueues.mqueues.MQAppender;
import org.nuxeo.lib.core.mqueues.mqueues.MQManager;
import org.nuxeo.lib.core.mqueues.pattern.consumer.BatchPolicy;
import org.nuxeo.lib.core.mqueues.pattern.consumer.ConsumerFactory;
import org.nuxeo.lib.core.mqueues.pattern.consumer.ConsumerPolicy;
import org.nuxeo.lib.core.mqueues.pattern.consumer.ConsumerPool;
import org.nuxeo.runtime.api.Framework;

@Operation(id="MQImporter.runBlobConsumers", category="Services", label="Import blobs", since="9.1", description="Import mqueues blob into the binarystore.")
public class BlobConsumers {
    private static final Log log = LogFactory.getLog(BlobConsumers.class);
    public static final String ID = "MQImporter.runBlobConsumers";
    public static final String DEFAULT_MQ_BLOB_INFO_NAME = "mq-blob-info";
    public static final String DEFAULT_MQ_CONFIG = "import";
    @Context
    protected OperationContext ctx;
    @Param(name="nbThreads", required=false)
    protected Integer nbThreads;
    @Param(name="blobProviderName", required=false)
    protected String blobProviderName = "default";
    @Param(name="batchSize", required=false)
    protected Integer batchSize = 10;
    @Param(name="batchThresholdS", required=false)
    protected Integer batchThresholdS = 20;
    @Param(name="retryMax", required=false)
    protected Integer retryMax = 3;
    @Param(name="retryDelayS", required=false)
    protected Integer retryDelayS = 2;
    @Param(name="mqName", required=false)
    protected String mqName;
    @Param(name="mqBlobInfo", required=false)
    protected String mqBlobInfoName;
    @Param(name="mqConfig", required=false)
    protected String mqConfig;

    @OperationMethod
    public void run() {
        RandomBlobProducers.checkAccess(this.ctx);
        ConsumerPolicy consumerPolicy = ConsumerPolicy.builder().name(ID).batchPolicy(BatchPolicy.builder().capacity(this.batchSize.intValue()).timeThreshold(Duration.ofSeconds(this.batchThresholdS.intValue())).build()).retryPolicy(new RetryPolicy().withMaxRetries(this.retryMax.intValue()).withDelay((long)this.retryDelayS.intValue(), TimeUnit.SECONDS)).maxThreads(this.getNbThreads()).build();
        MQService service = (MQService)Framework.getService(MQService.class);
        MQManager manager = service.getManager(this.getMQConfig());
        try (BlobInfoWriter blobInfoWriter = this.getBlobInfoWriter(manager);){
            ConsumerPool consumers = new ConsumerPool(this.getMQName(), manager, (ConsumerFactory)new BlobMessageConsumerFactory(this.blobProviderName, blobInfoWriter), consumerPolicy);
            consumers.start().get();
        }
        catch (Exception e) {
            log.error((Object)e.getMessage(), (Throwable)e);
        }
    }

    protected BlobInfoWriter getBlobInfoWriter(MQManager managerBlobInfo) {
        this.initBlobInfoMQ(managerBlobInfo);
        return new MQBlobInfoWriter((MQAppender<BlobInfoMessage>)managerBlobInfo.getAppender(this.getMQBlobInfoName()));
    }

    protected void initBlobInfoMQ(MQManager manager) {
        manager.createIfNotExists(this.getMQBlobInfoName(), 1);
    }

    protected short getNbThreads() {
        if (this.nbThreads != null) {
            return this.nbThreads.shortValue();
        }
        return 0;
    }

    protected String getMQName() {
        if (this.mqName != null) {
            return this.mqName;
        }
        return "mq-blob";
    }

    protected String getMQBlobInfoName() {
        if (this.mqBlobInfoName != null) {
            return this.mqBlobInfoName;
        }
        return DEFAULT_MQ_BLOB_INFO_NAME;
    }

    protected String getMQConfig() {
        if (this.mqConfig != null) {
            return this.mqConfig;
        }
        return DEFAULT_MQ_CONFIG;
    }
}

