package org.apache.flink.connector.base.source.reader;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter;
import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.class */
public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBaseTest.class);

    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBaseTest$BlockingShutdownSplitFetcherManager.class */
    private static class BlockingShutdownSplitFetcherManager<E, SplitT extends SourceSplit> extends SingleThreadFetcherManager<E, SplitT> {
        private final CompletableFuture<Void> inShutdownSplitFetcherFuture;

        public BlockingShutdownSplitFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, Supplier<SplitReader<E, SplitT>> supplier, Configuration configuration) {
            super(futureCompletingBlockingQueue, supplier, configuration);
            this.inShutdownSplitFetcherFuture = new CompletableFuture<>();
        }

        public boolean maybeShutdownFinishedFetchers() {
            shutdownAllSplitFetcher();
            return true;
        }

        public CompletableFuture<Void> getInShutdownSplitFetcherFuture() {
            return this.inShutdownSplitFetcherFuture;
        }

        private void shutdownAllSplitFetcher() {
            this.inShutdownSplitFetcherFuture.complete(null);
            while (!super.maybeShutdownFinishedFetchers()) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBaseTest$OnEventWatermarkGenerator.class */
    public static class OnEventWatermarkGenerator implements WatermarkGenerator<Integer> {
        private OnEventWatermarkGenerator() {
        }

        public void onEvent(Integer num, long j, WatermarkOutput watermarkOutput) {
            watermarkOutput.emitWatermark(new Watermark(num.intValue()));
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/SourceReaderBaseTest$WatermarkCollectingDataOutput.class */
    public static class WatermarkCollectingDataOutput implements PushingAsyncDataInput.DataOutput<Integer> {
        int numRecords;
        final List<Long> watermarks;

        private WatermarkCollectingDataOutput() {
            this.numRecords = 0;
            this.watermarks = new ArrayList();
        }

        public void emitRecord(StreamRecord<Integer> streamRecord) {
            this.numRecords++;
        }

        public void emitWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark) throws Exception {
            this.watermarks.add(Long.valueOf(watermark.getTimestamp()));
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    @Test
    void testExceptionInSplitReader() {
        Assertions.assertThatThrownBy(() -> {
            MockSourceReader mockSourceReader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>) new FutureCompletingBlockingQueue(), (Supplier<SplitReader<int[], MockSourceSplit>>) () -> {
                return new SplitReader<int[], MockSourceSplit>() { // from class: org.apache.flink.connector.base.source.reader.SourceReaderBaseTest.1
                    public RecordsWithSplitIds<int[]> fetch() {
                        throw new RuntimeException("Testing Exception");
                    }

                    public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChange) {
                    }

                    public void wakeUp() {
                    }

                    public void close() {
                    }
                };
            }, getConfig(), (SourceReaderContext) new TestingReaderContext());
            Throwable th = null;
            try {
                try {
                    SourceReaderTestBase.ValidatingSourceOutput validatingSourceOutput = new SourceReaderTestBase.ValidatingSourceOutput(this);
                    mockSourceReader.addSplits(Collections.singletonList(m17getSplit(0, 10, Boundedness.CONTINUOUS_UNBOUNDED)));
                    mockSourceReader.notifyNoMoreSplits();
                    while (true) {
                        Assertions.assertThat(mockSourceReader.pollNext(validatingSourceOutput)).isNotEqualTo(InputStatus.END_OF_INPUT);
                        Thread.sleep(1L);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (mockSourceReader != null) {
                    if (th != null) {
                        try {
                            mockSourceReader.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        mockSourceReader.close();
                    }
                }
                throw th2;
            }
        }).isInstanceOf(RuntimeException.class).hasMessage("One or more fetchers have encountered exception");
    }

    @Test
    void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
        TestingRecordsWithSplitIds testingRecordsWithSplitIds = new TestingRecordsWithSplitIds("test-split", "value1", "value2");
        createReaderAndAwaitAvailable("test-split", testingRecordsWithSplitIds).pollNext(new TestingReaderOutput());
        Assertions.assertThat(testingRecordsWithSplitIds.isRecycled()).isFalse();
    }

    @Test
    void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
        TestingRecordsWithSplitIds testingRecordsWithSplitIds = new TestingRecordsWithSplitIds("test-split", "value1", "value2");
        SourceReader createReaderAndAwaitAvailable = createReaderAndAwaitAvailable("test-split", testingRecordsWithSplitIds);
        createReaderAndAwaitAvailable.pollNext(new TestingReaderOutput());
        createReaderAndAwaitAvailable.pollNext(new TestingReaderOutput());
        createReaderAndAwaitAvailable.pollNext(new TestingReaderOutput());
        Assertions.assertThat(testingRecordsWithSplitIds.isRecycled()).isTrue();
    }

    @Test
    void testMultipleSplitsWithDifferentFinishingMoments() throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        MockSplitReader build = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setSeparatedFinishedRecord(false).setBlockingFetch(false).build();
        MockSourceReader mockSourceReader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>) futureCompletingBlockingQueue, (Supplier<SplitReader<int[], MockSourceSplit>>) () -> {
            return build;
        }, getConfig(), (SourceReaderContext) new TestingReaderContext());
        mockSourceReader.start();
        mockSourceReader.addSplits(Arrays.asList(m17getSplit(0, 10, Boundedness.BOUNDED), m17getSplit(1, 12, Boundedness.BOUNDED)));
        mockSourceReader.notifyNoMoreSplits();
        while (true) {
            InputStatus pollNext = mockSourceReader.pollNext(new TestingReaderOutput());
            if (pollNext == InputStatus.END_OF_INPUT) {
                return;
            }
            if (pollNext == InputStatus.NOTHING_AVAILABLE) {
                mockSourceReader.isAvailable().get();
            }
        }
    }

    @Test
    void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        MockSplitReader build = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setSeparatedFinishedRecord(true).setBlockingFetch(false).build();
        MockSourceReader mockSourceReader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>) futureCompletingBlockingQueue, (Supplier<SplitReader<int[], MockSourceSplit>>) () -> {
            return build;
        }, getConfig(), (SourceReaderContext) new TestingReaderContext());
        mockSourceReader.start();
        mockSourceReader.addSplits(Arrays.asList(m17getSplit(0, 10, Boundedness.BOUNDED), m17getSplit(1, 10, Boundedness.BOUNDED)));
        mockSourceReader.notifyNoMoreSplits();
        while (true) {
            InputStatus pollNext = mockSourceReader.pollNext(new TestingReaderOutput());
            if (pollNext == InputStatus.END_OF_INPUT) {
                return;
            }
            if (pollNext == InputStatus.NOTHING_AVAILABLE) {
                mockSourceReader.isAvailable().get();
            }
        }
    }

    @Test
    void testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue() throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        MockSplitReader build = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(1).setBlockingFetch(true).build();
        BlockingShutdownSplitFetcherManager blockingShutdownSplitFetcherManager = new BlockingShutdownSplitFetcherManager(futureCompletingBlockingQueue, () -> {
            return build;
        }, getConfig());
        MockSourceReader mockSourceReader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>) futureCompletingBlockingQueue, (SingleThreadFetcherManager<int[], MockSourceSplit>) blockingShutdownSplitFetcherManager, getConfig(), (SourceReaderContext) new TestingReaderContext());
        MockSourceSplit mockSourceSplit = new MockSourceSplit(0, 0, 1);
        mockSourceReader.addSplits(Collections.singletonList(mockSourceSplit));
        mockSourceReader.notifyNoMoreSplits();
        blockingShutdownSplitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> {
            mockSourceSplit.addRecord(1);
        });
        Assertions.assertThat(mockSourceReader.pollNext(new TestingReaderOutput())).isEqualTo(InputStatus.MORE_AVAILABLE);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest(name = "Emit record before split addition: {0}")
    void testPerSplitWatermark(boolean z) throws Exception {
        MockSplitReader build = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(3).setBlockingFetch(true).build();
        SourceOperator createTestOperator = TestingSourceOperator.createTestOperator(new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>) new FutureCompletingBlockingQueue(), (Supplier<SplitReader<int[], MockSourceSplit>>) () -> {
            return build;
        }, new Configuration(), (SourceReaderContext) new TestingReaderContext()), WatermarkStrategy.forGenerator(context -> {
            return new OnEventWatermarkGenerator();
        }), true);
        MockSourceSplit mockSourceSplit = new MockSourceSplit(0, 0, 3);
        mockSourceSplit.addRecord(100);
        mockSourceSplit.addRecord(200);
        mockSourceSplit.addRecord(300);
        MockSourceSplit mockSourceSplit2 = new MockSourceSplit(1, 0, 3);
        mockSourceSplit2.addRecord(150);
        mockSourceSplit2.addRecord(250);
        mockSourceSplit2.addRecord(350);
        WatermarkCollectingDataOutput watermarkCollectingDataOutput = new WatermarkCollectingDataOutput();
        if (z) {
            createTestOperator.emitNext(watermarkCollectingDataOutput);
        }
        createTestOperator.handleOperatorEvent(new AddSplitEvent(Arrays.asList(mockSourceSplit, mockSourceSplit2), new MockSourceSplitSerializer()));
        CommonTestUtils.waitUtil(() -> {
            try {
                createTestOperator.emitNext(watermarkCollectingDataOutput);
                return Boolean.valueOf(watermarkCollectingDataOutput.numRecords == 3);
            } catch (Exception e) {
                LOG.warn("Exception caught at emitting records", e);
                return false;
            }
        }, Duration.ofSeconds(10L), String.format("%d out of 3 records are received within timeout", Integer.valueOf(watermarkCollectingDataOutput.numRecords)));
        Assertions.assertThat(watermarkCollectingDataOutput.watermarks).isEmpty();
        CommonTestUtils.waitUtil(() -> {
            try {
                createTestOperator.emitNext(watermarkCollectingDataOutput);
                return Boolean.valueOf(watermarkCollectingDataOutput.numRecords == 6);
            } catch (Exception e) {
                LOG.warn("Exception caught at emitting records", e);
                return false;
            }
        }, Duration.ofSeconds(10L), String.format("%d out of 6 records are received within timeout", Integer.valueOf(watermarkCollectingDataOutput.numRecords)));
        Assertions.assertThat(watermarkCollectingDataOutput.watermarks).hasSize(3);
        Assertions.assertThat(watermarkCollectingDataOutput.watermarks).containsExactly(new Long[]{150L, 250L, 300L});
    }

    @Test
    void testMultipleSplitsAndFinishedByRecordEvaluator() throws Exception {
        int i = 7;
        int i2 = 15;
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        MockSplitReader build = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setSeparatedFinishedRecord(false).setBlockingFetch(false).build();
        MockSourceReader mockSourceReader = new MockSourceReader(futureCompletingBlockingQueue, new SingleThreadFetcherManager(futureCompletingBlockingQueue, () -> {
            return build;
        }, getConfig()), getConfig(), new TestingReaderContext(), num -> {
            return num.intValue() == i || num.intValue() == i2;
        });
        mockSourceReader.start();
        mockSourceReader.addSplits(Arrays.asList(m17getSplit(0, 10, Boundedness.BOUNDED), m17getSplit(1, 10, Boundedness.BOUNDED)));
        mockSourceReader.notifyNoMoreSplits();
        TestingReaderOutput testingReaderOutput = new TestingReaderOutput();
        while (true) {
            InputStatus pollNext = mockSourceReader.pollNext(testingReaderOutput);
            if (pollNext == InputStatus.END_OF_INPUT) {
                List list = (List) IntStream.concat(IntStream.range(0, 7), IntStream.range(10, 15)).boxed().collect(Collectors.toList());
                Assertions.assertThat(testingReaderOutput.getEmittedRecords()).containsExactlyInAnyOrder(list.toArray(new Integer[list.size()]));
                return;
            } else if (pollNext == InputStatus.NOTHING_AVAILABLE) {
                mockSourceReader.isAvailable().get();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: createReader, reason: merged with bridge method [inline-methods] */
    public MockSourceReader m18createReader() {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        MockSplitReader build = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setBlockingFetch(true).build();
        return new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>) futureCompletingBlockingQueue, (Supplier<SplitReader<int[], MockSourceSplit>>) () -> {
            return build;
        }, getConfig(), (SourceReaderContext) new TestingReaderContext());
    }

    protected List<MockSourceSplit> getSplits(int i, int i2, Boundedness boundedness) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(m17getSplit(i3, i2, boundedness));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: getSplit, reason: merged with bridge method [inline-methods] */
    public MockSourceSplit m17getSplit(int i, int i2, Boundedness boundedness) {
        MockSourceSplit mockSourceSplit = boundedness == Boundedness.BOUNDED ? new MockSourceSplit(i, 0, i2) : new MockSourceSplit(i);
        for (int i3 = 0; i3 < i2; i3++) {
            mockSourceSplit.addRecord((i * 10) + i3);
        }
        return mockSourceSplit;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getNextRecordIndex(MockSourceSplit mockSourceSplit) {
        return mockSourceSplit.index();
    }

    private Configuration getConfig() {
        Configuration configuration = new Configuration();
        configuration.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
        configuration.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
        return configuration;
    }

    private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(String str, RecordsWithSplitIds<E> recordsWithSplitIds) throws Exception {
        SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit> singleThreadMultiplexSourceReaderBase = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>(new FutureCompletingBlockingQueue(), () -> {
            return new TestingSplitReader(recordsWithSplitIds);
        }, new PassThroughRecordEmitter(), new Configuration(), new TestingReaderContext()) { // from class: org.apache.flink.connector.base.source.reader.SourceReaderBaseTest.2
            public void notifyCheckpointComplete(long j) {
            }

            protected void onSplitFinished(Map<String, TestingSourceSplit> map) {
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public TestingSourceSplit initializedState(TestingSourceSplit testingSourceSplit) {
                return testingSourceSplit;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public TestingSourceSplit toSplitType(String str2, TestingSourceSplit testingSourceSplit) {
                return testingSourceSplit;
            }
        };
        singleThreadMultiplexSourceReaderBase.start();
        singleThreadMultiplexSourceReaderBase.addSplits(Collections.singletonList(new TestingSourceSplit(str)));
        singleThreadMultiplexSourceReaderBase.isAvailable().get();
        return singleThreadMultiplexSourceReaderBase;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1801351427:
                if (implMethodName.equals("lambda$testPerSplitWatermark$c7c57de6$1")) {
                    z = true;
                    break;
                }
                break;
            case -1522521464:
                if (implMethodName.equals("lambda$testMultipleSplitsAndFinishedByRecordEvaluator$f60bf540$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/base/source/reader/RecordEvaluator") && serializedLambda.getFunctionalInterfaceMethodName().equals("isEndOfStream") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/SourceReaderBaseTest") && serializedLambda.getImplMethodSignature().equals("(IILjava/lang/Integer;)Z")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    return num -> {
                        return num.intValue() == intValue || num.intValue() == intValue2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/base/source/reader/SourceReaderBaseTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return new OnEventWatermarkGenerator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
