package org.apache.camel.component.aws2.kinesis;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

/* loaded from: input_file:org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer.class */
public class KclKinesis2Consumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KclKinesis2Consumer.class);
    private final Processor processor;
    private ExecutorService executor;
    private Scheduler schedulerKcl;

    /* loaded from: input_file:org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer$CamelKinesisRecordProcessor.class */
    class CamelKinesisRecordProcessor implements ShardRecordProcessor {
        private static final Logger LOG = LoggerFactory.getLogger(CamelKinesisRecordProcessor.class);
        private String shardId;
        private Kinesis2Endpoint endpoint;

        public CamelKinesisRecordProcessor(Kinesis2Endpoint kinesis2Endpoint) {
            this.endpoint = kinesis2Endpoint;
        }

        public void initialize(InitializationInput initializationInput) {
            this.shardId = initializationInput.shardId();
            LOG.debug("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
        }

        public void processRecords(ProcessRecordsInput processRecordsInput) {
            try {
                LOG.debug("Processing {} record(s)", Integer.valueOf(processRecordsInput.records().size()));
                processRecordsInput.records().forEach(kinesisClientRecord -> {
                    try {
                        KclKinesis2Consumer.this.processor.process(createExchange(kinesisClientRecord, this.shardId));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            } catch (Throwable th) {
                LOG.error("Caught throwable while processing records. Aborting.");
            }
        }

        public void leaseLost(LeaseLostInput leaseLostInput) {
            LOG.debug("Lost lease, so terminating.");
        }

        public void shardEnded(ShardEndedInput shardEndedInput) {
            try {
                LOG.debug("Reached shard end checkpointing.");
                shardEndedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                LOG.error("Exception while checkpointing at shard end. Giving up.", e);
            }
        }

        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
            try {
                LOG.debug("Scheduler is shutting down, checkpointing.");
                shutdownRequestedInput.checkpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                LOG.error("Exception while checkpointing at requested shutdown. Giving up.", e);
            }
        }

        protected Exchange createExchange(KinesisClientRecord kinesisClientRecord, String str) {
            Exchange createExchange = this.endpoint.createExchange();
            createExchange.getMessage().setBody(kinesisClientRecord.data());
            createExchange.getMessage().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, kinesisClientRecord.approximateArrivalTimestamp());
            createExchange.getMessage().setHeader(Kinesis2Constants.PARTITION_KEY, kinesisClientRecord.partitionKey());
            createExchange.getMessage().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, kinesisClientRecord.sequenceNumber());
            createExchange.getMessage().setHeader(Kinesis2Constants.SHARD_ID, str);
            if (kinesisClientRecord.approximateArrivalTimestamp() != null) {
                createExchange.getMessage().setHeader(Kinesis2Constants.MESSAGE_TIMESTAMP, Long.valueOf(kinesisClientRecord.approximateArrivalTimestamp().getEpochSecond() * 1000));
            }
            return createExchange;
        }
    }

    /* loaded from: input_file:org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer$CamelKinesisRecordProcessorFactory.class */
    private class CamelKinesisRecordProcessorFactory implements ShardRecordProcessorFactory {
        private CamelKinesisRecordProcessorFactory() {
        }

        public ShardRecordProcessor shardRecordProcessor() {
            return new CamelKinesisRecordProcessor(KclKinesis2Consumer.this.m10getEndpoint());
        }
    }

    /* loaded from: input_file:org/apache/camel/component/aws2/kinesis/KclKinesis2Consumer$KclKinesisConsumingTask.class */
    class KclKinesisConsumingTask implements Runnable {
        private final KinesisAsyncClient kinesisAsyncClient;
        private final DynamoDbAsyncClient dynamoDbAsyncClient;
        private final CloudWatchAsyncClient cloudWatchAsyncClient;
        private final String streamName;
        private final String applicationName;
        private final boolean disableMetricsExport;

        KclKinesisConsumingTask(String str, String str2, KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient, boolean z) {
            this.cloudWatchAsyncClient = cloudWatchAsyncClient;
            this.dynamoDbAsyncClient = dynamoDbAsyncClient;
            this.kinesisAsyncClient = kinesisAsyncClient;
            this.streamName = str;
            this.applicationName = str2 != null ? str2 : str;
            this.disableMetricsExport = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ConfigsBuilder configsBuilder = new ConfigsBuilder(this.streamName, this.applicationName, this.kinesisAsyncClient, this.dynamoDbAsyncClient, this.cloudWatchAsyncClient, "KclKinesisConsumingTask-" + UUID.randomUUID().toString(), new CamelKinesisRecordProcessorFactory());
                Scheduler scheduler = new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), this.disableMetricsExport ? configsBuilder.metricsConfig().metricsLevel(MetricsLevel.NONE).metricsFactory(new NullMetricsFactory()) : configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(KclKinesis2Consumer.this.m10getEndpoint().getConfiguration().getStreamName(), this.kinesisAsyncClient)));
                KclKinesis2Consumer.this.schedulerKcl = scheduler;
                new Thread((Runnable) scheduler).start();
            } catch (Exception e) {
                KclKinesis2Consumer.this.getExceptionHandler().handleException("Error during processing", e);
            }
        }
    }

    public KclKinesis2Consumer(Kinesis2Endpoint kinesis2Endpoint, Processor processor) {
        super(kinesis2Endpoint, processor);
        this.processor = processor;
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public Kinesis2Endpoint m10getEndpoint() {
        return super.getEndpoint();
    }

    protected void doStart() throws Exception {
        DynamoDbAsyncClient dynamoDbAsyncClient;
        CloudWatchAsyncClient cloudWatchAsyncClient;
        super.doStart();
        LOG.debug("Starting KCL Consumer");
        KinesisAsyncClient amazonKinesisAsyncClient = m10getEndpoint().getConfiguration().getAmazonKinesisAsyncClient();
        Kinesis2Configuration configuration = m10getEndpoint().getConfiguration();
        if (ObjectHelper.isEmpty(m10getEndpoint().getConfiguration().getDynamoDbAsyncClient())) {
            DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder();
            if (ObjectHelper.isNotEmpty(configuration.getAccessKey()) && ObjectHelper.isNotEmpty(configuration.getSecretKey())) {
                builder = (DynamoDbAsyncClientBuilder) builder.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey())));
            } else if (ObjectHelper.isNotEmpty(configuration.getProfileCredentialsName())) {
                builder = (DynamoDbAsyncClientBuilder) builder.credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
            } else if (ObjectHelper.isNotEmpty(configuration.getAccessKey()) && ObjectHelper.isNotEmpty(configuration.getSecretKey()) && ObjectHelper.isNotEmpty(configuration.getSessionToken())) {
                builder = (DynamoDbAsyncClientBuilder) builder.credentialsProvider(StaticCredentialsProvider.create(AwsSessionCredentials.create(configuration.getAccessKey(), configuration.getSecretKey(), configuration.getSessionToken())));
            }
            if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
                builder = (DynamoDbAsyncClientBuilder) builder.region(Region.of(configuration.getRegion()));
            }
            dynamoDbAsyncClient = (DynamoDbAsyncClient) builder.build();
        } else {
            dynamoDbAsyncClient = m10getEndpoint().getConfiguration().getDynamoDbAsyncClient();
        }
        if (ObjectHelper.isEmpty(m10getEndpoint().getConfiguration().getCloudWatchAsyncClient())) {
            CloudWatchAsyncClientBuilder builder2 = CloudWatchAsyncClient.builder();
            if (ObjectHelper.isNotEmpty(configuration.getAccessKey()) && ObjectHelper.isNotEmpty(configuration.getSecretKey())) {
                builder2 = (CloudWatchAsyncClientBuilder) builder2.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey())));
            } else if (ObjectHelper.isNotEmpty(configuration.getProfileCredentialsName())) {
                builder2 = (CloudWatchAsyncClientBuilder) builder2.credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
            } else if (ObjectHelper.isNotEmpty(configuration.getAccessKey()) && ObjectHelper.isNotEmpty(configuration.getSecretKey()) && ObjectHelper.isNotEmpty(configuration.getSessionToken())) {
                builder2 = (CloudWatchAsyncClientBuilder) builder2.credentialsProvider(StaticCredentialsProvider.create(AwsSessionCredentials.create(configuration.getAccessKey(), configuration.getSecretKey(), configuration.getSessionToken())));
            }
            if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
                builder2 = (CloudWatchAsyncClientBuilder) builder2.region(Region.of(configuration.getRegion()));
            }
            cloudWatchAsyncClient = (CloudWatchAsyncClient) builder2.build();
        } else {
            cloudWatchAsyncClient = m10getEndpoint().getConfiguration().getCloudWatchAsyncClient();
        }
        this.executor = m10getEndpoint().createExecutor();
        this.executor.submit(new KclKinesisConsumingTask(configuration.getStreamName(), configuration.getApplicationName(), amazonKinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, configuration.isKclDisableCloudwatchMetricsExport()));
    }

    protected void doStop() throws Exception {
        LOG.debug("Stopping KCL Consumer");
        if (this.executor != null) {
            if (m10getEndpoint() == null || m10getEndpoint().getCamelContext() == null) {
                this.executor.shutdownNow();
            } else {
                m10getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            }
        }
        if (this.schedulerKcl != null) {
            CompletableFuture startGracefulShutdown = this.schedulerKcl.startGracefulShutdown();
            LOG.info("Waiting up to 20 seconds for scheduler shutdown to complete.");
            try {
                startGracefulShutdown.get(20L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.debug("Interrupted while waiting for graceful shutdown. Continuing.");
            } catch (ExecutionException e2) {
                LOG.debug("Exception while executing graceful shutdown.", e2);
            } catch (TimeoutException e3) {
                LOG.debug("Timeout while waiting for shutdown.  Scheduler may not have exited.");
            }
            LOG.debug("Completed, shutting down now.");
        }
        this.executor = null;
        super.doStop();
    }
}
