package org.apache.nifi.processors.slack.consume;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.request.conversations.ConversationsHistoryRequest;
import com.slack.api.methods.request.conversations.ConversationsRepliesRequest;
import com.slack.api.methods.response.conversations.ConversationsHistoryResponse;
import com.slack.api.methods.response.conversations.ConversationsRepliesResponse;
import com.slack.api.model.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;

/* loaded from: input_file:org/apache/nifi/processors/slack/consume/ConsumeChannel.class */
public class ConsumeChannel {
    private static final String CONVERSATION_HISTORY_URL = "https://slack.com/api/conversations.history";
    private static final String CHECK_FOR_REPLIES = "check for replies";
    private static final String BACKWARD = "backward";
    private static final String FORWARD = "forward";
    private static final Pattern MENTION_PATTERN = Pattern.compile("<@(U.*?)>");
    private static final long YIELD_MILLIS = 3000;
    private final ConsumeSlackClient client;
    private final String channelId;
    private final String channelName;
    private final int batchSize;
    private final long replyMonitorFrequencyMillis;
    private final long replyMonitorWindowMillis;
    private final boolean resolveUsernames;
    private final boolean includeMessageBlocks;
    private final UsernameLookup usernameLookup;
    private final Relationship successRelationship;
    private final ComponentLog logger;
    private final ObjectMapper objectMapper;
    private final StateKeys stateKeys;
    private volatile long yieldExpiration;
    private volatile long lastReplyMonitorPollEnd = System.currentTimeMillis();
    private final AtomicLong nextRequestTime = new AtomicLong(0);

    /* loaded from: input_file:org/apache/nifi/processors/slack/consume/ConsumeChannel$Builder.class */
    public static class Builder {
        private ConsumeSlackClient client;
        private String channelId;
        private String channelName;
        private boolean includeMessageBlocks;
        private boolean resolveUsernames;
        private ComponentLog logger;
        private Relationship successRelationship;
        private UsernameLookup usernameLookup;
        private ObjectMapper objectMapper;
        private int batchSize = 50;
        private long replyMonitorFrequencyMillis = TimeUnit.SECONDS.toMillis(60);
        private long replyMonitorWindowMillis = TimeUnit.DAYS.toMillis(7);

        public Builder channelId(String str) {
            this.channelId = str;
            return this;
        }

        public Builder channelName(String str) {
            this.channelName = str;
            return this;
        }

        public Builder client(ConsumeSlackClient consumeSlackClient) {
            this.client = consumeSlackClient;
            return this;
        }

        public Builder batchSize(int i) {
            this.batchSize = i;
            return this;
        }

        public Builder logger(ComponentLog componentLog) {
            this.logger = componentLog;
            return this;
        }

        public Builder replyMonitorFrequency(long j, TimeUnit timeUnit) {
            this.replyMonitorFrequencyMillis = timeUnit.toMillis(j);
            return this;
        }

        public Builder replyMonitorWindow(long j, TimeUnit timeUnit) {
            this.replyMonitorWindowMillis = timeUnit.toMillis(j);
            return this;
        }

        public Builder includeMessageBlocks(boolean z) {
            this.includeMessageBlocks = z;
            return this;
        }

        public Builder resolveUsernames(boolean z) {
            this.resolveUsernames = z;
            return this;
        }

        public Builder successRelationship(Relationship relationship) {
            this.successRelationship = relationship;
            return this;
        }

        public Builder usernameLookup(UsernameLookup usernameLookup) {
            this.usernameLookup = usernameLookup;
            return this;
        }

        public Builder objectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        public ConsumeChannel build() {
            return new ConsumeChannel(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/slack/consume/ConsumeChannel$ConsumptionResults.class */
    public interface ConsumptionResults {
        SlackTimestamp getEarliestTimestamp();

        SlackTimestamp getLatestTimestamp();

        String getRepliesCursor();

        boolean isFailure();

        boolean isContinuePolling();

        boolean isMore();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/slack/consume/ConsumeChannel$StandardConsumptionResults.class */
    public static class StandardConsumptionResults implements ConsumptionResults {
        private final SlackTimestamp earliestTimestamp;
        private final SlackTimestamp latestTimestamp;
        private final boolean failure;
        private final boolean continuePolling;
        private final String repliesCursor;
        private final boolean isMore;

        public StandardConsumptionResults(SlackTimestamp slackTimestamp, SlackTimestamp slackTimestamp2, String str, boolean z, boolean z2, boolean z3) {
            this.earliestTimestamp = slackTimestamp;
            this.latestTimestamp = slackTimestamp2;
            this.repliesCursor = str;
            this.failure = z;
            this.continuePolling = z2;
            this.isMore = z3;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeChannel.ConsumptionResults
        public SlackTimestamp getEarliestTimestamp() {
            return this.earliestTimestamp;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeChannel.ConsumptionResults
        public SlackTimestamp getLatestTimestamp() {
            return this.latestTimestamp;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeChannel.ConsumptionResults
        public String getRepliesCursor() {
            return this.repliesCursor;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeChannel.ConsumptionResults
        public boolean isFailure() {
            return this.failure;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeChannel.ConsumptionResults
        public boolean isContinuePolling() {
            return this.continuePolling;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeChannel.ConsumptionResults
        public boolean isMore() {
            return this.isMore;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/slack/consume/ConsumeChannel$StateKeys.class */
    public static class StateKeys {
        public final String ACTION;
        public final String LATEST_TS;
        public final String EARLIEST_TS;
        public final String DIRECTION;
        public final String LATEST_REPLIES_CURSOR;
        public final String HISTORICAL_MESSAGES_REPLIES_CURSOR;
        public final String HISTORICAL_REPLIES_EARLIEST_THREAD_TS;
        public final String REPLY_MIN_TS;
        public final String REPLY_MAX_TS;

        public StateKeys(String str) {
            this.ACTION = str + ".action";
            this.LATEST_TS = str + ".latest";
            this.EARLIEST_TS = str + ".earliest";
            this.DIRECTION = str + ".direction";
            this.LATEST_REPLIES_CURSOR = str + ".latest.replies.cursor";
            this.HISTORICAL_MESSAGES_REPLIES_CURSOR = str + ".historical.replies.cursor";
            this.HISTORICAL_REPLIES_EARLIEST_THREAD_TS = str + ".historical.replies.ts";
            this.REPLY_MIN_TS = str + ".historical.reply.min.ts";
            this.REPLY_MAX_TS = str + ".historical.reply.max.ts";
        }
    }

    private ConsumeChannel(Builder builder) {
        this.client = builder.client;
        this.channelId = builder.channelId;
        this.channelName = builder.channelName;
        this.batchSize = builder.batchSize;
        this.replyMonitorFrequencyMillis = builder.replyMonitorFrequencyMillis;
        this.replyMonitorWindowMillis = builder.replyMonitorWindowMillis;
        this.logger = builder.logger;
        this.resolveUsernames = builder.resolveUsernames;
        this.includeMessageBlocks = builder.includeMessageBlocks;
        this.successRelationship = builder.successRelationship;
        this.usernameLookup = builder.usernameLookup;
        this.objectMapper = builder.objectMapper;
        this.stateKeys = new StateKeys(this.channelId);
    }

    public String getChannelId() {
        return this.channelId;
    }

    public ConfigVerificationResult verify() {
        try {
            ConversationsHistoryResponse fetchConversationsHistory = this.client.fetchConversationsHistory(ConversationsHistoryRequest.builder().channel(this.channelId).limit(1).build());
            if (!fetchConversationsHistory.isOk()) {
                return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Failed to obtain a message due to: " + SlackResponseUtil.getErrorMessage(fetchConversationsHistory.getError(), fetchConversationsHistory.getNeeded(), fetchConversationsHistory.getProvided(), fetchConversationsHistory.getWarning())).build();
            }
            Message message = (Message) fetchConversationsHistory.getMessages().get(0);
            enrichMessage(message);
            String username = message.getUsername();
            if (this.resolveUsernames && username == null) {
                return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Successfully retrieved a message but failed to resolve the username").build();
            }
            return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(fetchConversationsHistory.getMessages().isEmpty() ? "Successfully requested messages for channel but got no messages" : "Successfully retrieved a message from " + (username == null ? message.getUser() : username)).build();
        } catch (Exception e) {
            return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Failed to obtain a message due to: " + String.valueOf(e)).build();
        }
    }

    public void consume(ProcessContext processContext, ProcessSession processSession) throws IOException, SlackApiException {
        long j = this.nextRequestTime.get();
        if (j > 0 && System.currentTimeMillis() < j) {
            processContext.yield();
            return;
        }
        try {
            StateMap state = processSession.getState(Scope.CLUSTER);
            if (isCheckForReplies(state)) {
                consumeReplies(processContext, processSession, state);
            } else {
                consumeLatestMessages(processContext, processSession, state);
            }
        } catch (IOException e) {
            this.logger.error("Failed to determine current offset for channel {}; will not retrieve any messages until this is resolved", new Object[]{this.channelId, e});
            processContext.yield();
        }
    }

    private boolean isCheckForReplies(StateMap stateMap) {
        if (CHECK_FOR_REPLIES.equals(stateMap.get(this.stateKeys.ACTION))) {
            return true;
        }
        return System.currentTimeMillis() > this.lastReplyMonitorPollEnd + this.replyMonitorFrequencyMillis;
    }

    private void consumeReplies(ProcessContext processContext, ProcessSession processSession, StateMap stateMap) throws IOException, SlackApiException {
        ConsumptionResults consumeMessages;
        if (!FORWARD.equals(stateMap.get(this.stateKeys.DIRECTION))) {
            onCompletedRepliesScan(processSession, new HashMap(stateMap.toMap()), null);
            return;
        }
        String str = stateMap.get(this.stateKeys.LATEST_TS);
        if (str == null) {
            onCompletedRepliesScan(processSession, new HashMap(stateMap.toMap()), null);
            return;
        }
        HashMap hashMap = new HashMap(stateMap.toMap());
        if (!CHECK_FOR_REPLIES.equals(stateMap.get(this.stateKeys.ACTION))) {
            hashMap.put(this.stateKeys.ACTION, CHECK_FOR_REPLIES);
            processSession.setState(hashMap, Scope.CLUSTER);
        }
        String str2 = stateMap.get(this.stateKeys.REPLY_MIN_TS);
        if (str2 == null) {
            str2 = str;
        }
        String str3 = stateMap.get(this.stateKeys.REPLY_MAX_TS);
        SlackTimestamp slackTimestamp = new SlackTimestamp(str2);
        SlackTimestamp slackTimestamp2 = str3 == null ? new SlackTimestamp() : new SlackTimestamp(str3);
        SlackTimestamp slackTimestamp3 = new SlackTimestamp(str);
        String rawValue = new SlackTimestamp(System.currentTimeMillis() - this.replyMonitorWindowMillis).getRawValue();
        String str4 = stateMap.get(this.stateKeys.HISTORICAL_REPLIES_EARLIEST_THREAD_TS);
        if (str4 == null) {
            str4 = new SlackTimestamp(System.currentTimeMillis()).getRawValue();
        }
        String str5 = stateMap.get(this.stateKeys.HISTORICAL_MESSAGES_REPLIES_CURSOR);
        do {
            consumeMessages = consumeMessages(processContext, processSession, ConversationsHistoryRequest.builder().channel(this.channelId).limit(500).latest(str4).oldest(rawValue).inclusive(true).build(), message -> {
                return false;
            }, str5, slackTimestamp, message2 -> {
                SlackTimestamp slackTimestamp4 = new SlackTimestamp(message2.getTs());
                return (slackTimestamp4.afterOrEqualTo(slackTimestamp2) || slackTimestamp4.beforeOrEqualTo(slackTimestamp) || new SlackTimestamp(message2.getThreadTs()).after(slackTimestamp3)) ? false : true;
            });
            if (!consumeMessages.isMore() && !consumeMessages.isFailure()) {
                onCompletedRepliesScan(processSession, hashMap, slackTimestamp2);
                return;
            }
            SlackTimestamp earliestTimestamp = consumeMessages.getEarliestTimestamp();
            str4 = earliestTimestamp == null ? null : earliestTimestamp.getRawValue();
            str5 = consumeMessages.getRepliesCursor();
            if (str4 == null) {
                return;
            }
            hashMap.put(this.stateKeys.HISTORICAL_REPLIES_EARLIEST_THREAD_TS, str4);
            if (str5 != null) {
                hashMap.put(this.stateKeys.HISTORICAL_MESSAGES_REPLIES_CURSOR, str5);
            }
            processSession.setState(hashMap, Scope.CLUSTER);
            processSession.commitAsync();
        } while (consumeMessages.isContinuePolling());
    }

    private void onCompletedRepliesScan(ProcessSession processSession, Map<String, String> map, SlackTimestamp slackTimestamp) throws IOException {
        map.remove(this.stateKeys.ACTION);
        map.remove(this.stateKeys.HISTORICAL_REPLIES_EARLIEST_THREAD_TS);
        map.remove(this.stateKeys.HISTORICAL_MESSAGES_REPLIES_CURSOR);
        map.remove(this.stateKeys.REPLY_MAX_TS);
        if (slackTimestamp != null) {
            map.put(this.stateKeys.REPLY_MIN_TS, slackTimestamp.getRawValue());
        }
        processSession.setState(map, Scope.CLUSTER);
        this.lastReplyMonitorPollEnd = System.currentTimeMillis();
    }

    private void consumeLatestMessages(ProcessContext processContext, ProcessSession processSession, StateMap stateMap) throws IOException, SlackApiException {
        ConsumptionResults consumeMessages;
        SlackTimestamp earliestTimestamp;
        String str;
        String str2 = stateMap.get(this.stateKeys.LATEST_REPLIES_CURSOR);
        String str3 = stateMap.get(this.stateKeys.DIRECTION);
        if (str3 == null) {
            str3 = BACKWARD;
        }
        String str4 = stateMap.get(BACKWARD.equals(str3) ? this.stateKeys.EARLIEST_TS : this.stateKeys.LATEST_TS);
        boolean z = str2 != null;
        String str5 = str2;
        HashMap hashMap = new HashMap(stateMap.toMap());
        do {
            ConversationsHistoryRequest build = ConversationsHistoryRequest.builder().channel(this.channelId).limit(Integer.valueOf(this.batchSize)).inclusive(z).build();
            if (str3.equals(FORWARD)) {
                build.setOldest(str4);
                build.setLatest((String) null);
            } else {
                build.setOldest((String) null);
                build.setLatest(str4);
            }
            String str6 = str4;
            consumeMessages = consumeMessages(processContext, processSession, build, message -> {
                return !Objects.equals(message.getTs(), str6);
            }, str5, null, message2 -> {
                return true;
            });
            if (str3.equals(FORWARD)) {
                earliestTimestamp = consumeMessages.getLatestTimestamp();
                str = this.stateKeys.LATEST_TS;
            } else {
                earliestTimestamp = consumeMessages.getEarliestTimestamp();
                str = this.stateKeys.EARLIEST_TS;
            }
            if (earliestTimestamp == null) {
                return;
            }
            str4 = earliestTimestamp.getRawValue();
            str5 = consumeMessages.getRepliesCursor();
            z = str5 != null;
            hashMap.put(str, str4);
            if (hashMap.get(this.stateKeys.LATEST_TS) == null) {
                SlackTimestamp latestTimestamp = consumeMessages.getLatestTimestamp();
                hashMap.put(this.stateKeys.LATEST_TS, latestTimestamp == null ? null : latestTimestamp.getRawValue());
            }
            if (str5 != null) {
                hashMap.put(this.stateKeys.LATEST_REPLIES_CURSOR, str5);
            }
            if (!consumeMessages.isMore() && !consumeMessages.isFailure()) {
                hashMap.put(this.stateKeys.DIRECTION, FORWARD);
                hashMap.remove(this.stateKeys.EARLIEST_TS);
                this.logger.info("Successfully completed initial load of messages for channel {}", new Object[]{this.channelId});
            }
            processSession.setState(hashMap, Scope.CLUSTER);
            processSession.commitAsync();
        } while (consumeMessages.isContinuePolling());
    }

    /* JADX WARN: Removed duplicated region for block: B:72:0x0227 A[Catch: Throwable -> 0x0257, TryCatch #3 {Throwable -> 0x0257, blocks: (B:12:0x00b5, B:14:0x00c0, B:15:0x00ce, B:17:0x00d8, B:19:0x00f7, B:22:0x0109, B:24:0x0128, B:28:0x013b, B:31:0x0153, B:34:0x015e, B:35:0x0187, B:36:0x0190, B:38:0x019a, B:41:0x01b9, B:43:0x01c5, B:48:0x01dc, B:57:0x01e8, B:66:0x01fe, B:68:0x0208, B:122:0x016f, B:129:0x014c, B:70:0x021d, B:72:0x0227, B:137:0x0236, B:135:0x0249, B:140:0x0240), top: B:11:0x00b5, inners: #0, #1 }] */
    /* JADX WARN: Removed duplicated region for block: B:75:0x024f  */
    /* JADX WARN: Removed duplicated region for block: B:78:0x027a  */
    /* JADX WARN: Removed duplicated region for block: B:81:0x0283  */
    /* JADX WARN: Removed duplicated region for block: B:93:0x02b7  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.nifi.processors.slack.consume.ConsumeChannel.ConsumptionResults consumeMessages(org.apache.nifi.processor.ProcessContext r10, org.apache.nifi.processor.ProcessSession r11, com.slack.api.methods.request.conversations.ConversationsHistoryRequest r12, java.util.function.Predicate<com.slack.api.model.Message> r13, java.lang.String r14, org.apache.nifi.processors.slack.consume.SlackTimestamp r15, java.util.function.Predicate<com.slack.api.model.Message> r16) throws java.io.IOException, com.slack.api.methods.SlackApiException {
        /*
            Method dump skipped, instructions count: 899
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.nifi.processors.slack.consume.ConsumeChannel.consumeMessages(org.apache.nifi.processor.ProcessContext, org.apache.nifi.processor.ProcessSession, com.slack.api.methods.request.conversations.ConversationsHistoryRequest, java.util.function.Predicate, java.lang.String, org.apache.nifi.processors.slack.consume.SlackTimestamp, java.util.function.Predicate):org.apache.nifi.processors.slack.consume.ConsumeChannel$ConsumptionResults");
    }

    private boolean enrichMessage(Message message) {
        message.setChannel(this.channelId);
        if (!this.includeMessageBlocks) {
            message.setBlocks((List) null);
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (this.resolveUsernames) {
            if (message.getUsername() == null && message.getUser() != null) {
                String username = this.usernameLookup.getUsername(message.getUser());
                if (username == null) {
                    atomicBoolean.set(true);
                }
                message.setUsername(username);
            }
            String text = message.getText();
            if (text != null) {
                message.setText(MENTION_PATTERN.matcher(text).replaceAll(matchResult -> {
                    String username2 = this.usernameLookup.getUsername(matchResult.group(1));
                    if (username2 == null) {
                        atomicBoolean.set(true);
                        matchResult.group(0);
                    }
                    return "<@" + username2 + ">";
                }));
            }
        }
        return !atomicBoolean.get();
    }

    private void yieldOnException(PartialThreadException partialThreadException, String str, Message message, ProcessContext processContext) {
        if (SlackResponseUtil.isRateLimited(partialThreadException.getCause())) {
            this.logger.warn("Slack indicated that the Rate Limit has been exceeded when attempting to retrieve messages for channel {}; will continue in {} seconds", new Object[]{str, Integer.valueOf(SlackResponseUtil.getRetryAfterSeconds(partialThreadException))});
        } else {
            this.logger.error("Encountered unexpected response from Slack when retrieving replies to message with thread timestamp {} due to: {}", new Object[]{message.getThreadTs(), partialThreadException.getMessage(), partialThreadException});
        }
        long currentTimeMillis = System.currentTimeMillis() + (SlackResponseUtil.getRetryAfterSeconds(partialThreadException) * 1000);
        this.nextRequestTime.getAndUpdate(j -> {
            return Math.max(j, currentTimeMillis);
        });
        processContext.yield();
    }

    private List<Message> fetchReplies(Message message, String str, SlackTimestamp slackTimestamp) throws SlackApiException, IOException, PartialThreadException {
        String latestReply;
        ArrayList arrayList = new ArrayList();
        if (slackTimestamp != null && (latestReply = message.getLatestReply()) != null && new SlackTimestamp(latestReply).before(slackTimestamp)) {
            return Collections.emptyList();
        }
        String str2 = str;
        while (true) {
            String str3 = str2;
            try {
                ConversationsRepliesResponse fetchConversationsReplies = this.client.fetchConversationsReplies(ConversationsRepliesRequest.builder().channel(this.channelId).ts(message.getThreadTs()).includeAllMetadata(true).limit(1000).oldest(slackTimestamp == null ? null : slackTimestamp.getRawValue()).cursor(str3).build());
                if (!fetchConversationsReplies.isOk()) {
                    throw new PartialThreadException(arrayList, str3, SlackResponseUtil.getErrorMessage(fetchConversationsReplies.getError(), fetchConversationsReplies.getNeeded(), fetchConversationsReplies.getProvided(), fetchConversationsReplies.getWarning()));
                }
                arrayList.addAll(fetchConversationsReplies.getMessages());
                if (!fetchConversationsReplies.isHasMore()) {
                    return arrayList;
                }
                str2 = fetchConversationsReplies.getResponseMetadata().getNextCursor();
            } catch (Exception e) {
                if (arrayList.isEmpty()) {
                    throw e;
                }
                throw new PartialThreadException(arrayList, str3, e);
            }
        }
    }

    public void yield() {
        this.yieldExpiration = System.currentTimeMillis() + YIELD_MILLIS;
    }

    public boolean isYielded() {
        if (this.yieldExpiration == 0) {
            return false;
        }
        if (System.currentTimeMillis() < this.yieldExpiration) {
            return true;
        }
        this.yieldExpiration = 0L;
        return false;
    }
}
