package io.camunda.zeebe.stream.impl;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDbTransaction;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.log.WriteContext;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.retry.AbortableRetryStrategy;
import io.camunda.zeebe.scheduler.retry.RecoverableRetryStrategy;
import io.camunda.zeebe.scheduler.retry.RetryStrategy;
import io.camunda.zeebe.stream.api.CommandResponseWriter;
import io.camunda.zeebe.stream.api.EmptyProcessingResult;
import io.camunda.zeebe.stream.api.EventFilter;
import io.camunda.zeebe.stream.api.ProcessingResponse;
import io.camunda.zeebe.stream.api.ProcessingResult;
import io.camunda.zeebe.stream.api.RecordProcessor;
import io.camunda.zeebe.stream.api.StreamClock;
import io.camunda.zeebe.stream.api.records.ExceededBatchRecordSizeException;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.stream.api.scheduling.ScheduledCommandCache;
import io.camunda.zeebe.stream.api.state.MutableLastProcessedPositionState;
import io.camunda.zeebe.stream.impl.metrics.ProcessingMetrics;
import io.camunda.zeebe.stream.impl.records.RecordBatchEntry;
import io.camunda.zeebe.stream.impl.records.RecordValues;
import io.camunda.zeebe.stream.impl.records.TypedRecordImpl;
import io.camunda.zeebe.stream.impl.records.UnwrittenRecord;
import io.camunda.zeebe.util.CloseableSilently;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.exception.RecoverableException;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/stream/impl/ProcessingStateMachine.class */
public final class ProcessingStateMachine {
    public static final String WARN_MESSAGE_BATCH_PROCESSING_RETRY = "Expected to process commands in a batch, but exceeded the resulting batch size after processing {} commands (maxCommandsInBatch: {}).";
    private static final String ERROR_MESSAGE_WRITE_RECORD_ABORTED = "Expected to write one or more follow-up records for record '{} {}' without errors, but exception was thrown.";
    private static final String ERROR_MESSAGE_ROLLBACK_ABORTED = "Expected to roll back the current transaction for record '{} {}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED = "Expected to execute side effects for record '{} {}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_UPDATE_STATE_FAILED = "Expected to successfully update state for record '{} {}', but caught an exception. Retry.";
    private static final String ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING = "Expected to process record '{} {}' successfully on stream processor, but caught recoverable exception. Retry processing.";
    private static final String ERROR_MESSAGE_PROCESSING_FAILED_UNRECOVERABLE = "Expected to process record '{} {}' successfully on stream processor, but caught unrecoverable exception.";
    private static final String NOTIFY_PROCESSED_LISTENER_ERROR_MESSAGE = "Expected to invoke processed listener for record {} successfully, but exception was thrown.";
    private static final String NOTIFY_SKIPPED_LISTENER_ERROR_MESSAGE = "Expected to invoke skipped listener for record '{} {}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_HANDLING_PROCESSING_ERROR_FAILED = "Expected to process command '{} {}' successfully on stream processor, but caught unexpected exception. Failed to handle the exception gracefully.";
    private final EventFilter processingFilter;
    private final MutableLastProcessedPositionState lastProcessedPositionState;
    private final ActorControl actor;
    private final LogStreamReader logStreamReader;
    private final TransactionContext transactionContext;
    private final RetryStrategy writeRetryStrategy;
    private final RetryStrategy sideEffectsRetryStrategy;
    private final RetryStrategy updateStateRetryStrategy;
    private final BooleanSupplier shouldProcessNext;
    private final BooleanSupplier abortCondition;
    private final RecordValues recordValues;
    private final TypedRecordImpl typedCommand;
    private final StreamProcessorListener streamProcessorListener;
    private LoggedEvent currentRecord;
    private ZeebeDbTransaction zeebeDbTransaction;
    private int onErrorRetries;
    private CloseableSilently processingTimer;
    private final StreamProcessorContext context;
    private final List<RecordProcessor> recordProcessors;
    private ProcessingResult currentProcessingResult;
    private List<LogAppendEntry> pendingWrites;
    private Collection<ProcessingResponse> pendingResponses;
    private RecordProcessor currentProcessor;
    private final LogStreamWriter logStreamWriter;
    private boolean inProcessing;
    private final int maxCommandsInBatch;
    private int processedCommandsCount;
    private final ProcessingMetrics processingMetrics;
    private final ScheduledCommandCache scheduledCommandCache;
    private final StreamClock.ControllableStreamClock clock;
    private static final Logger LOG = io.camunda.zeebe.logstreams.impl.Loggers.PROCESSOR_LOGGER;
    private static final Duration PROCESSING_RETRY_DELAY = Duration.ofMillis(250);
    private final EventFilter isEventOrRejection = new MetadataEventFilter(recordMetadata -> {
        RecordType recordType = recordMetadata.getRecordType();
        return recordType == RecordType.EVENT || recordType == RecordType.COMMAND_REJECTION;
    });
    private final RecordMetadata metadata = new RecordMetadata();
    private long writtenPosition = -1;
    private long lastSuccessfulProcessedRecordPosition = -1;
    private long lastWrittenPosition = -1;
    private boolean reachedEnd = true;
    private volatile ErrorHandlingPhase errorHandlingPhase = ErrorHandlingPhase.NO_ERROR;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/camunda/zeebe/stream/impl/ProcessingStateMachine$BatchProcessingStepResult.class */
    public static final class BatchProcessingStepResult extends Record {
        private final List<TypedRecord<?>> toProcess;
        private final List<LogAppendEntry> toWrite;

        private BatchProcessingStepResult(List<TypedRecord<?>> list, List<LogAppendEntry> list2) {
            this.toProcess = list;
            this.toWrite = list2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BatchProcessingStepResult.class), BatchProcessingStepResult.class, "toProcess;toWrite", "FIELD:Lio/camunda/zeebe/stream/impl/ProcessingStateMachine$BatchProcessingStepResult;->toProcess:Ljava/util/List;", "FIELD:Lio/camunda/zeebe/stream/impl/ProcessingStateMachine$BatchProcessingStepResult;->toWrite:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BatchProcessingStepResult.class), BatchProcessingStepResult.class, "toProcess;toWrite", "FIELD:Lio/camunda/zeebe/stream/impl/ProcessingStateMachine$BatchProcessingStepResult;->toProcess:Ljava/util/List;", "FIELD:Lio/camunda/zeebe/stream/impl/ProcessingStateMachine$BatchProcessingStepResult;->toWrite:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BatchProcessingStepResult.class, Object.class), BatchProcessingStepResult.class, "toProcess;toWrite", "FIELD:Lio/camunda/zeebe/stream/impl/ProcessingStateMachine$BatchProcessingStepResult;->toProcess:Ljava/util/List;", "FIELD:Lio/camunda/zeebe/stream/impl/ProcessingStateMachine$BatchProcessingStepResult;->toWrite:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public List<TypedRecord<?>> toProcess() {
            return this.toProcess;
        }

        public List<LogAppendEntry> toWrite() {
            return this.toWrite;
        }
    }

    /* loaded from: input_file:io/camunda/zeebe/stream/impl/ProcessingStateMachine$ErrorHandlingPhase.class */
    public enum ErrorHandlingPhase {
        NO_ERROR,
        USER_COMMAND_PROCESSING_FAILED,
        PROCESSING_FAILED,
        PROCESSING_ERROR_FAILED,
        USER_COMMAND_PROCESSING_ERROR_FAILED,
        USER_COMMAND_REJECT_FAILED,
        USER_COMMAND_REJECT_SIMPLE_REJECT_FAILED,
        ENDLESS_ERROR_LOOP
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/camunda/zeebe/stream/impl/ProcessingStateMachine$NextProcessingStep.class */
    public interface NextProcessingStep {
        void run() throws Exception;
    }

    public ProcessingStateMachine(StreamProcessorContext streamProcessorContext, BooleanSupplier booleanSupplier, List<RecordProcessor> list, ScheduledCommandCache scheduledCommandCache) {
        this.context = streamProcessorContext;
        this.recordProcessors = list;
        this.scheduledCommandCache = scheduledCommandCache;
        this.actor = streamProcessorContext.getActor();
        this.recordValues = streamProcessorContext.getRecordValues();
        this.logStreamReader = streamProcessorContext.getLogStreamReader();
        this.logStreamWriter = streamProcessorContext.getLogStreamWriter();
        this.transactionContext = streamProcessorContext.getTransactionContext();
        this.abortCondition = streamProcessorContext.getAbortCondition();
        this.lastProcessedPositionState = streamProcessorContext.getLastProcessedPositionState();
        this.maxCommandsInBatch = streamProcessorContext.getMaxCommandsInBatch();
        this.writeRetryStrategy = new AbortableRetryStrategy(this.actor);
        this.sideEffectsRetryStrategy = new AbortableRetryStrategy(this.actor);
        this.updateStateRetryStrategy = new RecoverableRetryStrategy(this.actor);
        this.shouldProcessNext = booleanSupplier;
        this.typedCommand = new TypedRecordImpl(streamProcessorContext.getLogStream().getPartitionId());
        this.streamProcessorListener = streamProcessorContext.getStreamProcessorListener();
        this.processingMetrics = new ProcessingMetrics(streamProcessorContext.getMeterRegistry());
        this.processingFilter = new MetadataEventFilter(recordMetadata -> {
            return recordMetadata.getRecordType() == RecordType.COMMAND;
        }).and(loggedEvent -> {
            return !loggedEvent.shouldSkipProcessing();
        }).and(streamProcessorContext.processingFilter());
        this.clock = streamProcessorContext.getClock();
    }

    private void skipRecord() {
        notifySkippedListener(this.currentRecord);
        markProcessingCompleted();
        this.actor.submit(this::tryToReadNextRecord);
        this.processingMetrics.eventSkipped();
    }

    void markProcessingCompleted() {
        this.inProcessing = false;
        if (this.onErrorRetries > 0) {
            this.onErrorRetries = 0;
            updateErrorHandlingPhase(ErrorHandlingPhase.NO_ERROR);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryToReadNextRecord() {
        boolean hasNext = this.logStreamReader.hasNext();
        if (this.currentRecord != null) {
            LoggedEvent loggedEvent = this.currentRecord;
            this.reachedEnd = this.isEventOrRejection.applies(loggedEvent) && !hasNext && this.lastWrittenPosition <= loggedEvent.getPosition();
        }
        if (this.shouldProcessNext.getAsBoolean() && hasNext && !this.inProcessing) {
            this.currentRecord = (LoggedEvent) this.logStreamReader.next();
            if (this.processingFilter.applies(this.currentRecord)) {
                processCommand(this.currentRecord);
            } else {
                skipRecord();
            }
        }
    }

    public boolean hasReachedEnd() {
        return this.reachedEnd;
    }

    private void processCommand(LoggedEvent loggedEvent) {
        this.inProcessing = true;
        this.currentProcessingResult = EmptyProcessingResult.INSTANCE;
        this.metadata.reset();
        loggedEvent.readMetadata(this.metadata);
        try {
            this.processingMetrics.processingLatency(loggedEvent.getTimestamp(), this.clock.millis());
            this.processingTimer = this.processingMetrics.startProcessingDurationTimer(this.metadata.getValueType(), this.metadata.getIntent());
            this.typedCommand.wrap(loggedEvent, this.metadata, this.recordValues.readRecordValue(loggedEvent, this.metadata.getValueType()));
            this.zeebeDbTransaction = this.transactionContext.getCurrentTransaction();
            CloseableSilently startBatchProcessingDurationTimer = this.processingMetrics.startBatchProcessingDurationTimer();
            try {
                this.zeebeDbTransaction.run(() -> {
                    batchProcessing(this.typedCommand);
                });
                this.processingMetrics.observeCommandCount(this.processedCommandsCount);
                if (startBatchProcessingDurationTimer != null) {
                    startBatchProcessingDurationTimer.close();
                }
                finalizeCommandProcessing();
                writeRecords();
            } catch (Throwable th) {
                if (startBatchProcessingDurationTimer != null) {
                    try {
                        startBatchProcessingDurationTimer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (UnrecoverableException e) {
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_UNRECOVERABLE, loggedEvent, this.metadata);
            throw e;
        } catch (RecoverableException e2) {
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING, new Object[]{loggedEvent, this.metadata, e2});
            this.actor.schedule(PROCESSING_RETRY_DELAY, () -> {
                processCommand(this.currentRecord);
            });
        } catch (ExceededBatchRecordSizeException e3) {
            if (this.processedCommandsCount <= 0) {
                onError(e3, () -> {
                    errorHandlingInTransaction(e3);
                    writeRecords();
                });
                return;
            }
            LOG.warn(WARN_MESSAGE_BATCH_PROCESSING_RETRY, new Object[]{Integer.valueOf(this.processedCommandsCount), Integer.valueOf(this.maxCommandsInBatch), e3});
            this.processingMetrics.countRetry();
            onError(e3, () -> {
                processCommand(loggedEvent);
            });
        } catch (Exception e4) {
            onError(e4, () -> {
                errorHandlingInTransaction(e4);
                writeRecords();
            });
        }
    }

    private void finalizeCommandProcessing() {
        this.lastProcessedPositionState.markAsProcessed(this.typedCommand.getPosition());
        this.processedCommandsCount = 0;
    }

    private void batchProcessing(TypedRecord<?> typedRecord) {
        LogStreamWriter logStreamWriter = this.logStreamWriter;
        Objects.requireNonNull(logStreamWriter);
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((v1, v2) -> {
            return r2.canWriteEvents(v1, v2);
        }, typedRecord.getOperationReference());
        int i = 0;
        int i2 = this.processedCommandsCount > 0 ? this.processedCommandsCount : this.maxCommandsInBatch;
        this.processedCommandsCount = 0;
        this.pendingWrites = new ArrayList();
        this.pendingResponses = Collections.newSetFromMap(new IdentityHashMap(2));
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.addLast(typedRecord);
        while (!arrayDeque.isEmpty() && this.processedCommandsCount < i2) {
            TypedRecord typedRecord2 = (TypedRecord) arrayDeque.removeFirst();
            this.currentProcessor = this.recordProcessors.stream().filter(recordProcessor -> {
                return recordProcessor.accepts(typedRecord2.getValueType());
            }).findFirst().orElseThrow(() -> {
                return NoSuchProcessorException.forRecord(typedRecord2);
            });
            this.currentProcessingResult = this.currentProcessor.process(typedRecord2, bufferedProcessingResultBuilder);
            BatchProcessingStepResult collectBatchProcessingStepResult = collectBatchProcessingStepResult(this.currentProcessingResult, i, arrayDeque.size() + this.processedCommandsCount + 1, i2);
            arrayDeque.addAll(collectBatchProcessingStepResult.toProcess());
            this.pendingWrites.addAll(collectBatchProcessingStepResult.toWrite());
            Optional<ProcessingResponse> processingResponse = this.currentProcessingResult.getProcessingResponse();
            Collection<ProcessingResponse> collection = this.pendingResponses;
            Objects.requireNonNull(collection);
            processingResponse.ifPresent((v1) -> {
                r1.add(v1);
            });
            i = this.currentProcessingResult.getRecordBatch().entries().size();
            this.processedCommandsCount++;
            this.processingMetrics.commandsProcessed();
        }
    }

    private BatchProcessingStepResult collectBatchProcessingStepResult(ProcessingResult processingResult, int i, int i2, int i3) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        processingResult.getRecordBatch().entries().stream().skip(i).forEachOrdered(logAppendEntry -> {
            LogAppendEntry logAppendEntry = logAppendEntry;
            int size = i2 + arrayList.size();
            if (logAppendEntry.recordMetadata().getRecordType() == RecordType.COMMAND && size < i3) {
                arrayList.add(new UnwrittenRecord(logAppendEntry.key(), this.context.getPartitionId(), logAppendEntry.recordValue(), logAppendEntry.recordMetadata()));
                logAppendEntry = LogAppendEntry.ofProcessed(logAppendEntry);
            }
            arrayList2.add(logAppendEntry);
        });
        return new BatchProcessingStepResult(arrayList, arrayList2);
    }

    private void onError(Throwable th, NextProcessingStep nextProcessingStep) {
        this.onErrorRetries++;
        switchErrorPhase();
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.rollback();
            return true;
        }, this.abortCondition), (bool, th2) -> {
            if (th2 != null) {
                LOG.error(ERROR_MESSAGE_ROLLBACK_ABORTED, new Object[]{this.currentRecord, this.metadata, th2});
            }
            try {
                if (tryExitOutOfErrorLoop(th)) {
                    return;
                }
                nextProcessingStep.run();
            } catch (Exception e) {
                onError(e, nextProcessingStep);
            }
        });
    }

    private boolean tryExitOutOfErrorLoop(Throwable th) {
        try {
            if (this.errorHandlingPhase == ErrorHandlingPhase.USER_COMMAND_PROCESSING_ERROR_FAILED) {
                LOG.debug(ERROR_MESSAGE_HANDLING_PROCESSING_ERROR_FAILED, new Object[]{this.currentRecord, this.metadata, th});
                tryRejectingIfUserCommand(th.getMessage());
                return true;
            }
            if (this.errorHandlingPhase != ErrorHandlingPhase.USER_COMMAND_REJECT_FAILED) {
                return false;
            }
            LOG.warn(ERROR_MESSAGE_HANDLING_PROCESSING_ERROR_FAILED, new Object[]{this.currentRecord, this.metadata, th});
            tryRejectingIfUserCommand(String.format("Expected to process command, but caught an exception. Check broker logs (partition %s) for details.", Integer.valueOf(this.context.getPartitionId())));
            return true;
        } catch (Exception e) {
            LOG.error("Expected to write rejection for command '{} {}', but failed with unexpected error.", new Object[]{this.currentRecord, this.metadata, e});
            this.pendingResponses.clear();
            this.pendingWrites.clear();
            return false;
        }
    }

    private void startErrorLoop(boolean z) {
        if (this.errorHandlingPhase == ErrorHandlingPhase.NO_ERROR) {
            updateErrorHandlingPhase(z ? ErrorHandlingPhase.USER_COMMAND_PROCESSING_FAILED : ErrorHandlingPhase.PROCESSING_FAILED);
        }
    }

    private void switchErrorPhase() {
        ErrorHandlingPhase errorHandlingPhase;
        switch (this.errorHandlingPhase) {
            case NO_ERROR:
                errorHandlingPhase = ErrorHandlingPhase.NO_ERROR;
                break;
            case USER_COMMAND_PROCESSING_FAILED:
                errorHandlingPhase = ErrorHandlingPhase.USER_COMMAND_PROCESSING_ERROR_FAILED;
                break;
            case PROCESSING_FAILED:
                errorHandlingPhase = ErrorHandlingPhase.PROCESSING_ERROR_FAILED;
                break;
            case PROCESSING_ERROR_FAILED:
            case USER_COMMAND_REJECT_SIMPLE_REJECT_FAILED:
                LOG.error("Failed to process command '{} {}' retries. Entering endless error loop.", this.currentRecord, this.metadata);
                errorHandlingPhase = ErrorHandlingPhase.ENDLESS_ERROR_LOOP;
                break;
            case USER_COMMAND_PROCESSING_ERROR_FAILED:
                errorHandlingPhase = ErrorHandlingPhase.USER_COMMAND_REJECT_FAILED;
                break;
            case USER_COMMAND_REJECT_FAILED:
                errorHandlingPhase = ErrorHandlingPhase.USER_COMMAND_REJECT_SIMPLE_REJECT_FAILED;
                break;
            case ENDLESS_ERROR_LOOP:
                errorHandlingPhase = ErrorHandlingPhase.ENDLESS_ERROR_LOOP;
                break;
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
        updateErrorHandlingPhase(errorHandlingPhase);
    }

    private void tryRejectingIfUserCommand(String str) {
        String str2 = str != null ? str : "";
        LogStreamWriter logStreamWriter = this.logStreamWriter;
        Objects.requireNonNull(logStreamWriter);
        BufferedProcessingResultBuilder bufferedProcessingResultBuilder = new BufferedProcessingResultBuilder((v1, v2) -> {
            return r2.canWriteEvents(v1, v2);
        }, this.typedCommand.getOperationReference());
        ErrorRecord errorRecord = new ErrorRecord();
        errorRecord.initErrorRecord(new CommandRejectionException(str2), this.currentRecord.getPosition());
        bufferedProcessingResultBuilder.appendRecord(this.currentRecord.getKey(), errorRecord, new RecordMetadata().recordType(RecordType.EVENT).valueType(ValueType.ERROR).intent(ErrorIntent.CREATED).recordVersion(1).rejectionType(RejectionType.NULL_VAL).rejectionReason("").operationReference(this.typedCommand.getOperationReference()));
        bufferedProcessingResultBuilder.withResponse(RecordType.COMMAND_REJECTION, this.typedCommand.getKey(), this.typedCommand.getIntent(), errorRecord, ValueType.ERROR, RejectionType.PROCESSING_ERROR, str2, this.typedCommand.getRequestId(), this.typedCommand.getRequestStreamId());
        this.currentProcessingResult = bufferedProcessingResultBuilder.build();
        this.pendingWrites = this.currentProcessingResult.getRecordBatch().entries();
        this.pendingResponses = this.currentProcessingResult.getProcessingResponse().stream().toList();
        finalizeCommandProcessing();
        writeRecords();
    }

    private void errorHandlingInTransaction(Throwable th) throws Exception {
        startErrorLoop(this.typedCommand.hasRequestMetadata());
        this.zeebeDbTransaction = this.transactionContext.getCurrentTransaction();
        this.zeebeDbTransaction.run(() -> {
            LogStreamWriter logStreamWriter = this.logStreamWriter;
            Objects.requireNonNull(logStreamWriter);
            this.currentProcessingResult = this.currentProcessor.onProcessingError(th, this.typedCommand, new BufferedProcessingResultBuilder((v1, v2) -> {
                return r2.canWriteEvents(v1, v2);
            }, this.typedCommand.getOperationReference()));
            this.pendingWrites = this.currentProcessingResult.getRecordBatch().entries();
            this.pendingResponses = this.currentProcessingResult.getProcessingResponse().stream().toList();
            finalizeCommandProcessing();
        });
    }

    private ActorFuture<Boolean> writeWithRetryAsync() {
        CompletableActorFuture completed;
        long position = this.typedCommand.getPosition();
        if (this.currentProcessingResult.isEmpty()) {
            notifySkippedListener(this.currentRecord);
            this.processingMetrics.eventSkipped();
            completed = CompletableActorFuture.completed(true);
        } else {
            completed = this.pendingWrites.isEmpty() ? CompletableActorFuture.completed(true) : this.writeRetryStrategy.runWithRetry(() -> {
                Either tryWrite = this.logStreamWriter.tryWrite(WriteContext.processingResult(), this.pendingWrites, position);
                if (!tryWrite.isRight()) {
                    return false;
                }
                this.writtenPosition = ((Long) tryWrite.get()).longValue();
                return true;
            }, this.abortCondition);
        }
        return completed;
    }

    private void writeRecords() {
        this.actor.runOnCompletion(writeWithRetryAsync(), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_WRITE_RECORD_ABORTED, new Object[]{this.currentRecord, this.metadata, th});
                onError(th, () -> {
                    errorHandlingInTransaction(th);
                    writeRecords();
                });
            } else {
                this.processingMetrics.recordsWritten(this.writtenPosition - this.lastWrittenPosition);
                updateState();
            }
        });
    }

    private void updateState() {
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.commit();
            this.lastSuccessfulProcessedRecordPosition = this.currentRecord.getPosition();
            this.processingMetrics.setLastProcessedPosition(this.lastSuccessfulProcessedRecordPosition);
            this.lastWrittenPosition = this.writtenPosition;
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_UPDATE_STATE_FAILED, new Object[]{this.currentRecord, this.metadata, th});
                onError(th, () -> {
                    errorHandlingInTransaction(th);
                    updateState();
                });
            } else {
                this.scheduledCommandCache.remove(this.metadata.getIntent(), this.currentRecord.getKey());
                executeSideEffects();
            }
        });
    }

    private void executeSideEffects() {
        this.actor.runOnCompletion(this.sideEffectsRetryStrategy.runWithRetry(() -> {
            for (ProcessingResponse processingResponse : this.pendingResponses) {
                CommandResponseWriter commandResponseWriter = this.context.getCommandResponseWriter();
                RecordBatchEntry responseValue = processingResponse.responseValue();
                RecordMetadata recordMetadata = responseValue.recordMetadata();
                commandResponseWriter.intent(recordMetadata.getIntent()).key(responseValue.key()).recordType(recordMetadata.getRecordType()).rejectionReason(BufferUtil.wrapString(recordMetadata.getRejectionReason())).rejectionType(recordMetadata.getRejectionType()).partitionId(this.context.getPartitionId()).valueType(recordMetadata.getValueType()).valueWriter(responseValue.recordValue()).tryWriteResponse(processingResponse.requestStreamId(), processingResponse.requestId());
            }
            return executePostCommitTasks();
        }, this.abortCondition), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED, new Object[]{this.currentRecord, this.metadata, th});
            }
            notifyProcessedListener(this.typedCommand);
            this.processingTimer.close();
            markProcessingCompleted();
            this.actor.submit(this::tryToReadNextRecord);
        });
    }

    private boolean executePostCommitTasks() {
        CloseableSilently startBatchProcessingPostCommitTasksTimer = this.processingMetrics.startBatchProcessingPostCommitTasksTimer();
        try {
            boolean executePostCommitTasks = this.currentProcessingResult.executePostCommitTasks();
            if (startBatchProcessingPostCommitTasksTimer != null) {
                startBatchProcessingPostCommitTasksTimer.close();
            }
            return executePostCommitTasks;
        } catch (Throwable th) {
            if (startBatchProcessingPostCommitTasksTimer != null) {
                try {
                    startBatchProcessingPostCommitTasksTimer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void notifyProcessedListener(TypedRecord typedRecord) {
        try {
            this.streamProcessorListener.onProcessed(typedRecord);
        } catch (Exception e) {
            LOG.error(NOTIFY_PROCESSED_LISTENER_ERROR_MESSAGE, typedRecord, e);
        }
    }

    private void notifySkippedListener(LoggedEvent loggedEvent) {
        try {
            this.streamProcessorListener.onSkipped(loggedEvent);
        } catch (Exception e) {
            LOG.error(NOTIFY_SKIPPED_LISTENER_ERROR_MESSAGE, new Object[]{loggedEvent, this.metadata, e});
        }
    }

    public long getLastSuccessfulProcessedRecordPosition() {
        return this.lastSuccessfulProcessedRecordPosition;
    }

    public long getLastWrittenPosition() {
        return this.lastWrittenPosition;
    }

    public boolean isMakingProgress() {
        return this.errorHandlingPhase != ErrorHandlingPhase.ENDLESS_ERROR_LOOP;
    }

    public void startProcessing(LastProcessingPositions lastProcessingPositions) {
        long lastProcessedPosition = lastProcessingPositions.getLastProcessedPosition();
        this.logStreamReader.seekToNextEvent(lastProcessedPosition);
        if (this.lastSuccessfulProcessedRecordPosition == -1) {
            this.lastSuccessfulProcessedRecordPosition = lastProcessedPosition;
        }
        if (this.lastWrittenPosition == -1) {
            this.lastWrittenPosition = lastProcessingPositions.getLastWrittenPosition();
        }
        this.actor.submit(this::tryToReadNextRecord);
    }

    private void updateErrorHandlingPhase(ErrorHandlingPhase errorHandlingPhase) {
        this.errorHandlingPhase = errorHandlingPhase;
        this.processingMetrics.errorHandlingPhase(errorHandlingPhase);
    }
}
