package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.utils.ManagedMemoryUtils;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;

/* loaded from: input_file:org/apache/paimon/flink/sink/CombinedTableCompactorSink.class */
public class CombinedTableCompactorSink implements Serializable {
    private static final long serialVersionUID = 1;
    private static final String WRITER_NAME = "Writer";
    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
    private final Catalog.Loader catalogLoader;
    private final boolean ignorePreviousFiles = false;
    private final Options options;

    public CombinedTableCompactorSink(Catalog.Loader loader, Options options) {
        this.catalogLoader = loader;
        this.options = options;
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> dataStream, DataStream<MultiTableUnawareAppendCompactionTask> dataStream2) {
        return sinkFrom(dataStream, dataStream2, CoreOptions.createCommitUser(this.options));
    }

    public DataStreamSink<?> sinkFrom(DataStream<RowData> dataStream, DataStream<MultiTableUnawareAppendCompactionTask> dataStream2, String str) {
        return doCommit(doWrite(dataStream, dataStream2, str), str);
    }

    public DataStream<MultiTableCommittable> doWrite(DataStream<RowData> dataStream, DataStream<MultiTableUnawareAppendCompactionTask> dataStream2, String str) {
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        boolean z = executionEnvironment.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        SingleOutputStreamOperator parallelism = dataStream.transform(String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), combinedMultiComacptionWriteOperator(executionEnvironment.getCheckpointConfig(), z, str)).setParallelism(dataStream.getParallelism());
        DataStream parallelism2 = dataStream2.transform(String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), new AppendOnlyMultiTableCompactionWorkerOperator(this.catalogLoader, str, this.options)).setParallelism(dataStream2.getParallelism());
        if (!z) {
            FlinkSink.assertBatchAdaptiveParallelism(executionEnvironment, parallelism.getParallelism());
            FlinkSink.assertBatchAdaptiveParallelism(executionEnvironment, parallelism2.getParallelism());
        }
        if (((Boolean) this.options.get(FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY)).booleanValue()) {
            ManagedMemoryUtils.declareManagedMemory(parallelism, (MemorySize) this.options.get(FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY));
            ManagedMemoryUtils.declareManagedMemory(parallelism2, (MemorySize) this.options.get(FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY));
        }
        return parallelism.union(new DataStream[]{parallelism2});
    }

    protected DataStreamSink<?> doCommit(DataStream<MultiTableCommittable> dataStream, String str) {
        StreamExecutionEnvironment executionEnvironment = dataStream.getExecutionEnvironment();
        ReadableConfig configuration = executionEnvironment.getConfiguration();
        CheckpointConfig checkpointConfig = executionEnvironment.getCheckpointConfig();
        boolean z = configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        boolean z2 = z && checkpointConfig.isCheckpointingEnabled();
        if (z2) {
            FlinkSink.assertStreamingConfiguration(executionEnvironment);
        }
        return FlinkStreamPartitioner.partition(dataStream, new MultiTableCommittableChannelComputer(), Integer.valueOf(dataStream.getParallelism())).transform(GLOBAL_COMMITTER_NAME, new MultiTableCommittableTypeInfo(), new CommitterOperator(z2, false, ((Boolean) this.options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING)).booleanValue(), str, createCommitterFactory(z), createCommittableStateManager(), (Long) this.options.get(FlinkConnectorOptions.END_INPUT_WATERMARK))).setParallelism(dataStream.getParallelism()).addSink(new DiscardingSink()).name("end").setParallelism(1);
    }

    protected OneInputStreamOperator<RowData, MultiTableCommittable> combinedMultiComacptionWriteOperator(CheckpointConfig checkpointConfig, boolean z, String str) {
        return new MultiTablesStoreCompactOperator(this.catalogLoader, str, checkpointConfig, z, this.ignorePreviousFiles, this.options);
    }

    protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable> createCommitterFactory(boolean z) {
        Map<String, String> map = this.options.toMap();
        map.put(CoreOptions.WRITE_ONLY.key(), "false");
        if (z) {
            map.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
            map.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
            map.put(CoreOptions.LOOKUP_WAIT.key(), "false");
        }
        return context -> {
            return new StoreMultiCommitter(this.catalogLoader, context, true, map);
        };
    }

    protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() {
        return new RestoreAndFailCommittableStateManager(WrappedManifestCommittableSerializer::new);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 590035051:
                if (implMethodName.equals("lambda$createCommitterFactory$100032d9$1")) {
                    z = false;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/Committer$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/flink/sink/Committer$Context;)Lorg/apache/paimon/flink/sink/Committer;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/CombinedTableCompactorSink") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;Lorg/apache/paimon/flink/sink/Committer$Context;)Lorg/apache/paimon/flink/sink/Committer;")) {
                    CombinedTableCompactorSink combinedTableCompactorSink = (CombinedTableCompactorSink) serializedLambda.getCapturedArg(0);
                    Map map = (Map) serializedLambda.getCapturedArg(1);
                    return context -> {
                        return new StoreMultiCommitter(this.catalogLoader, context, true, map);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/utils/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return WrappedManifestCommittableSerializer::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
