package com.groupon.messagebus.client;

import com.groupon.messagebus.api.Consumer;
import com.groupon.messagebus.api.ConsumerAckType;
import com.groupon.messagebus.api.ConsumerConfig;
import com.groupon.messagebus.api.DestinationType;
import com.groupon.messagebus.api.HostParams;
import com.groupon.messagebus.api.Message;
import com.groupon.messagebus.api.exceptions.AckFailedException;
import com.groupon.messagebus.api.exceptions.BrokerConnectionFailedException;
import com.groupon.messagebus.api.exceptions.InvalidConfigException;
import com.groupon.messagebus.api.exceptions.InvalidDestinationException;
import com.groupon.messagebus.api.exceptions.InvalidStatusException;
import com.groupon.messagebus.api.exceptions.KeepAliveFailedException;
import com.groupon.messagebus.api.exceptions.NackFailedException;
import com.groupon.messagebus.api.exceptions.ReceiveTimeoutException;
import com.groupon.messagebus.util.DynamicServerListGetter;
import com.groupon.stomp.MarshallingSupport;
import com.groupon.stomp.Stomp;
import com.groupon.stomp.StompFrame;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.Timer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/groupon/messagebus/client/ConsumerImpl.class */
public class ConsumerImpl implements Consumer {
    private ConsumerConfig config;
    private static final String TOPIC_NAME_PREFIX = "jms.topic.";
    private static final String QUEUE_NAME_PREFIX = "jms.queue.";
    private static final long ACKSAFE_TIMEOUT = 1000;
    ExecutorService executor;
    private Logger log = Logger.getLogger(ConsumerImpl.class);
    private Timer refreshServerListTimer = new Timer();
    private List<StompServerFetcher> serverList = Collections.synchronizedList(new ArrayList());
    private Map<HostParams, List<StompServerFetcher>> currentServers = new HashMap();
    private StompServerFetcher lastSentServer = null;
    private int lastContactedServerIdx = -1;
    private Consumer.Status status = Consumer.Status.INITIALIZED;

    /* renamed from: com.groupon.messagebus.client.ConsumerImpl$2, reason: invalid class name */
    /* loaded from: input_file:com/groupon/messagebus/client/ConsumerImpl$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$groupon$messagebus$api$Consumer$Status = new int[Consumer.Status.values().length];

        static {
            try {
                $SwitchMap$com$groupon$messagebus$api$Consumer$Status[Consumer.Status.INITIALIZED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$groupon$messagebus$api$Consumer$Status[Consumer.Status.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$groupon$messagebus$api$Consumer$Status[Consumer.Status.STOPPED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public Consumer.Status getStatus() {
        return this.status;
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean start(ConsumerConfig consumerConfig) throws InvalidConfigException, InvalidStatusException {
        Set<HostParams> fetchHostList;
        this.log.debug("consumer " + toString() + " starting.");
        switch (AnonymousClass2.$SwitchMap$com$groupon$messagebus$api$Consumer$Status[this.status.ordinal()]) {
            case MarshallingSupport.BOOLEAN_TYPE /* 1 */:
                this.config = consumerConfig;
                validateConfigs(this.config);
                if (this.config.useDynamicServerList() && null == this.config.getDynamicServerListFetchURL() && null != this.config.getHostParams() && this.config.getHostParams().size() > 0) {
                    HostParams next = this.config.getHostParams().iterator().next();
                    try {
                        this.config.setDynamicServerListFetchURL(DynamicServerListGetter.buildDynamicServersURL(next.getHost(), 8081));
                    } catch (URISyntaxException e) {
                        this.log.error("Error creating dynamic server url from " + next, e);
                    }
                }
                if (null != this.config.getDynamicServerListFetchURL()) {
                    try {
                        fetchHostList = fetchHostList();
                        this.refreshServerListTimer.schedule(new RefreshServerListTimerTask(this), this.config.getConnectionLifetime(), this.config.getConnectionLifetime());
                    } catch (MalformedURLException e2) {
                        throw new InvalidConfigException("Invalid dynamic server list fetcher url " + this.config.getDynamicServerListFetchURL());
                    } catch (IOException e3) {
                        this.log.error("IOException when getting broker list. Aborting consumer start.", e3);
                        return false;
                    } catch (Exception e4) {
                        this.log.error("Something was wrong when getting broker list. Aborting consumer start.", e4);
                        return false;
                    }
                } else {
                    fetchHostList = this.config.getHostParams();
                }
                this.executor = Executors.newFixedThreadPool(fetchHostList.size() > 0 ? fetchHostList.size() : 1);
                for (HostParams hostParams : fetchHostList) {
                    try {
                        startAndRegisterConnection(hostParams);
                    } catch (BrokerConnectionFailedException e5) {
                        this.log.error("Not able to connect to broker " + hostParams.toString() + e5.getStackTrace());
                    }
                }
                this.status = Consumer.Status.RUNNING;
                return true;
            default:
                throw new InvalidStatusException("Consumer cannot be started. Status=" + this.status);
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public void stop() throws InvalidStatusException {
        switch (AnonymousClass2.$SwitchMap$com$groupon$messagebus$api$Consumer$Status[this.status.ordinal()]) {
            case MarshallingSupport.BYTE_TYPE /* 2 */:
                Iterator<StompServerFetcher> it = this.serverList.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
                if (this.executor != null) {
                    this.executor.shutdownNow();
                }
                this.serverList.clear();
                this.currentServers.clear();
                this.refreshServerListTimer.cancel();
                this.log.debug("Consumer " + toString() + " stopped successfully");
                this.status = Consumer.Status.STOPPED;
                return;
            case MarshallingSupport.CHAR_TYPE /* 3 */:
                this.log.info("Consumer is already stopped, nothing to do.");
                return;
            default:
                throw new InvalidStatusException("Consumer cannot be stopped. Status=" + this.status);
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean ack() {
        if (this.config.getAckType() == ConsumerAckType.AUTO_CLIENT_ACK) {
            this.log.warn("This consumer has auto ack type. No need to explicitly ack.");
            return false;
        }
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not ack.");
            return false;
        }
        if (this.lastSentServer == null) {
            return false;
        }
        try {
            this.lastSentServer.ack();
            this.lastSentServer = null;
            return true;
        } catch (AckFailedException e) {
            this.log.error("Ack failed to server " + this.lastSentServer, e);
            return false;
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean ack(String str) {
        if (this.config.getAckType() == ConsumerAckType.AUTO_CLIENT_ACK) {
            this.log.warn("This consumer has auto ack type. No need to explicitly ack.");
            return false;
        }
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not ack.");
            return false;
        }
        try {
            StringTokenizer stringTokenizer = new StringTokenizer(Utils.decode(str), Stomp.Headers.SEPERATOR);
            if (stringTokenizer.countTokens() != 4) {
                throw new AckFailedException(new Exception("Wrong ack id format."));
            }
            String nextToken = stringTokenizer.nextToken();
            int parseInt = Integer.parseInt(stringTokenizer.nextToken());
            String nextToken2 = stringTokenizer.nextToken();
            String nextToken3 = stringTokenizer.nextToken();
            HostParams hostParams = new HostParams(nextToken, parseInt);
            List<StompServerFetcher> list = this.currentServers.get(hostParams);
            if (list == null || list.size() == 0) {
                list = new ArrayList();
                StompServerFetcher stompServerFetcher = new StompServerFetcher(nextToken, parseInt, this.config);
                list.add(stompServerFetcher);
                this.currentServers.put(hostParams, list);
                this.serverList.add(stompServerFetcher);
            }
            try {
                list.get(0).ack(nextToken2, nextToken3);
                this.lastSentServer = null;
                return true;
            } catch (AckFailedException e) {
                this.log.error("Ack failed to messageId " + nextToken2, e);
                return false;
            }
        } catch (AckFailedException e2) {
            this.log.error("Ack failed to ackId " + str, e2);
            return false;
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public Message receiveImmediate() {
        return receiveImpl(false);
    }

    @Override // com.groupon.messagebus.api.Consumer
    public Message receive() {
        return receiveImpl(true);
    }

    @Override // com.groupon.messagebus.api.Consumer
    public Message receive(long j) throws ReceiveTimeoutException {
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not receive. Status=" + this.status);
            return null;
        }
        Future submit = this.executor.submit(new Callable<Message>() { // from class: com.groupon.messagebus.client.ConsumerImpl.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Message call() {
                return ConsumerImpl.this.receiveImpl(true);
            }
        });
        try {
            try {
                try {
                    Message message = (Message) submit.get(j, TimeUnit.MILLISECONDS);
                    submit.cancel(true);
                    return message;
                } catch (ExecutionException e) {
                    this.log.error(e.getMessage(), e);
                    throw new ReceiveTimeoutException(e);
                }
            } catch (InterruptedException e2) {
                this.log.error(e2.getMessage(), e2);
                throw new ReceiveTimeoutException(e2);
            } catch (TimeoutException e3) {
                this.log.debug("Receive request timed out", e3);
                throw new ReceiveTimeoutException(e3);
            }
        } catch (Throwable th) {
            submit.cancel(true);
            throw th;
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean nack() {
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not nack.");
            return false;
        }
        if (this.lastSentServer == null) {
            return false;
        }
        try {
            this.lastSentServer.nack();
            this.lastSentServer = null;
            return true;
        } catch (NackFailedException e) {
            this.log.error("Nack failed to server " + this.lastSentServer, e);
            return false;
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean nack(String str) {
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not nack.");
            return false;
        }
        try {
            StringTokenizer stringTokenizer = new StringTokenizer(Utils.decode(str), Stomp.Headers.SEPERATOR);
            if (stringTokenizer.countTokens() != 4) {
                throw new NackFailedException(new Exception("Wrong nack id format."));
            }
            String nextToken = stringTokenizer.nextToken();
            int parseInt = Integer.parseInt(stringTokenizer.nextToken());
            String nextToken2 = stringTokenizer.nextToken();
            HostParams hostParams = new HostParams(nextToken, parseInt);
            List<StompServerFetcher> list = this.currentServers.get(hostParams);
            if (list == null || list.size() == 0) {
                list = new ArrayList();
                StompServerFetcher stompServerFetcher = new StompServerFetcher(nextToken, parseInt, this.config);
                list.add(stompServerFetcher);
                this.currentServers.put(hostParams, list);
                this.serverList.add(stompServerFetcher);
            }
            try {
                list.get(0).nack(nextToken2);
                this.lastSentServer = null;
                return true;
            } catch (NackFailedException e) {
                this.log.error("Nack failed to messageId " + nextToken2, e);
                return false;
            }
        } catch (NackFailedException e2) {
            this.log.error("nack failed to nackId " + str, e2);
            return false;
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean keepAlive() {
        boolean z = true;
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not send keep-alive.");
            return false;
        }
        try {
            Iterator<Map.Entry<HostParams, List<StompServerFetcher>>> it = this.currentServers.entrySet().iterator();
            while (it.hasNext()) {
                List<StompServerFetcher> list = this.currentServers.get(it.next().getKey());
                try {
                    list.get(0).keepAlive();
                } catch (KeepAliveFailedException e) {
                    this.log.error("Keepalive failed for broker " + list.get(0).getHost(), e);
                    z = false;
                }
            }
        } catch (Exception e2) {
            this.log.warn("KeepAlive failed", e2);
            z = false;
        }
        return z;
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean ackSafe(String str) {
        return ackSafe(str, ACKSAFE_TIMEOUT);
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean ackSafe(String str, long j) {
        if (this.config.getAckType() == ConsumerAckType.AUTO_CLIENT_ACK) {
            this.log.warn("This consumer has auto ack type. No need to explicitly ack.");
            return false;
        }
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not ack.");
            return false;
        }
        try {
            StringTokenizer stringTokenizer = new StringTokenizer(Utils.decode(str), Stomp.Headers.SEPERATOR);
            if (stringTokenizer.countTokens() != 4) {
                throw new AckFailedException(new Exception("Wrong ack id format."));
            }
            String nextToken = stringTokenizer.nextToken();
            int parseInt = Integer.parseInt(stringTokenizer.nextToken());
            String nextToken2 = stringTokenizer.nextToken();
            String nextToken3 = stringTokenizer.nextToken();
            HostParams hostParams = new HostParams(nextToken, parseInt);
            List<StompServerFetcher> list = this.currentServers.get(hostParams);
            if (list == null || list.size() == 0) {
                list = new ArrayList();
                StompServerFetcher stompServerFetcher = new StompServerFetcher(nextToken, parseInt, this.config);
                list.add(stompServerFetcher);
                this.currentServers.put(hostParams, list);
                this.serverList.add(stompServerFetcher);
            }
            try {
                list.get(0).ackSafe(nextToken2, nextToken3, j);
                this.lastSentServer = null;
                return true;
            } catch (AckFailedException e) {
                this.log.error("Ack failed to messageId " + nextToken2, e);
                return false;
            }
        } catch (AckFailedException e2) {
            this.log.error("Ack failed to ackId " + str, e2);
            return false;
        }
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean ackSafe() {
        return ackSafe(ACKSAFE_TIMEOUT);
    }

    @Override // com.groupon.messagebus.api.Consumer
    public boolean ackSafe(long j) {
        if (this.config.getAckType() == ConsumerAckType.AUTO_CLIENT_ACK) {
            this.log.warn("This consumer has auto ack type. No need to explicitly ack.");
            return false;
        }
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not ack.");
            return false;
        }
        if (this.lastSentServer == null) {
            return false;
        }
        try {
            this.lastSentServer.ackSafe(j);
            this.lastSentServer = null;
            return true;
        } catch (AckFailedException e) {
            this.log.error("Ack failed to server " + this.lastSentServer, e);
            return false;
        } catch (Exception e2) {
            this.log.error("Ack failed", e2);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Message receiveImpl(boolean z) {
        if (this.status != Consumer.Status.RUNNING) {
            this.log.warn("This consumer is not running, can not receive. Status=" + this.status);
            return null;
        }
        do {
            for (int i = 0; i < this.serverList.size(); i++) {
                int size = ((this.lastContactedServerIdx + 1) + i) % this.serverList.size();
                StompServerFetcher stompServerFetcher = this.serverList.get(size);
                StompFrame receiveLast = stompServerFetcher.receiveLast();
                if (receiveLast != null) {
                    this.lastContactedServerIdx = size;
                    this.lastSentServer = stompServerFetcher;
                    Message messageFromBytes = Utils.getMessageFromBytes(receiveLast.getBody().getBytes());
                    this.log.debug("Received message: " + messageFromBytes);
                    this.log.debug("received message-id: " + messageFromBytes.getMessageId());
                    if (messageFromBytes != null) {
                        StringBuilder sb = new StringBuilder(stompServerFetcher.getHost() + Stomp.Headers.SEPERATOR);
                        sb.append("" + stompServerFetcher.getPort() + Stomp.Headers.SEPERATOR);
                        sb.append(receiveLast.getHeaders().get("message-id") + Stomp.Headers.SEPERATOR);
                        sb.append(receiveLast.getHeaders().containsKey("connection-id") ? receiveLast.getHeaders().get("connection-id") : Utils.NULL_STRING);
                        messageFromBytes.setAckId(Utils.encode(sb.toString()));
                        messageFromBytes.setMessageProperties(receiveLast.getHeaders());
                    }
                    return messageFromBytes;
                }
            }
            if (!z) {
                return null;
            }
            try {
                Thread.sleep(this.config.getReceiveSleepInterval());
            } catch (InterruptedException e) {
                this.log.debug("receiveImpl blocking timed out.");
                return null;
            }
        } while (z);
        return null;
    }

    private void startAndRegisterConnection(HostParams hostParams) throws InvalidDestinationException, BrokerConnectionFailedException {
        String host = hostParams.getHost();
        StompServerFetcher stompServerFetcher = getStompServerFetcher(hostParams, this.config);
        stompServerFetcher.refreshConnection();
        this.log.debug("opening connection for " + hostParams.getHost() + " Port=" + hostParams.getPort());
        this.serverList.add(stompServerFetcher);
        Thread thread = new Thread(stompServerFetcher, host);
        thread.setDaemon(true);
        thread.start();
        if (!this.currentServers.containsKey(hostParams)) {
            this.currentServers.put(hostParams, new ArrayList());
        }
        this.currentServers.get(hostParams).add(stompServerFetcher);
    }

    public StompServerFetcher getStompServerFetcher(HostParams hostParams, ConsumerConfig consumerConfig) {
        return new StompServerFetcher(hostParams.getHost(), hostParams.getPort(), consumerConfig);
    }

    private void stopHostAndUnregisterConnection(HostParams hostParams) {
        List<StompServerFetcher> list = this.currentServers.get(hostParams);
        if (list != null) {
            for (StompServerFetcher stompServerFetcher : list) {
                this.serverList.remove(stompServerFetcher);
                stompServerFetcher.close();
                this.log.debug("Connection with the broker " + stompServerFetcher.toString() + " closed successfully");
            }
        }
        this.currentServers.remove(hostParams);
        this.lastContactedServerIdx = -1;
    }

    private void validateConfigs(ConsumerConfig consumerConfig) throws InvalidConfigException {
        if (consumerConfig.getDestinationName() == null) {
            throw new InvalidConfigException("Destination name can not be null");
        }
        if (consumerConfig.getDestinationType() == DestinationType.QUEUE && this.config.getDestinationName().indexOf(QUEUE_NAME_PREFIX) != 0) {
            String str = "Invalid destination/queue name: " + this.config.getDestinationName() + ". Queue name must start with " + QUEUE_NAME_PREFIX;
            this.log.error(str);
            throw new InvalidConfigException(str);
        }
        if (consumerConfig.getDestinationType() == DestinationType.TOPIC) {
            if (this.config.getDestinationName().indexOf(TOPIC_NAME_PREFIX) != 0) {
                String str2 = "Invalid destination/topic name: " + this.config.getDestinationName() + ". Topic name must start with " + TOPIC_NAME_PREFIX;
                this.log.error(str2);
                throw new InvalidConfigException(str2);
            }
            if (this.config.getSubscriptionId() == null) {
                this.log.error("When consuming topics, subscription-id must be specified");
                throw new InvalidConfigException("When consuming for topics, subscription id must be specified");
            }
        }
    }

    public void refreshServers() {
        try {
            Set<HostParams> fetchHostList = fetchHostList();
            StringBuffer stringBuffer = new StringBuffer("New server list includes");
            HashSet hashSet = new HashSet();
            for (HostParams hostParams : this.currentServers.keySet()) {
                if (!fetchHostList.contains(hostParams)) {
                    hashSet.add(hostParams);
                }
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                HostParams hostParams2 = (HostParams) it.next();
                this.log.debug("Stopping " + hostParams2);
                stopHostAndUnregisterConnection(hostParams2);
            }
            for (HostParams hostParams3 : fetchHostList) {
                if (null == this.currentServers.get(hostParams3)) {
                    this.log.debug("Starting new broker connection with " + hostParams3);
                    try {
                        startAndRegisterConnection(hostParams3);
                    } catch (BrokerConnectionFailedException e) {
                        this.log.error("Failed to connect to " + hostParams3.toString() + ".\n" + e.getStackTrace());
                    }
                }
                stringBuffer.append(", " + hostParams3);
            }
            this.log.debug("Dynamically refreshing Server List");
            this.log.debug(stringBuffer.toString());
        } catch (MalformedURLException e2) {
            this.log.error("Incorrect host list url is provided. Abort refresh servers.", e2);
        } catch (IOException e3) {
            this.log.warn("IOException when trying to get server list. Aborting refresh servers.", e3);
        } catch (Exception e4) {
            this.log.warn("Exception when trying to get server list. Aborting refresh servers.", e4);
        }
    }

    public Set<HostParams> fetchHostList() throws MalformedURLException, IOException {
        return DynamicServerListGetter.fetchHostList(this.config.getDynamicServerListFetchURL());
    }
}
