package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.class */
public class StreamAppenderatorDriverTest extends EasyMockSupport {
    private static final String DATA_SOURCE = "foo";
    private static final String VERSION = "abc123";
    private static final String UPGRADED_VERSION = "xyz456";
    private static final int MAX_ROWS_IN_MEMORY = 100;
    private static final int MAX_ROWS_PER_SEGMENT = 3;
    private SegmentAllocator allocator;
    private StreamAppenderatorTester streamAppenderatorTester;
    private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
    private StreamAppenderatorDriver driver;
    private DataSegmentKiller dataSegmentKiller;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
    private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
    private static final long HANDOFF_CONDITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1);
    private static final List<InputRow> ROWS = Arrays.asList(new MapBasedInputRow(DateTimes.of("2000"), ImmutableList.of("dim1"), ImmutableMap.of("dim1", "foo", "met1", "1")), new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim1"), ImmutableMap.of("dim1", "foo", "met1", Double.valueOf(2.0d))), new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim2"), ImmutableMap.of("dim2", "bar", "met1", Double.valueOf(2.0d))));

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestCommitterSupplier.class */
    static class TestCommitterSupplier<T> implements Supplier<Committer> {
        private final AtomicReference<T> metadata = new AtomicReference<>();

        public void setMetadata(T t) {
            this.metadata.set(t);
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Committer m84get() {
            final T t = this.metadata.get();
            return new Committer() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestCommitterSupplier.1
                public Object getMetadata() {
                    return t;
                }

                public void run() {
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestSegmentAllocator.class */
    static class TestSegmentAllocator implements SegmentAllocator {
        private final String dataSource;
        private final Granularity granularity;
        private final Map<Long, AtomicInteger> counters = new HashMap();

        public TestSegmentAllocator(String str, Granularity granularity) {
            this.dataSource = str;
            this.granularity = granularity;
        }

        public SegmentIdWithShardSpec allocate(InputRow inputRow, String str, String str2, boolean z) {
            SegmentIdWithShardSpec segmentIdWithShardSpec;
            synchronized (this.counters) {
                DateTime bucketStart = this.granularity.bucketStart(inputRow.getTimestamp());
                long millis = bucketStart.getMillis();
                this.counters.putIfAbsent(Long.valueOf(millis), new AtomicInteger());
                segmentIdWithShardSpec = new SegmentIdWithShardSpec(this.dataSource, this.granularity.bucket(bucketStart), StreamAppenderatorDriverTest.VERSION, new NumberedShardSpec(this.counters.get(Long.valueOf(millis)).getAndIncrement(), 0));
            }
            return segmentIdWithShardSpec;
        }
    }

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestSegmentHandoffNotifierFactory.class */
    static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory {
        private long handoffDelay;
        private boolean handoffEnabled = true;
        private final Set<SegmentDescriptor> handedOffSegmentDescriptors = new HashSet();

        public void disableHandoff() {
            this.handoffEnabled = false;
        }

        public void setHandoffDelay(long j) {
            this.handoffDelay = j;
        }

        public Set<SegmentDescriptor> getHandedOffSegmentDescriptors() {
            ImmutableSet copyOf;
            synchronized (this.handedOffSegmentDescriptors) {
                copyOf = ImmutableSet.copyOf(this.handedOffSegmentDescriptors);
            }
            return copyOf;
        }

        public SegmentHandoffNotifier createSegmentHandoffNotifier(String str, String str2) {
            return new SegmentHandoffNotifier() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory.1
                public boolean registerSegmentHandoffCallback(SegmentDescriptor segmentDescriptor, Executor executor, Runnable runnable) {
                    if (!TestSegmentHandoffNotifierFactory.this.handoffEnabled) {
                        return true;
                    }
                    if (TestSegmentHandoffNotifierFactory.this.handoffDelay > 0) {
                        try {
                            Thread.sleep(TestSegmentHandoffNotifierFactory.this.handoffDelay);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    executor.execute(runnable);
                    synchronized (TestSegmentHandoffNotifierFactory.this.handedOffSegmentDescriptors) {
                        TestSegmentHandoffNotifierFactory.this.handedOffSegmentDescriptors.add(segmentDescriptor);
                    }
                    return true;
                }

                public void start() {
                }

                public void close() {
                }
            };
        }
    }

    @Before
    public void setUp() throws Exception {
        this.streamAppenderatorTester = new StreamAppenderatorTester.Builder().maxRowsInMemory(100).basePersistDirectory(this.temporaryFolder.newFolder()).build();
        this.allocator = new TestSegmentAllocator("foo", Granularities.HOUR);
        this.segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
        this.dataSegmentKiller = (DataSegmentKiller) createStrictMock(DataSegmentKiller.class);
        this.driver = new StreamAppenderatorDriver(this.streamAppenderatorTester.getAppenderator(), this.allocator, this.segmentHandoffNotifierFactory, new TestPublishedSegmentRetriever(this.streamAppenderatorTester.getPushedSegments()), this.dataSegmentKiller, OBJECT_MAPPER, new SegmentGenerationMetrics());
        EasyMock.replay(new Object[]{this.dataSegmentKiller});
    }

    @After
    public void tearDown() throws Exception {
        EasyMock.verify(new Object[]{this.dataSegmentKiller});
        this.driver.clear();
        this.driver.close();
    }

    @Test(timeout = 60000)
    public void testSimple() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob((AppenderatorDriverSegmentLockHelper) null));
        for (int i = 0; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "dummy", testCommitterSupplier, false, true).isOk());
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) this.driver.publish(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = (SegmentsAndCommitMetadata) this.driver.registerHandoff(segmentsAndCommitMetadata).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)), new SegmentIdWithShardSpec("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata2.getSegments()));
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndCommitMetadata2.getCommitMetadata());
    }

    @Test
    public void testMaxRowsPerSegment() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob((AppenderatorDriverSegmentLockHelper) null));
        for (int i = 0; i < 9; i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            AppenderatorDriverAddResult add = this.driver.add(new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim2"), ImmutableMap.of("dim2", StringUtils.format("bar-%d", new Object[]{Integer.valueOf(i)}), "met1", Double.valueOf(2.0d))), "dummy", testCommitterSupplier, false, true);
            Assert.assertTrue(add.isOk());
            if (add.getNumRowsInSegment() > MAX_ROWS_PER_SEGMENT) {
                this.driver.moveSegmentOut("dummy", ImmutableList.of(add.getSegmentIdentifier()));
            }
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) this.driver.publish(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = (SegmentsAndCommitMetadata) this.driver.registerHandoff(segmentsAndCommitMetadata).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(3L, segmentsAndCommitMetadata2.getSegments().size());
        Assert.assertEquals(9, segmentsAndCommitMetadata2.getCommitMetadata());
    }

    @Test(timeout = 60000, expected = TimeoutException.class)
    public void testHandoffTimeout() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        this.segmentHandoffNotifierFactory.disableHandoff();
        Assert.assertNull(this.driver.startJob((AppenderatorDriverSegmentLockHelper) null));
        for (int i = 0; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "dummy", testCommitterSupplier, false, true).isOk());
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) this.driver.publish(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        this.driver.registerHandoff(segmentsAndCommitMetadata).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testHandoffUpgradedSegments() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob((AppenderatorDriverSegmentLockHelper) null));
        for (int i = 0; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "dummy", testCommitterSupplier, false, true).isOk());
        }
        this.driver.persist(testCommitterSupplier.m84get());
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) this.driver.publishAndRegisterHandoff(makeUpgradingPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertNotNull(segmentsAndCommitMetadata.getUpgradedSegments());
        Assert.assertEquals(segmentsAndCommitMetadata.getSegments().size(), segmentsAndCommitMetadata.getUpgradedSegments().size());
        HashSet hashSet = new HashSet();
        Iterator it = segmentsAndCommitMetadata.getSegments().iterator();
        while (it.hasNext()) {
            hashSet.add(((DataSegment) it.next()).toDescriptor());
        }
        Iterator it2 = segmentsAndCommitMetadata.getUpgradedSegments().iterator();
        while (it2.hasNext()) {
            hashSet.add(((DataSegment) it2.next()).toDescriptor());
        }
        Assert.assertEquals(hashSet, this.segmentHandoffNotifierFactory.getHandedOffSegmentDescriptors());
    }

    @Test
    public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob((AppenderatorDriverSegmentLockHelper) null));
        testCommitterSupplier.setMetadata(1);
        Assert.assertTrue(this.driver.add(ROWS.get(0), "dummy", testCommitterSupplier, false, true).isOk());
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata.getSegments()));
        Assert.assertEquals(1, segmentsAndCommitMetadata.getCommitMetadata());
        for (int i = 1; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "dummy", testCommitterSupplier, false, true).isOk());
            SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = (SegmentsAndCommitMetadata) this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(i - 1, 0))), asIdentifiers(segmentsAndCommitMetadata2.getSegments()));
            Assert.assertEquals(Integer.valueOf(i + 1), segmentsAndCommitMetadata2.getCommitMetadata());
        }
        this.driver.persist(testCommitterSupplier.m84get());
        SegmentsAndCommitMetadata segmentsAndCommitMetadata3 = (SegmentsAndCommitMetadata) this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(), asIdentifiers(segmentsAndCommitMetadata3.getSegments()));
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndCommitMetadata3.getCommitMetadata());
    }

    @Test
    public void testIncrementalHandoff() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob((AppenderatorDriverSegmentLockHelper) null));
        testCommitterSupplier.setMetadata(1);
        Assert.assertTrue(this.driver.add(ROWS.get(0), "sequence_0", testCommitterSupplier, false, true).isOk());
        for (int i = 1; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "sequence_1", testCommitterSupplier, false, true).isOk());
        }
        ListenableFuture publishAndRegisterHandoff = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("sequence_0"));
        ListenableFuture publishAndRegisterHandoff2 = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m84get(), ImmutableList.of("sequence_1"));
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) publishAndRegisterHandoff.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = (SegmentsAndCommitMetadata) publishAndRegisterHandoff2.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata.getSegments()));
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata2.getSegments()));
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndCommitMetadata.getCommitMetadata());
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndCommitMetadata2.getCommitMetadata());
    }

    private Set<SegmentIdWithShardSpec> asIdentifiers(Iterable<DataSegment> iterable) {
        return ImmutableSet.copyOf(Iterables.transform(iterable, SegmentIdWithShardSpec::fromDataSegment));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TransactionalSegmentPublisher makeOkPublisher() {
        return makePublisher(set -> {
            return SegmentPublishResult.ok(Set.of());
        });
    }

    private TransactionalSegmentPublisher makeUpgradingPublisher() {
        return makePublisher(set -> {
            HashSet hashSet = new HashSet(set);
            int i = 0;
            Iterator it = set.iterator();
            while (it.hasNext()) {
                DataSegment dataSegment = (DataSegment) it.next();
                DataSegment dataSegment2 = new DataSegment(SegmentId.of("foo", Intervals.ETERNITY, UPGRADED_VERSION, i), dataSegment.getLoadSpec(), dataSegment.getDimensions(), dataSegment.getMetrics(), new NumberedShardSpec(i, 0), (CompactionState) null, dataSegment.getBinaryVersion(), dataSegment.getSize());
                i++;
                hashSet.add(dataSegment2);
            }
            return SegmentPublishResult.ok(hashSet);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TransactionalSegmentPublisher makeFailingPublisher(boolean z) {
        return makePublisher(set -> {
            RuntimeException runtimeException = new RuntimeException("test");
            if (z) {
                throw runtimeException;
            }
            return SegmentPublishResult.fail(runtimeException.getMessage(), new Object[0]);
        });
    }

    private static TransactionalSegmentPublisher makePublisher(final Function<Set<DataSegment>, SegmentPublishResult> function) {
        return new TransactionalSegmentPublisher() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.1
            public SegmentPublishResult publishAnnotatedSegments(@Nullable Set<DataSegment> set, Set<DataSegment> set2, @Nullable Object obj, @Nullable SegmentSchemaMapping segmentSchemaMapping) {
                return (SegmentPublishResult) function.apply(set2);
            }
        };
    }
}
