package org.apache.nifi.processors.slack;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.slack.api.bolt.App;
import com.slack.api.bolt.AppConfig;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.request.conversations.ConversationsHistoryRequest;
import com.slack.api.methods.request.conversations.ConversationsInfoRequest;
import com.slack.api.methods.request.conversations.ConversationsListRequest;
import com.slack.api.methods.request.conversations.ConversationsRepliesRequest;
import com.slack.api.methods.request.users.UsersInfoRequest;
import com.slack.api.methods.response.conversations.ConversationsHistoryResponse;
import com.slack.api.methods.response.conversations.ConversationsInfoResponse;
import com.slack.api.methods.response.conversations.ConversationsListResponse;
import com.slack.api.methods.response.conversations.ConversationsRepliesResponse;
import com.slack.api.methods.response.users.UsersInfoResponse;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.consume.ConsumeChannel;
import org.apache.nifi.processors.slack.consume.ConsumeSlackClient;
import org.apache.nifi.processors.slack.consume.UsernameLookup;
import org.apache.nifi.processors.slack.util.RateLimit;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("Retrieves messages from one or more configured Slack channels. The messages are written out in JSON format. See Usage / Additional Details for more information about how to configure this Processor and enable it to retrieve messages from Slack.")
@WritesAttributes({@WritesAttribute(attribute = "slack.channel.id", description = "The ID of the Slack Channel from which the messages were retrieved"), @WritesAttribute(attribute = "slack.message.count", description = "The number of slack messages that are included in the FlowFile"), @WritesAttribute(attribute = "mime.type", description = "Set to application/json, as the output will always be in JSON format")})
@DefaultSettings(yieldDuration = "3 sec")
@PrimaryNodeOnly
@Stateful(scopes = {Scope.CLUSTER}, description = "Maintains a mapping of Slack Channel IDs to the timestamp of the last message that was retrieved for that channel. This allows the processor to only retrieve messages that have been posted since the last time the processor was run. This state is stored in the cluster so that if the Primary Node changes, the new node will pick up where the previous node left off.")
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SeeAlso({ListenSlack.class})
@Tags({"slack", "conversation", "conversation.history", "social media", "team", "text", "unstructured"})
/* loaded from: input_file:org/apache/nifi/processors/slack/ConsumeSlack.class */
public class ConsumeSlack extends AbstractProcessor implements VerifiableProcessor {
    static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder().name("Access Token").description("OAuth Access Token used for authenticating/authorizing the Slack request sent by NiFi. This may be either a User Token or a Bot Token. It must be granted the channels:history, groups:history, im:history, or mpim:history scope, depending on the type of conversation being used.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(true).sensitive(true).build();
    static final PropertyDescriptor CHANNEL_IDS = new PropertyDescriptor.Builder().name("Channels").description("A comma-separated list of Slack Channels to Retrieve Messages From. Each element in the list may be either a Channel ID, such as C0L9VCD47, or (for public channels only) the name of a channel, prefixed with a # sign, such as #general. If any channel name is provided instead,instead of an ID, the Access Token provided must be granted the channels:read scope in order to resolve the Channel ID. See the Processor's Additional Details for information on how to find a Channel ID.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor REPLY_MONITOR_WINDOW = new PropertyDescriptor.Builder().name("Reply Monitor Window").description("After consuming all messages in a given channel, this Processor will periodically poll all \"threaded messages\", aka Replies, whose timestamp is between now and this amount of time in the past in order to check for any new replies. Setting this value to a larger value may result in additional resource use and may result in Rate Limiting. However, if a user replies to an old thread that was started outside of this window, the reply may not be captured.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("7 days").build();
    static final PropertyDescriptor REPLY_MONITOR_FREQUENCY = new PropertyDescriptor.Builder().name("Reply Monitor Frequency").description("After consuming all messages in a given channel, this Processor will periodically poll all \"threaded messages\", aka Replies, whose timestamp falls between now and the amount of time specified by the <Reply Monitor Window> property. This property determines how frequently those messages are polled. Setting the value to a shorter duration may result in replies to messages being captured more quickly, providing a lower latency. However, it will also result in additional resource use and could trigger Rate Limiting to occur.").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("5 mins").build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The maximum number of messages to retrieve in a single request to Slack. The entire response will be parsed into memory, so it is important that this be kept in mind when setting this value.").required(true).addValidator(StandardValidators.createLongValidator(0, 1000, false)).defaultValue("100").build();
    static final PropertyDescriptor RESOLVE_USERNAMES = new PropertyDescriptor.Builder().name("Resolve Usernames").description("Specifies whether or not User IDs should be resolved to usernames. By default, Slack Messages provide the ID of the user that sends a message, such as U0123456789, but not the username, such as NiFiUser. The username may be resolved, but it may require additional calls to the Slack API and requires that the Token used be granted the users:read scope. If set to true, usernames will be resolved with a best-effort policy: if a username cannot be obtained, it will be skipped over. Also, note that when a username is obtained, the Message's <username> field is populated, and the <text> field is updated such that any mention will be output such as \"Hi @user\" instead of \"Hi <@U1234567>\".").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    static final PropertyDescriptor INCLUDE_MESSAGE_BLOCKS = new PropertyDescriptor.Builder().name("Include Message Blocks").description("Specifies whether or not the output JSON should include the value of the 'blocks' field for each Slack Message. This field includes information such as individual parts of a message that are formatted using rich text. This may be useful, for instance, for parsing. However, it often accounts for a significant portion of the data and as such may be set to null when it is not useful to you.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor INCLUDE_NULL_FIELDS = new PropertyDescriptor.Builder().name("Include Null Fields").description("Specifies whether or not fields that have null values should be included in the output JSON. If true, any field in a Slack Message that has a null value will be included in the JSON with a value of null. If false, the key omitted from the output JSON entirely. Omitting null values results in smaller messages that are generally more efficient to process, but including the values may provide a better understanding of the format, especially for schema inference.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Slack messages that are successfully received will be routed to this relationship").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(CHANNEL_IDS, ACCESS_TOKEN, REPLY_MONITOR_WINDOW, REPLY_MONITOR_FREQUENCY, BATCH_SIZE, RESOLVE_USERNAMES, INCLUDE_MESSAGE_BLOCKS, INCLUDE_NULL_FIELDS);
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private RateLimit rateLimit;
    private final Queue<ConsumeChannel> channels = new LinkedBlockingQueue();
    private volatile App slackApp;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/slack/ConsumeSlack$DelegatingSlackClient.class */
    public static class DelegatingSlackClient implements ConsumeSlackClient {
        private final MethodsClient delegate;

        public DelegatingSlackClient(MethodsClient methodsClient) {
            this.delegate = methodsClient;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeSlackClient
        public ConversationsHistoryResponse fetchConversationsHistory(ConversationsHistoryRequest conversationsHistoryRequest) throws SlackApiException, IOException {
            return this.delegate.conversationsHistory(conversationsHistoryRequest);
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeSlackClient
        public ConversationsRepliesResponse fetchConversationsReplies(ConversationsRepliesRequest conversationsRepliesRequest) throws SlackApiException, IOException {
            return this.delegate.conversationsReplies(conversationsRepliesRequest);
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeSlackClient
        public UsersInfoResponse fetchUsername(String str) throws SlackApiException, IOException {
            return this.delegate.usersInfo(UsersInfoRequest.builder().user(str).build());
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeSlackClient
        public Map<String, String> fetchChannelIds() throws SlackApiException, IOException {
            HashMap hashMap = new HashMap();
            String str = null;
            do {
                ConversationsListResponse conversationsList = this.delegate.conversationsList(ConversationsListRequest.builder().cursor(str).limit(1000).build());
                if (!conversationsList.isOk()) {
                    throw new RuntimeException("Failed to determine Channel IDs: " + SlackResponseUtil.getErrorMessage(conversationsList.getError(), conversationsList.getNeeded(), conversationsList.getProvided(), conversationsList.getWarning()));
                }
                conversationsList.getChannels().forEach(conversation -> {
                    hashMap.put(conversation.getName(), conversation.getId());
                });
                str = conversationsList.getResponseMetadata().getNextCursor();
            } while (!StringUtils.isEmpty(str));
            return hashMap;
        }

        @Override // org.apache.nifi.processors.slack.consume.ConsumeSlackClient
        public String fetchChannelName(String str) throws SlackApiException, IOException {
            ConversationsInfoResponse conversationsInfo = this.delegate.conversationsInfo(ConversationsInfoRequest.builder().channel(str).build());
            if (conversationsInfo.isOk()) {
                return conversationsInfo.getChannel().getName();
            }
            throw new RuntimeException(String.format("Failed to determine Channel name from ID [%s]: %s", str, SlackResponseUtil.getErrorMessage(conversationsInfo.getError(), conversationsInfo.getNeeded(), conversationsInfo.getProvided(), conversationsInfo.getWarning())));
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public boolean isStateful(ProcessContext processContext) {
        return true;
    }

    @OnScheduled
    public void setup(ProcessContext processContext) throws IOException, SlackApiException {
        this.rateLimit = new RateLimit(getLogger());
        this.slackApp = createSlackApp(processContext);
        this.channels.addAll(createChannels(processContext, this.slackApp));
    }

    @OnStopped
    public void shutdown() {
        this.channels.clear();
        if (this.slackApp != null) {
            this.slackApp.stop();
            this.slackApp = null;
        }
        this.rateLimit = null;
    }

    public RateLimit getRateLimit() {
        return this.rateLimit;
    }

    private App createSlackApp(ProcessContext processContext) {
        return new App(AppConfig.builder().singleTeamBotToken(processContext.getProperty(ACCESS_TOKEN).getValue()).build());
    }

    private List<ConsumeChannel> createChannels(ProcessContext processContext, App app) throws SlackApiException, IOException {
        String orElse;
        ObjectMapper objectMapper = new ObjectMapper();
        if (processContext.getProperty(INCLUDE_NULL_FIELDS).asBoolean().booleanValue()) {
            objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
        } else {
            objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        }
        ConsumeSlackClient initializeClient = initializeClient(app);
        ArrayList<String> arrayList = new ArrayList();
        Stream filter = Arrays.stream(processContext.getProperty(CHANNEL_IDS).getValue().split(",")).map((v0) -> {
            return v0.trim();
        }).filter(str -> {
            return !str.isEmpty();
        });
        Objects.requireNonNull(arrayList);
        filter.forEach((v1) -> {
            r1.add(v1);
        });
        HashMap hashMap = new HashMap();
        if (channelIdsProvidedOnly(arrayList)) {
            for (String str2 : arrayList) {
                String fetchChannelName = initializeClient.fetchChannelName(str2);
                getLogger().info("Resolved Channel ID {} to name {}", new Object[]{str2, fetchChannelName});
                hashMap.put(str2, fetchChannelName);
            }
        } else {
            Map<String, String> fetchChannelIds = initializeClient.fetchChannelIds();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                String replace = ((String) it.next()).replace("#", "");
                String str3 = fetchChannelIds.get(replace);
                if (str3 != null) {
                    orElse = replace;
                    getLogger().info("Resolved Channel {} to ID {}", new Object[]{orElse, str3});
                } else {
                    str3 = replace;
                    orElse = fetchChannelIds.keySet().stream().filter(str4 -> {
                        return replace.equals(fetchChannelIds.get(str4));
                    }).findFirst().orElse("");
                    getLogger().info("Resolved Channel ID {} to name {}", new Object[]{str3, orElse});
                }
                hashMap.put(str3, orElse);
            }
        }
        UsernameLookup usernameLookup = new UsernameLookup(initializeClient, getLogger());
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry entry : hashMap.entrySet()) {
            arrayList2.add(new ConsumeChannel.Builder().channelId((String) entry.getKey()).channelName((String) entry.getValue()).batchSize(processContext.getProperty(BATCH_SIZE).asInteger().intValue()).client(initializeClient).includeMessageBlocks(processContext.getProperty(INCLUDE_MESSAGE_BLOCKS).asBoolean().booleanValue()).logger(getLogger()).replyMonitorFrequency(processContext.getProperty(REPLY_MONITOR_FREQUENCY).asTimePeriod(TimeUnit.MILLISECONDS).longValue(), TimeUnit.MILLISECONDS).replyMonitorWindow(processContext.getProperty(REPLY_MONITOR_WINDOW).asTimePeriod(TimeUnit.MILLISECONDS).longValue(), TimeUnit.MILLISECONDS).resolveUsernames(processContext.getProperty(RESOLVE_USERNAMES).asBoolean().booleanValue()).successRelationship(REL_SUCCESS).usernameLookup(usernameLookup).objectMapper(objectMapper).build());
        }
        return arrayList2;
    }

    protected ConsumeSlackClient initializeClient(App app) {
        app.start();
        return new DelegatingSlackClient(app.client());
    }

    private ConsumeChannel getChannel() {
        ArrayList arrayList = new ArrayList();
        while (!this.channels.isEmpty()) {
            try {
                ConsumeChannel poll = this.channels.poll();
                if (poll == null) {
                    return null;
                }
                if (!poll.isYielded()) {
                    this.channels.addAll(arrayList);
                    return poll;
                }
                arrayList.add(poll);
            } finally {
                this.channels.addAll(arrayList);
            }
        }
        this.channels.addAll(arrayList);
        return null;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (this.rateLimit.isLimitReached()) {
            getLogger().debug("Will not consume from Slack because rate limit has been reached");
            processContext.yield();
            return;
        }
        ConsumeChannel channel = getChannel();
        try {
            if (channel == null) {
                getLogger().debug("All Slack Channels are currently yielded; will yield context and return");
                processContext.yield();
                return;
            }
            try {
                channel.consume(processContext, processSession);
                this.channels.offer(channel);
            } catch (Exception e) {
                processSession.rollback();
                yieldOnException(e, channel.getChannelId(), processContext);
                this.channels.offer(channel);
            }
        } catch (Throwable th) {
            this.channels.offer(channel);
            throw th;
        }
    }

    private static boolean channelIdsProvidedOnly(List<String> list) {
        return list.stream().noneMatch(str -> {
            return str.contains("#");
        });
    }

    private void yieldOnException(Throwable th, String str, ProcessContext processContext) {
        if (SlackResponseUtil.isRateLimited(th)) {
            getLogger().warn("Slack indicated that the Rate Limit has been exceeded when attempting to retrieve messages for channel {}", new Object[]{str});
        } else {
            getLogger().error("Failed to retrieve messages for channel {}", new Object[]{str, th});
        }
        this.rateLimit.retryAfter(Duration.ofSeconds(SlackResponseUtil.getRetryAfterSeconds(th)));
        processContext.yield();
    }

    public List<ConfigVerificationResult> verify(ProcessContext processContext, ComponentLog componentLog, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        try {
            List<ConsumeChannel> createChannels = createChannels(processContext, createSlackApp(processContext));
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Channel IDs").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).build());
            Iterator<ConsumeChannel> it = createChannels.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().verify());
            }
            return arrayList;
        } catch (Exception e) {
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Channel IDs").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(e.toString()).build());
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Check authorizations").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Skipped because appropriate Channel IDs could not be determined").build());
            return arrayList;
        }
    }
}
