package org.apache.paimon.flink.sink.cdc;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableChannelComputer;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.class */
public class FlinkCdcMultiTableSink implements Serializable {
    private static final long serialVersionUID = 1;
    private static final String WRITER_NAME = "CDC MultiplexWriter";
    private static final String GLOBAL_COMMITTER_NAME = "Multiplex Global Committer";
    private final boolean isOverwrite = false;
    private final Catalog.Loader catalogLoader;
    private final double commitCpuCores;

    @Nullable
    private final MemorySize commitHeapMemory;
    private final boolean commitChaining;
    private final String commitUser;

    public FlinkCdcMultiTableSink(Catalog.Loader loader, double d, @Nullable MemorySize memorySize, boolean z, String str) {
        this.catalogLoader = loader;
        this.commitCpuCores = d;
        this.commitHeapMemory = memorySize;
        this.commitChaining = z;
        this.commitUser = str;
    }

    private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
        return (fileStoreTable, str, storeSinkWriteState, iOManager, memoryPoolFactory, metricGroup) -> {
            return new StoreSinkWriteImpl(fileStoreTable, str, storeSinkWriteState, iOManager, false, fileStoreTable.coreOptions().prepareCommitWaitCompaction(), true, memoryPoolFactory, metricGroup);
        };
    }

    public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> dataStream) {
        return sinkFrom(dataStream, this.commitUser, createWriteProvider());
    }

    public DataStreamSink<?> sinkFrom(DataStream<CdcMultiplexRecord> dataStream, String str, StoreSinkWrite.WithWriteBufferProvider withWriteBufferProvider) {
        FlinkSink.assertStreamingConfiguration(dataStream.getExecutionEnvironment());
        MultiTableCommittableTypeInfo multiTableCommittableTypeInfo = new MultiTableCommittableTypeInfo();
        SingleOutputStreamOperator parallelism = FlinkStreamPartitioner.partition(dataStream.transform(WRITER_NAME, multiTableCommittableTypeInfo, createWriteOperator(withWriteBufferProvider, str)).setParallelism(dataStream.getParallelism()), new MultiTableCommittableChannelComputer(), Integer.valueOf(dataStream.getParallelism())).transform(GLOBAL_COMMITTER_NAME, multiTableCommittableTypeInfo, new CommitterOperator(true, false, this.commitChaining, str, createCommitterFactory(), createCommittableStateManager())).setParallelism(dataStream.getParallelism());
        FlinkSink.configureGlobalCommitter(parallelism, this.commitCpuCores, this.commitHeapMemory);
        return parallelism.addSink(new DiscardingSink()).name("end").setParallelism(1);
    }

    protected OneInputStreamOperator<CdcMultiplexRecord, MultiTableCommittable> createWriteOperator(StoreSinkWrite.WithWriteBufferProvider withWriteBufferProvider, String str) {
        return new CdcRecordStoreMultiWriteOperator(this.catalogLoader, withWriteBufferProvider, str, new Options());
    }

    protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable> createCommitterFactory() {
        return context -> {
            return new StoreMultiCommitter(this.catalogLoader, context);
        };
    }

    protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
        return new RestoreAndFailCommittableStateManager(WrappedManifestCommittableSerializer::new);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1975954229:
                if (implMethodName.equals("lambda$createCommitterFactory$200aba27$1")) {
                    z = true;
                    break;
                }
                break;
            case -465277900:
                if (implMethodName.equals("lambda$createWriteProvider$72334113$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$WithWriteBufferProvider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemoryPoolFactory;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemoryPoolFactory;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    FlinkCdcMultiTableSink flinkCdcMultiTableSink = (FlinkCdcMultiTableSink) serializedLambda.getCapturedArg(0);
                    return (fileStoreTable, str, storeSinkWriteState, iOManager, memoryPoolFactory, metricGroup) -> {
                        return new StoreSinkWriteImpl(fileStoreTable, str, storeSinkWriteState, iOManager, false, fileStoreTable.coreOptions().prepareCommitWaitCompaction(), true, memoryPoolFactory, metricGroup);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/Committer$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/flink/sink/Committer$Context;)Lorg/apache/paimon/flink/sink/Committer;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/flink/sink/Committer$Context;)Lorg/apache/paimon/flink/sink/Committer;")) {
                    FlinkCdcMultiTableSink flinkCdcMultiTableSink2 = (FlinkCdcMultiTableSink) serializedLambda.getCapturedArg(0);
                    return context -> {
                        return new StoreMultiCommitter(this.catalogLoader, context);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return WrappedManifestCommittableSerializer::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
