package io.camunda.zeebe.broker.transport.commandapi;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler;
import io.camunda.zeebe.broker.transport.ErrorResponseWriter;
import io.camunda.zeebe.broker.transport.backpressure.BackpressureMetrics;
import io.camunda.zeebe.broker.transport.backpressure.RequestLimiter;
import io.camunda.zeebe.logstreams.log.LogAppendEntry;
import io.camunda.zeebe.logstreams.log.LogStreamWriter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.ExecuteCommandRequestDecoder;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.Either;
import io.micrometer.core.instrument.MeterRegistry;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/transport/commandapi/CommandApiRequestHandler.class */
final class CommandApiRequestHandler extends AsyncApiRequestHandler<CommandApiRequestReader, CommandApiResponseWriter> {
    private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    private final Int2ObjectHashMap<LogStreamWriter> leadingStreams;
    private final Int2ObjectHashMap<RequestLimiter<Intent>> partitionLimiters;
    private final BackpressureMetrics metrics;
    private boolean isDiskSpaceAvailable;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CommandApiRequestHandler(MeterRegistry meterRegistry) {
        super(CommandApiRequestReader::new, CommandApiResponseWriter::new);
        this.leadingStreams = new Int2ObjectHashMap<>();
        this.partitionLimiters = new Int2ObjectHashMap<>();
        this.isDiskSpaceAvailable = true;
        this.metrics = new BackpressureMetrics(meterRegistry);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.camunda.zeebe.broker.transport.AsyncApiRequestHandler
    public ActorFuture<Either<ErrorResponseWriter, CommandApiResponseWriter>> handleAsync(int i, long j, CommandApiRequestReader commandApiRequestReader, CommandApiResponseWriter commandApiResponseWriter, ErrorResponseWriter errorResponseWriter) {
        return CompletableActorFuture.completed(handle(i, j, commandApiRequestReader, commandApiResponseWriter, errorResponseWriter));
    }

    private Either<ErrorResponseWriter, CommandApiResponseWriter> handle(int i, long j, CommandApiRequestReader commandApiRequestReader, CommandApiResponseWriter commandApiResponseWriter, ErrorResponseWriter errorResponseWriter) {
        return handleExecuteCommandRequest(i, j, commandApiRequestReader, commandApiResponseWriter, errorResponseWriter);
    }

    private Either<ErrorResponseWriter, CommandApiResponseWriter> handleExecuteCommandRequest(int i, long j, CommandApiRequestReader commandApiRequestReader, CommandApiResponseWriter commandApiResponseWriter, ErrorResponseWriter errorResponseWriter) {
        if (!this.isDiskSpaceAvailable) {
            return Either.left(errorResponseWriter.outOfDiskSpace(i));
        }
        ExecuteCommandRequestDecoder messageDecoder = commandApiRequestReader.getMessageDecoder();
        LogStreamWriter logStreamWriter = (LogStreamWriter) this.leadingStreams.get(i);
        RequestLimiter requestLimiter = (RequestLimiter) this.partitionLimiters.get(i);
        ValueType valueType = messageDecoder.valueType();
        Intent fromProtocolValue = Intent.fromProtocolValue(valueType, messageDecoder.intent());
        UnifiedRecordValue value = commandApiRequestReader.value();
        RecordMetadata metadata = commandApiRequestReader.metadata();
        metadata.requestId(j);
        metadata.requestStreamId(i);
        metadata.recordType(RecordType.COMMAND);
        metadata.intent(fromProtocolValue);
        metadata.valueType(valueType);
        if (logStreamWriter == null) {
            errorResponseWriter.partitionLeaderMismatch(i);
            return Either.left(errorResponseWriter);
        }
        if (value == null) {
            errorResponseWriter.unsupportedMessage(valueType.name(), CommandApiRequestReader.RECORDS_BY_TYPE.keySet().toArray());
            return Either.left(errorResponseWriter);
        }
        this.metrics.receivedRequest(i);
        if (!requestLimiter.tryAcquire(i, j, fromProtocolValue)) {
            this.metrics.dropped(i);
            LOG.trace("Partition-{} receiving too many requests. Current limit {} inflight {}, dropping request {} from gateway", new Object[]{Integer.valueOf(i), Integer.valueOf(requestLimiter.getLimit()), Integer.valueOf(requestLimiter.getInflightCount()), Long.valueOf(j)});
            errorResponseWriter.resourceExhausted();
            return Either.left(errorResponseWriter);
        }
        try {
            return writeCommand(messageDecoder.key(), metadata, value, logStreamWriter, errorResponseWriter, i).map(bool -> {
                return commandApiResponseWriter;
            }).mapLeft(errorResponseWriter2 -> {
                requestLimiter.onIgnore(i, j);
                return errorResponseWriter;
            });
        } catch (Exception e) {
            requestLimiter.onIgnore(i, j);
            String formatted = "Failed to write client request to partition '%d', %s".formatted(Integer.valueOf(i), e);
            LOG.error(formatted);
            return Either.left(errorResponseWriter.internalError(formatted, new Object[0]));
        }
    }

    private Either<ErrorResponseWriter, Boolean> writeCommand(long j, RecordMetadata recordMetadata, UnifiedRecordValue unifiedRecordValue, LogStreamWriter logStreamWriter, ErrorResponseWriter errorResponseWriter, int i) {
        LogAppendEntry of = j != ExecuteCommandRequestDecoder.keyNullValue() ? LogAppendEntry.of(j, recordMetadata, unifiedRecordValue) : LogAppendEntry.of(recordMetadata, unifiedRecordValue);
        return logStreamWriter.canWriteEvents(1, of.getLength()) ? logStreamWriter.tryWrite(of).map(l -> {
            return true;
        }).mapLeft(writeFailure -> {
            return errorResponseWriter.mapWriteError(i, writeFailure);
        }) : Either.left(errorResponseWriter.errorCode(ErrorCode.MALFORMED_REQUEST).errorMessage("Request size is above configured maxMessageSize."));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addPartition(int i, LogStreamWriter logStreamWriter, RequestLimiter<Intent> requestLimiter) {
        this.actor.submit(() -> {
            this.leadingStreams.put(i, logStreamWriter);
            this.partitionLimiters.put(i, requestLimiter);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removePartition(int i) {
        this.actor.submit(() -> {
            this.leadingStreams.remove(i);
            this.partitionLimiters.remove(i);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onDiskSpaceNotAvailable() {
        this.actor.submit(() -> {
            this.isDiskSpaceAvailable = false;
            LOG.debug("Broker is out of disk space. All client requests will be rejected");
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onDiskSpaceAvailable() {
        this.actor.submit(() -> {
            this.isDiskSpaceAvailable = true;
        });
    }
}
