package org.apache.nifi.processors.box;

import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxEvent;
import com.box.sdk.EnterpriseEventsStreamRequest;
import com.box.sdk.EventLog;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.box.controllerservices.BoxClientService;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Consumes Enterprise Events from Box admin_logs_streaming Stream Type.\nThe content of the events is sent to the 'success' relationship as a JSON array.\nThe last known position of the Box stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n")
@PrimaryNodeOnly
@Stateful(description = "The last known position of the Box Event stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n", scopes = {Scope.CLUSTER})
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"box", "storage"})
@SeeAlso({ConsumeBoxEvents.class, FetchBoxFile.class, ListBoxFile.class})
/* loaded from: input_file:org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.class */
public class ConsumeBoxEnterpriseEvents extends AbstractProcessor {
    private static final String POSITION_KEY = "position";
    private static final String EARLIEST_POSITION = "0";
    private static final String LATEST_POSITION = "now";
    private static final int LIMIT = 500;
    static final PropertyDescriptor EVENT_TYPES = new PropertyDescriptor.Builder().name("Event Types").description("A comma separated list of Enterprise Events to consume. If not set, all Events are consumed.See Additional Details for more information.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor START_EVENT_POSITION = new PropertyDescriptor.Builder().name("Start Event Position").description("What position to consume the Events from.").required(true).allowableValues(StartEventPosition.class).defaultValue(StartEventPosition.EARLIEST).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    static final PropertyDescriptor START_OFFSET = new PropertyDescriptor.Builder().name("Start Offset").description("The offset to start consuming the Events from.").required(true).dependsOn(START_EVENT_POSITION, StartEventPosition.OFFSET, new DescribedValue[0]).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(BoxClientService.BOX_CLIENT_SERVICE, EVENT_TYPES, START_EVENT_POSITION, START_OFFSET);
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Events received successfully will be sent out this relationship.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private volatile BoxAPIConnection boxAPIConnection;
    private volatile String[] eventTypes;
    private volatile String streamPosition;

    /* loaded from: input_file:org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents$StartEventPosition.class */
    public enum StartEventPosition implements DescribedValue {
        EARLIEST("earliest", "Start consuming events from the earliest available Event."),
        LATEST("latest", "Start consuming events from the latest Event."),
        OFFSET("offset", "Start consuming events from the specified offset.");

        private final String value;
        private final String description;

        StartEventPosition(String str, String str2) {
            this.value = str;
            this.description = str2;
        }

        public String getValue() {
            return this.value;
        }

        public String getDisplayName() {
            return this.value;
        }

        public String getDescription() {
            return this.description;
        }
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onEnabled(ProcessContext processContext) {
        this.boxAPIConnection = processContext.getProperty(BoxClientService.BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class).getBoxApiConnection();
        this.eventTypes = processContext.getProperty(EVENT_TYPES).isSet() ? processContext.getProperty(EVENT_TYPES).getValue().split(",") : new String[0];
        this.streamPosition = calculateStreamPosition(processContext);
    }

    private String calculateStreamPosition(ProcessContext processContext) {
        return readStreamPosition(processContext).orElseGet(() -> {
            return initializeStartEventPosition(processContext);
        });
    }

    private Optional<String> readStreamPosition(ProcessContext processContext) {
        try {
            return Optional.ofNullable(processContext.getStateManager().getState(Scope.CLUSTER).get(POSITION_KEY));
        } catch (IOException e) {
            throw new ProcessException("Could not retrieve saved event position", e);
        }
    }

    private void writeStreamPosition(String str, ProcessSession processSession) {
        try {
            processSession.setState(Map.of(POSITION_KEY, str), Scope.CLUSTER);
        } catch (IOException e) {
            throw new ProcessException("Could not save event position", e);
        }
    }

    private String initializeStartEventPosition(ProcessContext processContext) {
        switch ((StartEventPosition) processContext.getProperty(START_EVENT_POSITION).asAllowableValue(StartEventPosition.class)) {
            case EARLIEST:
                return EARLIEST_POSITION;
            case LATEST:
                return retrieveLatestStreamPosition();
            case OFFSET:
                return processContext.getProperty(START_OFFSET).getValue();
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    private String retrieveLatestStreamPosition() {
        return getEventLog(LATEST_POSITION).getNextStreamPosition();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        while (true) {
            EventLog eventLog = getEventLog(this.streamPosition);
            this.streamPosition = eventLog.getNextStreamPosition();
            writeStreamPosition(this.streamPosition, processSession);
            if (eventLog.getSize() == 0) {
                return;
            } else {
                writeLogAsRecords(eventLog, processSession);
            }
        }
    }

    EventLog getEventLog(String str) {
        return EventLog.getEnterpriseEventsStream(this.boxAPIConnection, new EnterpriseEventsStreamRequest().limit(LIMIT).position(str).typeNames(this.eventTypes));
    }

    private void writeLogAsRecords(EventLog eventLog, ProcessSession processSession) {
        FlowFile create = processSession.create();
        try {
            OutputStream write = processSession.write(create);
            try {
                BoxEventJsonArrayWriter create2 = BoxEventJsonArrayWriter.create(write);
                try {
                    Iterator it = eventLog.iterator();
                    while (it.hasNext()) {
                        create2.write((BoxEvent) it.next());
                    }
                    if (create2 != null) {
                        create2.close();
                    }
                    if (write != null) {
                        write.close();
                    }
                    processSession.putAttribute(create, "record.count", String.valueOf(eventLog.getSize()));
                    processSession.putAttribute(create, CoreAttributes.MIME_TYPE.key(), "application/json");
                    processSession.transfer(create, REL_SUCCESS);
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e) {
            throw new ProcessException("Failed to write Box Event into a FlowFile", e);
        }
    }
}
