package io.camunda.connector.kafka.outbound;

import io.camunda.connector.api.annotation.OutboundConnector;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import io.camunda.connector.kafka.model.KafkaPropertiesUtil;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorResponse;
import io.camunda.connector.kafka.outbound.model.ProducerRecordFactory;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;

@OutboundConnector(name = "Kafka Producer", inputVariables = {"authentication", "topic", "message", "schemaStrategy", "additionalProperties", "headers"}, type = "io.camunda:connector-kafka:1")
@ElementTemplate(id = "io.camunda.connectors.KAFKA.v1", name = "Kafka Outbound Connector", description = "Produce Kafka message", inputDataClass = KafkaConnectorRequest.class, version = 5, propertyGroups = {@ElementTemplate.PropertyGroup(id = "authentication", label = "Authentication"), @ElementTemplate.PropertyGroup(id = "kafka", label = "Kafka"), @ElementTemplate.PropertyGroup(id = "message", label = "Message")}, documentationRef = "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=outbound", icon = "icon.svg")
/* loaded from: input_file:io/camunda/connector/kafka/outbound/KafkaConnectorFunction.class */
public class KafkaConnectorFunction implements OutboundConnectorFunction {
    private final Function<Properties, Producer<String, Object>> producerCreatorFunction;
    private final ProducerRecordFactory producerRecordFactory;

    public KafkaConnectorFunction() {
        this(KafkaProducer::new);
    }

    public KafkaConnectorFunction(Function<Properties, Producer<String, Object>> function) {
        this.producerRecordFactory = new ProducerRecordFactory();
        this.producerCreatorFunction = function;
    }

    public Object execute(OutboundConnectorContext outboundConnectorContext) {
        return executeConnector((KafkaConnectorRequest) outboundConnectorContext.bindVariables(KafkaConnectorRequest.class));
    }

    private KafkaConnectorResponse executeConnector(KafkaConnectorRequest kafkaConnectorRequest) {
        try {
            Producer<String, Object> apply = this.producerCreatorFunction.apply(KafkaPropertiesUtil.assembleKafkaClientProperties(kafkaConnectorRequest));
            try {
                KafkaConnectorResponse constructKafkaConnectorResponse = constructKafkaConnectorResponse((RecordMetadata) apply.send(this.producerRecordFactory.createProducerRecord(kafkaConnectorRequest)).get(45L, TimeUnit.SECONDS));
                if (apply != null) {
                    apply.close();
                }
                return constructKafkaConnectorResponse;
            } finally {
            }
        } catch (Exception e) {
            throw new ConnectorException("FAIL", "Error during Kafka Producer execution; error message: [" + e.getMessage() + "]", e);
        }
    }

    private KafkaConnectorResponse constructKafkaConnectorResponse(RecordMetadata recordMetadata) {
        return new KafkaConnectorResponse(recordMetadata.topic(), recordMetadata.timestamp(), recordMetadata.offset(), recordMetadata.partition());
    }
}
