package org.kaleidofoundry.messaging;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.kaleidofoundry.core.context.EmptyContextParameterException;
import org.kaleidofoundry.core.context.RuntimeContext;
import org.kaleidofoundry.core.i18n.I18nMessages;
import org.kaleidofoundry.core.i18n.I18nMessagesFactory;
import org.kaleidofoundry.core.i18n.InternalBundleHelper;
import org.kaleidofoundry.core.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kaleidofoundry/messaging/AbstractConsumer.class */
public abstract class AbstractConsumer implements Consumer {
    protected final Logger STATISTICS_LOGGER;
    protected final RuntimeContext<Consumer> context;
    protected final Transport transport;
    protected ExecutorService pool;
    protected static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    static final String DEBUG_SEPARATOR = StringHelper.replicate("-", 120);
    private final AtomicInteger ProcessedMessagesOK = new AtomicInteger(0);
    private final AtomicInteger ProcessedMessagesKO = new AtomicInteger(0);
    private final AtomicInteger ProcessedMessagesSKIPPED = new AtomicInteger(0);
    private final AtomicLong AverageResponseTime = new AtomicLong(0);
    protected final I18nMessages MESSAGING_BUNDLE = I18nMessagesFactory.provides(MessagingConstants.I18N_RESOURCE, InternalBundleHelper.CoreMessageBundle);
    private final List<MessageHandler> messageHandlers = Collections.synchronizedList(new LinkedList());

    /* loaded from: input_file:org/kaleidofoundry/messaging/AbstractConsumer$ConsumerWorker.class */
    public abstract class ConsumerWorker extends Thread {
        public ConsumerWorker(int i, String str) {
            super(str);
        }

        public abstract void init() throws TransportException;

        public abstract void receive(MessageWrapper messageWrapper);

        public abstract void acknowledge(MessageWrapper messageWrapper) throws MessagingException;

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            initRun();
            while (!AbstractConsumer.this.pool.isShutdown()) {
                long j = 0;
                boolean z = true;
                MessageWrapper messageWrapper = new MessageWrapper();
                try {
                    try {
                        receive(messageWrapper);
                        j = System.currentTimeMillis();
                        z = onceReceived(messageWrapper);
                        long currentTimeMillis = System.currentTimeMillis() - j;
                        long longValue = AbstractConsumer.this.context.getLong(ClientContextBuilder.CONSUMER_RESPONSE_DURATION, 0L).longValue() - currentTimeMillis;
                        if (!z && longValue > 0) {
                            try {
                                sleep(longValue);
                            } catch (InterruptedException unused) {
                            }
                        }
                        int intValue = AbstractConsumer.this.context.getInteger(ClientContextBuilder.CONSUMER_PRINT_PROCESSED_MESSAGES_MODULO, -1).intValue();
                        if (intValue > 0) {
                            int i = AbstractConsumer.this.ProcessedMessagesOK.get();
                            int i2 = AbstractConsumer.this.ProcessedMessagesKO.get();
                            int i3 = AbstractConsumer.this.ProcessedMessagesSKIPPED.get();
                            if (((i + i2) + i3) % intValue == 0) {
                                long j2 = (AbstractConsumer.this.AverageResponseTime.get() + currentTimeMillis) / 2;
                                if (((i + i2) + i3) % intValue == 0) {
                                    AbstractConsumer.this.STATISTICS_LOGGER.info("consumer statistics : name={} ; msg OK={} ; msg KO={} ; msg SKIPPED={} ; msg response time={}ms", new Object[]{getName(), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Long.valueOf(j2)});
                                }
                            }
                        }
                    } catch (Throwable unused2) {
                        long currentTimeMillis2 = System.currentTimeMillis() - j;
                        long longValue2 = AbstractConsumer.this.context.getLong(ClientContextBuilder.CONSUMER_RESPONSE_DURATION, 0L).longValue() - currentTimeMillis2;
                        if (0 == 0 && longValue2 > 0) {
                            try {
                                sleep(longValue2);
                            } catch (InterruptedException unused3) {
                            }
                        }
                        int intValue2 = AbstractConsumer.this.context.getInteger(ClientContextBuilder.CONSUMER_PRINT_PROCESSED_MESSAGES_MODULO, -1).intValue();
                        if (intValue2 > 0) {
                            int i4 = AbstractConsumer.this.ProcessedMessagesOK.get();
                            int i5 = AbstractConsumer.this.ProcessedMessagesKO.get();
                            int i6 = AbstractConsumer.this.ProcessedMessagesSKIPPED.get();
                            if (((i4 + i5) + i6) % intValue2 == 0) {
                                long j3 = (AbstractConsumer.this.AverageResponseTime.get() + currentTimeMillis2) / 2;
                                if (((i4 + i5) + i6) % intValue2 == 0) {
                                    AbstractConsumer.this.STATISTICS_LOGGER.info("consumer statistics : name={} ; msg OK={} ; msg KO={} ; msg SKIPPED={} ; msg response time={}ms", new Object[]{getName(), Integer.valueOf(i4), Integer.valueOf(i5), Integer.valueOf(i6), Long.valueOf(j3)});
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    long currentTimeMillis3 = System.currentTimeMillis() - j;
                    long longValue3 = AbstractConsumer.this.context.getLong(ClientContextBuilder.CONSUMER_RESPONSE_DURATION, 0L).longValue() - currentTimeMillis3;
                    if (!z && longValue3 > 0) {
                        try {
                            sleep(longValue3);
                        } catch (InterruptedException unused4) {
                        }
                    }
                    int intValue3 = AbstractConsumer.this.context.getInteger(ClientContextBuilder.CONSUMER_PRINT_PROCESSED_MESSAGES_MODULO, -1).intValue();
                    if (intValue3 > 0) {
                        int i7 = AbstractConsumer.this.ProcessedMessagesOK.get();
                        int i8 = AbstractConsumer.this.ProcessedMessagesKO.get();
                        int i9 = AbstractConsumer.this.ProcessedMessagesSKIPPED.get();
                        if (((i7 + i8) + i9) % intValue3 == 0) {
                            long j4 = (AbstractConsumer.this.AverageResponseTime.get() + currentTimeMillis3) / 2;
                            if (((i7 + i8) + i9) % intValue3 == 0) {
                                AbstractConsumer.this.STATISTICS_LOGGER.info("consumer statistics : name={} ; msg OK={} ; msg KO={} ; msg SKIPPED={} ; msg response time={}ms", new Object[]{getName(), Integer.valueOf(i7), Integer.valueOf(i8), Integer.valueOf(i9), Long.valueOf(j4)});
                            }
                        }
                    }
                    throw th;
                }
            }
        }

        @Override // java.lang.Thread
        public void destroy() {
            AbstractConsumer.LOGGER.info("{} destroyed", getName());
        }

        protected boolean isDebug() {
            return AbstractConsumer.this.context.getBoolean(ClientContextBuilder.DEBUG_PROPERTY, false).booleanValue() || AbstractConsumer.LOGGER.isDebugEnabled();
        }

        protected final void initRun() {
            try {
                try {
                    init();
                } catch (TransportException e) {
                    throw new IllegalStateException("transport initialization error", e);
                }
            } finally {
                AbstractConsumer.LOGGER.info("{} listening", getName());
            }
        }

        protected final boolean onceReceived(MessageWrapper messageWrapper) throws MessagingException {
            boolean z = true;
            MessageHandler messageHandler = null;
            if (!messageWrapper.hasError() && messageWrapper.getMessage() != null) {
                if (isDebug()) {
                    AbstractConsumer.LOGGER.info("<<< receiving message with providerId={} , correlationId={} , parameters={}", new String[]{messageWrapper.getMessage().getProviderId(), messageWrapper.getMessage().getCorrelationId(), String.valueOf(messageWrapper.getMessage().getParameters())});
                    AbstractConsumer.LOGGER.info("{}", messageWrapper.getMessage().toString());
                }
                boolean z2 = false;
                Iterator it = AbstractConsumer.this.messageHandlers.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    MessageHandler messageHandler2 = (MessageHandler) it.next();
                    try {
                    } catch (Throwable th) {
                        messageHandler = messageHandler2;
                        messageWrapper.setError(th);
                    }
                    if (!messageHandler2.onReceive(messageWrapper.getMessage())) {
                        z2 = true;
                        break;
                    }
                }
                if (z2) {
                    AbstractConsumer.this.ProcessedMessagesSKIPPED.incrementAndGet();
                } else {
                    AbstractConsumer.this.ProcessedMessagesOK.incrementAndGet();
                    acknowledge(messageWrapper);
                }
            }
            if (messageWrapper.hasError()) {
                if (messageWrapper.getError() instanceof MessageTimeoutException) {
                    z = false;
                    AbstractConsumer.LOGGER.info(messageWrapper.getError().getMessage());
                } else {
                    z = false;
                    AbstractConsumer.this.ProcessedMessagesKO.incrementAndGet();
                    if (messageHandler != null) {
                        AbstractConsumer.LOGGER.error("an error occurred on the chain handler processing of this consumer", messageWrapper.getError());
                        messageHandler.onError(messageWrapper.getMessage(), messageWrapper.getError());
                    } else {
                        AbstractConsumer.LOGGER.error("an error occurred on this consumer", messageWrapper.getError());
                    }
                }
            }
            return z;
        }

        protected final void printResponseTime(long j) {
            AbstractConsumer.LOGGER.info("processing message in {} ms", String.valueOf(System.currentTimeMillis() - j));
        }
    }

    /* loaded from: input_file:org/kaleidofoundry/messaging/AbstractConsumer$MessageWrapper.class */
    public class MessageWrapper {
        private Message message;
        private Object providerObject;
        private Throwable error;
        private boolean ackMessage = true;

        public MessageWrapper() {
        }

        public Message getMessage() {
            return this.message;
        }

        public void setMessage(Message message) {
            this.message = message;
        }

        public Throwable getError() {
            return this.error;
        }

        public void setError(Throwable th) {
            this.error = th;
        }

        public boolean isAckMessage() {
            return this.ackMessage;
        }

        public void setAckMessage(boolean z) {
            this.ackMessage = z;
        }

        public Object getProviderObject() {
            return this.providerObject;
        }

        public void setProviderObject(Object obj) {
            this.providerObject = obj;
        }

        public boolean hasError() {
            return this.error != null;
        }
    }

    public AbstractConsumer(RuntimeContext<Consumer> runtimeContext) {
        this.context = runtimeContext;
        String string = this.context.getString(ClientContextBuilder.TRANSPORT_REF);
        if (StringHelper.isEmpty(string)) {
            throw new EmptyContextParameterException(ClientContextBuilder.TRANSPORT_REF, runtimeContext);
        }
        this.transport = TransportFactory.provides(string, runtimeContext);
        this.transport.getConsumers().put(getName(), this);
        this.STATISTICS_LOGGER = LoggerFactory.getLogger(String.valueOf(Consumer.class.getName()) + "." + getName());
    }

    @Override // org.kaleidofoundry.messaging.Consumer, org.kaleidofoundry.messaging.Client
    public String getName() {
        return this.context.getName();
    }

    @Override // org.kaleidofoundry.messaging.Client
    public Transport getTransport() throws TransportException {
        return this.transport;
    }

    @Override // org.kaleidofoundry.messaging.Consumer
    public synchronized void start() throws TransportException {
        this.ProcessedMessagesOK.set(0);
        this.ProcessedMessagesKO.set(0);
        this.ProcessedMessagesSKIPPED.set(0);
        stop();
        int intValue = this.context.getInteger(ClientContextBuilder.THREAD_POOL_COUNT_PROPERTY, 1).intValue();
        LOGGER.info("Creating consumer [{}] with a thread pool size of {}", this.context.getName(), Integer.valueOf(intValue));
        this.pool = Executors.newFixedThreadPool(intValue);
        String string = this.context.getString(ClientContextBuilder.CONSUMER_THREAD_PREFIX_PROPERTY, this.context.getName());
        for (int i = 0; i < intValue; i++) {
            this.pool.submit(newWorker(String.valueOf(string) + "[" + StringHelper.leftPad(String.valueOf(i + 1), 3, '0') + "]", i + 1));
        }
    }

    @Override // org.kaleidofoundry.messaging.Consumer
    public synchronized void stop() throws TransportException {
        if (this.pool != null) {
            int intValue = this.context.getInteger(ClientContextBuilder.CONSUMER_THREAD_POOL_WAIT_ON_SHUTDOWN, 5).intValue();
            LOGGER.info("Shutdown consumer [{}] thread pool", this.context.getName());
            this.pool.shutdown();
            try {
                if (!this.pool.awaitTermination(intValue, TimeUnit.SECONDS)) {
                    this.pool.shutdownNow();
                    if (!this.pool.awaitTermination(intValue, TimeUnit.SECONDS)) {
                        LOGGER.error("Error trying shutdown consumer [{}] thread pool", this.context.getName());
                    }
                }
            } catch (InterruptedException unused) {
                this.pool.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        this.transport.getConsumers().remove(getName());
    }

    @Override // org.kaleidofoundry.messaging.Consumer
    public Consumer addMessageHandler(MessageHandler messageHandler) {
        if (!this.messageHandlers.contains(messageHandler)) {
            this.messageHandlers.add(messageHandler);
        }
        return this;
    }

    protected abstract ConsumerWorker newWorker(String str, int i) throws TransportException;

    @Override // org.kaleidofoundry.messaging.Client
    public UsageStatistics getStatistics() {
        return new UsageStatistics(this.ProcessedMessagesOK.get(), this.ProcessedMessagesKO.get(), this.ProcessedMessagesSKIPPED.get());
    }
}
