package org.apache.nifi.processors.opentelemetry.io;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.google.protobuf.Message;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.zip.GZIPInputStream;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.opentelemetry.encoding.JsonServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.encoding.ProtobufServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.encoding.ServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponse;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponseStatus;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentEncoding;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentType;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;

/* loaded from: input_file:org/apache/nifi/processors/opentelemetry/io/StandardRequestContentListener.class */
public class StandardRequestContentListener implements RequestContentListener {
    private static final int ZERO_MESSAGES = 0;
    private static final byte COMPRESSED = 1;
    private static final String CLIENT_SOCKET_ADDRESS = "client.socket.address";
    private static final String CLIENT_SOCKET_PORT = "client.socket.port";
    private final ServiceRequestReader protobufReader = new ProtobufServiceRequestReader();
    private final ServiceRequestReader jsonReader = new JsonServiceRequestReader();
    private final ComponentLog log;
    private final BlockingQueue<Message> messages;

    public StandardRequestContentListener(ComponentLog componentLog, BlockingQueue<Message> blockingQueue) {
        this.log = (ComponentLog) Objects.requireNonNull(componentLog, "Log required");
        this.messages = (BlockingQueue) Objects.requireNonNull(blockingQueue, "Messages required");
    }

    @Override // org.apache.nifi.processors.opentelemetry.io.RequestContentListener
    public ServiceResponse onRequest(ByteBuffer byteBuffer, ServiceRequestDescription serviceRequestDescription) {
        ServiceResponse serviceResponse;
        Objects.requireNonNull(byteBuffer, "Buffer required");
        Objects.requireNonNull(serviceRequestDescription, "Description required");
        InetSocketAddress remoteAddress = serviceRequestDescription.getRemoteAddress();
        TelemetryContentType contentType = serviceRequestDescription.getContentType();
        if (TelemetryContentType.APPLICATION_GRPC == contentType) {
            try {
                byte b = byteBuffer.get();
                this.log.debug("Client Address [{}] Content-Type [{}] Message Size [{}] Compression [{}]", new Object[]{remoteAddress, contentType, Integer.valueOf(byteBuffer.getInt()), Byte.valueOf(b)});
                serviceResponse = onSupportedRequest(getDecodedStream(byteBuffer, COMPRESSED == b ? TelemetryContentEncoding.GZIP : TelemetryContentEncoding.NONE), serviceRequestDescription);
            } catch (Exception e) {
                this.log.warn("Client Address [{}] Content-Type [{}] processing failed", new Object[]{remoteAddress, contentType, e});
                serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
            }
        } else if (TelemetryContentType.APPLICATION_PROTOBUF == contentType || TelemetryContentType.APPLICATION_JSON == contentType) {
            try {
                serviceResponse = onSupportedRequest(getDecodedStream(byteBuffer, serviceRequestDescription.getContentEncoding()), serviceRequestDescription);
            } catch (Exception e2) {
                this.log.warn("Client Address [{}] Content-Type [{}] processing failed", new Object[]{remoteAddress, contentType, e2});
                serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
            }
        } else {
            serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
        }
        return serviceResponse;
    }

    private ServiceResponse onSupportedRequest(InputStream inputStream, ServiceRequestDescription serviceRequestDescription) throws IOException {
        ServiceRequestReader serviceRequestReader = TelemetryContentType.APPLICATION_JSON == serviceRequestDescription.getContentType() ? this.jsonReader : this.protobufReader;
        TelemetryRequestType requestType = serviceRequestDescription.getRequestType();
        return onMessages(inputStream.available() == 0 ? Collections.emptyList() : TelemetryRequestType.LOGS == requestType ? readMessages(inputStream, serviceRequestDescription, ExportLogsServiceRequest.class, serviceRequestReader) : TelemetryRequestType.METRICS == requestType ? readMessages(inputStream, serviceRequestDescription, ExportMetricsServiceRequest.class, serviceRequestReader) : TelemetryRequestType.TRACES == requestType ? readMessages(inputStream, serviceRequestDescription, ExportTraceServiceRequest.class, serviceRequestReader) : ZERO_MESSAGES);
    }

    private <T extends Message> List<Message> readMessages(InputStream inputStream, ServiceRequestDescription serviceRequestDescription, Class<T> cls, ServiceRequestReader serviceRequestReader) {
        ArrayList arrayList = new ArrayList();
        List<KeyValue> clientSocketAttributes = getClientSocketAttributes(serviceRequestDescription);
        ExportLogsServiceRequest read = serviceRequestReader.read(inputStream, cls);
        if (read instanceof ExportLogsServiceRequest) {
            for (ResourceLogs resourceLogs : read.getResourceLogsList()) {
                Resource.Builder builder = resourceLogs.getResource().toBuilder();
                builder.addAllAttributes(clientSocketAttributes);
                arrayList.add(resourceLogs.toBuilder().setResource(builder).build());
            }
        } else if (read instanceof ExportMetricsServiceRequest) {
            for (ResourceMetrics resourceMetrics : ((ExportMetricsServiceRequest) read).getResourceMetricsList()) {
                Resource.Builder builder2 = resourceMetrics.getResource().toBuilder();
                builder2.addAllAttributes(clientSocketAttributes);
                arrayList.add(resourceMetrics.toBuilder().setResource(builder2).build());
            }
        } else {
            if (!(read instanceof ExportTraceServiceRequest)) {
                throw new IllegalArgumentException(String.format("Request Type [%s] not supported", cls.getName()));
            }
            for (ResourceSpans resourceSpans : ((ExportTraceServiceRequest) read).getResourceSpansList()) {
                Resource.Builder builder3 = resourceSpans.getResource().toBuilder();
                builder3.addAllAttributes(clientSocketAttributes);
                arrayList.add(resourceSpans.toBuilder().setResource(builder3).build());
            }
        }
        return arrayList;
    }

    private List<KeyValue> getClientSocketAttributes(ServiceRequestDescription serviceRequestDescription) {
        return Arrays.asList(KeyValue.newBuilder().setKey(CLIENT_SOCKET_ADDRESS).setValue(AnyValue.newBuilder().setStringValue(serviceRequestDescription.getRemoteAddress().getAddress().getHostAddress())).build(), KeyValue.newBuilder().setKey(CLIENT_SOCKET_PORT).setValue(AnyValue.newBuilder().setIntValue(r0.getPort())).build());
    }

    private ServiceResponse onMessages(List<? extends Message> list) {
        ServiceResponse serviceResponse;
        if (list == null) {
            serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
        } else if (list.isEmpty()) {
            serviceResponse = new ServiceResponse(ServiceResponseStatus.SUCCESS, ZERO_MESSAGES);
        } else {
            int i = ZERO_MESSAGES;
            Iterator<? extends Message> it = list.iterator();
            while (it.hasNext()) {
                if (this.messages.offer(it.next())) {
                    i += COMPRESSED;
                }
            }
            int size = list.size() - i;
            serviceResponse = ZERO_MESSAGES == size ? new ServiceResponse(ServiceResponseStatus.SUCCESS, ZERO_MESSAGES) : ZERO_MESSAGES == i ? new ServiceResponse(ServiceResponseStatus.UNAVAILABLE, ZERO_MESSAGES) : new ServiceResponse(ServiceResponseStatus.PARTIAL_SUCCESS, size);
        }
        return serviceResponse;
    }

    private InputStream getDecodedStream(ByteBuffer byteBuffer, TelemetryContentEncoding telemetryContentEncoding) throws IOException {
        return TelemetryContentEncoding.GZIP == telemetryContentEncoding ? new GZIPInputStream(new ByteBufferBackedInputStream(byteBuffer)) : new ByteBufferBackedInputStream(byteBuffer);
    }
}
