package org.apache.paimon.flink.sink;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;

/* loaded from: input_file:org/apache/paimon/flink/sink/DynamicBucketSink.class */
public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Integer>> {
    private static final long serialVersionUID = 1;

    public DynamicBucketSink(FileStoreTable fileStoreTable, @Nullable Map<String, String> map) {
        super(fileStoreTable, map);
    }

    protected abstract ChannelComputer<T> assignerChannelComputer(Integer num);

    protected abstract ChannelComputer<Tuple2<T, Integer>> channelComputer2();

    protected abstract SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction();

    /* JADX INFO: Access modifiers changed from: protected */
    public HashBucketAssignerOperator<T> createHashBucketAssignerOperator(String str, Table table, Integer num, SerializableFunction<TableSchema, PartitionKeyExtractor<T>> serializableFunction, boolean z) {
        return new HashBucketAssignerOperator<>(str, table, num, serializableFunction, z);
    }

    public DataStreamSink<?> build(DataStream<T> dataStream, @Nullable Integer num) {
        String createCommitUser = CoreOptions.createCommitUser(this.table.coreOptions().toConfiguration());
        Integer dynamicBucketAssignerParallelism = this.table.coreOptions().dynamicBucketAssignerParallelism();
        if (dynamicBucketAssignerParallelism == null) {
            dynamicBucketAssignerParallelism = num;
        }
        Integer dynamicBucketInitialBuckets = this.table.coreOptions().dynamicBucketInitialBuckets();
        DataStream partition = FlinkStreamPartitioner.partition(dataStream, assignerChannelComputer(dynamicBucketInitialBuckets), dynamicBucketAssignerParallelism);
        return sinkFrom(FlinkStreamPartitioner.partition(partition.transform("dynamic-bucket-assigner", new TupleTypeInfo(new TypeInformation[]{partition.getType(), BasicTypeInfo.INT_TYPE_INFO}), createHashBucketAssignerOperator(createCommitUser, this.table, dynamicBucketInitialBuckets, extractorFunction(), false)).setParallelism(partition.getParallelism()), channelComputer2(), num), createCommitUser);
    }
}
