package org.apache.nifi.stateless.session;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.function.Consumer;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.stateless.engine.DataflowAbortedException;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.StandardStatelessFlow;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/stateless/session/StatelessProcessSession.class */
public class StatelessProcessSession extends StandardProcessSession {
    private static final Logger logger = LoggerFactory.getLogger(StatelessProcessSession.class);
    private final Connectable connectable;
    private final RepositoryContextFactory repositoryContextFactory;
    private final ProcessContextFactory processContextFactory;
    private final ProvenanceEventRepository provenanceEventRepository;
    private final ExecutionProgress executionProgress;
    private final AsynchronousCommitTracker tracker;
    private boolean requireSynchronousCommits;

    /* JADX WARN: Illegal instructions before constructor call */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public StatelessProcessSession(org.apache.nifi.connectable.Connectable r7, org.apache.nifi.stateless.repository.RepositoryContextFactory r8, org.apache.nifi.provenance.ProvenanceEventRepository r9, org.apache.nifi.stateless.engine.ProcessContextFactory r10, org.apache.nifi.stateless.engine.ExecutionProgress r11, boolean r12, org.apache.nifi.stateless.session.AsynchronousCommitTracker r13) {
        /*
            r6 = this;
            r0 = r6
            r1 = r8
            r2 = r7
            r3 = r9
            org.apache.nifi.controller.repository.RepositoryContext r1 = r1.createRepositoryContext(r2, r3)
            r2 = r11
            r3 = r2
            java.lang.Object r3 = java.util.Objects.requireNonNull(r3)
            void r2 = r2::isCanceled
            org.apache.nifi.controller.repository.metrics.NopPerformanceTracker r3 = new org.apache.nifi.controller.repository.metrics.NopPerformanceTracker
            r4 = r3
            r4.<init>()
            r0.<init>(r1, r2, r3)
            r0 = r6
            r1 = r7
            r0.connectable = r1
            r0 = r6
            r1 = r8
            r0.repositoryContextFactory = r1
            r0 = r6
            r1 = r9
            r0.provenanceEventRepository = r1
            r0 = r6
            r1 = r10
            r0.processContextFactory = r1
            r0 = r6
            r1 = r11
            r0.executionProgress = r1
            r0 = r6
            r1 = r12
            r0.requireSynchronousCommits = r1
            r0 = r6
            r1 = r13
            r0.tracker = r1
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.nifi.stateless.session.StatelessProcessSession.<init>(org.apache.nifi.connectable.Connectable, org.apache.nifi.stateless.repository.RepositoryContextFactory, org.apache.nifi.provenance.ProvenanceEventRepository, org.apache.nifi.stateless.engine.ProcessContextFactory, org.apache.nifi.stateless.engine.ExecutionProgress, boolean, org.apache.nifi.stateless.session.AsynchronousCommitTracker):void");
    }

    public void commitAsync() {
        if (this.requireSynchronousCommits) {
            super.commit();
        } else {
            super.commitAsync();
        }
    }

    public void commitAsync(Runnable runnable) {
        commitAsync(runnable, null);
    }

    public void commitAsync(Runnable runnable, Consumer<Throwable> consumer) {
        if (!this.requireSynchronousCommits) {
            super.commitAsync();
            this.tracker.addCallback(this.connectable, runnable, consumer, this);
            return;
        }
        try {
            super.commit();
            try {
                runnable.run();
            } catch (Exception e) {
                logger.error("Committed Process Session {} for {} but failed to trigger success callback", new Object[]{this, this.connectable, e});
            }
        } catch (Throwable th) {
            logger.error("Failed to commit Process Session {} for {}", new Object[]{this, this.connectable, th});
            consumer.accept(th);
        }
    }

    protected void commit(StandardProcessSession.Checkpoint checkpoint, boolean z) {
        assertProgressNotCanceled();
        this.requireSynchronousCommits = this.requireSynchronousCommits || !z;
        if (checkpoint.getFlowFilesIn() + checkpoint.getFlowFilesOut() + checkpoint.getFlowFilesRemoved() > 0) {
            this.tracker.recordProgress(checkpoint.getFlowFilesOut() + checkpoint.getFlowFilesRemoved(), checkpoint.getBytesOut() + checkpoint.getBytesRemoved());
        }
        super.commit(checkpoint, z);
        if (!this.requireSynchronousCommits) {
            queueFollowOnComponents();
            return;
        }
        long nanoTime = System.nanoTime();
        triggerFollowOnComponents();
        registerProcessEvent(this.connectable, -(System.nanoTime() - nanoTime));
        awaitAcknowledgment();
    }

    private void triggerFollowOnComponents() {
        if (this.executionProgress.isTerminalPort(this.connectable)) {
            return;
        }
        for (Connection connection : this.connectable.getConnections()) {
            while (!connection.getFlowFileQueue().isEmpty()) {
                Connectable destination = connection.getDestination();
                if (isFailurePortGuaranteed(destination)) {
                    throw new FailurePortEncounteredException("FlowFile was transferred to Port " + destination.getName() + ", which is marked as a Failure Port", destination.getName());
                }
                if (StandardStatelessFlow.isTerminalPort(destination)) {
                    break;
                } else {
                    triggerNext(destination);
                }
            }
        }
    }

    private void queueFollowOnComponents() {
        if (this.executionProgress.isTerminalPort(this.connectable)) {
            return;
        }
        for (Connection connection : this.connectable.getConnections()) {
            if (!connection.getFlowFileQueue().isEmpty()) {
                Connectable destination = connection.getDestination();
                if (isFailurePortGuaranteed(destination)) {
                    throw new FailurePortEncounteredException("FlowFile was transferred to Port " + destination.getName() + ", which is marked as a Failure Port", destination.getName());
                }
                if (!StandardStatelessFlow.isTerminalPort(destination)) {
                    this.tracker.addConnectable(destination);
                }
            }
        }
    }

    private boolean isFailurePortGuaranteed(Connectable connectable) {
        ConnectableType connectableType = connectable.getConnectableType();
        if (connectableType != ConnectableType.OUTPUT_PORT && connectableType != ConnectableType.FUNNEL) {
            return false;
        }
        if (this.executionProgress.isFailurePort(connectable.getName())) {
            return true;
        }
        Iterator it = connectable.getConnections().iterator();
        while (it.hasNext()) {
            if (isFailurePortGuaranteed(((Connection) it.next()).getDestination())) {
                return true;
            }
        }
        return false;
    }

    private void triggerNext(Connectable connectable) {
        assertProgressNotCanceled();
        ProcessContext createProcessContext = this.processContextFactory.createProcessContext(connectable);
        StatelessProcessSessionFactory statelessProcessSessionFactory = new StatelessProcessSessionFactory(connectable, this.repositoryContextFactory, this.provenanceEventRepository, this.processContextFactory, this.executionProgress, this.requireSynchronousCommits, new AsynchronousCommitTracker(this.tracker.getRootGroup()));
        logger.debug("Triggering {}", connectable);
        long nanoTime = System.nanoTime();
        try {
            connectable.onTrigger(createProcessContext, statelessProcessSessionFactory);
            registerProcessEvent(connectable, System.nanoTime() - nanoTime);
        } catch (Throwable th) {
            abortProcessing(th);
            throw th;
        }
    }

    private void assertProgressNotCanceled() {
        if (this.executionProgress.isCanceled()) {
            logger.info("Completed processing for {} but execution has been canceled. Will not commit session.", this.connectable);
            abortProcessing(null);
            throw new DataflowAbortedException();
        }
    }

    private void awaitAcknowledgment() {
        if (this.executionProgress.isDataQueued()) {
            logger.debug("Completed processing for {} but data is queued for processing so will allow Process Session to complete without waiting for acknowledgment", this.connectable);
            return;
        }
        logger.debug("Completed processing for {}; no data is queued for processing so will await acknowledgment of completion", this.connectable);
        try {
            if (this.executionProgress.awaitCompletionAction() == ExecutionProgress.CompletionAction.CANCEL) {
                logger.info("Dataflow completed but action was canceled instead of being acknowledged. Will roll back session.");
                abortProcessing(null);
                throw new DataflowAbortedException();
            }
        } catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for dataflow completion to be acknowledged. Will roll back session.");
            abortProcessing(e);
            throw new DataflowAbortedException();
        }
    }

    private void abortProcessing(Throwable th) {
        if (th == null) {
            this.executionProgress.notifyExecutionCanceled();
        } else {
            this.executionProgress.notifyExecutionFailed(th);
        }
        try {
            rollback(false, true);
        } finally {
            purgeFlowFiles();
        }
    }

    private void purgeFlowFiles() {
        Iterator it = getRootGroup().findAllConnections().iterator();
        while (it.hasNext()) {
            DrainableFlowFileQueue drainableFlowFileQueue = (DrainableFlowFileQueue) ((Connection) it.next()).getFlowFileQueue();
            ArrayList arrayList = new ArrayList(drainableFlowFileQueue.size().getObjectCount());
            drainableFlowFileQueue.drainTo(arrayList);
            Iterator<FlowFileRecord> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                getRepositoryContext().getContentRepository().decrementClaimantCount(it2.next().getContentClaim());
            }
        }
    }

    private ProcessGroup getRootGroup() {
        return getRootGroup(this.connectable.getProcessGroup());
    }

    private ProcessGroup getRootGroup(ProcessGroup processGroup) {
        ProcessGroup parent = processGroup.getParent();
        return parent == null ? processGroup : getRootGroup(parent);
    }

    private void registerProcessEvent(Connectable connectable, long j) {
        try {
            StandardFlowFileEvent standardFlowFileEvent = new StandardFlowFileEvent();
            standardFlowFileEvent.setProcessingNanos(j);
            standardFlowFileEvent.setInvocations(1);
            getRepositoryContext().getFlowFileEventRepository().updateRepository(standardFlowFileEvent, connectable.getIdentifier());
        } catch (IOException e) {
            logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate.", connectable.getRunnableComponent(), e);
        }
    }

    public String toString() {
        return "StatelessProcessSession[id=" + getSessionId() + "]";
    }
}
