package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.class */
public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader<NullWritable, ArrayWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
    protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
    private final Map<String, HoodieRecord> deltaRecordMap;
    private final Set<String> deltaRecordKeys;
    private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
    private final int recordKeyIndex;
    private Iterator<String> deltaItr;

    public RealtimeCompactedRecordReader(RealtimeSplit realtimeSplit, JobConf jobConf, RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {
        super(realtimeSplit, jobConf);
        this.parquetReader = recordReader;
        this.mergedLogRecordScanner = getMergedLogRecordScanner();
        this.deltaRecordMap = this.mergedLogRecordScanner.getRecords();
        this.deltaRecordKeys = new HashSet(this.deltaRecordMap.keySet());
        this.recordKeyIndex = ((Integer) realtimeSplit.getVirtualKeyInfo().map((v0) -> {
            return v0.getRecordKeyFieldIndex();
        }).orElse(2)).intValue();
    }

    private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOException {
        return HoodieMergedLogRecordScanner.newBuilder().withStorage(HoodieStorageUtils.getStorage(this.split.getPath().toString(), (StorageConfiguration<?>) HadoopFSUtils.getStorageConf(this.jobConf))).withBasePath(this.split.getBasePath()).withLogFilePaths(this.split.getDeltaLogPaths()).withReaderSchema(getLogScannerReaderSchema()).withLatestInstantTime(this.split.getMaxCommitTime()).withMaxMemorySizeInBytes(Long.valueOf(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf))).withReverseReader(false).withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 1048576)).withSpillableMapBasePath(this.jobConf.get(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), FileIOUtils.getDefaultSpillableMapBasePath())).withDiskMapType((ExternalSpillableMap.DiskMapType) this.jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(this.jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue())).withOptimizedLogBlocksScan(this.jobConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))).withInternalSchema(this.schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())).build();
    }

    private Option<HoodieAvroIndexedRecord> buildGenericRecordwithCustomPayload(HoodieRecord hoodieRecord) throws IOException {
        return this.usesCustomPayload ? hoodieRecord.toIndexedRecord(getWriterSchema(), this.payloadProps) : hoodieRecord.toIndexedRecord(getReaderSchema(), this.payloadProps);
    }

    public boolean next(NullWritable nullWritable, ArrayWritable arrayWritable) throws IOException {
        while (this.parquetReader.next(nullWritable, arrayWritable)) {
            if (this.deltaRecordMap.isEmpty()) {
                return true;
            }
            String obj = arrayWritable.get()[this.recordKeyIndex].toString();
            if (!this.deltaRecordMap.containsKey(obj)) {
                return true;
            }
            this.deltaRecordKeys.remove(obj);
            Option<HoodieAvroIndexedRecord> mergeRecord = this.supportPayload ? mergeRecord(this.deltaRecordMap.get(obj), arrayWritable) : buildGenericRecordwithCustomPayload(this.deltaRecordMap.get(obj));
            if (mergeRecord.isPresent()) {
                setUpWritable(mergeRecord, arrayWritable, obj);
                return true;
            }
        }
        if (this.deltaItr == null) {
            this.deltaItr = this.deltaRecordKeys.iterator();
        }
        while (this.deltaItr.hasNext()) {
            String next = this.deltaItr.next();
            Option<HoodieAvroIndexedRecord> buildGenericRecordwithCustomPayload = buildGenericRecordwithCustomPayload(this.deltaRecordMap.get(next));
            if (buildGenericRecordwithCustomPayload.isPresent()) {
                setUpWritable(buildGenericRecordwithCustomPayload, arrayWritable, next);
                return true;
            }
        }
        return false;
    }

    private void setUpWritable(Option<HoodieAvroIndexedRecord> option, ArrayWritable arrayWritable, String str) {
        GenericRecord genericRecord = (GenericRecord) option.get().getData();
        if (this.usesCustomPayload) {
            genericRecord = HoodieAvroUtils.rewriteRecord((GenericRecord) option.get().getData(), getReaderSchema());
        }
        ArrayWritable avroToArrayWritable = HoodieRealtimeRecordReaderUtils.avroToArrayWritable(genericRecord, getHiveSchema(), isSupportTimestamp());
        Writable[] writableArr = avroToArrayWritable.get();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("key %s, base values: %s, log values: %s", str, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), HoodieRealtimeRecordReaderUtils.arrayWritableToString(avroToArrayWritable)));
        }
        Writable[] writableArr2 = arrayWritable.get();
        try {
            System.arraycopy(writableArr, 0, writableArr2, 0, Math.min(writableArr2.length, writableArr.length));
            arrayWritable.set(writableArr2);
        } catch (RuntimeException e) {
            LOG.error("Got exception when doing array copy", e);
            LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
            LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(avroToArrayWritable));
            throw new RuntimeException("Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable) + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(avroToArrayWritable) + " ,Error :" + e.getMessage(), e);
        }
    }

    private Option<HoodieAvroIndexedRecord> mergeRecord(HoodieRecord<?> hoodieRecord, ArrayWritable arrayWritable) throws IOException {
        GenericRecord rewriteRecordIgnoreResultCheck = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(convertArrayWritableToHoodieRecord(arrayWritable), getLogScannerReaderSchema());
        return HoodieAvroRecordMerger.INSTANCE.merge(new HoodieAvroIndexedRecord(rewriteRecordIgnoreResultCheck), rewriteRecordIgnoreResultCheck.getSchema(), hoodieRecord, getLogScannerReaderSchema(), this.payloadProps).map(pair -> {
            return (HoodieAvroIndexedRecord) pair.getLeft();
        });
    }

    private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable arrayWritable) {
        return this.serializer.serialize(arrayWritable, getHiveSchema());
    }

    /* renamed from: createKey, reason: merged with bridge method [inline-methods] */
    public NullWritable m1076createKey() {
        return (NullWritable) this.parquetReader.createKey();
    }

    /* renamed from: createValue, reason: merged with bridge method [inline-methods] */
    public ArrayWritable m1075createValue() {
        return (ArrayWritable) this.parquetReader.createValue();
    }

    public long getPos() throws IOException {
        return this.parquetReader.getPos();
    }

    public void close() throws IOException {
        this.parquetReader.close();
        this.mergedLogRecordScanner.close();
    }

    public float getProgress() throws IOException {
        return this.parquetReader.getProgress();
    }
}
