package org.apache.nifi.processors.aws.kinesis.stream.record;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.aws.kinesis.stream.ConsumeKinesisStream;
import org.apache.nifi.processors.aws.kinesis.stream.pause.RecordProcessorBlocker;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* loaded from: input_file:org/apache/nifi/processors/aws/kinesis/stream/record/AbstractKinesisRecordProcessor.class */
public abstract class AbstractKinesisRecordProcessor implements ShardRecordProcessor {
    public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id";
    public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number";
    public static final String AWS_KINESIS_PARTITION_KEY = "aws.kinesis.partition.key";
    public static final String AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP = "aws.kinesis.approximate.arrival.timestamp";
    public static final String KINESIS_RECORD_SCHEMA_KEY = "kinesis.name";
    static final Base64.Encoder BASE_64_ENCODER = Base64.getEncoder();
    private final ProcessSessionFactory sessionFactory;
    private final ComponentLog log;
    private final String streamName;
    private final String transitUriPrefix;
    private final long checkpointIntervalMillis;
    private final long retryWaitMillis;
    private final int numRetries;
    private final DateTimeFormatter dateTimeFormatter;
    private final RecordProcessorBlocker recordProcessorBlocker;
    private String kinesisShardId;
    private long nextCheckpointTimeInMillis;
    private boolean processingRecords = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractKinesisRecordProcessor(ProcessSessionFactory processSessionFactory, ComponentLog componentLog, String str, String str2, String str3, long j, long j2, int i, DateTimeFormatter dateTimeFormatter, RecordProcessorBlocker recordProcessorBlocker) {
        this.sessionFactory = processSessionFactory;
        this.log = componentLog;
        this.streamName = str;
        this.checkpointIntervalMillis = j;
        this.retryWaitMillis = j2;
        this.numRetries = i;
        this.dateTimeFormatter = dateTimeFormatter;
        this.recordProcessorBlocker = recordProcessorBlocker;
        this.transitUriPrefix = StringUtils.isBlank(str3) ? String.format("http://%s.amazonaws.com", str2) : str3;
    }

    public void initialize(InitializationInput initializationInput) {
        if (initializationInput.pendingCheckpointSequenceNumber() != null) {
            this.log.warn("Initializing record processor for stream: {} / shard {}; from sequence number: {}; indicates previously uncheckpointed sequence number: {}", new Object[]{this.streamName, initializationInput.shardId(), initializationInput.extendedSequenceNumber(), initializationInput.pendingCheckpointSequenceNumber()});
        } else {
            this.log.debug("Initializing record processor for stream: {} / shard: {}; from sequence number: {}", new Object[]{this.streamName, initializationInput.shardId(), initializationInput.extendedSequenceNumber()});
        }
        this.kinesisShardId = initializationInput.shardId();
        this.nextCheckpointTimeInMillis = System.currentTimeMillis() + this.checkpointIntervalMillis;
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        try {
            this.recordProcessorBlocker.await();
        } catch (InterruptedException e) {
            getLogger().debug("Interrupted while waiting for recordProcessorBlocker to unblock, resuming record processing", e);
        }
        if (this.log.isDebugEnabled()) {
            ComponentLog componentLog = this.log;
            Object[] objArr = new Object[5];
            objArr[0] = Integer.valueOf(processRecordsInput.records().size());
            objArr[1] = this.kinesisShardId;
            objArr[2] = processRecordsInput.cacheEntryTime() != null ? this.dateTimeFormatter.format(processRecordsInput.cacheEntryTime().atZone(ZoneId.systemDefault())) : null;
            objArr[3] = processRecordsInput.cacheExitTime() != null ? this.dateTimeFormatter.format(processRecordsInput.cacheExitTime().atZone(ZoneId.systemDefault())) : null;
            objArr[4] = processRecordsInput.millisBehindLatest();
            componentLog.debug("Processing {} records from {}; cache entry: {}; cache exit: {}; millis behind latest: {}", objArr);
        }
        ProcessSession processSession = null;
        try {
            List<KinesisClientRecord> records = processRecordsInput.records();
            if (!records.isEmpty()) {
                ArrayList arrayList = new ArrayList(records.size());
                StopWatch stopWatch = new StopWatch(true);
                processSession = this.sessionFactory.createSession();
                startProcessingRecords();
                transferTo(ConsumeKinesisStream.REL_SUCCESS, processSession, records.size(), processRecordsWithRetries(records, arrayList, processSession, stopWatch), arrayList);
                processSession.commitAsync(() -> {
                    this.processingRecords = false;
                    checkpointOnceEveryCheckpointInterval(processRecordsInput.checkpointer());
                });
            }
        } catch (Exception e2) {
            this.log.error("Unable to fully process received Kinesis record(s) due to {}", new Object[]{e2.getLocalizedMessage(), e2});
            if (processSession != null) {
                processSession.rollback();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startProcessingRecords() {
        this.processingRecords = true;
    }

    private int processRecordsWithRetries(List<KinesisClientRecord> list, List<FlowFile> list2, ProcessSession processSession, StopWatch stopWatch) {
        int i = 0;
        int i2 = 0;
        while (i2 < list.size()) {
            KinesisClientRecord kinesisClientRecord = list.get(i2);
            boolean z = false;
            for (int i3 = 0; !z && i3 < this.numRetries; i3++) {
                z = attemptProcessRecord(list2, kinesisClientRecord, i2 == list.size() - 1, processSession, stopWatch);
            }
            if (z) {
                i++;
            } else {
                this.log.error("Couldn't process Kinesis record {}, skipping.", new Object[]{kinesisClientRecord});
            }
            i2++;
        }
        return i;
    }

    private boolean attemptProcessRecord(List<FlowFile> list, KinesisClientRecord kinesisClientRecord, boolean z, ProcessSession processSession, StopWatch stopWatch) {
        boolean z2 = false;
        try {
            processRecord(list, kinesisClientRecord, z, processSession, stopWatch);
            z2 = true;
        } catch (Exception e) {
            this.log.error("Caught Exception while processing Kinesis record {}", new Object[]{kinesisClientRecord, e});
            try {
                Thread.sleep(this.retryWaitMillis);
            } catch (InterruptedException e2) {
                this.log.debug("Interrupted sleep during record processing back-off", e2);
            }
        }
        return z2;
    }

    abstract void processRecord(List<FlowFile> list, KinesisClientRecord kinesisClientRecord, boolean z, ProcessSession processSession, StopWatch stopWatch);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportProvenance(ProcessSession processSession, FlowFile flowFile, String str, String str2, StopWatch stopWatch) {
        processSession.getProvenanceReporter().receive(flowFile, (StringUtils.isNotBlank(str) && StringUtils.isNotBlank(str2)) ? String.format("%s/%s/%s#%s", this.transitUriPrefix, this.kinesisShardId, str, str2) : String.format("%s/%s", this.transitUriPrefix, this.kinesisShardId), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, String> getDefaultAttributes(String str, String str2, Instant instant) {
        HashMap hashMap = new HashMap();
        hashMap.put("aws.kinesis.shard.id", this.kinesisShardId);
        hashMap.put("aws.kinesis.sequence.number", str);
        hashMap.put(AWS_KINESIS_PARTITION_KEY, str2);
        if (instant != null) {
            hashMap.put(AWS_KINESIS_APPROXIMATE_ARRIVAL_TIMESTAMP, this.dateTimeFormatter.format(instant.atZone(ZoneId.systemDefault())));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void transferTo(Relationship relationship, ProcessSession processSession, int i, int i2, List<FlowFile> list) {
        processSession.adjustCounter("Records Processed", i, false);
        if (list.isEmpty()) {
            return;
        }
        processSession.adjustCounter("Records Transformed", i2, false);
        processSession.transfer(list, relationship);
    }

    private void checkpointOnceEveryCheckpointInterval(RecordProcessorCheckpointer recordProcessorCheckpointer) {
        if (System.currentTimeMillis() > this.nextCheckpointTimeInMillis) {
            checkpointWithRetries(recordProcessorCheckpointer);
            this.nextCheckpointTimeInMillis = System.currentTimeMillis() + this.checkpointIntervalMillis;
        }
    }

    public void leaseLost(LeaseLostInput leaseLostInput) {
        this.log.debug("Lease lost");
    }

    public void shardEnded(ShardEndedInput shardEndedInput) {
        this.log.debug("Shutting down Record Processor for shard: {} with reason: Shard Ended", new Object[]{this.kinesisShardId});
        checkpointWithRetries(shardEndedInput.checkpointer());
    }

    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        this.log.debug("Shutting down Record Processor for shard: {} with reason: Shutdown Requested", new Object[]{this.kinesisShardId});
        for (int i = 0; this.processingRecords && i < this.numRetries; i++) {
            this.log.debug("Record Processor for shard {} still processing records, waiting before shutdown", new Object[]{this.kinesisShardId});
            try {
                Thread.sleep(this.retryWaitMillis);
            } catch (InterruptedException e) {
                this.log.debug("Interrupted sleep while waiting for record processing to complete before shutdown (TERMINATE)", e);
            }
        }
        if (this.processingRecords) {
            this.log.warn("Record Processor for shard {} still running, but maximum wait time elapsed, checkpoint will be attempted", new Object[]{this.kinesisShardId});
        }
        checkpointWithRetries(shutdownRequestedInput.checkpointer());
    }

    private void checkpointWithRetries(RecordProcessorCheckpointer recordProcessorCheckpointer) {
        this.log.debug("Checkpointing shard {}", new Object[]{this.kinesisShardId});
        for (int i = 0; i < this.numRetries && !attemptCheckpoint(recordProcessorCheckpointer, i); i++) {
            try {
            } catch (ShutdownException e) {
                this.log.info("Caught shutdown exception, skipping checkpoint.", e);
                return;
            } catch (InvalidStateException e2) {
                this.log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e2);
                return;
            }
        }
    }

    private boolean attemptCheckpoint(RecordProcessorCheckpointer recordProcessorCheckpointer, int i) throws ShutdownException, InvalidStateException {
        boolean z = false;
        try {
            recordProcessorCheckpointer.checkpoint();
            z = true;
        } catch (ThrottlingException e) {
            if (i >= this.numRetries - 1) {
                this.log.error("Checkpoint failed after {} attempts.", new Object[]{Integer.valueOf(i + 1), e});
            } else {
                this.log.warn("Transient issue when checkpointing - attempt {} of {}", new Object[]{Integer.valueOf(i + 1), Integer.valueOf(this.numRetries), e});
                try {
                    Thread.sleep(this.retryWaitMillis);
                } catch (InterruptedException e2) {
                    this.log.debug("Interrupted sleep during checkpoint back-off", e2);
                }
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ComponentLog getLogger() {
        return this.log;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getStreamName() {
        return this.streamName;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getKinesisShardId() {
        return this.kinesisShardId;
    }

    void setKinesisShardId(String str) {
        this.kinesisShardId = str;
    }

    long getNextCheckpointTimeInMillis() {
        return this.nextCheckpointTimeInMillis;
    }

    void setNextCheckpointTimeInMillis(long j) {
        this.nextCheckpointTimeInMillis = j;
    }

    void setProcessingRecords(boolean z) {
        this.processingRecords = z;
    }
}
