package org.apache.hudi.execution;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.HoodieLazyInsertIterable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import scala.Tuple2;

/* loaded from: input_file:org/apache/hudi/execution/TestBoundedInMemoryQueue.class */
public class TestBoundedInMemoryQueue extends HoodieSparkClientTestHarness {
    private final String instantTime = InProcessTimeGenerator.createNewInstantTime();
    private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name()).withWriteBufferLimitBytes(1024).build(false);

    @BeforeEach
    public void setUp() throws Exception {
        initTestDataGenerator();
        initExecutorServiceWithFixedThreadPool(2);
    }

    @AfterEach
    public void tearDown() throws Exception {
        cleanupResources();
    }

    @Timeout(60)
    @Test
    public void testRecordReading() throws Exception {
        List generateInserts = this.dataGen.generateInserts(this.instantTime, 128);
        BoundedInMemoryQueue boundedInMemoryQueue = new BoundedInMemoryQueue(1024L, HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig));
        Future submit = this.executorService.submit(() -> {
            new IteratorBasedQueueProducer(generateInserts.iterator()).produce(boundedInMemoryQueue);
            boundedInMemoryQueue.seal();
            return true;
        });
        Iterator it = generateInserts.iterator();
        int i = 0;
        while (boundedInMemoryQueue.iterator().hasNext()) {
            HoodieAvroRecord hoodieAvroRecord = (HoodieAvroRecord) it.next();
            Option insertValue = hoodieAvroRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
            HoodieLazyInsertIterable.HoodieInsertValueGenResult hoodieInsertValueGenResult = (HoodieLazyInsertIterable.HoodieInsertValueGenResult) boundedInMemoryQueue.iterator().next();
            Assertions.assertEquals(hoodieAvroRecord, hoodieInsertValueGenResult.getResult());
            Assertions.assertEquals(insertValue, hoodieInsertValueGenResult.getResult().getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA));
            i++;
        }
        Assertions.assertFalse(boundedInMemoryQueue.iterator().hasNext() || it.hasNext());
        Assertions.assertEquals(128, i);
        submit.get();
    }

    @Timeout(60)
    @Test
    public void testCompositeProducerRecordReading() throws Exception {
        ArrayList arrayList = new ArrayList();
        BoundedInMemoryQueue boundedInMemoryQueue = new BoundedInMemoryQueue(1024L, HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig));
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 40; i++) {
            List<HoodieRecord> generateInserts = this.dataGen.generateInserts(this.instantTime, 1000);
            int i2 = 0;
            for (HoodieRecord hoodieRecord : generateInserts) {
                Assertions.assertFalse(hashMap.containsKey(hoodieRecord.getRecordKey()));
                hashMap.put(hoodieRecord.getRecordKey(), new Tuple2(Integer.valueOf(i), Integer.valueOf(i2)));
                i2++;
            }
            arrayList.add(generateInserts);
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < arrayList.size(); i3++) {
            List list = (List) arrayList.get(i3);
            if (i3 % 2 == 0) {
                arrayList2.add(new IteratorBasedQueueProducer(list.iterator()));
            } else {
                arrayList2.add(new FunctionBasedQueueProducer(hoodieMessageQueue -> {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        try {
                            hoodieMessageQueue.insertRecord(it.next());
                        } catch (Exception e) {
                            throw new HoodieException(e);
                        }
                    }
                    return true;
                }));
            }
        }
        List list2 = (List) arrayList2.stream().map(hoodieProducer -> {
            return this.executorService.submit(() -> {
                hoodieProducer.produce(boundedInMemoryQueue);
                return true;
            });
        }).collect(Collectors.toList());
        Future submit = this.executorService.submit(() -> {
            try {
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
                boundedInMemoryQueue.seal();
                return true;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Map map = (Map) IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), num -> {
            return -1;
        }));
        Map map2 = (Map) IntStream.range(0, 40).boxed().collect(Collectors.toMap(Function.identity(), num2 -> {
            return 0;
        }));
        while (boundedInMemoryQueue.iterator().hasNext()) {
            Tuple2 tuple2 = (Tuple2) hashMap.get(((HoodieLazyInsertIterable.HoodieInsertValueGenResult) boundedInMemoryQueue.iterator().next()).getResult().getRecordKey());
            Integer num3 = (Integer) map.get(tuple2._1());
            map2.put(tuple2._1(), Integer.valueOf(((Integer) map2.get(tuple2._1())).intValue() + 1));
            map.put(tuple2._1(), Integer.valueOf(num3.intValue() + 1));
            Assertions.assertEquals(num3.intValue() + 1, ((Integer) tuple2._2()).intValue());
        }
        for (int i4 = 0; i4 < 40; i4++) {
            Assertions.assertEquals(1000, (Integer) map2.get(Integer.valueOf(i4)));
        }
        submit.get();
    }

    @Timeout(60)
    @Test
    public void testMemoryLimitForBuffering() throws Exception {
        List generateInserts = this.dataGen.generateInserts(this.instantTime, 128);
        BoundedInMemoryQueue boundedInMemoryQueue = new BoundedInMemoryQueue(5 * new DefaultSizeEstimator().sizeEstimate((HoodieLazyInsertIterable.HoodieInsertValueGenResult) HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig).apply(generateInserts.get(0))), HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig));
        this.executorService.submit(() -> {
            new IteratorBasedQueueProducer(generateInserts.iterator()).produce(boundedInMemoryQueue);
            return true;
        });
        while (!isQueueFull(boundedInMemoryQueue.rateLimiter)) {
            Thread.sleep(10L);
        }
        Assertions.assertEquals(0, boundedInMemoryQueue.rateLimiter.availablePermits());
        Assertions.assertEquals(5, boundedInMemoryQueue.currentRateLimit);
        Assertions.assertEquals(5L, boundedInMemoryQueue.size());
        Assertions.assertEquals(4L, boundedInMemoryQueue.samplingRecordCounter.get());
        Assertions.assertEquals(generateInserts.get(0), ((HoodieLazyInsertIterable.HoodieInsertValueGenResult) boundedInMemoryQueue.iterator().next()).getResult());
        Assertions.assertEquals(generateInserts.get(1), ((HoodieLazyInsertIterable.HoodieInsertValueGenResult) boundedInMemoryQueue.iterator().next()).getResult());
        while (!isQueueFull(boundedInMemoryQueue.rateLimiter)) {
            Thread.sleep(10L);
        }
        Assertions.assertEquals(0, boundedInMemoryQueue.rateLimiter.availablePermits());
        Assertions.assertEquals(5, boundedInMemoryQueue.currentRateLimit);
        Assertions.assertEquals(5L, boundedInMemoryQueue.size());
        Assertions.assertEquals(6L, boundedInMemoryQueue.samplingRecordCounter.get());
    }

    @Timeout(60)
    @Test
    public void testException() throws Exception {
        List generateInserts = this.dataGen.generateInserts(this.instantTime, 256);
        DefaultSizeEstimator defaultSizeEstimator = new DefaultSizeEstimator();
        HoodieLazyInsertIterable.HoodieInsertValueGenResult hoodieInsertValueGenResult = (HoodieLazyInsertIterable.HoodieInsertValueGenResult) HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig).apply(generateInserts.get(0));
        long sizeEstimate = 4 * defaultSizeEstimator.sizeEstimate(new Tuple2(hoodieInsertValueGenResult.getResult(), hoodieInsertValueGenResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties())));
        BoundedInMemoryQueue boundedInMemoryQueue = new BoundedInMemoryQueue(sizeEstimate, HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig));
        Future submit = this.executorService.submit(() -> {
            new IteratorBasedQueueProducer(generateInserts.iterator()).produce(boundedInMemoryQueue);
            return true;
        });
        while (!isQueueFull(boundedInMemoryQueue.rateLimiter)) {
            Thread.sleep(10L);
        }
        Exception exc = new Exception("Failing it :)");
        boundedInMemoryQueue.markAsFailed(exc);
        submit.getClass();
        Throwable assertThrows = Assertions.assertThrows(ExecutionException.class, submit::get, "exception is expected");
        Assertions.assertEquals(HoodieException.class, assertThrows.getCause().getClass());
        Assertions.assertEquals(exc, assertThrows.getCause().getCause());
        RuntimeException runtimeException = new RuntimeException("failing record reading");
        Iterator it = (Iterator) Mockito.mock(Iterator.class);
        Mockito.when(Boolean.valueOf(it.hasNext())).thenReturn(true);
        Mockito.when(it.next()).thenThrow(new Throwable[]{runtimeException});
        BoundedInMemoryQueue boundedInMemoryQueue2 = new BoundedInMemoryQueue(sizeEstimate, HoodieLazyInsertIterable.getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, this.writeConfig));
        Future submit2 = this.executorService.submit(() -> {
            try {
                new IteratorBasedQueueProducer(it).produce(boundedInMemoryQueue2);
                return true;
            } catch (Exception e) {
                boundedInMemoryQueue2.markAsFailed(e);
                throw e;
            }
        });
        Assertions.assertEquals(runtimeException, Assertions.assertThrows(Exception.class, () -> {
            boundedInMemoryQueue2.iterator().hasNext();
        }, "exception is expected").getCause());
        submit2.getClass();
        Assertions.assertEquals(runtimeException, Assertions.assertThrows(ExecutionException.class, submit2::get, "exception is expected").getCause());
    }

    private boolean isQueueFull(Semaphore semaphore) {
        return semaphore.availablePermits() == 0 && semaphore.hasQueuedThreads();
    }
}
