/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.core.bulk.action.computation;

import com.google.common.collect.Lists;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModelList;
import org.nuxeo.ecm.core.api.DocumentNotFoundException;
import org.nuxeo.ecm.core.api.DocumentRef;
import org.nuxeo.ecm.core.api.IdRef;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.impl.DocumentModelListImpl;
import org.nuxeo.ecm.core.api.model.PropertyConversionException;
import org.nuxeo.ecm.core.bulk.BulkCodecs;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.message.BulkBucket;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.ComputationMetadata;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.api.login.NuxeoLoginContext;
import org.nuxeo.runtime.transaction.TransactionHelper;

public abstract class AbstractBulkComputation
extends AbstractComputation {
    private static final Logger log = LogManager.getLogger(AbstractBulkComputation.class);
    protected static final String SELECT_DOCUMENTS_IN = "SELECT * FROM Document, Relation WHERE ecm:uuid IN ('%s')";
    protected Map<String, BulkCommand> commands = new PassiveExpiringMap(60L, TimeUnit.SECONDS);
    protected BulkCommand command;
    protected BulkStatus delta;

    public AbstractBulkComputation(String name) {
        this(name, 1);
    }

    public AbstractBulkComputation(String name, int nbOutputStreams) {
        super(name, 1, nbOutputStreams);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processRecord(ComputationContext context, String inputStreamName, Record record) {
        BulkBucket bucket = (BulkBucket)BulkCodecs.getBucketCodec().decode(record.getData());
        this.command = this.getCommand(bucket.getCommandId());
        if (this.command != null) {
            this.delta = BulkStatus.deltaOf(this.command.getId());
            this.delta.setProcessingStartTime(Instant.now());
            this.delta.setProcessed(bucket.getIds().size());
            this.startBucket(record.getKey());
            try {
                for (List batch : Lists.partition(bucket.getIds(), (int)this.command.getBatchSize())) {
                    this.processBatchOfDocuments(batch);
                }
            }
            finally {
                this.delta.setProcessingEndTime(Instant.now());
            }
            this.endBucket(context, this.delta);
        } else if (this.isAbortedCommand(bucket.getCommandId())) {
            log.debug("Skipping aborted command: {}", (Object)bucket.getCommandId());
        } else {
            log.warn("Skipping unknown command: {}, offset: {}.", (Object)bucket.getCommandId(), (Object)context.getLastOffset());
        }
        context.askForCheckpoint();
    }

    protected boolean isAbortedCommand(String commandId) {
        BulkService bulkService = (BulkService)Framework.getService(BulkService.class);
        BulkStatus status = (BulkStatus)bulkService.getStatus((Serializable)((Object)commandId));
        return BulkStatus.State.ABORTED.equals((Object)status.getState());
    }

    protected BulkCommand getCommand(String commandId) {
        this.commands.size();
        return this.commands.computeIfAbsent(commandId, id -> ((BulkService)Framework.getService(BulkService.class)).getCommand((String)id));
    }

    public BulkCommand getCurrentCommand() {
        return this.command;
    }

    protected void processBatchOfDocuments(List<String> batch) {
        if (batch == null || batch.isEmpty()) {
            return;
        }
        int timeout = (int)Objects.requireNonNullElse(this.getBatchTransactionTimeout(), Duration.ZERO).toSeconds();
        Supplier[] supplierArray = new Supplier[3];
        supplierArray[0] = () -> ((ComputationMetadata)this.metadata).name();
        supplierArray[1] = () -> timeout;
        supplierArray[2] = () -> batch;
        log.debug("The computation: {} is about to start a transaction with timeout: {}s to process the batch: {}", supplierArray);
        TransactionHelper.runInTransaction((int)timeout, () -> {
            try {
                String username = this.command.getUsername();
                String repository = this.command.getRepository();
                try (NuxeoLoginContext ignored = this.loginSystemOrUser(username);){
                    CoreSession session = repository == null ? null : CoreInstance.getCoreSession((String)repository);
                    this.compute(session, batch, this.command.getParams());
                }
            }
            catch (LoginException e) {
                throw new NuxeoException((Throwable)e);
            }
        });
    }

    protected Duration getBatchTransactionTimeout() {
        return this.command.getBatchTransactionTimeout();
    }

    protected NuxeoLoginContext loginSystemOrUser(String username) throws LoginException {
        return "system".equals(username) ? Framework.loginSystem() : Framework.loginUser((String)username);
    }

    public void startBucket(String bucketKey) {
    }

    public void endBucket(ComputationContext context, BulkStatus delta) {
        AbstractBulkComputation.updateStatus(context, delta);
    }

    public void processFailure(ComputationContext context, Throwable failure) {
        int status = 500;
        if (failure instanceof NuxeoException) {
            status = ((NuxeoException)failure).getStatusCode();
        }
        log.error(String.format("Action: %s fails on record: %s after retries.", this.metadata.name(), context.getLastOffset()), failure);
        this.delta.inError(this.metadata.name() + " fails on " + context.getLastOffset() + ": " + failure.getMessage(), status);
        this.endBucket(context, this.delta);
    }

    public static void updateStatus(ComputationContext context, BulkStatus delta) {
        context.produceRecord("o1", delta.getId(), BulkCodecs.getStatusCodec().encode((Object)delta));
    }

    protected abstract void compute(CoreSession var1, List<String> var2, Map<String, Serializable> var3);

    public DocumentModelList loadDocuments(CoreSession session, List<String> documentIds) {
        if (documentIds == null || documentIds.isEmpty()) {
            return new DocumentModelListImpl(0);
        }
        try {
            DocumentModelList ret = session.query(String.format(SELECT_DOCUMENTS_IN, String.join((CharSequence)"', '", documentIds)));
            if (log.isDebugEnabled() && ret.size() < documentIds.size()) {
                ArrayList<String> notFound = new ArrayList<String>(documentIds);
                ret.forEach(doc -> notFound.remove(doc.getId()));
                log.debug("Some documents are not accessible: " + notFound);
            }
            return ret;
        }
        catch (DocumentNotFoundException | PropertyConversionException e) {
            log.warn("Fail to loadDocuments on bulk command: {}, because of: {}, retrying without batching", (Object)this.command.getId(), (Object)e.getMessage());
            return this.loadDocumentsOneByOne(session, documentIds);
        }
    }

    public DocumentModelList loadDocumentsOneByOne(CoreSession session, List<String> documentIds) {
        DocumentModelListImpl ret = new DocumentModelListImpl(documentIds.size());
        for (String documentId : documentIds) {
            try {
                ret.add((Object)session.getDocument((DocumentRef)new IdRef(documentId)));
            }
            catch (DocumentNotFoundException | PropertyConversionException e) {
                String message = "Skipping corrupted doc: " + documentId + ", because of: " + e.getMessage();
                log.error(message);
                log.debug(message, e);
            }
        }
        return ret;
    }
}

