package org.apache.nifi.processors.gcp.pubsub.consume;

import com.google.pubsub.v1.ReceivedMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub;
import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

/* loaded from: input_file:org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter.class */
public abstract class AbstractPubSubMessageConverter implements PubSubMessageConverter {
    protected final RecordReaderFactory readerFactory;
    protected final RecordSetWriterFactory writerFactory;
    protected final ComponentLog logger;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter$RecordGroup.class */
    public static final class RecordGroup extends Record {
        private final FlowFile flowFile;
        private final RecordSetWriter writer;

        private RecordGroup(FlowFile flowFile, RecordSetWriter recordSetWriter) {
            this.flowFile = flowFile;
            this.writer = recordSetWriter;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RecordGroup.class), RecordGroup.class, "flowFile;writer", "FIELD:Lorg/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter$RecordGroup;->flowFile:Lorg/apache/nifi/flowfile/FlowFile;", "FIELD:Lorg/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter$RecordGroup;->writer:Lorg/apache/nifi/serialization/RecordSetWriter;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RecordGroup.class), RecordGroup.class, "flowFile;writer", "FIELD:Lorg/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter$RecordGroup;->flowFile:Lorg/apache/nifi/flowfile/FlowFile;", "FIELD:Lorg/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter$RecordGroup;->writer:Lorg/apache/nifi/serialization/RecordSetWriter;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RecordGroup.class, Object.class), RecordGroup.class, "flowFile;writer", "FIELD:Lorg/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter$RecordGroup;->flowFile:Lorg/apache/nifi/flowfile/FlowFile;", "FIELD:Lorg/apache/nifi/processors/gcp/pubsub/consume/AbstractPubSubMessageConverter$RecordGroup;->writer:Lorg/apache/nifi/serialization/RecordSetWriter;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public FlowFile flowFile() {
            return this.flowFile;
        }

        public RecordSetWriter writer() {
            return this.writer;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPubSubMessageConverter(RecordReaderFactory recordReaderFactory, RecordSetWriterFactory recordSetWriterFactory, ComponentLog componentLog) {
        this.readerFactory = recordReaderFactory;
        this.writerFactory = recordSetWriterFactory;
        this.logger = componentLog;
    }

    @Override // org.apache.nifi.processors.gcp.pubsub.consume.PubSubMessageConverter
    public void toFlowFiles(ProcessSession processSession, List<ReceivedMessage> list, List<String> list2, String str) {
        try {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            for (ReceivedMessage receivedMessage : list) {
                if (receivedMessage.hasMessage()) {
                    byte[] byteArray = receivedMessage.getMessage().getData().toByteArray();
                    try {
                        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArray);
                        try {
                            RecordReader createRecordReader = this.readerFactory.createRecordReader(hashMap2, byteArrayInputStream, byteArray.length, this.logger);
                            while (true) {
                                try {
                                    Record nextRecord = createRecordReader.nextRecord();
                                    if (nextRecord == null) {
                                        break;
                                    }
                                    RecordSchema schema = this.writerFactory.getSchema(hashMap2, getRecordSchema(nextRecord.getSchema()));
                                    hashMap.computeIfAbsent(schema, recordSchema -> {
                                        FlowFile create = processSession.create();
                                        try {
                                            RecordSetWriter createWriter = this.writerFactory.createWriter(this.logger, schema, processSession.write(create), hashMap2);
                                            createWriter.beginRecordSet();
                                            return new RecordGroup(create, createWriter);
                                        } catch (Exception e) {
                                            processSession.remove(create);
                                            throw new ProcessException("Failed to create RecordSetWriter", e);
                                        }
                                    }).writer().write(getRecord(nextRecord, receivedMessage));
                                } finally {
                                }
                            }
                            if (createRecordReader != null) {
                                createRecordReader.close();
                            }
                            byteArrayInputStream.close();
                        } catch (Throwable th) {
                            try {
                                byteArrayInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    } catch (MalformedRecordException | IOException | SchemaNotFoundException e) {
                        this.logger.error("Failed to parse the record. Transfer to a 'parse.failure' relationship", e);
                        handleParseFailure(processSession, receivedMessage, byteArray, list2, str);
                    }
                }
                list2.add(receivedMessage.getAckId());
            }
            finishRecordGroups(processSession, hashMap, str);
        } catch (Exception e2) {
            throw new ProcessException("FlowFile Record conversion failed", e2);
        }
    }

    protected abstract Record getRecord(Record record, ReceivedMessage receivedMessage);

    protected abstract RecordSchema getRecordSchema(RecordSchema recordSchema);

    private void handleParseFailure(ProcessSession processSession, ReceivedMessage receivedMessage, byte[] bArr, List<String> list, String str) {
        processSession.transfer(processSession.write(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(processSession.putAllAttributes(processSession.create(), receivedMessage.getMessage().getAttributesMap()), PubSubAttributes.ACK_ID_ATTRIBUTE, receivedMessage.getAckId()), PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE, String.valueOf(receivedMessage.getSerializedSize())), PubSubAttributes.MESSAGE_ID_ATTRIBUTE, receivedMessage.getMessage().getMessageId()), PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(receivedMessage.getMessage().getAttributesCount())), PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(receivedMessage.getMessage().getPublishTime().getSeconds())), PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE, str), outputStream -> {
            outputStream.write(bArr);
        }), ConsumeGCPubSub.REL_PARSE_FAILURE);
        processSession.adjustCounter("Records Received from " + str, 1L, false);
        list.add(receivedMessage.getAckId());
    }

    private void finishRecordGroups(ProcessSession processSession, Map<RecordSchema, RecordGroup> map, String str) {
        for (RecordGroup recordGroup : map.values()) {
            try {
                RecordSetWriter writer = recordGroup.writer();
                try {
                    WriteResult finishRecordSet = writer.finishRecordSet();
                    HashMap hashMap = new HashMap(finishRecordSet.getAttributes());
                    hashMap.put("record.count", String.valueOf(finishRecordSet.getRecordCount()));
                    hashMap.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
                    hashMap.put(PubSubAttributes.SUBSCRIPTION_NAME_ATTRIBUTE, str);
                    int recordCount = finishRecordSet.getRecordCount();
                    if (writer != null) {
                        writer.close();
                    }
                    FlowFile putAllAttributes = processSession.putAllAttributes(recordGroup.flowFile(), hashMap);
                    processSession.getProvenanceReporter().receive(putAllAttributes, str);
                    processSession.transfer(putAllAttributes, ConsumeGCPubSub.REL_SUCCESS);
                    processSession.adjustCounter("Records Received from " + str, recordCount, false);
                } finally {
                }
            } catch (IOException e) {
                throw new ProcessException("Failed to finish writing records", e);
            }
        }
    }
}
