/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.ecm.platform.importer.base;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.javasimon.SimonManager;
import org.javasimon.Stopwatch;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.CoreSession;
import org.nuxeo.ecm.core.api.DocumentModel;
import org.nuxeo.ecm.core.api.DocumentRef;
import org.nuxeo.ecm.core.api.PathRef;
import org.nuxeo.ecm.core.api.repository.Repository;
import org.nuxeo.ecm.core.api.repository.RepositoryManager;
import org.nuxeo.ecm.platform.importer.base.GenericThreadedImportTask;
import org.nuxeo.ecm.platform.importer.base.ImporterRunner;
import org.nuxeo.ecm.platform.importer.base.ImporterRunnerConfiguration;
import org.nuxeo.ecm.platform.importer.factories.DefaultDocumentModelFactory;
import org.nuxeo.ecm.platform.importer.factories.ImporterDocumentModelFactory;
import org.nuxeo.ecm.platform.importer.filter.ImporterFilter;
import org.nuxeo.ecm.platform.importer.filter.ImportingDocumentFilter;
import org.nuxeo.ecm.platform.importer.listener.ImporterListener;
import org.nuxeo.ecm.platform.importer.listener.JobHistoryListener;
import org.nuxeo.ecm.platform.importer.log.ImporterLogger;
import org.nuxeo.ecm.platform.importer.log.PerfLogger;
import org.nuxeo.ecm.platform.importer.source.SourceNode;
import org.nuxeo.ecm.platform.importer.threading.DefaultMultiThreadingPolicy;
import org.nuxeo.ecm.platform.importer.threading.ImporterThreadingPolicy;
import org.nuxeo.runtime.api.Framework;

public class GenericMultiThreadedImporter
implements ImporterRunner {
    protected static ThreadPoolExecutor importTP;
    protected static Map<String, Long> nbCreatedDocsByThreads;
    protected ImporterThreadingPolicy threadPolicy;
    protected ImporterDocumentModelFactory factory;
    protected SourceNode importSource;
    protected DocumentModel targetContainer;
    protected Integer batchSize = 50;
    protected Integer nbThreads = 5;
    protected ImporterLogger log;
    protected CoreSession session;
    protected String importWritePath;
    protected Boolean skipRootContainerCreation = false;
    protected String jobName;
    protected boolean enablePerfLogging = true;
    protected List<ImporterFilter> filters = new ArrayList<ImporterFilter>();
    protected List<ImporterListener> listeners = new ArrayList<ImporterListener>();
    protected List<ImportingDocumentFilter> importingDocumentFilters = new ArrayList<ImportingDocumentFilter>();
    protected GenericThreadedImportTask rootImportTask;

    public static ThreadPoolExecutor getExecutor() {
        return importTP;
    }

    public static synchronized void addCreatedDoc(String taskId, long nbDocs) {
        String tid = Thread.currentThread().getName();
        nbCreatedDocsByThreads.put(tid + "-" + taskId, nbDocs);
    }

    public static synchronized long getCreatedDocsCounter() {
        long counter = 0L;
        for (String tid : nbCreatedDocsByThreads.keySet()) {
            Long tCounter = nbCreatedDocsByThreads.get(tid);
            if (tCounter == null) continue;
            counter += tCounter.longValue();
        }
        return counter;
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, ImporterLogger log) throws Exception {
        this.importSource = sourceNode;
        this.importWritePath = importWritePath;
        this.log = log;
        if (batchSize != null) {
            this.batchSize = batchSize;
        }
        if (nbThreads != null) {
            this.nbThreads = nbThreads;
        }
        if (skipRootContainerCreation != null) {
            this.skipRootContainerCreation = skipRootContainerCreation;
        }
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, Integer nbThreads, ImporterLogger log) throws Exception {
        this(sourceNode, importWritePath, false, batchSize, nbThreads, log);
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Boolean skipRootContainerCreation, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) throws Exception {
        this(sourceNode, importWritePath, skipRootContainerCreation, batchSize, nbThreads, log);
        this.jobName = jobName;
        if (jobName != null) {
            this.listeners.add(new JobHistoryListener(jobName));
        }
    }

    public GenericMultiThreadedImporter(SourceNode sourceNode, String importWritePath, Integer batchSize, Integer nbThreads, String jobName, ImporterLogger log) throws Exception {
        this(sourceNode, importWritePath, false, batchSize, nbThreads, jobName, log);
    }

    public GenericMultiThreadedImporter(ImporterRunnerConfiguration configuration) throws Exception {
        this(configuration.sourceNode, configuration.importWritePath, configuration.skipRootContainerCreation, configuration.batchSize, configuration.nbThreads, configuration.jobName, configuration.log);
    }

    public void addFilter(ImporterFilter filter) {
        this.log.debug(String.format("Filter with %s, was added on the importer with the hash code %s. The source node name is %s", filter.toString(), this.hashCode(), this.importSource.getName()));
        this.filters.add(filter);
    }

    public void addListeners(ImporterListener ... listeners) {
        this.addListeners(Arrays.asList(listeners));
    }

    public void addListeners(Collection<ImporterListener> listeners) {
        this.listeners.addAll(listeners);
    }

    public void addImportingDocumentFilters(ImportingDocumentFilter ... importingDocumentFilters) {
        this.addImportingDocumentFilters(Arrays.asList(importingDocumentFilters));
    }

    public void addImportingDocumentFilters(Collection<ImportingDocumentFilter> importingDocumentFilters) {
        this.importingDocumentFilters.addAll(importingDocumentFilters);
    }

    protected CoreSession getCoreSession() throws Exception {
        if (this.session == null) {
            RepositoryManager rm = (RepositoryManager)Framework.getService(RepositoryManager.class);
            Repository repo = rm.getDefaultRepository();
            this.session = repo.open();
        }
        return this.session;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LoginContext lc = null;
        Exception finalException = null;
        try {
            lc = Framework.login();
            for (ImporterFilter filter : this.filters) {
                this.log.debug(String.format("Running filter with %s, on the importer with the hash code %s. The source node name is %s", filter.toString(), this.hashCode(), this.importSource.getName()));
                filter.handleBeforeImport();
            }
            if (this.filters.size() == 0) {
                this.log.debug(String.format("No filters are registered on the importer with hash code %s, while importing the source node with name ", this.hashCode(), this.importSource.getName()));
            }
            this.doRun();
        }
        catch (Exception e) {
            this.log.error("Task exec failed", e);
            finalException = e;
        }
        finally {
            for (ImporterFilter filter : this.filters) {
                filter.handleAfterImport(finalException);
            }
            if (this.session != null) {
                CoreInstance.getInstance().close(this.session);
                this.session = null;
            }
            if (lc != null) {
                try {
                    lc.logout();
                }
                catch (LoginException e) {
                    this.log.error("Error during logout", e);
                }
            }
        }
    }

    public void setRootImportTask(GenericThreadedImportTask rootImportTask) {
        this.rootImportTask = rootImportTask;
    }

    protected GenericThreadedImportTask initRootTask(SourceNode importSource, DocumentModel targetContainer, boolean skipRootContainerCreation, ImporterLogger log, Integer batchSize, String jobName) throws Exception {
        if (this.rootImportTask == null) {
            this.setRootImportTask(new GenericThreadedImportTask(null, importSource, targetContainer, skipRootContainerCreation, log, batchSize, this.getFactory(), this.getThreadPolicy(), jobName));
        } else {
            this.rootImportTask.setInputSource(importSource);
            this.rootImportTask.setTargetFolder(targetContainer);
            this.rootImportTask.setSkipContainerCreation(skipRootContainerCreation);
            this.rootImportTask.setRsLogger(log);
            this.rootImportTask.setFactory(this.getFactory());
            this.rootImportTask.setThreadPolicy(this.getThreadPolicy());
            this.rootImportTask.setJobName(jobName);
            this.rootImportTask.setBatchSize(batchSize);
        }
        this.rootImportTask.addListeners(this.listeners);
        this.rootImportTask.addImportingDocumentFilters(this.importingDocumentFilters);
        return this.rootImportTask;
    }

    protected void doRun() throws Exception {
        this.targetContainer = this.getTargetContainer();
        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
        importTP = new ThreadPoolExecutor(this.nbThreads, this.nbThreads, 500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100));
        this.initRootTask(this.importSource, this.targetContainer, this.skipRootContainerCreation, this.log, this.batchSize, this.jobName);
        this.rootImportTask.setRootTask();
        long t0 = System.currentTimeMillis();
        this.notifyBeforeImport();
        importTP.execute(this.rootImportTask);
        Thread.sleep(200L);
        int activeTasks = importTP.getActiveCount();
        int oldActiveTasks = 0;
        long lastLogProgressTime = System.currentTimeMillis();
        long lastCreatedDocCounter = 0L;
        String[] headers = new String[]{"nbDocs", "average", "imediate"};
        PerfLogger perfLogger = new PerfLogger(headers);
        while (activeTasks > 0) {
            double averageSpeed;
            long ti;
            Thread.sleep(500L);
            activeTasks = importTP.getActiveCount();
            boolean logProgress = false;
            if (oldActiveTasks != activeTasks) {
                oldActiveTasks = activeTasks;
                this.log.debug("currently " + activeTasks + " active import Threads");
                logProgress = true;
            }
            if ((ti = System.currentTimeMillis()) - lastLogProgressTime > 5000L) {
                logProgress = true;
            }
            if (!logProgress) continue;
            long inbCreatedDocs = GenericMultiThreadedImporter.getCreatedDocsCounter();
            long deltaT = ti - lastLogProgressTime;
            double imediateSpeed = averageSpeed = (double)(1000.0f * ((float)inbCreatedDocs / (float)(ti - t0)));
            if (deltaT > 0L) {
                imediateSpeed = 1000.0f * ((float)(inbCreatedDocs - lastCreatedDocCounter) / (float)deltaT);
            }
            this.log.info(inbCreatedDocs + " docs created");
            this.log.info("average speed = " + averageSpeed + " docs/s");
            this.log.info("immediate speed = " + imediateSpeed + " docs/s");
            if (this.enablePerfLogging) {
                Double[] perfData = new Double[]{new Double(inbCreatedDocs), averageSpeed, imediateSpeed};
                perfLogger.log(perfData);
            }
            lastLogProgressTime = ti;
            lastCreatedDocCounter = inbCreatedDocs;
        }
        this.log.info("All Threads terminated");
        perfLogger.release();
        this.notifyAfterImport();
        long t1 = System.currentTimeMillis();
        long nbCreatedDocs = GenericMultiThreadedImporter.getCreatedDocsCounter();
        this.log.info(nbCreatedDocs + " docs created");
        this.log.info(1000.0f * ((float)nbCreatedDocs / (float)(t1 - t0)) + " docs/s");
        for (String k : nbCreatedDocsByThreads.keySet()) {
            this.log.info(k + " --> " + nbCreatedDocsByThreads.get(k));
        }
        for (String name : SimonManager.simonNames()) {
            Stopwatch stopwatch;
            if (name == null || name.isEmpty() || !name.startsWith("org.nuxeo.ecm.platform.importer") || (stopwatch = SimonManager.getStopwatch((String)name)).getCounter() <= 0L) continue;
            this.log.info(stopwatch.toString());
        }
    }

    protected DocumentModel getTargetContainer() throws Exception {
        if (this.targetContainer == null) {
            this.targetContainer = this.createTargetContainer();
        }
        return this.targetContainer;
    }

    protected DocumentModel createTargetContainer() throws Exception {
        try {
            return this.getCoreSession().getDocument((DocumentRef)new PathRef(this.importWritePath));
        }
        catch (Exception e) {
            this.log.error(e.getMessage());
            throw new Exception(e);
        }
    }

    public ImporterThreadingPolicy getThreadPolicy() {
        if (this.threadPolicy == null) {
            this.threadPolicy = new DefaultMultiThreadingPolicy();
        }
        return this.threadPolicy;
    }

    public void setThreadPolicy(ImporterThreadingPolicy threadPolicy) {
        this.threadPolicy = threadPolicy;
    }

    public ImporterDocumentModelFactory getFactory() {
        if (this.factory == null) {
            this.factory = new DefaultDocumentModelFactory();
        }
        return this.factory;
    }

    public void setFactory(ImporterDocumentModelFactory factory) {
        this.factory = factory;
    }

    public void setEnablePerfLogging(boolean enablePerfLogging) {
        this.enablePerfLogging = enablePerfLogging;
    }

    @Override
    public void stopImportProcrocess() {
        if (importTP != null && !importTP.isTerminated() && !importTP.isTerminating()) {
            importTP.shutdownNow();
        }
    }

    protected void notifyBeforeImport() throws Exception {
        for (ImporterListener listener : this.listeners) {
            listener.beforeImport();
        }
    }

    protected void notifyAfterImport() throws Exception {
        for (ImporterListener listener : this.listeners) {
            listener.afterImport();
        }
    }

    static {
        nbCreatedDocsByThreads = new ConcurrentHashMap<String, Long>();
    }
}

