package org.apache.hudi.utils;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.HoodieTestCommitGenerator;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.HoodieWriteHelper;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hudi/utils/HoodieWriterClientTestHarness.class */
public abstract class HoodieWriterClientTestHarness extends HoodieCommonTestHarness {
    protected static int timelineServicePort = ((Integer) FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue()).intValue();
    protected static final String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
    protected static final String CLEANING_FAILURE = "CLEANING FAILURE";
    protected HoodieTestTable testTable;

    @FunctionalInterface
    /* loaded from: input_file:org/apache/hudi/utils/HoodieWriterClientTestHarness$Function2.class */
    public interface Function2<R, T1, T2> {
        R apply(T1 t1, T2 t2) throws IOException;
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/hudi/utils/HoodieWriterClientTestHarness$Function3.class */
    public interface Function3<R, T1, T2, T3> {
        R apply(T1 t1, T2 t2, T3 t3) throws IOException;
    }

    protected abstract BaseHoodieWriteClient getHoodieWriteClient(HoodieWriteConfig hoodieWriteConfig);

    protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder builder, boolean z, boolean z2) {
        if (z) {
            return;
        }
        builder.withProperties(z2 ? getPropertiesForMetadataTable() : getPropertiesForKeyGen()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
    }

    protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder builder, boolean z) {
        addConfigsForPopulateMetaFields(builder, z, false);
    }

    public static Properties getPropertiesForKeyGen() {
        return getPropertiesForKeyGen(false);
    }

    public static Properties getPropertiesForKeyGen(boolean z) {
        Properties properties = new Properties();
        properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(z));
        properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
        properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
        properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
        properties.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
        return properties;
    }

    protected Properties getPropertiesForMetadataTable() {
        Properties properties = new Properties();
        properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
        properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "key");
        properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "key");
        return properties;
    }

    public HoodieWriteConfig getConfig() {
        return getConfigBuilder().build();
    }

    public HoodieWriteConfig getConfig(HoodieIndex.IndexType indexType) {
        return getConfigBuilder(indexType).build();
    }

    protected HoodieWriteConfig.Builder getConfigBuilder() {
        return getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}");
    }

    public HoodieWriteConfig.Builder getConfigBuilder(HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy) {
        return getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", HoodieIndex.IndexType.BLOOM, hoodieFailedWritesCleaningPolicy);
    }

    public HoodieWriteConfig.Builder getConfigBuilder(HoodieIndex.IndexType indexType) {
        return getConfigBuilder("{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", indexType, HoodieFailedWritesCleaningPolicy.EAGER);
    }

    public HoodieWriteConfig.Builder getConfigBuilder(String str) {
        return getConfigBuilder(str, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER);
    }

    public HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieIndex.IndexType indexType) {
        return getConfigBuilder(str, indexType, HoodieFailedWritesCleaningPolicy.EAGER);
    }

    public HoodieWriteConfig.Builder getConfigBuilder(String str, HoodieIndex.IndexType indexType, HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy) {
        HoodieWriteConfig.Builder withFileSystemViewConfig = HoodieWriteConfig.newBuilder().withPath(this.basePath).withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2).withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION.intValue()).withWriteStatusClass(MetadataMergeWriteStatus.class).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1048576L).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1048576L).parquetMaxFileSize(1048576L).orcMaxFileSize(1048576L).build()).forTable("raw_trips").withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withEnableBackupForRemoteFileSystemView(false).withRemoteServerPort(Integer.valueOf(timelineServicePort)).build());
        if (StringUtils.nonEmpty(str)) {
            withFileSystemViewConfig.withSchema(str);
        }
        return withFileSystemViewConfig;
    }

    protected abstract List<WriteStatus> writeAndVerifyBatch(BaseHoodieWriteClient baseHoodieWriteClient, List<HoodieRecord> list, String str, boolean z, boolean z2) throws IOException;

    protected Object castInsertFirstBatch(HoodieWriteConfig hoodieWriteConfig, BaseHoodieWriteClient baseHoodieWriteClient, String str, String str2, int i, Function3<Object, BaseHoodieWriteClient, Object, String> function3, boolean z, boolean z2, int i2, InstantGenerator instantGenerator) throws Exception {
        return castInsertFirstBatch(hoodieWriteConfig, baseHoodieWriteClient, str, str2, i, function3, z, z2, i2, true, instantGenerator);
    }

    protected Object castInsertFirstBatch(HoodieWriteConfig hoodieWriteConfig, BaseHoodieWriteClient baseHoodieWriteClient, String str, String str2, int i, Function3<Object, BaseHoodieWriteClient, Object, String> function3, boolean z, boolean z2, int i2, boolean z3, InstantGenerator instantGenerator) throws Exception {
        return null;
    }

    protected Object castWriteBatch(BaseHoodieWriteClient baseHoodieWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, Function2<List<HoodieRecord>, String, Integer> function2, Function3<Object, BaseHoodieWriteClient, Object, String> function3, boolean z, int i2, int i3, int i4, boolean z2, InstantGenerator instantGenerator) throws Exception {
        return castWriteBatch(baseHoodieWriteClient, str, str2, option, str3, i, function2, function3, z, i2, i3, i4, z2, true, instantGenerator);
    }

    protected Object castWriteBatch(BaseHoodieWriteClient baseHoodieWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, Function2<List<HoodieRecord>, String, Integer> function2, Function3<Object, BaseHoodieWriteClient, Object, String> function3, boolean z, int i2, int i3, int i4, boolean z2, boolean z3, InstantGenerator instantGenerator) throws Exception {
        return null;
    }

    protected Object castUpdateBatch(HoodieWriteConfig hoodieWriteConfig, BaseHoodieWriteClient baseHoodieWriteClient, String str, String str2, Option<List<String>> option, String str3, int i, Function3<Object, BaseHoodieWriteClient, Object, String> function3, boolean z, boolean z2, int i2, int i3, int i4, boolean z3, InstantGenerator instantGenerator) throws Exception {
        return null;
    }

    protected Object castDeleteBatch(HoodieWriteConfig hoodieWriteConfig, BaseHoodieWriteClient baseHoodieWriteClient, String str, String str2, String str3, int i, boolean z, boolean z2, int i2, int i3, TimelineFactory timelineFactory, InstantGenerator instantGenerator) throws Exception {
        return castDeleteBatch(hoodieWriteConfig, baseHoodieWriteClient, str, str2, str3, i, z, z2, i2, i3, true, timelineFactory, instantGenerator);
    }

    protected Object castDeleteBatch(HoodieWriteConfig hoodieWriteConfig, BaseHoodieWriteClient baseHoodieWriteClient, String str, String str2, String str3, int i, boolean z, boolean z2, int i2, int i3, boolean z3, TimelineFactory timelineFactory, InstantGenerator instantGenerator) throws Exception {
        return null;
    }

    protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean z, HoodieWriteConfig hoodieWriteConfig, Function2<List<HoodieRecord>, String, Integer> function2) {
        return null;
    }

    protected String[] assertTheEntireDatasetHasAllRecordsStill(int i) {
        return new String[0];
    }

    protected void testMergeHandle(HoodieWriteConfig hoodieWriteConfig) throws IOException {
    }

    protected HoodieWriteConfig getRollbackMarkersAndConsistencyGuardWriteConfig(boolean z, boolean z2, boolean z3) {
        Properties properties = new Properties();
        if (!z3) {
            properties = getPropertiesForKeyGen();
        }
        return !z2 ? getConfigBuilder().withRollbackUsingMarkers(z).withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(z2).build()).build() : getConfigBuilder().withRollbackUsingMarkers(z).withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withEnableOptimisticConsistencyGuard(z2).withOptimisticConsistencyGuardSleepTimeMs(1L).build()).withProperties(properties).build();
    }

    protected HoodieWriteConfig getConsistencyCheckWriteConfig(boolean z) {
        return !z ? getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(z).build()).build() : getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).withEnableOptimisticConsistencyGuard(z).withOptimisticConsistencyGuardSleepTimeMs(1L).build()).build();
    }

    protected HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy, boolean z) {
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.wait_time_ms", "3000");
        properties.setProperty("hoodie.write.lock.client.wait_time_ms_between_retry", "3000");
        properties.setProperty("hoodie.write.lock.client.num_retries", "20");
        if (!z) {
            properties.putAll(getPropertiesForKeyGen(z));
        }
        return getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy).withAutoClean(false).build()).withHeartbeatIntervalInMs(3000).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(Integer.valueOf(timelineServicePort)).build()).withAutoCommit(false).withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()).withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).withProperties(properties).build();
    }

    @NotNull
    protected Set<String> verifyRecordKeys(List<HoodieRecord> list, List<WriteStatus> list2, List<GenericRecord> list3) {
        Iterator<WriteStatus> it = list2.iterator();
        while (it.hasNext()) {
            list3.addAll(getFileUtilsInstance(createMetaClient()).readAvroRecords(this.storage, new StoragePath(this.basePath, it.next().getStat().getPath())));
        }
        Set<String> recordsToRecordKeySet = Transformations.recordsToRecordKeySet(list);
        Assertions.assertEquals(list3.size(), recordsToRecordKeySet.size());
        return recordsToRecordKeySet;
    }

    protected void verifyRecordsWrittenWithPreservedMetadata(Set<String> set, List<HoodieRecord> list, List<WriteStatus> list2) {
        ArrayList arrayList = new ArrayList();
        Set<String> verifyRecordKeys = verifyRecordKeys(list, list2, arrayList);
        Assertions.assertTrue(set.containsAll(((Map) arrayList.stream().collect(Collectors.groupingBy(genericRecord -> {
            return genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
        }))).keySet()));
        Set set2 = (Set) list2.stream().map((v0) -> {
            return v0.getFileId();
        }).collect(Collectors.toSet());
        for (GenericRecord genericRecord2 : arrayList) {
            Assertions.assertTrue(verifyRecordKeys.contains(genericRecord2.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()));
            Assertions.assertTrue(set2.contains(FSUtils.getFileId(genericRecord2.get(HoodieRecord.FILENAME_METADATA_FIELD).toString())));
        }
    }

    protected Pair<String, String> getPartitionAndBaseFilePathsFromLatestCommitMetadata(HoodieTableMetaClient hoodieTableMetaClient) throws IOException {
        String fileExtension = hoodieTableMetaClient.getTableConfig().getBaseFileFormat().getFileExtension();
        HoodieCommitMetadata readCommitMetadata = hoodieTableMetaClient.getActiveTimeline().readCommitMetadata((HoodieInstant) hoodieTableMetaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get());
        return Pair.of((String) readCommitMetadata.getPartitionToWriteStats().values().stream().flatMap((v0) -> {
            return v0.stream();
        }).filter(hoodieWriteStat -> {
            return hoodieWriteStat.getPath().endsWith(fileExtension);
        }).findAny().map((v0) -> {
            return v0.getPartitionPath();
        }).orElse(null), hoodieTableMetaClient.getBasePath() + "/" + ((String) readCommitMetadata.getPartitionToWriteStats().values().stream().flatMap((v0) -> {
            return v0.stream();
        }).filter(hoodieWriteStat2 -> {
            return hoodieWriteStat2.getPath().endsWith(fileExtension);
        }).findAny().map((v0) -> {
            return v0.getPath();
        }).orElse(null)));
    }

    public static FileFormatUtils getFileUtilsInstance(HoodieTableMetaClient hoodieTableMetaClient) {
        return HoodieIOFactory.getIOFactory(hoodieTableMetaClient.getStorage()).getFileFormatUtils(hoodieTableMetaClient.getTableConfig().getBaseFileFormat());
    }

    protected HoodieWriteConfig getSmallInsertWriteConfig(int i, boolean z, boolean z2) {
        return getSmallInsertWriteConfig(i, z, this.dataGen.getEstimatedFileSizeInBytes(150), z2);
    }

    protected HoodieWriteConfig getSmallInsertWriteConfig(int i, boolean z, long j, boolean z2) {
        return getSmallInsertWriteConfig(i, z ? HoodieTestDataGenerator.NULL_SCHEMA : "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", j, z2);
    }

    protected HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j) {
        return getSmallInsertWriteConfig(i, str, j, false);
    }

    protected HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j, boolean z) {
        return getSmallInsertWriteConfig(i, str, j, z, true, new Properties());
    }

    protected HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j, boolean z, Properties properties) {
        return getSmallInsertWriteConfig(i, str, j, false, z, properties);
    }

    protected HoodieWriteConfig getSmallInsertWriteConfig(int i, String str, long j, boolean z, boolean z2, Properties properties) {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(str);
        if (!z2) {
            configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
        }
        return configBuilder.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(j).insertSplitSize(i).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).parquetMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).build()).withMergeAllowDuplicateOnInserts(z).withProps(properties).build();
    }

    protected HoodieWriteConfig getSmallInsertWriteConfigForMDT(int i, String str, long j, boolean z) {
        return getConfigBuilder(str, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER).withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(j).insertSplitSize(i).build()).withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).parquetMaxFileSize(this.dataGen.getEstimatedFileSizeInBytes(200)).build()).withMergeAllowDuplicateOnInserts(z).build();
    }

    protected HoodieWriteConfig getWriteConfigWithPopulateMetaFieldsAndAllowOperationMetaField(boolean z, boolean z2) {
        HoodieWriteConfig.Builder combineInput = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAllowOperationMetadataField(z2).combineInput(true, true);
        addConfigsForPopulateMetaFields(combineInput, z);
        return combineInput.build();
    }

    protected List<HoodieRecord<RawTripTestPayload>> dedupForCopyOnWriteStorage(HoodieWriteConfig hoodieWriteConfig, HoodieData<HoodieRecord> hoodieData, boolean z, int i, int i2) {
        HoodieIndex hoodieIndex = (HoodieIndex) Mockito.mock(HoodieIndex.class);
        Mockito.when(Boolean.valueOf(hoodieIndex.isGlobal())).thenReturn(Boolean.valueOf(z));
        HoodieData deduplicateRecords = HoodieWriteHelper.newInstance().deduplicateRecords(hoodieData, hoodieIndex, hoodieData.getNumPartitions() + i, hoodieWriteConfig.getSchema(), hoodieWriteConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE);
        Assertions.assertEquals(i2, deduplicateRecords.getNumPartitions());
        List<HoodieRecord<RawTripTestPayload>> collectAsList = deduplicateRecords.collectAsList();
        Assertions.assertEquals(z ? 1 : 2, collectAsList.size());
        org.apache.hudi.testutils.Assertions.assertNoDupesWithinPartition(collectAsList);
        return collectAsList;
    }

    protected HoodieTableMetaClient createMetaClient() {
        return HoodieTestUtils.createMetaClient(this.storageConf, this.basePath);
    }

    protected HoodieTableMetaClient createMetaClient(String str) {
        return HoodieTestUtils.createMetaClient(this.storageConf, str);
    }

    protected Set<HoodieFileGroupId> getFileGroupIdsFromWriteStatus(List<WriteStatus> list) {
        return (Set) list.stream().map(writeStatus -> {
            return new HoodieFileGroupId(writeStatus.getPartitionPath(), writeStatus.getFileId());
        }).collect(Collectors.toSet());
    }

    protected void verifyClusteredFilesWithReplaceCommitMetadata(String str) throws IOException {
        this.metaClient = HoodieTableMetaClient.reload(createMetaClient());
        HoodieInstant hoodieInstant = (HoodieInstant) this.metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get();
        HoodieReplaceCommitMetadata readReplaceCommitMetadata = this.metaClient.getActiveTimeline().readReplaceCommitMetadata(hoodieInstant);
        ArrayList arrayList = new ArrayList();
        readReplaceCommitMetadata.getPartitionToWriteStats().forEach((str2, list) -> {
            list.forEach(hoodieWriteStat -> {
                arrayList.add(hoodieWriteStat.getPath());
            });
        });
        Assertions.assertEquals((List) this.storage.listDirectEntries(new StoragePath(this.basePath, str)).stream().filter(storagePathInfo -> {
            return storagePathInfo.getPath().getName().contains(hoodieInstant.requestedTime());
        }).map(storagePathInfo2 -> {
            return str + "/" + storagePathInfo2.getPath().getName();
        }).collect(Collectors.toList()), arrayList);
    }

    protected static List<Pair<HoodieInstant, HoodieClusteringPlan>> getAndAssertPendingClusteringPlans(boolean z, HoodieTableMetaClient hoodieTableMetaClient) {
        List<Pair<HoodieInstant, HoodieClusteringPlan>> list = (List) ClusteringUtils.getAllPendingClusteringPlans(hoodieTableMetaClient).collect(Collectors.toList());
        if (z) {
            Assertions.assertEquals(1, list.size());
        } else {
            Assertions.assertEquals(0, list.size());
        }
        return list;
    }

    protected void checkTimelineForUpsertsInternal(HoodieTableMetaClient hoodieTableMetaClient) {
        List instants = HoodieTestUtils.TIMELINE_FACTORY.createActiveTimeline(hoodieTableMetaClient, false).getCommitAndReplaceTimeline().getInstants();
        Assertions.assertEquals(9, instants.size());
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit", "001"), instants.get(0));
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", "001"), instants.get(1));
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "001"), instants.get(2));
        Assertions.assertFalse(((HoodieInstant) instants.get(2)).isLegacy());
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit", "004"), instants.get(3));
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", "004"), instants.get(4));
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "004"), instants.get(5));
        Assertions.assertFalse(((HoodieInstant) instants.get(5)).isLegacy());
        Assertions.assertFalse(((HoodieInstant) instants.get(8)).isLegacy());
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit", "006"), instants.get(6));
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, "commit", "006"), instants.get(7));
        Assertions.assertEquals(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "006"), instants.get(8));
    }

    protected void verifyRecordsWritten(String str, boolean z, List<HoodieRecord> list, List<WriteStatus> list2, HoodieWriteConfig hoodieWriteConfig, KeyGenerator keyGenerator) {
        ArrayList arrayList = new ArrayList();
        Set<String> verifyRecordKeys = verifyRecordKeys(list, list2, arrayList);
        if (hoodieWriteConfig.populateMetaFields()) {
            for (GenericRecord genericRecord : arrayList) {
                String obj = genericRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
                Assertions.assertEquals(str, genericRecord.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString());
                Assertions.assertTrue(verifyRecordKeys.contains(obj));
            }
            return;
        }
        for (GenericRecord genericRecord2 : arrayList) {
            String recordKey = keyGenerator.getKey(genericRecord2).getRecordKey();
            if (!z) {
                Assertions.assertNull(genericRecord2.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
            }
            Assertions.assertTrue(verifyRecordKeys.contains(recordKey));
        }
    }

    protected List<WriteStatus> writeAndVerifyBatch(BaseHoodieWriteClient baseHoodieWriteClient, List<HoodieRecord> list, String str, boolean z) throws IOException {
        return writeAndVerifyBatch(baseHoodieWriteClient, list, str, z, false);
    }

    protected Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> insertTwoBatches(BaseHoodieWriteClient baseHoodieWriteClient, BaseHoodieWriteClient baseHoodieWriteClient2, boolean z, String str, boolean z2) throws IOException {
        BaseHoodieWriteClient baseHoodieWriteClient3;
        if (z2) {
            if (null != baseHoodieWriteClient) {
                baseHoodieWriteClient.close();
            }
            baseHoodieWriteClient3 = baseHoodieWriteClient2;
        } else {
            baseHoodieWriteClient3 = baseHoodieWriteClient;
        }
        this.dataGen = new HoodieTestDataGenerator(new String[]{str});
        String createNewInstantTime = baseHoodieWriteClient3.createNewInstantTime();
        List<HoodieRecord> generateInserts = this.dataGen.generateInserts(createNewInstantTime, 200);
        Set<HoodieFileGroupId> fileGroupIdsFromWriteStatus = getFileGroupIdsFromWriteStatus(writeAndVerifyBatch(baseHoodieWriteClient3, generateInserts, createNewInstantTime, z, z2));
        String createNewInstantTime2 = baseHoodieWriteClient3.createNewInstantTime();
        List<HoodieRecord> generateInserts2 = this.dataGen.generateInserts(createNewInstantTime2, 200);
        List<WriteStatus> writeAndVerifyBatch = writeAndVerifyBatch(baseHoodieWriteClient3, generateInserts2, createNewInstantTime2, z, z2);
        baseHoodieWriteClient3.close();
        Set<HoodieFileGroupId> fileGroupIdsFromWriteStatus2 = getFileGroupIdsFromWriteStatus(writeAndVerifyBatch);
        HashSet hashSet = new HashSet(fileGroupIdsFromWriteStatus);
        hashSet.addAll(fileGroupIdsFromWriteStatus2);
        HashSet hashSet2 = new HashSet(fileGroupIdsFromWriteStatus);
        hashSet2.retainAll(fileGroupIdsFromWriteStatus2);
        Assertions.assertEquals(0, hashSet2.size());
        return Pair.of(Pair.of(Stream.concat(generateInserts.stream(), generateInserts2.stream()).collect(Collectors.toList()), Arrays.asList(createNewInstantTime, createNewInstantTime2)), hashSet);
    }

    protected void rollbackAndAssert(boolean z, String str, HoodieTableMetaClient hoodieTableMetaClient, BaseHoodieWriteClient baseHoodieWriteClient) throws IOException {
        if (!z) {
            baseHoodieWriteClient.rollback(str);
            Assertions.assertFalse(this.testTable.commitExists(str), "After explicit rollback, commit file should not be present");
            Assertions.assertFalse(hoodieTableMetaClient.getStorage().exists(new StoragePath(hoodieTableMetaClient.getMarkerFolderPath(str))));
        } else {
            Assertions.assertTrue(this.testTable.commitExists(str), "With optimistic CG, first commit should succeed. commit file should be present");
            Assertions.assertFalse(hoodieTableMetaClient.getStorage().exists(new StoragePath(hoodieTableMetaClient.getMarkerFolderPath(str))));
            baseHoodieWriteClient.rollback(str);
            Assertions.assertFalse(this.testTable.commitExists(str), "After explicit rollback, commit file should not be present");
        }
    }

    protected Pair<StoragePath, List<WriteStatus>> testConsistencyCheck(HoodieEngineContext hoodieEngineContext, HoodieTableMetaClient hoodieTableMetaClient, String str, boolean z, Function2<HoodieTable, HoodieTableMetaClient, HoodieWriteConfig> function2, Function function, Function function3) throws Exception {
        HoodieWriteConfig consistencyCheckWriteConfig = getConsistencyCheckWriteConfig(z);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(consistencyCheckWriteConfig);
        hoodieWriteClient.startCommitWithTime(str);
        List list = (List) function3.apply(hoodieWriteClient.bulkInsert(function.apply(this.dataGen.generateInserts(str, 200)), str));
        String markerFolderPath = hoodieTableMetaClient.getMarkerFolderPath(str);
        Option create = WriteMarkersFactory.get(consistencyCheckWriteConfig.getMarkersType(), function2.apply(hoodieTableMetaClient, consistencyCheckWriteConfig), str).create(consistencyCheckWriteConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED ? new StoragePath(markerFolderPath, (String) MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(markerFolderPath, this.storage, hoodieEngineContext, 1).values().stream().flatMap((v0) -> {
            return v0.stream();
        }).findFirst().get()).getParent().toString() : (String) ((List) this.storage.globEntries(new StoragePath(String.format("%s/*/*/*/*", markerFolderPath)), storagePath -> {
            return storagePath.toString().contains(".marker");
        }).stream().limit(1L).map(storagePathInfo -> {
            return storagePathInfo.getPath().getParent().toString();
        }).collect(Collectors.toList())).get(0), FSUtils.makeBaseFileName(str, HoodieTestCommitGenerator.BASE_FILE_WRITE_TOKEN, UUID.randomUUID().toString(), ((HoodieFileFormat) HoodieTableConfig.BASE_FILE_FORMAT.defaultValue()).getFileExtension()), IOType.MERGE);
        if (z) {
            hoodieWriteClient.commit(str, function.apply(list));
        } else {
            Assertions.assertTrue(((Exception) Assertions.assertThrows(HoodieCommitException.class, () -> {
                hoodieWriteClient.commit(str, function.apply(list));
            }, "Commit should fail due to consistency check")).getCause() instanceof HoodieIOException);
        }
        return Pair.of(create.get(), list);
    }

    protected void testConsistencyCheckDuringFinalize(HoodieEngineContext hoodieEngineContext, boolean z, Function2<HoodieTable, HoodieTableMetaClient, HoodieWriteConfig> function2, Function function, Function function3) throws Exception {
        HoodieTableMetaClient createMetaClient = createMetaClient();
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withEnableOptimisticConsistencyGuard(z).build()).build());
        hoodieWriteClient.setOperationType(WriteOperationType.UPSERT);
        Pair<StoragePath, List<WriteStatus>> testConsistencyCheck = testConsistencyCheck(hoodieEngineContext, createMetaClient, "000", z, function2, function, function3);
        createMetaClient.getStorage().deleteFile((StoragePath) testConsistencyCheck.getKey());
        if (z) {
            Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
            Assertions.assertFalse(createMetaClient.getStorage().exists(new StoragePath(createMetaClient.getMarkerFolderPath("000"))));
        } else {
            Assertions.assertTrue(hoodieWriteClient.commit("000", function.apply(testConsistencyCheck.getRight())), "Commit should succeed");
            Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
            Assertions.assertFalse(createMetaClient.getStorage().exists(new StoragePath(createMetaClient.getMarkerFolderPath("000"))));
        }
    }

    protected void testRollbackAfterConsistencyCheckFailureUsingFileList(HoodieEngineContext hoodieEngineContext, boolean z, boolean z2, boolean z3, Function2<HoodieTable, HoodieTableMetaClient, HoodieWriteConfig> function2, Function function, Function function3) throws Exception {
        HoodieTableMetaClient createMetaClient = createMetaClient();
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(getRollbackMarkersAndConsistencyGuardWriteConfig(z, z2, z3));
        testConsistencyCheck(hoodieEngineContext, createMetaClient, "00000000000010", z2, function2, function, function3);
        rollbackAndAssert(z2, "00000000000010", createMetaClient, hoodieWriteClient);
    }

    protected List<HoodieRecord> generate3AvroRecords(String str) throws IOException {
        String uuid = UUID.randomUUID().toString();
        HoodieKey hoodieKey = new HoodieKey(uuid, "2018-01-01");
        HoodieRecord hoodieAvroRecord = new HoodieAvroRecord(hoodieKey, this.dataGen.generateRandomValue(hoodieKey, str), HoodieOperation.INSERT);
        HoodieKey hoodieKey2 = new HoodieKey(uuid, "2018-02-01");
        return Arrays.asList(hoodieAvroRecord, new HoodieAvroRecord(hoodieKey2, this.dataGen.generateRandomValue(hoodieKey2, str), HoodieOperation.INSERT), new HoodieAvroRecord(hoodieKey2, this.dataGen.generateRandomValue(hoodieKey2, str), HoodieOperation.UPDATE_AFTER));
    }

    private HoodieWriteMetadata<List<WriteStatus>> performClustering(HoodieClusteringConfig hoodieClusteringConfig, boolean z, boolean z2, String str, String str2, String str3, Pair<List<HoodieRecord>, List<String>> pair, Function<HoodieWriteMetadata, HoodieWriteMetadata<List<WriteStatus>>> function, Function<HoodieWriteConfig, KeyGenerator> function2) {
        HoodieWriteConfig build = getConfigBuilder().withAutoCommit(false).withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig.newBuilder().withPreCommitValidator(StringUtils.nullToEmpty(str)).withPrecommitValidatorEqualitySqlQueries(str2).withPrecommitValidatorSingleResultSqlQueries(str3).build()).withProps(z ? new Properties() : getPropertiesForKeyGen()).withClusteringConfig(hoodieClusteringConfig).build();
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        String obj = hoodieWriteClient.scheduleClustering(Option.empty()).get().toString();
        HoodieWriteMetadata<List<WriteStatus>> apply = function.apply(hoodieWriteClient.cluster(obj, z2));
        if (build.populateMetaFields()) {
            verifyRecordsWrittenWithPreservedMetadata(new HashSet((Collection) pair.getRight()), (List) pair.getLeft(), (List) apply.getWriteStatuses());
        } else {
            verifyRecordsWritten(obj, z, (List) pair.getLeft(), (List) apply.getWriteStatuses(), build, function2.apply(build));
        }
        return apply;
    }

    protected void testClustering(HoodieClusteringConfig hoodieClusteringConfig, boolean z, boolean z2, boolean z3, String str, String str2, String str3, Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> pair, Function<HoodieWriteMetadata, HoodieWriteMetadata<List<WriteStatus>>> function, Function<HoodieWriteConfig, KeyGenerator> function2) {
        HoodieWriteConfig build = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false).withClusteringConfig(hoodieClusteringConfig).withProps(getPropertiesForKeyGen()).build();
        HoodieWriteMetadata<List<WriteStatus>> performClustering = performClustering(hoodieClusteringConfig, z, z2, str, str2, str3, (Pair) pair.getLeft(), function, function2);
        if (z3) {
            Assertions.assertEquals((Set) pair.getRight(), (Set) ((List) performClustering.getWriteStats().get()).stream().map(hoodieWriteStat -> {
                return new HoodieFileGroupId(hoodieWriteStat.getPartitionPath(), hoodieWriteStat.getFileId());
            }).collect(Collectors.toSet()));
        }
        if (z2) {
            verifyRecordsWritten(((HoodieInstant) createMetaClient().reloadActiveTimeline().getCompletedReplaceTimeline().getReverseOrderedInstants().findFirst().get()).requestedTime(), z, (List) ((Pair) pair.getLeft()).getLeft(), (List) performClustering.getWriteStatuses(), build, function2.apply(build));
        }
    }

    protected Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean z, Function function) throws IOException {
        return testInsertTwoBatches(z, "2015/03/16", function);
    }

    protected Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean z, String str, Function function) throws IOException {
        return testInsertTwoBatches(z, str, getPropertiesForKeyGen(z), false, function);
    }

    protected Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean z, String str, Properties properties, boolean z2, Function function) throws IOException {
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(2000, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", 10L, false, z, z ? properties : getPropertiesForKeyGen());
        return insertTwoBatches(getHoodieWriteClient(smallInsertWriteConfig), (BaseHoodieWriteClient) function.apply(smallInsertWriteConfig), z, str, z2);
    }

    protected void testCommitWritesRelativePaths(Function function) throws Exception {
        HoodieWriteConfig.Builder withAutoCommit = getConfigBuilder().withAutoCommit(false);
        addConfigsForPopulateMetaFields(withAutoCommit, true);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(withAutoCommit.build());
        Throwable th = null;
        try {
            try {
                HoodieTableMetaClient createMetaClient = createMetaClient();
                hoodieWriteClient.startCommitWithTime("000");
                Assertions.assertTrue(hoodieWriteClient.commit("000", hoodieWriteClient.bulkInsert(function.apply(this.dataGen.generateInserts("000", 200)), "000")), "Commit should succeed");
                Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
                HoodieCommitMetadata readCommitMetadata = createMetaClient.getCommitTimeline().filterCompletedInstants().readCommitMetadata(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, createMetaClient.getCommitActionType(), "000"));
                StoragePath basePath = createMetaClient.getBasePath();
                Collection values = readCommitMetadata.getFileIdAndFullPaths(basePath).values();
                Iterator it = createMetaClient.reloadActiveTimeline().readCommitMetadata(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "000")).getFileIdAndFullPaths(basePath).values().iterator();
                while (it.hasNext()) {
                    Assertions.assertTrue(values.contains((String) it.next()));
                }
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    protected void testMetadataStatsOnCommit(boolean z, Function function) throws Exception {
        HoodieWriteConfig.Builder withAutoCommit = getConfigBuilder().withAutoCommit(false);
        addConfigsForPopulateMetaFields(withAutoCommit, z);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(withAutoCommit.build());
        hoodieWriteClient.startCommitWithTime("000");
        List generateInserts = this.dataGen.generateInserts("000", 200);
        Assertions.assertTrue(hoodieWriteClient.commit("000", hoodieWriteClient.bulkInsert(function.apply(generateInserts), "000")), "Commit should succeed");
        Assertions.assertTrue(this.testTable.commitExists("000"), "After explicit commit, commit file should be created");
        int i = 0;
        Iterator it = createMetaClient().reloadActiveTimeline().readCommitMetadata(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "000")).getPartitionToWriteStats().entrySet().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) ((Map.Entry) it.next()).getValue()).iterator();
            while (it2.hasNext()) {
                i = (int) (i + ((HoodieWriteStat) it2.next()).getNumInserts());
            }
        }
        Assertions.assertEquals(200, i);
        hoodieWriteClient.startCommitWithTime("001");
        Assertions.assertTrue(hoodieWriteClient.commit("001", hoodieWriteClient.upsert(function.apply(this.dataGen.generateUpdates("001", generateInserts)), "001")), "Commit should succeed");
        Assertions.assertTrue(this.testTable.commitExists("001"), "After explicit commit, commit file should be created");
        this.metaClient = createMetaClient();
        int i2 = 0;
        int i3 = 0;
        Iterator it3 = this.metaClient.reloadActiveTimeline().readCommitMetadata(HoodieTestUtils.INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", "001")).getPartitionToWriteStats().entrySet().iterator();
        while (it3.hasNext()) {
            for (HoodieWriteStat hoodieWriteStat : (List) ((Map.Entry) it3.next()).getValue()) {
                i2 = (int) (i2 + hoodieWriteStat.getNumInserts());
                i3 = (int) (i3 + hoodieWriteStat.getNumUpdateWrites());
            }
        }
        Assertions.assertEquals(0, i2);
        Assertions.assertEquals(200, i3);
    }

    protected void testFailWritesOnInlineTableServiceThrowable(boolean z, boolean z2, Function function, String str) throws IOException {
        try {
            Properties properties = new Properties();
            properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", String.valueOf(z));
            properties.setProperty("hoodie.auto.commit", "false");
            properties.setProperty("hoodie.clustering.inline.max.commits", "1");
            properties.setProperty("hoodie.clustering.inline", "true");
            properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
            testInsertTwoBatches(true, "2015/03/16", properties, true, function);
            Assertions.assertFalse(z2);
        } catch (HoodieException | Error e) {
            Assertions.assertEquals(str, e.getMessage());
            Assertions.assertTrue(z2);
        }
    }

    protected void testDeletesWithoutInserts(boolean z, Function function, Function function2) {
        HoodieWriteConfig smallInsertWriteConfig = getSmallInsertWriteConfig(100, "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null },{\"name\": \"trip_type\", \"type\": {\"type\": \"enum\", \"name\": \"TripType\", \"symbols\": [\"UNKNOWN\", \"UBERX\", \"BLACK\"], \"default\": \"UNKNOWN\"}},{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},{\"name\": \"distance_in_meters\", \"type\": \"int\"},{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},{\"name\": \"weight\", \"type\": \"float\"},{\"name\": \"nation\", \"type\": \"bytes\"},{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},{\"name\":\"current_ts\",\"type\": {\"type\": \"long\"}},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},{\"name\": \"tip_history\", \"default\": [], \"type\": {\"type\": \"array\", \"default\": [], \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": [{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}", this.dataGen.getEstimatedFileSizeInBytes(150), z, z ? new Properties() : getPropertiesForKeyGen());
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2016/09/26"});
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(smallInsertWriteConfig);
        hoodieWriteClient.startCommitWithTime("001");
        function2.apply(hoodieWriteClient.delete(function.apply(Transformations.randomSelectAsHoodieKeys(this.dataGen.generateInserts("001", 20), 20)), "001"));
    }

    protected void testInlineScheduleClustering(Function function, HoodieClusteringConfig hoodieClusteringConfig, Function function2, Function function3) throws IOException {
        testInsertTwoBatches(true, function);
        HoodieWriteConfig build = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false).withClusteringConfig(hoodieClusteringConfig).withProps(getPropertiesForKeyGen()).build();
        this.dataGen = new HoodieTestDataGenerator(new String[]{"2015/03/16"});
        generateInsertsAndCommit(build, function2, function3);
        getAndAssertPendingClusteringPlans(hoodieClusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING).booleanValue(), createMetaClient());
    }

    protected void testAndValidateClusteringOutputFiles(Function function, HoodieClusteringConfig hoodieClusteringConfig, Function function2, Function function3) throws IOException {
        testInsertTwoBatches(true, "2015/03/16", function);
        generateInsertsAndCommit(getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false).withClusteringConfig(hoodieClusteringConfig).build(), function2, function3);
        verifyClusteredFilesWithReplaceCommitMetadata("2015/03/16");
    }

    private void generateInsertsAndCommit(HoodieWriteConfig hoodieWriteConfig, Function function, Function function2) {
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(hoodieWriteConfig);
        Throwable th = null;
        try {
            try {
                String createNewInstantTime = hoodieWriteClient.createNewInstantTime();
                List generateInserts = this.dataGen.generateInserts(createNewInstantTime, 200);
                hoodieWriteClient.startCommitWithTime(createNewInstantTime);
                List list = (List) function2.apply(hoodieWriteClient.insert(function.apply(generateInserts), createNewInstantTime));
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(list);
                hoodieWriteClient.commit(createNewInstantTime, function.apply(list));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    protected void testDeduplication(Function3<List<WriteStatus>, BaseHoodieWriteClient, List<HoodieRecord>, String> function3, boolean z, boolean z2) throws Exception {
        List<HoodieRecord> generate3AvroRecords = generate3AvroRecords("001");
        HoodieListData eager = HoodieListData.eager(generate3AvroRecords);
        HoodieWriteConfig writeConfigWithPopulateMetaFieldsAndAllowOperationMetaField = getWriteConfigWithPopulateMetaFieldsAndAllowOperationMetaField(z, z2);
        List<HoodieRecord<RawTripTestPayload>> dedupForCopyOnWriteStorage = dedupForCopyOnWriteStorage(writeConfigWithPopulateMetaFieldsAndAllowOperationMetaField, eager, true, z2 ? 100 : 2, eager.getNumPartitions());
        if (z2) {
            Assertions.assertEquals(dedupForCopyOnWriteStorage.get(0).getOperation(), generate3AvroRecords.get(2).getOperation());
        } else {
            Assertions.assertEquals(dedupForCopyOnWriteStorage.get(0).getPartitionPath(), generate3AvroRecords.get(2).getPartitionPath());
            dedupForCopyOnWriteStorage(writeConfigWithPopulateMetaFieldsAndAllowOperationMetaField, eager, false, 2, eager.getNumPartitions());
        }
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(writeConfigWithPopulateMetaFieldsAndAllowOperationMetaField);
        Throwable th = null;
        try {
            try {
                hoodieWriteClient.startCommitWithTime("001");
                List<WriteStatus> apply = function3.apply(hoodieWriteClient, generate3AvroRecords, "001");
                org.apache.hudi.testutils.Assertions.assertNoWriteErrors(apply);
                Assertions.assertEquals(2, apply.size());
                org.apache.hudi.testutils.Assertions.assertNoDuplicatesInPartition((List) apply.stream().map((v0) -> {
                    return v0.getWrittenRecordDelegates();
                }).flatMap((v0) -> {
                    return v0.stream();
                }).collect(Collectors.toList()));
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    protected void testAutoCommit(Function3<Object, BaseHoodieWriteClient, Object, String> function3, boolean z, boolean z2, InstantGenerator instantGenerator) throws Exception {
        HoodieWriteConfig.Builder withAutoCommit = getConfigBuilder().withAutoCommit(false);
        addConfigsForPopulateMetaFields(withAutoCommit, z2);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(withAutoCommit.build());
        Throwable th = null;
        try {
            try {
                Object castInsertFirstBatch = castInsertFirstBatch(withAutoCommit.build(), hoodieWriteClient, "001", "000", 200, function3, z, false, 200, instantGenerator);
                Assertions.assertFalse(this.testTable.commitExists("001"), "If Autocommit is false, then commit should not be made automatically");
                Assertions.assertTrue(hoodieWriteClient.commit("001", castInsertFirstBatch), "Commit should succeed");
                Assertions.assertTrue(this.testTable.commitExists("001"), "After explicit commit, commit file should be created");
                if (hoodieWriteClient != null) {
                    if (0 == 0) {
                        hoodieWriteClient.close();
                        return;
                    }
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (hoodieWriteClient != null) {
                if (th != null) {
                    try {
                        hoodieWriteClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    hoodieWriteClient.close();
                }
            }
            throw th4;
        }
    }

    protected void testDeletesForInsertsInSameBatch(InstantGenerator instantGenerator) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
        addConfigsForPopulateMetaFields(configBuilder, true);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(configBuilder.build());
        ArrayList arrayList = new ArrayList();
        castWriteBatch(hoodieWriteClient, "001", "000", Option.empty(), "000", -1, (str, num) -> {
            List generateInserts = this.dataGen.generateInserts(str, 200);
            List subList = generateInserts.subList(40, 90);
            arrayList.addAll(generateInserts);
            arrayList.addAll(this.dataGen.generateDeletesFromExistingRecords(subList));
            return arrayList;
        }, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, true, 150, 150, 1, false, true, instantGenerator);
    }

    protected void testHoodieConcatHandle(boolean z, boolean z2, InstantGenerator instantGenerator) throws Exception {
        this.metaClient = createMetaClient(this.basePath);
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder();
        addConfigsForPopulateMetaFields(configBuilder, z);
        HoodieWriteConfig build = configBuilder.withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0.intValue()).build();
        HoodieTableMetaClient.newTableBuilder().fromMetaClient(this.metaClient).setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_0).initTable(this.metaClient.getStorageConf().newInstance(), this.metaClient.getBasePath());
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        castInsertFirstBatch(build, hoodieWriteClient, "001", "000", 200, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, z2, true, 200, z, instantGenerator);
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        castWriteBatch(hoodieWriteClient, "004", "001", Option.of(Arrays.asList("002")), "000", 100, generateWrapRecordsFn(z2, build, hoodieTestDataGenerator::generateUniqueUpdates), (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, true, 100, 300, 2, false, z, instantGenerator);
    }

    protected void testHoodieConcatHandleOnDupInserts(boolean z, InstantGenerator instantGenerator) throws Exception {
        HoodieWriteConfig build = getConfigBuilder().withMergeAllowDuplicateOnInserts(true).build();
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        castInsertFirstBatch(build, hoodieWriteClient, "001", "000", 50, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, z, true, 50, build.populateMetaFields(), instantGenerator);
        List asList = Arrays.asList("002", "003");
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        castWriteBatch(hoodieWriteClient, "004", "001", Option.of(asList), "000", 100, generateWrapRecordsFn(z, build, hoodieTestDataGenerator::generateUpdates), (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, true, 100, 50 + 100, 2, false, build.populateMetaFields(), instantGenerator);
    }

    protected void testUpsertsInternal(Function3<Object, BaseHoodieWriteClient, Object, String> function3, boolean z, boolean z2, SupportsUpgradeDowngrade supportsUpgradeDowngrade) throws Exception {
        this.metaClient = createMetaClient();
        HoodieWriteConfig.Builder withMetadataConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
        addConfigsForPopulateMetaFields(withMetadataConfig, z);
        HoodieTableMetaClient.newTableBuilder().fromMetaClient(this.metaClient).setTableVersion(6).setPopulateMetaFields(z).initTable(this.metaClient.getStorageConf().newInstance(), this.metaClient.getBasePath());
        this.metaClient = HoodieTestUtils.createMetaClient(this.storageConf, new StoragePath(this.basePath), HoodieTableVersion.SIX);
        HoodieWriteConfig build = withMetadataConfig.withWriteTableVersion(6).build();
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(build);
        castInsertFirstBatch(build, hoodieWriteClient, "001", "000", 200, (v0, v1, v2) -> {
            return v0.insert(v1, v2);
        }, z2, true, 200, z, this.metaClient.getInstantGenerator());
        castUpdateBatch(build, hoodieWriteClient, "004", "001", Option.of(Arrays.asList("002")), "000", 100, function3, z2, true, 100, 200, 2, z, this.metaClient.getInstantGenerator());
        castDeleteBatch(build, hoodieWriteClient, "005", "004", "000", 50, z2, true, 0, 150, build.populateMetaFields(), this.metaClient.getTimelineLayout().getTimelineFactory(), this.metaClient.getInstantGenerator());
        HoodieWriteConfig build2 = getConfigBuilder().withProps(build.getProps()).withWriteTableVersion(HoodieTableVersion.EIGHT.versionCode()).build();
        new UpgradeDowngrade(this.metaClient, build2, getHoodieWriteClient(build2).getEngineContext(), supportsUpgradeDowngrade).run(HoodieTableVersion.EIGHT, (String) null);
        BaseHoodieWriteClient hoodieWriteClient2 = getHoodieWriteClient(build2);
        hoodieWriteClient2.savepoint("004", "user1", "comment1");
        hoodieWriteClient2.restoreToInstant("004", build.isMetadataTableEnabled());
        this.metaClient = HoodieTestUtils.createMetaClient(this.storageConf, new StoragePath(this.basePath), HoodieTableVersion.EIGHT);
        Assertions.assertFalse(this.metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
        hoodieWriteClient2.deleteSavepoint("004");
        Assertions.assertFalse(this.metaClient.reloadActiveTimeline().getSavePointTimeline().containsInstant("004"));
        assertTheEntireDatasetHasAllRecordsStill(200);
        castDeleteBatch(build2, hoodieWriteClient2, "006", "005", "000", 50, z2, true, 0, 150, this.metaClient.getTimelineLayout().getTimelineFactory(), this.metaClient.getInstantGenerator());
        checkTimelineForUpsertsInternal(this.metaClient);
        testMergeHandle(build);
    }

    protected void testDeletes(Function3<Function2<List<HoodieRecord>, String, Integer>, String, Integer, List<HoodieRecord>> function3, int i, int i2, int i3) throws Exception {
        HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY);
        addConfigsForPopulateMetaFields(configBuilder, true);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(configBuilder.build());
        ArrayList arrayList = new ArrayList();
        castWriteBatch(hoodieWriteClient, "001", "000", Option.empty(), "000", -1, (str, num) -> {
            List generateInserts = this.dataGen.generateInserts(str, 200);
            List generateDeletes = this.dataGen.generateDeletes(str, 100);
            arrayList.addAll(generateInserts);
            arrayList.addAll(generateDeletes);
            return arrayList;
        }, (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, true, 200, 200, 1, false, true, HoodieTestUtils.INSTANT_GENERATOR);
        castWriteBatch(hoodieWriteClient, "004", "001", Option.empty(), "000", i, function3.apply("004", Integer.valueOf(i), arrayList), (v0, v1, v2) -> {
            return v0.upsert(v1, v2);
        }, true, i2, i3, 2, false, true, HoodieTestUtils.INSTANT_GENERATOR);
    }

    protected void testRollbackFailedCommits(boolean z) throws Exception {
        HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.NEVER;
        HoodieTestUtils.init(this.storageConf, this.basePath);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of = Option.of(Arrays.asList("100"));
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        castWriteBatch(hoodieWriteClient, "100", "100", of, "100", 100, hoodieTestDataGenerator::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true, HoodieTestUtils.INSTANT_GENERATOR);
        Option<List<String>> of2 = Option.of(Arrays.asList("200"));
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        castWriteBatch(hoodieWriteClient, "200", "100", of2, "100", 100, hoodieTestDataGenerator2::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient.close();
        BaseHoodieWriteClient hoodieWriteClient2 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of3 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator3 = this.dataGen;
        hoodieTestDataGenerator3.getClass();
        castWriteBatch(hoodieWriteClient2, "300", "200", of3, "300", 100, hoodieTestDataGenerator3::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient2.close();
        this.dataGen = new HoodieTestDataGenerator();
        BaseHoodieWriteClient hoodieWriteClient3 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of4 = Option.of(Arrays.asList("400"));
        HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
        hoodieTestDataGenerator4.getClass();
        castWriteBatch(hoodieWriteClient3, "400", "300", of4, "400", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true, HoodieTestUtils.INSTANT_GENERATOR);
        HoodieTableMetaClient createMetaClient = createMetaClient();
        Assertions.assertEquals(0, createMetaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants());
        Assertions.assertEquals(2, createMetaClient.getActiveTimeline().filterInflights().countInstants());
        Assertions.assertEquals(2, createMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants());
        boolean z2 = false;
        while (!z2) {
            z2 = hoodieWriteClient3.getHeartbeatClient().isHeartbeatExpired("300");
            Thread.sleep(2000L);
        }
        hoodieWriteClient3.close();
        BaseHoodieWriteClient hoodieWriteClient4 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of5 = Option.of(Arrays.asList("500"));
        HoodieTestDataGenerator hoodieTestDataGenerator5 = this.dataGen;
        hoodieTestDataGenerator5.getClass();
        castWriteBatch(hoodieWriteClient4, "500", "400", of5, "500", 100, hoodieTestDataGenerator5::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient4.clean();
        hoodieWriteClient4.close();
        HoodieActiveTimeline reload = createMetaClient.getActiveTimeline().reload();
        if (hoodieFailedWritesCleaningPolicy.isLazy()) {
            Assertions.assertEquals(2, reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants());
            Assertions.assertEquals(0, reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"clean"})).countInstants());
            Assertions.assertEquals(3, reload.getCommitsTimeline().filterCompletedInstants().countInstants());
        } else if (hoodieFailedWritesCleaningPolicy.isNever()) {
            Assertions.assertEquals(2, reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants());
            Assertions.assertEquals(0, reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"clean"})).countInstants());
            Assertions.assertEquals(3, reload.getCommitsTimeline().filterCompletedInstants().countInstants());
        }
    }

    protected void testRollbackFailedCommitsToggleCleaningPolicy(boolean z) throws Exception {
        HoodieTestUtils.init(this.storageConf, this.basePath);
        this.metaClient = createMetaClient();
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy.EAGER, z));
        Option<List<String>> of = Option.of(Arrays.asList("100"));
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        castWriteBatch(hoodieWriteClient, "100", "100", of, "100", 100, hoodieTestDataGenerator::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, true, HoodieTestUtils.INSTANT_GENERATOR);
        Option<List<String>> of2 = Option.of(Arrays.asList("200"));
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        castWriteBatch(hoodieWriteClient, "200", "100", of2, "200", 100, hoodieTestDataGenerator2::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient.close();
        HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
        BaseHoodieWriteClient hoodieWriteClient2 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of3 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator3 = this.dataGen;
        hoodieTestDataGenerator3.getClass();
        castWriteBatch(hoodieWriteClient2, "300", "200", of3, "300", 100, hoodieTestDataGenerator3::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient2.close();
        BaseHoodieWriteClient hoodieWriteClient3 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of4 = Option.of(Arrays.asList("400"));
        HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
        hoodieTestDataGenerator4.getClass();
        castWriteBatch(hoodieWriteClient3, "400", "300", of4, "400", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient3.close();
        boolean z2 = false;
        while (!z2) {
            z2 = hoodieWriteClient3.getHeartbeatClient().isHeartbeatExpired("400");
            Thread.sleep(2000L);
        }
        hoodieWriteClient3.clean();
        Assertions.assertEquals(3, this.metaClient.getActiveTimeline().reload().getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants());
        BaseHoodieWriteClient hoodieWriteClient4 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of5 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator5 = this.dataGen;
        hoodieTestDataGenerator5.getClass();
        castWriteBatch(hoodieWriteClient4, "500", "400", of5, "300", 100, hoodieTestDataGenerator5::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient4.close();
        BaseHoodieWriteClient hoodieWriteClient5 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of6 = Option.of(Arrays.asList("400"));
        HoodieTestDataGenerator hoodieTestDataGenerator6 = this.dataGen;
        hoodieTestDataGenerator6.getClass();
        castWriteBatch(hoodieWriteClient5, "600", "500", of6, "400", 100, hoodieTestDataGenerator6::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 300, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient5.close();
        BaseHoodieWriteClient hoodieWriteClient6 = getHoodieWriteClient(getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy.EAGER, z));
        hoodieWriteClient6.startCommit();
        hoodieWriteClient6.close();
        HoodieActiveTimeline reload = this.metaClient.getActiveTimeline().reload();
        Assertions.assertEquals(3, reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants());
        Assertions.assertEquals(1, reload.getCommitsTimeline().filterCompletedInstants().countInstants());
    }

    protected void testParallelInsertAndCleanPreviousFailedCommits(boolean z) throws Exception {
        HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        HoodieTestUtils.init(this.storageConf, this.basePath);
        BaseHoodieWriteClient hoodieWriteClient = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of = Option.of(Arrays.asList("100"));
        HoodieTestDataGenerator hoodieTestDataGenerator = this.dataGen;
        hoodieTestDataGenerator.getClass();
        castWriteBatch(hoodieWriteClient, "100", "100", of, "100", 100, hoodieTestDataGenerator::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 100, 0, true, HoodieTestUtils.INSTANT_GENERATOR);
        Option<List<String>> of2 = Option.of(Arrays.asList("200"));
        HoodieTestDataGenerator hoodieTestDataGenerator2 = this.dataGen;
        hoodieTestDataGenerator2.getClass();
        castWriteBatch(hoodieWriteClient, "200", "100", of2, "200", 100, hoodieTestDataGenerator2::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 100, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient.close();
        BaseHoodieWriteClient hoodieWriteClient2 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        Option<List<String>> of3 = Option.of(Arrays.asList("300"));
        HoodieTestDataGenerator hoodieTestDataGenerator3 = this.dataGen;
        hoodieTestDataGenerator3.getClass();
        castWriteBatch(hoodieWriteClient2, "300", "200", of3, "300", 100, hoodieTestDataGenerator3::generateInserts, (v0, v1, v2) -> {
            return v0.bulkInsert(v1, v2);
        }, false, 100, 100, 0, false, HoodieTestUtils.INSTANT_GENERATOR);
        hoodieWriteClient2.close();
        this.dataGen = new HoodieTestDataGenerator();
        newFixedThreadPool.submit(() -> {
            BaseHoodieWriteClient hoodieWriteClient3 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
            Option<List<String>> of4 = Option.of(Arrays.asList("400"));
            HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
            hoodieTestDataGenerator4.getClass();
            return castWriteBatch(hoodieWriteClient3, "400", "300", of4, "300", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
                return v0.bulkInsert(v1, v2);
            }, false, 100, 100, 0, true, HoodieTestUtils.INSTANT_GENERATOR);
        }).get();
        HoodieTableMetaClient createMetaClient = createMetaClient();
        Assertions.assertEquals(0, createMetaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants());
        Assertions.assertEquals(2, createMetaClient.getActiveTimeline().filterInflights().countInstants());
        Assertions.assertEquals(2, createMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants());
        BaseHoodieWriteClient hoodieWriteClient3 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
        boolean z2 = false;
        while (!z2) {
            z2 = hoodieWriteClient3.getHeartbeatClient().isHeartbeatExpired("300");
            Thread.sleep(2000L);
        }
        Future submit = newFixedThreadPool.submit(() -> {
            BaseHoodieWriteClient hoodieWriteClient4 = getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z));
            Option<List<String>> of4 = Option.of(Arrays.asList("500"));
            HoodieTestDataGenerator hoodieTestDataGenerator4 = this.dataGen;
            hoodieTestDataGenerator4.getClass();
            return castWriteBatch(hoodieWriteClient4, "500", "400", of4, "500", 100, hoodieTestDataGenerator4::generateInserts, (v0, v1, v2) -> {
                return v0.bulkInsert(v1, v2);
            }, false, 100, 100, 0, true, HoodieTestUtils.INSTANT_GENERATOR);
        });
        Future submit2 = newFixedThreadPool.submit(() -> {
            return getHoodieWriteClient(getParallelWritingWriteConfig(hoodieFailedWritesCleaningPolicy, z)).clean();
        });
        submit.get();
        submit2.get();
        hoodieWriteClient3.close();
        HoodieActiveTimeline reload = createMetaClient.getActiveTimeline().reload();
        Assertions.assertEquals(2, reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"rollback"})).countInstants());
        Assertions.assertEquals(0, reload.getTimelineOfActions(CollectionUtils.createSet(new String[]{"clean"})).countInstants());
        Assertions.assertEquals(3, reload.getCommitsTimeline().filterCompletedInstants().countInstants());
        newFixedThreadPool.shutdown();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -868689368:
                if (implMethodName.equals("lambda$testConsistencyCheck$960c68bf$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/hudi/storage/StoragePathFilter") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/hudi/storage/StoragePath;)Z") && serializedLambda.getImplClass().equals("org/apache/hudi/utils/HoodieWriterClientTestHarness") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/hudi/storage/StoragePath;)Z")) {
                    return storagePath -> {
                        return storagePath.toString().contains(".marker");
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
