package org.apache.hudi.client.clustering.run.strategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import org.apache.hudi.util.ExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.class */
public class SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleSparkJobExecutionStrategy<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SingleSparkJobConsistentHashingExecutionStrategy.class);
    private final String indexKeyFields;
    private final Schema readerSchema;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy$FixedIdSuffixCreateHandleFactory.class */
    public static class FixedIdSuffixCreateHandleFactory extends CreateHandleFactory {
        FixedIdSuffixCreateHandleFactory() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.hudi.io.WriteHandleFactory
        public String getNextFileId(String str) {
            return FSUtils.createNewFileId(str, 0);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy$InsertHandler.class */
    public static class InsertHandler<T> implements HoodieConsumer<HoodieRecord<T>, List<WriteStatus>> {
        private final HoodieWriteConfig config;
        private final String instantTime;
        private final HoodieTable hoodieTable;
        private final TaskContextSupplier taskContextSupplier;
        private final WriteHandleFactory writeHandleFactory;
        private final boolean recordsSorted;
        private final Function<HoodieRecord, String> fileIdPrefixExtractor;
        private final Schema schema;
        private final List<WriteStatus> statuses = new ArrayList();
        private final Map<String, HoodieWriteHandle> writeHandles = new HashMap();

        public InsertHandler(HoodieWriteConfig hoodieWriteConfig, String str, HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier, WriteHandleFactory writeHandleFactory, boolean z, Function<HoodieRecord, String> function, Schema schema) {
            this.config = hoodieWriteConfig;
            this.instantTime = str;
            this.hoodieTable = hoodieTable;
            this.taskContextSupplier = taskContextSupplier;
            this.writeHandleFactory = writeHandleFactory;
            this.recordsSorted = z;
            this.fileIdPrefixExtractor = function;
            this.schema = schema;
        }

        @Override // org.apache.hudi.common.util.queue.HoodieConsumer
        public void consume(HoodieRecord hoodieRecord) throws Exception {
            String apply = this.fileIdPrefixExtractor.apply(hoodieRecord);
            HoodieWriteHandle hoodieWriteHandle = this.writeHandles.get(apply);
            if (hoodieWriteHandle == null) {
                if (this.recordsSorted) {
                    closeOpenHandles();
                }
                hoodieWriteHandle = this.writeHandleFactory.create(this.config, this.instantTime, this.hoodieTable, hoodieRecord.getPartitionPath(), apply, this.taskContextSupplier);
                this.writeHandles.put(apply, hoodieWriteHandle);
            }
            hoodieWriteHandle.write(hoodieRecord, this.schema, this.config.getProps());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hudi.common.util.queue.HoodieConsumer
        public List<WriteStatus> finish() {
            closeOpenHandles();
            return this.statuses;
        }

        private void closeOpenHandles() {
            Iterator<HoodieWriteHandle> it = this.writeHandles.values().iterator();
            while (it.hasNext()) {
                this.statuses.addAll(it.next().close());
            }
            this.writeHandles.clear();
        }
    }

    public SingleSparkJobConsistentHashingExecutionStrategy(HoodieTable hoodieTable, HoodieEngineContext hoodieEngineContext, HoodieWriteConfig hoodieWriteConfig) {
        super(hoodieTable, hoodieEngineContext, hoodieWriteConfig);
        this.indexKeyFields = hoodieTable.getConfig().getBucketIndexHashField();
        this.readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(hoodieWriteConfig.getSchema()));
    }

    @Override // org.apache.hudi.client.clustering.run.strategy.SingleSparkJobExecutionStrategy
    protected List<WriteStatus> performClusteringForGroup(ClusteringGroupInfo clusteringGroupInfo, Map<String, String> map, boolean z, SerializableSchema serializableSchema, TaskContextSupplier taskContextSupplier, String str) {
        ValidationUtils.checkArgument(clusteringGroupInfo.getNumOutputGroups() >= 1, "Number of output groups should be at least 1");
        return clusteringGroupInfo.getNumOutputGroups() == 1 ? performBucketMergeForGroup(clusteringGroupInfo, map, z, serializableSchema, taskContextSupplier, str) : performBucketSplitForGroup(clusteringGroupInfo, map, z, serializableSchema, taskContextSupplier, str);
    }

    private List<ConsistentHashingNode> decodeConsistentHashingNodes(ClusteringGroupInfo clusteringGroupInfo) {
        Option<Map<String, String>> extraMetadata = clusteringGroupInfo.getExtraMetadata();
        ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations");
        String str = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(str), "Child nodes should not be null or empty for consistent hashing operations");
        try {
            return ConsistentHashingNode.fromJsonString(str);
        } catch (Exception e) {
            throw new HoodieClusteringException("Failed to parse child nodes from metadata", e);
        }
    }

    private List<WriteStatus> performBucketMergeForGroup(ClusteringGroupInfo clusteringGroupInfo, Map<String, String> map, boolean z, SerializableSchema serializableSchema, TaskContextSupplier taskContextSupplier, String str) {
        long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), this.writeConfig);
        LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
        Option<Map<String, String>> extraMetadata = clusteringGroupInfo.getExtraMetadata();
        ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations");
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY)), "Partition should not be null or empty");
        Option fromJavaOptional = Option.fromJavaOptional(decodeConsistentHashingNodes(clusteringGroupInfo).stream().filter(consistentHashingNode -> {
            return consistentHashingNode.getTag() == ConsistentHashingNode.NodeTag.REPLACE;
        }).findFirst());
        ValidationUtils.checkArgument(fromJavaOptional.isPresent(), "New bucket should be present for merge operation");
        ConsistentHashingNode consistentHashingNode2 = (ConsistentHashingNode) fromJavaOptional.get();
        ArrayList arrayList = new ArrayList(clusteringGroupInfo.getOperations().size());
        clusteringGroupInfo.getOperations().stream().forEach(clusteringOperation -> {
            arrayList.add(() -> {
                return getRecordIterator(clusteringOperation, str, maxMemoryPerCompaction);
            });
        });
        return (List) ExecutorFactory.create(this.writeConfig, new LazyConcatenatingIterator(arrayList), new InsertHandler(this.writeConfig, str, getHoodieTable(), taskContextSupplier, new FixedIdSuffixCreateHandleFactory(), true, obj -> {
            return consistentHashingNode2.getFileIdPrefix();
        }, this.readerSchema), hoodieRecord -> {
            return hoodieRecord;
        }, getHoodieTable().getPreExecuteRunnable()).execute();
    }

    private List<WriteStatus> performBucketSplitForGroup(ClusteringGroupInfo clusteringGroupInfo, Map<String, String> map, boolean z, SerializableSchema serializableSchema, TaskContextSupplier taskContextSupplier, String str) {
        ValidationUtils.checkArgument(clusteringGroupInfo.getOperations().size() == 1, "Split operation should have only one operation");
        Option<Map<String, String>> extraMetadata = clusteringGroupInfo.getExtraMetadata();
        ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations");
        String str2 = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
        ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(str2), "Partition should not be null or empty");
        List<ConsistentHashingNode> decodeConsistentHashingNodes = decodeConsistentHashingNodes(clusteringGroupInfo);
        HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata = new HoodieConsistentHashingMetadata((short) 0, str2, str, 0, Integer.valueOf(Integer.parseInt(extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY))).intValue() + 1, Collections.emptyList());
        hoodieConsistentHashingMetadata.setChildrenNodes(decodeConsistentHashingNodes);
        ConsistentBucketIdentifier consistentBucketIdentifier = new ConsistentBucketIdentifier(hoodieConsistentHashingMetadata);
        return (List) ExecutorFactory.create(this.writeConfig, getRecordIterator(clusteringGroupInfo.getOperations().get(0), str, IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), this.writeConfig)), new InsertHandler(this.writeConfig, str, getHoodieTable(), taskContextSupplier, new FixedIdSuffixCreateHandleFactory(), false, hoodieRecord -> {
            return consistentBucketIdentifier.getBucket(hoodieRecord.getRecordKey(), this.indexKeyFields).getFileIdPrefix();
        }, this.readerSchema), hoodieRecord2 -> {
            return hoodieRecord2;
        }, getHoodieTable().getPreExecuteRunnable()).execute();
    }

    private ClosableIterator<HoodieRecord<T>> getRecordIterator(ClusteringOperation clusteringOperation, String str, long j) {
        Option<HoodieFileReader> baseOrBootstrapFileReader = getBaseOrBootstrapFileReader(clusteringOperation);
        Option<BaseKeyGenerator> createBaseKeyGenerator = HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(this.writeConfig);
        if (!clusteringOperation.getDeltaFilePaths().isEmpty()) {
            return getRecordIteratorWithLogFiles(clusteringOperation, str, j, createBaseKeyGenerator, baseOrBootstrapFileReader);
        }
        ValidationUtils.checkArgument(baseOrBootstrapFileReader.isPresent(), "Base or bootstrap file reader should be present for operation");
        return getRecordIteratorWithBaseFileOnly(createBaseKeyGenerator, baseOrBootstrapFileReader.get());
    }
}
