package org.apache.flink.table.gateway.service.result;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.collections.iterators.IteratorChain;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/table/gateway/service/result/ResultFetcherTest.class */
class ResultFetcherTest {
    private static ResolvedSchema schema;
    private static List<RowData> data;

    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension<>(() -> {
        return Executors.newCachedThreadPool(new ExecutorThreadFactory("Result Fetcher Test Pool", IgnoreExceptionHandler.INSTANCE));
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/service/result/ResultFetcherTest$TestIterator.class */
    public static class TestIterator implements Iterator<RowData> {
        private final Supplier<RowData> dataSupplier;
        private boolean hasMoreData = true;

        public static TestIterator createErrorIterator(String str) {
            return new TestIterator(() -> {
                throw new SqlExecutionException(str);
            });
        }

        public TestIterator(Supplier<RowData> supplier) {
            this.dataSupplier = supplier;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.hasMoreData;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public RowData next() {
            this.hasMoreData = false;
            return this.dataSupplier.get();
        }
    }

    ResultFetcherTest() {
    }

    @BeforeAll
    static void setUp() {
        schema = ResolvedSchema.of(new Column[]{Column.physical("boolean", DataTypes.BOOLEAN()), Column.physical("int", DataTypes.INT()), Column.physical("bigint", DataTypes.BIGINT()), Column.physical("varchar", DataTypes.STRING()), Column.physical("decimal(10, 5)", DataTypes.DECIMAL(10, 5)), Column.physical("timestamp", DataTypes.TIMESTAMP(6).bridgedTo(Timestamp.class)), Column.physical("binary", DataTypes.BYTES())});
        data = Arrays.asList(GenericRowData.ofKind(RowKind.INSERT, new Object[]{null, 1, 2L, "abc", BigDecimal.valueOf(1.23d), Timestamp.valueOf("2020-03-01 18:39:14"), new byte[]{50, 51, 52, -123, 54, 93, 115, 126}}), GenericRowData.ofKind(RowKind.UPDATE_BEFORE, new Object[]{false, null, 0L, "", BigDecimal.valueOf(1L), Timestamp.valueOf("2020-03-01 18:39:14.1"), new byte[]{100, -98, 32, 121, -125}}), GenericRowData.ofKind(RowKind.UPDATE_AFTER, new Object[]{true, Integer.MAX_VALUE, null, "abcdefg", BigDecimal.valueOf(12345L), Timestamp.valueOf("2020-03-01 18:39:14.12"), new byte[]{-110, -23, 1, 2}}), GenericRowData.ofKind(RowKind.DELETE, new Object[]{false, Integer.MIN_VALUE, Long.MAX_VALUE, null, BigDecimal.valueOf(12345.06789d), Timestamp.valueOf("2020-03-01 18:39:14.123"), new byte[]{50, 51, 52, -123, 54, 93, 115, 126}}), GenericRowData.ofKind(RowKind.INSERT, new Object[]{true, 100, Long.MIN_VALUE, "abcdefg111", null, Timestamp.valueOf("2020-03-01 18:39:14.123456"), new byte[]{110, 23, -1, -2}}), GenericRowData.ofKind(RowKind.DELETE, new Object[]{null, -1, -1L, "abcdefghijklmnopqrstuvwxyz", BigDecimal.valueOf(-12345.06789d), null, null}), GenericRowData.ofKind(RowKind.INSERT, new Object[]{null, -1, -1L, "这是一段中文", BigDecimal.valueOf(-12345.06789d), Timestamp.valueOf("2020-03-04 18:39:14"), new byte[]{-3, -2, -1, 0, 1, 2, 3}}), GenericRowData.ofKind(RowKind.DELETE, new Object[]{null, -1, -1L, "これは日本語をテストするための文です", BigDecimal.valueOf(-12345.06789d), Timestamp.valueOf("2020-03-04 18:39:14"), new byte[]{-3, -2, -1, 0, 1, 2, 3}}));
    }

    @Test
    void testFetchResultsMultipleTimesWithLimitedBufferSize() {
        int size = data.size() / 2;
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), size);
        int size2 = data.size();
        runFetchMultipleTimes(size, size2, l -> {
            return buildResultFetcher.fetchResults(l.longValue(), size2);
        });
    }

    @Test
    void testFetchResultsMultipleTimesWithLimitedFetchSize() {
        int size = data.size();
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), size);
        int size2 = data.size() / 2;
        runFetchMultipleTimes(size, size2, l -> {
            return buildResultFetcher.fetchResults(l.longValue(), size2);
        });
    }

    @Test
    void testFetchResultsInWithLimitedBufferSizeInOrientation() {
        int size = data.size() / 2;
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), size);
        int size2 = data.size();
        runFetchMultipleTimes(size, size2, l -> {
            return buildResultFetcher.fetchResults(FetchOrientation.FETCH_NEXT, size2);
        });
    }

    @Test
    void testFetchResultsMultipleTimesWithLimitedFetchSizeInOrientation() {
        int size = data.size();
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), size);
        int size2 = data.size() / 2;
        runFetchMultipleTimes(size, size2, l -> {
            return buildResultFetcher.fetchResults(FetchOrientation.FETCH_NEXT, size2);
        });
    }

    @Test
    void testFetchResultInParallel() throws Exception {
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(buildResultFetcher.getResultStore().getBufferedRecordSize() > 0);
        }, Duration.ofSeconds(10L), "Failed to wait the buffer has data.");
        checkFetchResultInParallel(buildResultFetcher);
    }

    @Test
    void testFetchResultInOrientationInParallel() throws Exception {
        List<Iterator<RowData>> list = (List) data.stream().map(rowData -> {
            return new TestIterator(() -> {
                try {
                    Thread.sleep(1L);
                    return rowData;
                } catch (Exception e) {
                    throw new SqlExecutionException("Failed to return the row.", e);
                }
            });
        }).collect(Collectors.toList());
        CountDownLatch countDownLatch = new CountDownLatch(100);
        ResultFetcher buildResultFetcher = buildResultFetcher(list, 1);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicReference atomicReference = new AtomicReference(true);
        for (int i = 0; i < 100; i++) {
            EXECUTOR_EXTENSION.getExecutor().submit(() -> {
                ResultSet fetchResults = buildResultFetcher.fetchResults(FetchOrientation.FETCH_NEXT, 1);
                if (fetchResults.getResultType().equals(ResultSet.ResultType.PAYLOAD) && fetchResults.getData().isEmpty()) {
                    atomicReference.set(false);
                }
                concurrentHashMap.compute(Long.valueOf(Thread.currentThread().getId()), (l, list2) -> {
                    if (list2 == null) {
                        return fetchResults.getData();
                    }
                    list2.addAll(fetchResults.getData());
                    return list2;
                });
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        Assertions.assertEquals(true, atomicReference.get());
        Assertions.assertEquals(new HashSet(data), concurrentHashMap.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet()));
    }

    @Test
    void testFetchResultFromDummyStoreInParallel() throws Exception {
        checkFetchResultInParallel(ResultFetcher.fromResults(OperationHandle.create(), schema, data));
    }

    @Test
    void testFetchResultAfterClose() throws Exception {
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), data.size() + 1);
        List emptyList = Collections.emptyList();
        long j = 0;
        while (emptyList.size() < 1) {
            ResultSet fetchResults = buildResultFetcher.fetchResults(j, 1);
            j = ((Long) Preconditions.checkNotNull(fetchResults.getNextToken())).longValue();
            emptyList = fetchResults.getData();
        }
        Assertions.assertEquals(data.subList(0, 1), emptyList);
        buildResultFetcher.close();
        long j2 = j;
        AtomicReference atomicReference = new AtomicReference(false);
        EXECUTOR_EXTENSION.getExecutor().submit(() -> {
            long j3 = j2;
            while (true) {
                ResultSet fetchResults2 = buildResultFetcher.fetchResults(j3, Integer.MAX_VALUE);
                if (fetchResults2.getResultType() == ResultSet.ResultType.EOS) {
                    atomicReference.set(true);
                    return;
                }
                j3 = ((Long) Preconditions.checkNotNull(fetchResults2.getNextToken())).longValue();
            }
        });
        atomicReference.getClass();
        CommonTestUtils.waitUtil(atomicReference::get, Duration.ofSeconds(10L), "Should get EOS when fetch results from the closed fetcher.");
    }

    @Test
    void testFetchResultWithToken() {
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), data.size());
        Long l = 0L;
        ArrayList arrayList = new ArrayList();
        ResultSet resultSet = null;
        while (l != null) {
            if (resultSet != null) {
                Assertions.assertEquals(resultSet, buildResultFetcher.fetchResults(l.longValue() - 1, data.size()));
            }
            ResultSet fetchResults = buildResultFetcher.fetchResults(l.longValue(), data.size());
            Assertions.assertEquals(fetchResults, buildResultFetcher.fetchResults(l.longValue(), data.size()));
            if (fetchResults.getResultType() == ResultSet.ResultType.EOS) {
                break;
            }
            resultSet = fetchResults;
            arrayList.addAll((Collection) Preconditions.checkNotNull(fetchResults.getData()));
            l = fetchResults.getNextToken();
        }
        Assertions.assertEquals(data, arrayList);
    }

    @Test
    void testFetchFailedResult() {
        ResultFetcher buildResultFetcher = buildResultFetcher(Arrays.asList(TestIterator.createErrorIterator("Artificial Exception"), data.iterator()), data.size());
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            long j = 0L;
            while (true) {
                Long l = j;
                if (l == null) {
                    return;
                } else {
                    j = buildResultFetcher.fetchResults(l.longValue(), Integer.MAX_VALUE).getNextToken();
                }
            }
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches("Artificial Exception")});
    }

    @Test
    void testFetchIllegalToken() {
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), data.size());
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            buildResultFetcher.fetchResults(2L, Integer.MAX_VALUE);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches("Expecting token to be 0, but found 2")});
    }

    @Test
    void testFetchBeforeWithDifferentSize() throws Exception {
        ResultFetcher buildResultFetcher = buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(buildResultFetcher.getResultStore().getBufferedRecordSize() > 1);
        }, Duration.ofSeconds(10L), "Failed to make cached records num larger than 1.");
        ResultSet fetchResults = buildResultFetcher.fetchResults(0L, Integer.MAX_VALUE);
        int size = fetchResults.getData().size();
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            buildResultFetcher.fetchResults(0L, size - 1);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(String.format("As the same token is provided, fetch size must be not less than the previous returned buffer size. Previous returned result size is %s, current max_fetch_size to be %s.", Integer.valueOf(fetchResults.getData().size()), Integer.valueOf(size - 1)))});
    }

    private ResultFetcher buildResultFetcher(List<Iterator<RowData>> list, int i) {
        return new ResultFetcher(OperationHandle.create(), schema, CloseableIterator.adapterForIterator(new IteratorChain(list)), (RowDataToStringConverter) null, false, (JobID) null, ResultKind.SUCCESS_WITH_CONTENT, i);
    }

    private void runFetchMultipleTimes(int i, int i2, Function<Long, ResultSet> function) {
        ResultSet apply;
        ArrayList arrayList = new ArrayList();
        Long l = 0L;
        do {
            apply = function.apply(l);
            Assertions.assertTrue(((List) Preconditions.checkNotNull(apply.getData())).size() <= Math.min(i, i2));
            l = apply.getNextToken();
            arrayList.addAll(apply.getData());
        } while (apply.getResultType() != ResultSet.ResultType.EOS);
        Assertions.assertEquals(ResultSet.ResultType.EOS, ((ResultSet) Preconditions.checkNotNull(apply)).getResultType());
        Assertions.assertEquals(data, arrayList);
    }

    private void checkFetchResultInParallel(ResultFetcher resultFetcher) throws Exception {
        AtomicReference atomicReference = new AtomicReference(true);
        CountDownLatch countDownLatch = new CountDownLatch(100);
        List data2 = resultFetcher.fetchResults(0L, Integer.MAX_VALUE).getData();
        for (int i = 0; i < 100; i++) {
            EXECUTOR_EXTENSION.getExecutor().submit(() -> {
                if (!data2.equals(resultFetcher.fetchResults(0L, Integer.MAX_VALUE).getData())) {
                    atomicReference.set(false);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        Assertions.assertEquals(true, atomicReference.get());
    }
}
