package org.apache.nifi.processors.slack;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.slack.api.app_backend.events.payload.EventsApiPayload;
import com.slack.api.app_backend.slash_commands.payload.SlashCommandPayload;
import com.slack.api.bolt.App;
import com.slack.api.bolt.AppConfig;
import com.slack.api.bolt.context.builtin.EventContext;
import com.slack.api.bolt.context.builtin.SlashCommandContext;
import com.slack.api.bolt.request.builtin.SlashCommandRequest;
import com.slack.api.bolt.response.Response;
import com.slack.api.bolt.socket_mode.SocketModeApp;
import com.slack.api.model.User;
import com.slack.api.model.event.AppMentionEvent;
import com.slack.api.model.event.Event;
import com.slack.api.model.event.FileChangeEvent;
import com.slack.api.model.event.FileCreatedEvent;
import com.slack.api.model.event.FileDeletedEvent;
import com.slack.api.model.event.FilePublicEvent;
import com.slack.api.model.event.FileSharedEvent;
import com.slack.api.model.event.FileUnsharedEvent;
import com.slack.api.model.event.MemberJoinedChannelEvent;
import com.slack.api.model.event.MessageChangedEvent;
import com.slack.api.model.event.MessageChannelJoinEvent;
import com.slack.api.model.event.MessageDeletedEvent;
import com.slack.api.model.event.MessageEvent;
import com.slack.api.model.event.MessageFileShareEvent;
import com.slack.api.model.event.ReactionAddedEvent;
import com.slack.api.model.event.ReactionRemovedEvent;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.slack.consume.UserDetailsLookup;

@CapabilityDescription("Retrieves real-time messages or Slack commands from one or more Slack conversations. The messages are written out in JSON format. Note that this Processor should be used to obtain real-time messages and commands from Slack and does not provide a mechanism for obtaining historical messages. The ConsumeSlack Processor should be used for an initial load of messages from a channel. See Usage / Additional Details for more information about how to configure this Processor and enable it to retrieve messages and commands from Slack.")
@DefaultSettings(yieldDuration = "250 millis")
@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "Set to application/json, as the output will always be in JSON format"), @WritesAttribute(attribute = "slack.event.type", description = "Set to the type of Slack event that occurred")})
@PrimaryNodeOnly
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@SeeAlso({ConsumeSlack.class})
@Tags({"slack", "real-time", "event", "message", "command", "listen", "receive", "social media", "team", "text", "unstructured"})
/* loaded from: input_file:org/apache/nifi/processors/slack/ListenSlack.class */
public class ListenSlack extends AbstractProcessor {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    static final AllowableValue RECEIVE_MESSAGE_EVENTS;
    static final AllowableValue RECEIVE_MENTION_EVENTS;
    static final AllowableValue RECEIVE_COMMANDS;
    static final AllowableValue RECEIVE_JOINED_CHANNEL_EVENTS;
    static PropertyDescriptor APP_TOKEN;
    static PropertyDescriptor BOT_TOKEN;
    static final PropertyDescriptor EVENT_TYPE;
    static final PropertyDescriptor RESOLVE_USER_DETAILS;
    static Relationship REL_SUCCESS;
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
    private static final Set<Relationship> RELATIONSHIPS;
    private final TransferQueue<EventWrapper> eventTransferQueue = new LinkedTransferQueue();
    private volatile SocketModeApp socketModeApp;
    private volatile UserDetailsLookup userDetailsLookup;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/slack/ListenSlack$EventWrapper.class */
    public static class EventWrapper {
        private final Object event;
        private final CountDownLatch countDownLatch;

        public EventWrapper(Object obj, CountDownLatch countDownLatch) {
            this.event = obj;
            this.countDownLatch = countDownLatch;
        }

        public Object getEvent() {
            return this.event;
        }

        public CountDownLatch getCountDownLatch() {
            return this.countDownLatch;
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void establishWebsocketEndpoint(ProcessContext processContext) throws Exception {
        String value = processContext.getProperty(APP_TOKEN).getValue();
        App app = new App(AppConfig.builder().singleTeamBotToken(processContext.getProperty(BOT_TOKEN).getValue()).build());
        if (processContext.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_MESSAGE_EVENTS.getValue())) {
            app.event(MessageEvent.class, this::handleEvent);
            app.event(MessageChangedEvent.class, this::handleEvent);
            app.event(MessageDeletedEvent.class, this::handleEvent);
            app.event(MessageFileShareEvent.class, this::handleEvent);
            app.event(FileSharedEvent.class, this::handleEvent);
            app.event(FileChangeEvent.class, this::handleEvent);
            app.event(FileCreatedEvent.class, this::handleEvent);
            app.event(FileDeletedEvent.class, this::handleEvent);
            app.event(FilePublicEvent.class, this::handleEvent);
            app.event(FileUnsharedEvent.class, this::handleEvent);
            app.event(ReactionAddedEvent.class, this::handleEvent);
            app.event(ReactionRemovedEvent.class, this::handleEvent);
        } else if (processContext.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_MENTION_EVENTS.getValue())) {
            app.event(AppMentionEvent.class, this::handleEvent);
            app.event(MessageEvent.class, (eventsApiPayload, eventContext) -> {
                return eventContext.ack();
            });
        } else if (processContext.getProperty(EVENT_TYPE).getValue().equals(RECEIVE_JOINED_CHANNEL_EVENTS.getValue())) {
            app.event(MemberJoinedChannelEvent.class, this::handleEvent);
            app.event(MessageChannelJoinEvent.class, this::handleEvent);
            app.event(MessageEvent.class, (eventsApiPayload2, eventContext2) -> {
                return eventContext2.ack();
            });
        } else {
            app.command(Pattern.compile(".*"), this::handleCommand);
        }
        this.userDetailsLookup = new UserDetailsLookup(str -> {
            return app.client().usersInfo(usersInfoRequestBuilder -> {
                return usersInfoRequestBuilder.user(str);
            });
        }, getLogger());
        this.socketModeApp = new SocketModeApp(value, app);
        this.socketModeApp.startAsync();
    }

    private Response handleEvent(EventsApiPayload<?> eventsApiPayload, EventContext eventContext) {
        Event event = eventsApiPayload.getEvent();
        Objects.requireNonNull(eventContext);
        return handleNotification(event, eventContext::ack);
    }

    private Response handleCommand(SlashCommandRequest slashCommandRequest, SlashCommandContext slashCommandContext) {
        SlashCommandPayload payload = slashCommandRequest.getPayload();
        Objects.requireNonNull(slashCommandContext);
        return handleNotification(payload, slashCommandContext::ack);
    }

    private Response handleNotification(Object obj, Supplier<Response> supplier) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.eventTransferQueue.add(new EventWrapper(obj, countDownLatch));
        try {
            return countDownLatch.await(5L, TimeUnit.SECONDS) ? supplier.get() : Response.error(503);
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while waiting for event to be processed", e);
        }
    }

    @OnStopped
    public void onStopped() throws Exception {
        this.socketModeApp.stop();
        this.socketModeApp.close();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        try {
            EventWrapper poll = this.eventTransferQueue.poll(1L, TimeUnit.MILLISECONDS);
            if (poll == null) {
                processContext.yield();
                return;
            }
            Object event = poll.getEvent();
            String simpleName = event.getClass().getSimpleName();
            FlowFile create = processSession.create();
            try {
                OutputStream write = processSession.write(create);
                try {
                    JsonGenerator createGenerator = OBJECT_MAPPER.createGenerator(write);
                    try {
                        if (processContext.getProperty(RESOLVE_USER_DETAILS).asBoolean().booleanValue()) {
                            ObjectNode readTree = OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(event));
                            if (readTree.hasNonNull("user")) {
                                User userDetails = this.userDetailsLookup.getUserDetails(readTree.get("user").asText());
                                if (userDetails != null) {
                                    readTree.set("userDetails", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(userDetails)));
                                }
                            }
                            createGenerator.writeTree(readTree);
                        } else {
                            createGenerator.writeObject(event);
                        }
                        if (createGenerator != null) {
                            createGenerator.close();
                        }
                        if (write != null) {
                            write.close();
                        }
                        FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(create, CoreAttributes.MIME_TYPE.key(), "application/json"), "slack.event.type", simpleName);
                        processSession.getProvenanceReporter().receive(putAttribute, this.socketModeApp.getClient().getWssUri().toString());
                        processSession.transfer(putAttribute, REL_SUCCESS);
                        processSession.commitAsync(() -> {
                            poll.getCountDownLatch().countDown();
                        });
                    } catch (Throwable th) {
                        if (createGenerator != null) {
                            try {
                                createGenerator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (IOException e) {
                getLogger().error("Failed to write out Slack message", e);
                processSession.remove(create);
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
    }

    static {
        OBJECT_MAPPER.registerModule(new JavaTimeModule());
        RECEIVE_MESSAGE_EVENTS = new AllowableValue("Receive Message Events", "Receive Message Events", "The Processor is to receive Slack Message Events");
        RECEIVE_MENTION_EVENTS = new AllowableValue("Receive App Mention Events", "Receive App Mention Events", "The Processor is to receive only slack messages that mention the bot user (App Mention Events)");
        RECEIVE_COMMANDS = new AllowableValue("Receive Commands", "Receive Commands", "The Processor is to receive Commands from Slack that are specific to your application. The Processor will not receive Message Events.");
        RECEIVE_JOINED_CHANNEL_EVENTS = new AllowableValue("Receive Joined Channel Events", "Receive Joined Channel Events", "The Processor is to receive only events when a member is joining a channel. The Processor will not receive Message Events.");
        APP_TOKEN = new PropertyDescriptor.Builder().name("App Token").description("The Application Token that is registered to your Slack application").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).sensitive(true).build();
        BOT_TOKEN = new PropertyDescriptor.Builder().name("Bot Token").description("The Bot Token that is registered to your Slack application").addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).sensitive(true).build();
        EVENT_TYPE = new PropertyDescriptor.Builder().name("Event Type to Receive").description("Specifies the type of Event that the Processor should respond to").required(true).defaultValue(RECEIVE_MENTION_EVENTS.getValue()).allowableValues(new DescribedValue[]{RECEIVE_MENTION_EVENTS, RECEIVE_MESSAGE_EVENTS, RECEIVE_COMMANDS, RECEIVE_JOINED_CHANNEL_EVENTS}).build();
        RESOLVE_USER_DETAILS = new PropertyDescriptor.Builder().name("Resolve User Details").description("Specifies whether the Processor should lookup details about the Slack User who sent the received message. If true, the output JSON will contain an additional field named 'userDetails'. The 'user' field will still contain the ID of the user. In order to enable this capability, the Bot Token must be granted the 'users:read' and optionally the 'users.profile:read' Bot Token Scope. If the rate limit is exceeded when retrieving this information, the received message will be rejected and must be re-delivered.").required(true).defaultValue("false").allowableValues(new String[]{"true", "false"}).dependsOn(EVENT_TYPE, new AllowableValue[]{RECEIVE_MESSAGE_EVENTS, RECEIVE_MENTION_EVENTS, RECEIVE_JOINED_CHANNEL_EVENTS}).build();
        REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are created will be sent to this Relationship.").build();
        PROPERTY_DESCRIPTORS = List.of(APP_TOKEN, BOT_TOKEN, EVENT_TYPE, RESOLVE_USER_DETAILS);
        RELATIONSHIPS = Set.of(REL_SUCCESS);
    }
}
