package org.apache.paimon.flink.shuffle;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.DataGetters;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.types.InternalRowToSizeVisitor;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableSupplier;

/* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle.class */
public class RangeShuffle {

    /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$AssignRangeIndexOperator.class */
    private static class AssignRangeIndexOperator<T> extends TableStreamOperator<Tuple2<Integer, Tuple2<T, RowData>>> implements TwoInputStreamOperator<List<T>, Tuple2<T, RowData>, Tuple2<Integer, Tuple2<T, RowData>>>, InputSelectable {
        private static final long serialVersionUID = 1;
        private final SerializableSupplier<Comparator<T>> keyComparatorSupplier;
        private transient List<Pair<T, RandomList>> keyIndex;
        private transient Collector<Tuple2<Integer, Tuple2<T, RowData>>> collector;
        private transient Comparator<T> keyComparator;

        /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$AssignRangeIndexOperator$RangePartitioner.class */
        public static class RangePartitioner implements Partitioner<Integer> {
            private static final long serialVersionUID = 1;
            private final int totalRangeNum;

            public RangePartitioner(int i) {
                this.totalRangeNum = i;
            }

            public int partition(Integer num, int i) {
                Preconditions.checkArgument(i <= this.totalRangeNum, "Num of subPartitions should <= totalRangeNum: " + this.totalRangeNum);
                return Math.min(i - 1, num.intValue() / (this.totalRangeNum / i));
            }
        }

        /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$AssignRangeIndexOperator$Tuple2KeySelector.class */
        public static class Tuple2KeySelector<T> implements KeySelector<Tuple2<Integer, Tuple2<T, RowData>>, Integer> {
            private static final long serialVersionUID = 1;

            public Integer getKey(Tuple2<Integer, Tuple2<T, RowData>> tuple2) throws Exception {
                return (Integer) tuple2.f0;
            }
        }

        public AssignRangeIndexOperator(SerializableSupplier<Comparator<T>> serializableSupplier) {
            this.keyComparatorSupplier = serializableSupplier;
        }

        public void open() throws Exception {
            super.open();
            this.keyComparator = this.keyComparatorSupplier.get();
            this.collector = new StreamRecordCollector(this.output);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processElement1(StreamRecord<List<T>> streamRecord) {
            this.keyIndex = new ArrayList();
            Object obj = null;
            int i = 0;
            for (Object obj2 : (List) streamRecord.getValue()) {
                if (obj == null || this.keyComparator.compare(obj, obj2) != 0) {
                    Pair<T, RandomList> of = Pair.of(obj2, new RandomList());
                    int i2 = i;
                    i++;
                    of.getRight().add(i2);
                    this.keyIndex.add(of);
                    obj = obj2;
                } else {
                    int i3 = i;
                    i++;
                    this.keyIndex.get(this.keyIndex.size() - 1).getRight().add(i3);
                }
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processElement2(StreamRecord<Tuple2<T, RowData>> streamRecord) {
            if (this.keyIndex == null) {
                throw new RuntimeException("There should be one data from the first input.");
            }
            if (this.keyIndex.isEmpty()) {
                this.collector.collect(new Tuple2(0, streamRecord.getValue()));
            } else {
                Tuple2 tuple2 = (Tuple2) streamRecord.getValue();
                this.collector.collect(new Tuple2(Integer.valueOf(binarySearch(tuple2.f0)), tuple2));
            }
        }

        public InputSelection nextSelection() {
            return this.keyIndex == null ? InputSelection.FIRST : InputSelection.ALL;
        }

        private int binarySearch(T t) {
            int size = this.keyIndex.size() - 1;
            int i = 0;
            int i2 = size;
            while (i <= i2) {
                int i3 = (i + i2) >>> 1;
                Pair<T, RandomList> pair = this.keyIndex.get(i3);
                int compare = this.keyComparator.compare(t, pair.getLeft());
                if (compare > 0) {
                    i = i3 + 1;
                } else {
                    if (compare >= 0) {
                        return pair.getRight().get();
                    }
                    i2 = i3 - 1;
                }
            }
            return i > size ? this.keyIndex.get(size).getRight().get() + 1 : this.keyIndex.get(i).getRight().get();
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$GlobalSampleOperator.class */
    private static class GlobalSampleOperator<T> extends TableStreamOperator<List<T>> implements OneInputStreamOperator<Tuple3<Double, T, Integer>, List<T>>, BoundedOneInput {
        private static final long serialVersionUID = 1;
        private final int numSample;
        private final int rangesNum;
        private final SerializableSupplier<Comparator<T>> comparatorSupplier;
        private transient Comparator<T> keyComparator;
        private transient Collector<List<T>> collector;
        private transient Sampler<Tuple2<T, Integer>> sampler;

        public GlobalSampleOperator(int i, SerializableSupplier<Comparator<T>> serializableSupplier, int i2) {
            this.numSample = i;
            this.comparatorSupplier = serializableSupplier;
            this.rangesNum = i2;
        }

        public void open() throws Exception {
            super.open();
            this.keyComparator = this.comparatorSupplier.get();
            this.sampler = new Sampler<>(this.numSample, 0L);
            this.collector = new StreamRecordCollector(this.output);
        }

        public void processElement(StreamRecord<Tuple3<Double, T, Integer>> streamRecord) throws Exception {
            Tuple3 tuple3 = (Tuple3) streamRecord.getValue();
            this.sampler.collect(((Double) tuple3.f0).doubleValue(), new Tuple2<>(tuple3.f1, tuple3.f2));
        }

        public void endInput() {
            Iterator<Tuple2<Double, Tuple2<T, Integer>>> sample = this.sampler.sample();
            ArrayList arrayList = new ArrayList();
            while (sample.hasNext()) {
                arrayList.add(sample.next().f1);
            }
            arrayList.sort((tuple2, tuple22) -> {
                return this.keyComparator.compare(tuple2.f0, tuple22.f0);
            });
            this.collector.collect(arrayList.isEmpty() ? new ArrayList() : Arrays.asList(RangeShuffle.allocateRangeBaseSize(arrayList, this.rangesNum)));
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$KeyAndSizeExtractor.class */
    public static class KeyAndSizeExtractor<T> extends RichMapFunction<Tuple2<T, RowData>, Tuple2<T, Integer>> {
        private final RowType rowType;
        private final boolean isSortBySize;
        private transient List<BiFunction<DataGetters, Integer, Integer>> fieldSizeCalculator;

        public KeyAndSizeExtractor(RowType rowType, boolean z) {
            this.rowType = rowType;
            this.isSortBySize = z;
        }

        public void open(OpenContext openContext) throws Exception {
            open(new Configuration());
        }

        public void open(Configuration configuration) throws Exception {
            InternalRowToSizeVisitor internalRowToSizeVisitor = new InternalRowToSizeVisitor();
            this.fieldSizeCalculator = (List) this.rowType.getFieldTypes().stream().map(dataType -> {
                return (BiFunction) dataType.accept(internalRowToSizeVisitor);
            }).collect(Collectors.toList());
        }

        public Tuple2<T, Integer> map(Tuple2<T, RowData> tuple2) throws Exception {
            if (!this.isSortBySize) {
                return new Tuple2<>(tuple2.f0, 1);
            }
            int i = 0;
            for (int i2 = 0; i2 < this.fieldSizeCalculator.size(); i2++) {
                i += this.fieldSizeCalculator.get(i2).apply(new FlinkRowWrapper((RowData) tuple2.f1), Integer.valueOf(i2)).intValue();
            }
            return new Tuple2<>(tuple2.f0, Integer.valueOf(i));
        }
    }

    @Internal
    /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$LocalSampleOperator.class */
    public static class LocalSampleOperator<T> extends TableStreamOperator<Tuple3<Double, T, Integer>> implements OneInputStreamOperator<Tuple2<T, Integer>, Tuple3<Double, T, Integer>>, BoundedOneInput {
        private static final long serialVersionUID = 1;
        private final int numSample;
        private transient Collector<Tuple3<Double, T, Integer>> collector;
        private transient Sampler<Tuple2<T, Integer>> sampler;

        public LocalSampleOperator(int i) {
            this.numSample = i;
        }

        public void open() throws Exception {
            super.open();
            this.collector = new StreamRecordCollector(this.output);
            this.sampler = new Sampler<>(this.numSample, System.nanoTime());
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processElement(StreamRecord<Tuple2<T, Integer>> streamRecord) throws Exception {
            this.sampler.collect(streamRecord.getValue());
        }

        public void endInput() {
            Iterator<Tuple2<Double, Tuple2<T, Integer>>> sample = this.sampler.sample();
            while (sample.hasNext()) {
                Tuple2<Double, Tuple2<T, Integer>> next = sample.next();
                this.collector.collect(new Tuple3(next.f0, ((Tuple2) next.f1).f0, ((Tuple2) next.f1).f1));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$RandomList.class */
    public static class RandomList {
        private static final Random RANDOM = new Random();
        private final List<Integer> list;

        private RandomList() {
            this.list = new ArrayList();
        }

        public void add(int i) {
            this.list.add(Integer.valueOf(i));
        }

        public int get() {
            return this.list.get(RANDOM.nextInt(this.list.size())).intValue();
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$RemoveRangeIndexOperator.class */
    private static class RemoveRangeIndexOperator<T> extends TableStreamOperator<Tuple2<T, RowData>> implements OneInputStreamOperator<Tuple2<Integer, Tuple2<T, RowData>>, Tuple2<T, RowData>> {
        private static final long serialVersionUID = 1;
        private transient Collector<Tuple2<T, RowData>> collector;

        private RemoveRangeIndexOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.collector = new StreamRecordCollector(this.output);
        }

        public void processElement(StreamRecord<Tuple2<Integer, Tuple2<T, RowData>>> streamRecord) throws Exception {
            this.collector.collect(((Tuple2) streamRecord.getValue()).f1);
        }
    }

    /* loaded from: input_file:org/apache/paimon/flink/shuffle/RangeShuffle$Sampler.class */
    private static class Sampler<T> {
        private final int numSamples;
        private final Random random;
        private final PriorityQueue<Tuple2<Double, T>> queue;
        private int index = 0;
        private Tuple2<Double, T> smallest = null;

        Sampler(int i, long j) {
            Preconditions.checkArgument(i >= 0, "numSamples should be non-negative.");
            this.numSamples = i;
            this.random = new XORShiftRandom(j);
            this.queue = new PriorityQueue<>(i, Comparator.comparingDouble(tuple2 -> {
                return ((Double) tuple2.f0).doubleValue();
            }));
        }

        void collect(T t) {
            collect(this.random.nextDouble(), t);
        }

        void collect(double d, T t) {
            if (this.index < this.numSamples) {
                addQueue(d, t);
            } else if (d > ((Double) this.smallest.f0).doubleValue()) {
                this.queue.remove();
                addQueue(d, t);
            }
            this.index++;
        }

        private void addQueue(double d, T t) {
            this.queue.add(new Tuple2<>(Double.valueOf(d), t));
            this.smallest = this.queue.peek();
        }

        Iterator<Tuple2<Double, T>> sample() {
            return this.queue.iterator();
        }
    }

    public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(DataStream<Tuple2<T, RowData>> dataStream, SerializableSupplier<Comparator<T>> serializableSupplier, TypeInformation<T> typeInformation, int i, int i2, int i3, int i4, RowType rowType, boolean z) {
        Transformation transformation = dataStream.getTransformation();
        OneInputTransformation oneInputTransformation = new OneInputTransformation(transformation, "ABSTRACT KEY AND SIZE", new StreamMap(new KeyAndSizeExtractor(rowType, z)), new TupleTypeInfo(new TypeInformation[]{typeInformation, BasicTypeInfo.INT_TYPE_INFO}), transformation.getParallelism());
        return new DataStream<>(dataStream.getExecutionEnvironment(), new OneInputTransformation(new PartitionTransformation(new TwoInputTransformation(new PartitionTransformation(new OneInputTransformation(new OneInputTransformation(oneInputTransformation, "LOCAL SAMPLE", new LocalSampleOperator(i), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.DOUBLE_TYPE_INFO, typeInformation, BasicTypeInfo.INT_TYPE_INFO}), oneInputTransformation.getParallelism()), "GLOBAL SAMPLE", new GlobalSampleOperator(i2, serializableSupplier, i3), new ListTypeInfo(typeInformation), 1), new BroadcastPartitioner(), StreamExchangeMode.BATCH), new PartitionTransformation(transformation, new ForwardPartitioner(), StreamExchangeMode.BATCH), "ASSIGN RANGE INDEX", new AssignRangeIndexOperator(serializableSupplier), new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, transformation.getOutputType()}), transformation.getParallelism()), new CustomPartitionerWrapper(new AssignRangeIndexOperator.RangePartitioner(i3), new AssignRangeIndexOperator.Tuple2KeySelector()), StreamExchangeMode.BATCH), "REMOVE RANGE INDEX", new RemoveRangeIndexOperator(), transformation.getOutputType(), i4));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @VisibleForTesting
    static <T> T[] allocateRangeBaseSize(List<Tuple2<T, Integer>> list, int i) {
        int size = list.size();
        int i2 = i - 1;
        T[] tArr = (T[]) new Object[i2];
        if (!list.isEmpty()) {
            long sum = list.stream().mapToLong(tuple2 -> {
                return ((Integer) tuple2.f1).intValue();
            }).sum();
            double d = sum / i;
            int i3 = 0;
            int i4 = 0;
            for (int i5 = 0; i5 < i2; i5++) {
                while (i3 < d && i4 < size) {
                    tArr[i5] = list.get(Math.min(i4, size - 1)).f0;
                    int i6 = i4;
                    i4++;
                    int intValue = ((Integer) list.get(i6).f1).intValue();
                    i3 += intValue;
                    sum -= intValue;
                }
                i3 = 0;
                d = sum / ((i - i5) - 1);
            }
        }
        for (int i7 = 0; i7 < i2; i7++) {
            if (tArr[i7] == 0) {
                tArr[i7] = list.get(size - 1).f0;
            }
        }
        return tArr;
    }
}
