/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.core.mqueues.pattern.producer.internals;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.core.mqueues.mqueues.MQAppender;
import org.nuxeo.lib.core.mqueues.pattern.Message;
import org.nuxeo.lib.core.mqueues.pattern.producer.ProducerFactory;
import org.nuxeo.lib.core.mqueues.pattern.producer.ProducerIterator;
import org.nuxeo.lib.core.mqueues.pattern.producer.ProducerStatus;

public class ProducerRunner<M extends Message>
implements Callable<ProducerStatus> {
    private static final Log log = LogFactory.getLog(ProducerRunner.class);
    protected final int producerId;
    protected final MQAppender<M> mq;
    protected final ProducerFactory<M> factory;
    protected String threadName;
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate((String)"org.nuxeo.runtime.metrics.MetricsService");
    protected final Timer producerTimer;
    protected final Counter producersCount;

    public ProducerRunner(ProducerFactory<M> factory, MQAppender<M> mQueue, int producerId) {
        this.factory = factory;
        this.producerId = producerId;
        this.mq = mQueue;
        this.producerTimer = this.newTimer(MetricRegistry.name((String)"nuxeo", (String[])new String[]{"importer", "queue", "producer", String.valueOf(producerId)}));
        this.producersCount = this.newCounter(MetricRegistry.name((String)"nuxeo", (String[])new String[]{"importer", "queue", "producers"}));
        log.debug((Object)("ProducerIterator thread created: " + producerId));
    }

    protected Counter newCounter(String name) {
        this.registry.remove(name);
        return this.registry.counter(name);
    }

    protected Timer newTimer(String name) {
        this.registry.remove(name);
        return this.registry.timer(name);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ProducerStatus call() throws Exception {
        this.threadName = Thread.currentThread().getName();
        long start = System.currentTimeMillis();
        this.producersCount.inc();
        try (ProducerIterator<M> producer = this.factory.createProducer(this.producerId);){
            this.producerLoop(producer);
        }
        finally {
            this.producersCount.dec();
        }
        return new ProducerStatus(this.producerId, this.producerTimer.getCount(), start, System.currentTimeMillis(), false);
    }

    protected void producerLoop(ProducerIterator<M> producer) {
        while (producer.hasNext()) {
            Message message;
            try (Timer.Context ignored = this.producerTimer.time();){
                message = (Message)producer.next();
                this.setThreadName(message);
            }
            this.mq.append(producer.getPartition(message, this.mq.size()), message);
        }
    }

    protected void setThreadName(M message) {
        String name = this.threadName + "-" + this.producerTimer.getCount();
        name = message != null ? name + "-" + message.getId() : name + "-null";
        Thread.currentThread().setName(name);
    }
}

