/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.core.bulk;

import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
import java.io.Externalizable;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.repository.RepositoryManager;
import org.nuxeo.ecm.core.api.scroll.ScrollRequest;
import org.nuxeo.ecm.core.api.scroll.ScrollService;
import org.nuxeo.ecm.core.bulk.BulkActionValidation;
import org.nuxeo.ecm.core.bulk.BulkAdminService;
import org.nuxeo.ecm.core.bulk.BulkCodecs;
import org.nuxeo.ecm.core.bulk.BulkService;
import org.nuxeo.ecm.core.bulk.message.BulkBucket;
import org.nuxeo.ecm.core.bulk.message.BulkCommand;
import org.nuxeo.ecm.core.bulk.message.BulkStatus;
import org.nuxeo.ecm.core.scroll.DocumentScrollRequest;
import org.nuxeo.ecm.core.scroll.GenericScrollRequest;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.kv.KeyValueStore;
import org.nuxeo.runtime.kv.KeyValueStoreProvider;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.transaction.TransactionHelper;

public class BulkServiceImpl
implements BulkService,
Synchronization {
    private static final Logger log = LogManager.getLogger(BulkServiceImpl.class);
    @Deprecated
    public static final String BULK_LOG_MANAGER_NAME = "bulk";
    public static final String BULK_KV_STORE_NAME = "bulk";
    public static final String COMMAND_STREAM = "bulk/command";
    public static final Name COMMAND_STREAM_NAME = Name.ofUrn((String)"bulk/command");
    public static final String STATUS_STREAM = "bulk/status";
    public static final Name STATUS_STREAM_NAME = Name.ofUrn((String)"bulk/status");
    public static final String DONE_STREAM = "bulk/done";
    public static final Name DONE_STREAM_NAME = Name.ofUrn((String)"bulk/done");
    public static final String COMMAND_PREFIX = "command:";
    @Deprecated(since="11.4")
    public static final String RECORD_CODEC = "avro";
    public static final String STATUS_PREFIX = "status:";
    public static final String EXCLUSIVE_PREFIX = "exclusive:";
    public static final String PRODUCE_IMMEDIATE_OPTION = "produceImmediate";
    public static final long COMPLETED_TTL_SECONDS = 3600L;
    public static final long ABORTED_TTL_SECONDS = 43200L;
    public static final long COMPLETED_IN_ERROR_TTL_SECONDS = 86400L;
    protected static final long EXCLUSIVE_TTL_SECONDS = 86400L;
    protected final AtomicLong externalScrollerCounter = new AtomicLong();
    protected final Map<String, BulkCommand> externalCommands = new PassiveExpiringMap(60L, TimeUnit.SECONDS);
    protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> Boolean.FALSE);
    protected static final ThreadLocal<List<BulkCommand>> transactionCommands = ThreadLocal.withInitial(ArrayList::new);

    @Override
    public String submit(BulkCommand command) {
        String actionScroller;
        RepositoryManager repoManager;
        log.debug("Run action with command={}", (Object)command);
        BulkAdminService adminService = (BulkAdminService)Framework.getService(BulkAdminService.class);
        if (!adminService.getActions().contains(command.getAction())) {
            throw new IllegalArgumentException("Unknown action for command: " + command);
        }
        BulkActionValidation actionValidation = adminService.getActionValidation(command.getAction());
        if (actionValidation != null) {
            actionValidation.validate(command);
        }
        if ((repoManager = (RepositoryManager)Framework.getService(RepositoryManager.class)) != null) {
            if (StringUtils.isEmpty((CharSequence)command.getRepository())) {
                command.setRepository(repoManager.getDefaultRepositoryName());
            } else if (repoManager.getRepository(command.getRepository()) == null) {
                throw new IllegalArgumentException("Unknown repository: " + command);
            }
        }
        if (command.getBucketSize() == 0 || command.getBatchSize() == 0) {
            if (command.getBucketSize() == 0) {
                command.setBucketSize(adminService.getBucketSize(command.getAction()));
            }
            if (command.getBatchSize() == 0) {
                command.setBatchSize(adminService.getBatchSize(command.getAction()));
            }
        }
        if (Duration.ZERO.equals(command.getBatchTransactionTimeout())) {
            command.setBatchTransactionTimeout(adminService.getBatchTransactionTimeout(command.getAction()));
        }
        if (command.getQueryLimit() == null) {
            command.setQueryLimit(adminService.getQueryLimit(command.getAction()));
        }
        if (command.getScroller() == null && !command.useExternalScroller() && !StringUtils.isBlank((CharSequence)(actionScroller = adminService.getDefaultScroller(command.getAction())))) {
            command.setScroller(actionScroller);
        }
        this.checkIfScrollerExists(command);
        if (command.getExclusive() != null ? command.getExclusive() != false : adminService.isExclusive(command.getAction())) {
            this.setExclusive(command);
        }
        BulkStatus status = new BulkStatus(command.getId());
        status.setState(BulkStatus.State.SCHEDULED);
        status.setAction(command.getAction());
        status.setUsername(command.getUsername());
        status.setSubmitTime(Instant.now());
        this.setStatus(status);
        byte[] commandAsBytes = this.setCommand(command);
        String shardKey = (command.getSequentialScroll() != null ? command.getSequentialScroll() != false : adminService.isSequentialScroll(command.getAction())) ? command.getAction() : command.getId();
        log.debug("Submit action with command: {}", (Object)command);
        Span span = Tracing.getTracer().getCurrentSpan();
        HashMap<String, AttributeValue> map = new HashMap<String, AttributeValue>();
        map.put("commandId", AttributeValue.stringAttributeValue((String)command.getId()));
        map.put("action", AttributeValue.stringAttributeValue((String)command.getAction()));
        map.put("nxql", AttributeValue.stringAttributeValue((String)command.getQuery()));
        span.addAnnotation("BulkService#submit", map);
        return this.submit(shardKey, command.getId(), commandAsBytes);
    }

    protected void checkIfScrollerExists(BulkCommand command) {
        ScrollService scrollService = (ScrollService)Framework.getService(ScrollService.class);
        if (!command.useExternalScroller()) {
            if (command.useGenericScroller()) {
                if (!scrollService.exists((ScrollRequest)GenericScrollRequest.builder(command.getScroller(), command.getQuery()).build())) {
                    throw new IllegalArgumentException("Unknown Generic Scroller for command: " + command);
                }
            } else if (!scrollService.exists((ScrollRequest)DocumentScrollRequest.builder(command.getQuery()).name(command.getScroller()).build())) {
                throw new IllegalArgumentException("Unknown Document Scroller for command: " + command);
            }
        }
    }

    protected String submit(String shardKey, String key, byte[] bytes) {
        org.nuxeo.lib.stream.log.LogManager logManager = ((StreamService)Framework.getService(StreamService.class)).getLogManager();
        LogAppender logAppender = logManager.getAppender(COMMAND_STREAM_NAME);
        Record record = Record.of((String)key, (byte[])bytes);
        log.debug("Append shardKey: {}, record: {}", (Object)shardKey, (Object)record);
        logAppender.append(shardKey, (Externalizable)record);
        return key;
    }

    public BulkStatus getStatus(String commandId) {
        KeyValueStore keyValueStore = this.getKvStore();
        byte[] statusAsBytes = keyValueStore.get(STATUS_PREFIX + commandId);
        if (statusAsBytes == null) {
            log.debug("Request status of unknown command: {}", (Object)commandId);
            return BulkStatus.unknownOf(commandId);
        }
        return (BulkStatus)BulkCodecs.getStatusCodec().decode(statusAsBytes);
    }

    protected void setExclusive(BulkCommand command) {
        BulkStatus.State state;
        String key;
        KeyValueStore kv = this.getKvStore();
        String existingCommand = kv.getString(key = EXCLUSIVE_PREFIX + command.getAction() + ":" + command.getRepository());
        if (existingCommand != null && (state = this.getStatus(existingCommand).getState()) != BulkStatus.State.UNKNOWN && state != BulkStatus.State.COMPLETED && state != BulkStatus.State.ABORTED) {
            throw new IllegalStateException(String.format("Bulk Action: %s is exclusive, command: %s is already running", command.getAction(), existingCommand));
        }
        if (!kv.compareAndSet(key, existingCommand, command.getId(), 86400L)) {
            existingCommand = kv.getString(key);
            throw new IllegalStateException(String.format("Bulk Action: %s is exclusive, command: %s is already running", command.getAction(), existingCommand));
        }
    }

    public byte[] setStatus(BulkStatus status) {
        KeyValueStore kvStore = this.getKvStore();
        byte[] statusAsBytes = BulkCodecs.getStatusCodec().encode((Object)status);
        switch (status.getState()) {
            case ABORTED: {
                kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, 43200L);
                kvStore.put(COMMAND_PREFIX + status.getId(), (String)null);
                break;
            }
            case COMPLETED: {
                long ttl = status.hasError() ? 86400L : 3600L;
                kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes, ttl);
                kvStore.setTTL(COMMAND_PREFIX + status.getId(), ttl);
                break;
            }
            default: {
                kvStore.put(STATUS_PREFIX + status.getId(), statusAsBytes);
            }
        }
        return statusAsBytes;
    }

    @Override
    public BulkCommand getCommand(String commandId) {
        KeyValueStore keyValueStore = this.getKvStore();
        byte[] statusAsBytes = keyValueStore.get(COMMAND_PREFIX + commandId);
        if (statusAsBytes == null) {
            return null;
        }
        return (BulkCommand)BulkCodecs.getCommandCodec().decode(statusAsBytes);
    }

    public BulkStatus abort(String commandId) {
        BulkStatus status = this.getStatus(commandId);
        switch (status.getState()) {
            case COMPLETED: {
                log.debug("Cannot abort a completed command: {}", (Object)commandId);
                return status;
            }
            case ABORTED: {
                log.debug("Command: {} already aborted", (Object)commandId);
                return status;
            }
            case UNKNOWN: {
                log.debug("Unknown command: {}", (Object)commandId);
                return status;
            }
        }
        log.warn("Aborting command: {}, status: {}", (Object)commandId, (Object)status);
        status.setState(BulkStatus.State.ABORTED);
        this.setStatus(status);
        BulkStatus delta = BulkStatus.deltaOf(commandId);
        delta.setCompletedTime(Instant.now());
        delta.setState(BulkStatus.State.ABORTED);
        Record record = Record.of((String)commandId, (byte[])BulkCodecs.getStatusCodec().encode((Object)delta));
        ((StreamService)Framework.getService(StreamService.class)).getStreamManager().append(STATUS_STREAM, record);
        return status;
    }

    public Map<String, Serializable> getResult(String commandId) {
        return this.getStatus(commandId).getResult();
    }

    public byte[] setCommand(BulkCommand command) {
        KeyValueStore kvStore = this.getKvStore();
        byte[] commandAsBytes = BulkCodecs.getCommandCodec().encode((Object)command);
        kvStore.put(COMMAND_PREFIX + command.getId(), commandAsBytes);
        return commandAsBytes;
    }

    @Override
    public boolean await(String commandId, Duration duration) throws InterruptedException {
        long deadline = System.currentTimeMillis() + duration.toMillis();
        do {
            BulkStatus status = this.getStatus(commandId);
            switch (status.getState()) {
                case ABORTED: 
                case COMPLETED: {
                    return true;
                }
                case UNKNOWN: {
                    log.error("Unknown status for command: {}", (Object)commandId);
                    return false;
                }
            }
            Thread.sleep(100L);
        } while (deadline > System.currentTimeMillis());
        Supplier[] supplierArray = new Supplier[2];
        supplierArray[0] = () -> this.getStatus(commandId);
        supplierArray[1] = duration::toMillis;
        log.debug("await timeout on: {} after: {} ms", supplierArray);
        return false;
    }

    public KeyValueStore getKvStore() {
        return ((KeyValueService)Framework.getService(KeyValueService.class)).getKeyValueStore("bulk");
    }

    @Override
    public boolean await(Duration duration) throws InterruptedException {
        KeyValueStoreProvider kv = (KeyValueStoreProvider)this.getKvStore();
        Set commandIds = kv.keyStream(STATUS_PREFIX).map(k -> k.replaceFirst(STATUS_PREFIX, "")).collect(Collectors.toSet());
        log.debug("Wait for command ids: {}", commandIds);
        long deadline = System.nanoTime() + duration.toNanos();
        block0: for (String commandId : commandIds) {
            log.debug("Wait for command id: {}", (Object)commandId);
            while (true) {
                BulkStatus status = this.getStatus(commandId);
                log.debug("Status of command: {} = {}", (Object)commandId, (Object)status);
                BulkStatus.State state = status.getState();
                log.debug("State of command: {} = {}", (Object)commandId, (Object)state);
                if (state == BulkStatus.State.COMPLETED || state == BulkStatus.State.ABORTED || state == BulkStatus.State.UNKNOWN) continue block0;
                if (deadline < System.nanoTime()) {
                    log.warn("await timeout on BulkService, at least one uncompleted command: {}", (Object)status);
                    return false;
                }
                Thread.sleep(200L);
            }
        }
        return true;
    }

    @Override
    public List<BulkStatus> getStatuses(String username) {
        KeyValueStoreProvider kv = (KeyValueStoreProvider)this.getKvStore();
        return kv.keyStream(STATUS_PREFIX).map(arg_0 -> ((KeyValueStoreProvider)kv).get(arg_0)).map(arg_0 -> BulkCodecs.getStatusCodec().decode(arg_0)).filter(status -> username.equals(status.getUsername())).collect(Collectors.toList());
    }

    @Override
    public void appendExternalBucket(BulkBucket bucket) {
        String commandId = bucket.getCommandId();
        BulkCommand command = this.externalCommands.computeIfAbsent(commandId, this::getCommand);
        String stream = ((BulkAdminService)Framework.getService(BulkAdminService.class)).getInputStream(command.getAction());
        String key = commandId + ":" + this.externalScrollerCounter.incrementAndGet();
        Record record = Record.of((String)key, (byte[])BulkCodecs.getBucketCodec().encode((Object)bucket));
        log.debug("Append key: {}, record: {}", (Object)key, (Object)record);
        ((StreamService)Framework.getService(StreamService.class)).getStreamManager().append(stream, record);
    }

    @Override
    public void completeExternalScroll(String commandId, long count) {
        BulkStatus delta = BulkStatus.deltaOf(commandId);
        delta.setState(BulkStatus.State.RUNNING);
        delta.setScrollEndTime(Instant.now());
        delta.setTotal(count);
        Record record = Record.of((String)commandId, (byte[])BulkCodecs.getStatusCodec().encode((Object)delta));
        log.debug("Complete external scroll with key: {}, count: {}, record: {}", (Object)commandId, (Object)count, (Object)record);
        ((StreamService)Framework.getService(StreamService.class)).getStreamManager().append(STATUS_STREAM, record);
    }

    @Override
    public String submitTransactional(BulkCommand command) {
        if (!Boolean.TRUE.equals(isEnlisted.get())) {
            log.debug("Enlisting into transaction");
            try {
                if (!this.registerSynchronization(this)) {
                    log.debug("No active transaction, submit command immediately");
                    return this.submit(command);
                }
            }
            catch (RollbackException e) {
                log.info("Transaction is marked for rollback, bulk command will not be executed", (Throwable)e);
                return command.getId();
            }
            isEnlisted.set(true);
        }
        transactionCommands.get().add(command);
        return command.getId();
    }

    protected boolean registerSynchronization(Synchronization sync) throws RollbackException {
        try {
            TransactionManager tm = TransactionHelper.lookupTransactionManager();
            if (tm != null) {
                if (tm.getTransaction() != null) {
                    tm.getTransaction().registerSynchronization(sync);
                    return true;
                }
                return false;
            }
            throw new NuxeoException("Unable to register synchronization: no transaction manager");
        }
        catch (RollbackException e) {
            throw e;
        }
        catch (IllegalStateException | NamingException | SystemException e) {
            throw new NuxeoException("Unable to register synchronization", e);
        }
    }

    public void beforeCompletion() {
        log.debug("Before completion");
    }

    public void afterCompletion(int status) {
        try {
            if (3 == status) {
                log.debug("Submitting bulk commands after commit");
                transactionCommands.get().forEach(this::submit);
            } else {
                log.info("Skip bulk commands, transaction status is not committed: {}", (Object)status);
            }
        }
        finally {
            isEnlisted.set(false);
            transactionCommands.get().clear();
        }
    }
}

