package org.apache.nifi.processors.box;

import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxEvent;
import com.box.sdk.EventListener;
import com.box.sdk.EventStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.box.controllerservices.BoxClientService;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Consumes all events from Box. This processor can be used to capture events such as uploads, modifications, deletions, etc.\nThe content of the events is sent to the 'success' relationship as a JSON array. Events can be dropped in case of NiFi restart\nor if the queue capacity is exceeded. The last known position of the Box stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n")
@PrimaryNodeOnly
@Stateful(description = "The last known position of the Box stream is stored in the processor state and is used to\nresume the stream from the last known position when the processor is restarted.\n", scopes = {Scope.CLUSTER})
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"box", "storage"})
@SeeAlso({FetchBoxFile.class, PutBoxFile.class, ListBoxFile.class})
/* loaded from: input_file:org/apache/nifi/processors/box/ConsumeBoxEvents.class */
public class ConsumeBoxEvents extends AbstractProcessor implements VerifiableProcessor {
    private static final String POSITION_KEY = "position";
    public static final PropertyDescriptor QUEUE_CAPACITY = new PropertyDescriptor.Builder().name("Queue Capacity").description("The maximum size of the internal queue used to buffer events being transferred from the underlying stream to the processor.\nSetting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total\nmemory used by the processor during these surges.\n").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("10000").required(true).build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(BoxClientService.BOX_CLIENT_SERVICE, QUEUE_CAPACITY);
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Events received successfully will be sent out this relationship.").build();
    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
    private volatile BoxAPIConnection boxAPIConnection;
    private volatile EventStream eventStream;
    protected volatile LinkedBlockingQueue<BoxEvent> events;
    private volatile AtomicLong position = new AtomicLong(0);

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public final Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext processContext) {
        this.boxAPIConnection = processContext.getProperty(BoxClientService.BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class).getBoxApiConnection();
        try {
            String str = processContext.getStateManager().getState(Scope.CLUSTER).get(POSITION_KEY);
            if (str == null) {
                this.eventStream = new EventStream(this.boxAPIConnection);
            } else {
                this.eventStream = new EventStream(this.boxAPIConnection, Long.parseLong(str));
            }
            int intValue = processContext.getProperty(QUEUE_CAPACITY).asInteger().intValue();
            if (this.events == null) {
                this.events = new LinkedBlockingQueue<>(intValue);
            } else {
                LinkedBlockingQueue<BoxEvent> linkedBlockingQueue = new LinkedBlockingQueue<>(intValue);
                linkedBlockingQueue.addAll(this.events);
                this.events = linkedBlockingQueue;
            }
            this.eventStream.addListener(new EventListener() { // from class: org.apache.nifi.processors.box.ConsumeBoxEvents.1
                public void onEvent(BoxEvent boxEvent) {
                    try {
                        ConsumeBoxEvents.this.events.put(boxEvent);
                    } catch (InterruptedException e) {
                        throw new RuntimeException("Interrupted while trying to put the event into the queue", e);
                    }
                }

                public void onNextPosition(long j) {
                    try {
                        processContext.getStateManager().setState(Map.of(ConsumeBoxEvents.POSITION_KEY, String.valueOf(j)), Scope.CLUSTER);
                        ConsumeBoxEvents.this.position.set(j);
                    } catch (IOException e) {
                        ConsumeBoxEvents.this.getLogger().warn("Failed to save position {} in processor state", new Object[]{Long.valueOf(j), e});
                    }
                }

                public boolean onException(Throwable th) {
                    ConsumeBoxEvents.this.getLogger().warn("An error has been received from the stream. Last tracked position {}", new Object[]{Long.valueOf(ConsumeBoxEvents.this.position.get()), th});
                    return true;
                }
            });
            this.eventStream.start();
        } catch (Exception e) {
            throw new ProcessException("Could not retrieve last event position", e);
        }
    }

    @OnStopped
    public void stopped() {
        if (this.eventStream == null || !this.eventStream.isStarted()) {
            return;
        }
        this.eventStream.stop();
    }

    public List<ConfigVerificationResult> verify(ProcessContext processContext, ComponentLog componentLog, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        this.boxAPIConnection = processContext.getProperty(BoxClientService.BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class).getBoxApiConnection();
        try {
            this.boxAPIConnection.refresh();
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Box API Connection").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully validated Box connection").build());
        } catch (Exception e) {
            getLogger().warn("Failed to verify configuration", e);
            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Box API Connection").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to validate Box connection: %s", e.getMessage())).build());
        }
        return arrayList;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        if (this.events.isEmpty()) {
            processContext.yield();
            return;
        }
        FlowFile create = processSession.create();
        ArrayList arrayList = new ArrayList();
        int drainTo = this.events.drainTo(arrayList);
        try {
            OutputStream write = processSession.write(create);
            try {
                BoxEventJsonArrayWriter create2 = BoxEventJsonArrayWriter.create(write);
                try {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        create2.write((BoxEvent) it.next());
                    }
                    if (create2 != null) {
                        create2.close();
                    }
                    if (write != null) {
                        write.close();
                    }
                    processSession.putAttribute(create, "record.count", String.valueOf(drainTo));
                    processSession.putAttribute(create, CoreAttributes.MIME_TYPE.key(), "application/json");
                    processSession.transfer(create, REL_SUCCESS);
                } catch (Throwable th) {
                    if (create2 != null) {
                        try {
                            create2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e) {
            getLogger().error("Failed to write events to FlowFile; will re-queue events and try again", e);
            LinkedBlockingQueue<BoxEvent> linkedBlockingQueue = this.events;
            Objects.requireNonNull(linkedBlockingQueue);
            arrayList.forEach((v1) -> {
                r1.offer(v1);
            });
            processSession.remove(create);
            processContext.yield();
        }
    }
}
