package io.camunda.zeebe.broker.exporter.stream;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.exporter.context.ExporterContext;
import io.camunda.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.exporter.api.context.ScheduledTask;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.ScheduledTimer;
import io.camunda.zeebe.stream.api.records.TypedRecord;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.jar.ThreadContextUtil;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import org.agrona.MutableDirectBuffer;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/camunda/zeebe/broker/exporter/stream/ExporterContainer.class */
public final class ExporterContainer implements Controller {
    private static final Logger LOG = Loggers.EXPORTER_LOGGER;
    private static final String SKIP_POSITION_UPDATE_ERROR_MESSAGE = "Failed to update exporter position when skipping filtered record, can be skipped, but may indicate an issue if it occurs often";
    private final ExporterContext context;
    private final Exporter exporter;
    private long position;
    private boolean exporterIsSoftPaused = false;
    private long lastUnacknowledgedPosition;
    private long lastAcknowledgedPosition;
    private byte[] lastExportedMetadata;
    private ExportersState exportersState;
    private ExporterMetrics metrics;
    private ActorControl actor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExporterContainer(ExporterDescriptor exporterDescriptor, int i, MeterRegistry meterRegistry) {
        this.context = new ExporterContext(Loggers.getExporterLogger(exporterDescriptor.getId()), exporterDescriptor.getConfiguration(), i, meterRegistry);
        this.exporter = exporterDescriptor.newInstance();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initContainer(ActorControl actorControl, ExporterMetrics exporterMetrics, ExportersState exportersState) {
        this.actor = actorControl;
        this.metrics = exporterMetrics;
        this.exportersState = exportersState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initPosition() {
        this.position = this.exportersState.getPosition(getId());
        this.lastUnacknowledgedPosition = this.position;
        if (this.position == -1) {
            this.exportersState.setPosition(getId(), -1L);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openExporter() {
        LOG.debug("Open exporter with id '{}'", getId());
        ThreadContextUtil.runWithClassLoader(() -> {
            this.exporter.open(this);
        }, this.exporter.getClass().getClassLoader());
    }

    public ExporterContext getContext() {
        return this.context;
    }

    public Exporter getExporter() {
        return this.exporter;
    }

    public long getPosition() {
        return this.position;
    }

    long getLastUnacknowledgedPosition() {
        return this.lastUnacknowledgedPosition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updatePositionOnSkipIfUpToDate(long j) {
        if (this.position < this.lastUnacknowledgedPosition || this.position >= j) {
            return;
        }
        try {
            updateExporterState(j);
        } catch (Exception e) {
            LOG.warn(SKIP_POSITION_UPDATE_ERROR_MESSAGE, e);
        }
    }

    private void updateExporterState(long j) {
        updateExporterState(j, null);
    }

    private void updateExporterState(long j, byte[] bArr) {
        if (this.position < j) {
            this.lastAcknowledgedPosition = j;
            this.lastExportedMetadata = bArr;
            if (this.exporterIsSoftPaused) {
                return;
            }
            MutableDirectBuffer mutableDirectBuffer = null;
            if (bArr != null) {
                mutableDirectBuffer = BufferUtil.wrapArray(bArr);
            }
            this.exportersState.setExporterState(getId(), j, mutableDirectBuffer);
            this.metrics.setLastUpdatedExportedPosition(getId(), j);
            this.position = j;
        }
    }

    public void updateLastExportedRecordPosition(long j) {
        this.actor.run(() -> {
            updateExporterState(j);
        });
    }

    public void updateLastExportedRecordPosition(long j, byte[] bArr) {
        this.actor.run(() -> {
            updateExporterState(j, bArr);
        });
    }

    public long getLastExportedRecordPosition() {
        return getPosition();
    }

    public ScheduledTask scheduleCancellableTask(Duration duration, Runnable runnable) {
        ScheduledTimer schedule = this.actor.schedule(duration, runnable);
        Objects.requireNonNull(schedule);
        return schedule::cancel;
    }

    public Optional<byte[]> readMetadata() {
        return Optional.ofNullable(this.exportersState.getExporterMetadata(getId())).filter(directBuffer -> {
            return directBuffer.capacity() > 0;
        }).map(BufferUtil::bufferAsArray);
    }

    public String getId() {
        return this.context.getConfiguration().getId();
    }

    private boolean acceptRecord(RecordMetadata recordMetadata) {
        Context.RecordFilter filter = this.context.getFilter();
        return filter.acceptType(recordMetadata.getRecordType()) && filter.acceptValue(recordMetadata.getValueType());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void configureExporter() throws Exception {
        LOG.debug("Configure exporter with id '{}'", getId());
        ThreadContextUtil.runCheckedWithClassLoader(() -> {
            this.exporter.configure(this.context);
        }, this.exporter.getClass().getClassLoader());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean exportRecord(RecordMetadata recordMetadata, TypedRecord typedRecord) {
        try {
            if (this.position >= typedRecord.getPosition()) {
                return true;
            }
            if (acceptRecord(recordMetadata)) {
                export(typedRecord);
                return true;
            }
            updatePositionOnSkipIfUpToDate(typedRecord.getPosition());
            return true;
        } catch (Exception e) {
            this.context.getLogger().warn("Error on exporting record with key {}", Long.valueOf(typedRecord.getKey()), e);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void softPauseExporter() {
        this.exporterIsSoftPaused = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void undoSoftPauseExporter() {
        this.exporterIsSoftPaused = false;
        updateExporterState(this.lastAcknowledgedPosition, this.lastExportedMetadata);
    }

    private void export(Record<?> record) {
        ThreadContextUtil.runWithClassLoader(() -> {
            this.exporter.export(record);
        }, this.exporter.getClass().getClassLoader());
        this.lastUnacknowledgedPosition = record.getPosition();
    }

    public void close() {
        try {
            Exporter exporter = this.exporter;
            Objects.requireNonNull(exporter);
            ThreadContextUtil.runCheckedWithClassLoader(exporter::close, this.exporter.getClass().getClassLoader());
        } catch (Exception e) {
            this.context.getLogger().error("Error on close", e);
        }
        try {
            this.context.close();
        } catch (Exception e2) {
            this.context.getLogger().error("Error on context.close", e2);
        }
    }
}
