package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.OuterStreamJoinStoreFactory;
import org.apache.kafka.streams.kstream.internals.StreamJoinedStoreFactory;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImplJoin.class */
class KStreamImplJoin {
    private final InternalStreamsBuilder builder;
    private final boolean leftOuter;
    private final boolean rightOuter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImplJoin$TimeTracker.class */
    public static class TimeTracker {
        private long emitIntervalMs = 1000;
        long streamTime = -1;
        long minTime = Long.MAX_VALUE;
        long nextTimeToEmit;

        public void setEmitInterval(long j) {
            this.emitIntervalMs = j;
        }

        public void advanceStreamTime(long j) {
            this.streamTime = Math.max(j, this.streamTime);
        }

        public void updatedMinTime(long j) {
            this.minTime = Math.min(j, this.minTime);
        }

        public void advanceNextTimeToEmit() {
            this.nextTimeToEmit += this.emitIntervalMs;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamImplJoin$TimeTrackerSupplier.class */
    public static class TimeTrackerSupplier {
        private final Map<TaskId, TimeTracker> tracker = new ConcurrentHashMap();

        TimeTrackerSupplier() {
        }

        public TimeTracker get(TaskId taskId) {
            return this.tracker.computeIfAbsent(taskId, taskId2 -> {
                return new TimeTracker();
            });
        }

        public void remove(TaskId taskId) {
            this.tracker.remove(taskId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamImplJoin(InternalStreamsBuilder internalStreamsBuilder, boolean z, boolean z2) {
        this.builder = internalStreamsBuilder;
        this.leftOuter = z;
        this.rightOuter = z2;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <K, V1, V2, VOut> KStream<K, VOut> join(KStream<K, V1> kStream, KStream<K, V2> kStream2, ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> valueJoinerWithKey, JoinWindows joinWindows, StreamJoined<K, V1, V2> streamJoined) {
        StoreFactory joinWindowStoreBuilderFromSupplier;
        StoreFactory joinWindowStoreBuilderFromSupplier2;
        StreamJoinedInternal streamJoinedInternal = new StreamJoinedInternal(streamJoined, this.builder);
        NamedInternal namedInternal = new NamedInternal(streamJoinedInternal.name());
        String str = this.rightOuter ? "-outer-this-join" : "-this-join";
        String str2 = this.leftOuter ? "-outer-other-join" : "-other-join";
        String suffixWithOrElseGet = namedInternal.suffixWithOrElseGet("-this-windowed", this.builder, "KSTREAM-WINDOWED-");
        String suffixWithOrElseGet2 = namedInternal.suffixWithOrElseGet("-other-windowed", this.builder, "KSTREAM-WINDOWED-");
        String newProcessorName = this.rightOuter ? this.builder.newProcessorName("KSTREAM-OUTERTHIS-") : this.builder.newProcessorName("KSTREAM-JOINTHIS-");
        String newProcessorName2 = this.leftOuter ? this.builder.newProcessorName("KSTREAM-OUTEROTHER-") : this.builder.newProcessorName("KSTREAM-JOINOTHER-");
        String suffixWithOrElseGet3 = namedInternal.suffixWithOrElseGet(str, newProcessorName);
        String suffixWithOrElseGet4 = namedInternal.suffixWithOrElseGet(str2, newProcessorName2);
        String suffixWithOrElseGet5 = namedInternal.suffixWithOrElseGet("-merge", this.builder, "KSTREAM-MERGE-");
        GraphNode graphNode = ((AbstractStream) kStream).graphNode;
        GraphNode graphNode2 = ((AbstractStream) kStream2).graphNode;
        String storeName = streamJoinedInternal.storeName();
        WindowBytesStoreSupplier thisStoreSupplier = streamJoinedInternal.thisStoreSupplier();
        WindowBytesStoreSupplier otherStoreSupplier = streamJoinedInternal.otherStoreSupplier();
        assertUniqueStoreNames(thisStoreSupplier, otherStoreSupplier);
        if (thisStoreSupplier == null) {
            joinWindowStoreBuilderFromSupplier = new StreamJoinedStoreFactory(storeName == null ? newProcessorName : storeName + str, joinWindows, streamJoinedInternal, StreamJoinedStoreFactory.Type.THIS);
        } else {
            assertWindowSettings(thisStoreSupplier, joinWindows);
            joinWindowStoreBuilderFromSupplier = joinWindowStoreBuilderFromSupplier(thisStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
        }
        if (otherStoreSupplier == null) {
            joinWindowStoreBuilderFromSupplier2 = new StreamJoinedStoreFactory(storeName == null ? newProcessorName2 : storeName + str2, joinWindows, streamJoinedInternal, StreamJoinedStoreFactory.Type.OTHER);
        } else {
            assertWindowSettings(otherStoreSupplier, joinWindows);
            joinWindowStoreBuilderFromSupplier2 = joinWindowStoreBuilderFromSupplier(otherStoreSupplier, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
        }
        ProcessorParameters<K, V1, ?, ?> processorParameters = new ProcessorParameters<>(new KStreamJoinWindow(joinWindowStoreBuilderFromSupplier.name()), suffixWithOrElseGet);
        this.builder.addGraphNode(graphNode, new WindowedStreamProcessorNode(joinWindowStoreBuilderFromSupplier.name(), processorParameters));
        ProcessorParameters<K, V2, ?, ?> processorParameters2 = new ProcessorParameters<>(new KStreamJoinWindow(joinWindowStoreBuilderFromSupplier2.name()), suffixWithOrElseGet2);
        this.builder.addGraphNode(graphNode2, new WindowedStreamProcessorNode(joinWindowStoreBuilderFromSupplier2.name(), processorParameters2));
        Optional<StoreFactory> empty = Optional.empty();
        if (this.leftOuter) {
            empty = Optional.of(new OuterStreamJoinStoreFactory(newProcessorName, streamJoinedInternal, joinWindows, this.rightOuter ? OuterStreamJoinStoreFactory.Type.RIGHT : OuterStreamJoinStoreFactory.Type.LEFT));
        }
        TimeTrackerSupplier timeTrackerSupplier = new TimeTrackerSupplier();
        JoinWindowsInternal joinWindowsInternal = new JoinWindowsInternal(joinWindows);
        KStreamKStreamJoin kStreamKStreamJoin = new KStreamKStreamJoin(true, joinWindowStoreBuilderFromSupplier2.name(), joinWindowsInternal, valueJoinerWithKey, this.leftOuter, empty.map((v0) -> {
            return v0.name();
        }), timeTrackerSupplier);
        KStreamKStreamJoin kStreamKStreamJoin2 = new KStreamKStreamJoin(false, joinWindowStoreBuilderFromSupplier.name(), joinWindowsInternal, AbstractStream.reverseJoinerWithKey(valueJoinerWithKey), this.rightOuter, empty.map((v0) -> {
            return v0.name();
        }), timeTrackerSupplier);
        KStreamKStreamSelfJoin kStreamKStreamSelfJoin = new KStreamKStreamSelfJoin(joinWindowStoreBuilderFromSupplier.name(), joinWindowsInternal, valueJoinerWithKey, joinWindows.size() + joinWindows.gracePeriodMs());
        PassThrough passThrough = new PassThrough();
        StreamStreamJoinNode.StreamStreamJoinNodeBuilder streamStreamJoinNodeBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
        streamStreamJoinNodeBuilder.withJoinMergeProcessorParameters(new ProcessorParameters(passThrough, suffixWithOrElseGet5)).withJoinThisProcessorParameters(new ProcessorParameters<>(kStreamKStreamJoin, suffixWithOrElseGet3)).withJoinOtherProcessorParameters(new ProcessorParameters<>(kStreamKStreamJoin2, suffixWithOrElseGet4)).withThisWindowStoreBuilder(joinWindowStoreBuilderFromSupplier).withOtherWindowStoreBuilder(joinWindowStoreBuilderFromSupplier2).withThisWindowedStreamProcessorParameters(processorParameters).withOtherWindowedStreamProcessorParameters(processorParameters2).withOuterJoinWindowStoreBuilder(empty).withValueJoiner(valueJoinerWithKey).withNodeName(suffixWithOrElseGet5).withSelfJoinProcessorParameters(new ProcessorParameters<>(kStreamKStreamSelfJoin, suffixWithOrElseGet5));
        if (joinWindowsInternal.spuriousResultFixEnabled()) {
            streamStreamJoinNodeBuilder.withSpuriousResultFixEnabled();
        }
        StreamStreamJoinNode build = streamStreamJoinNodeBuilder.build();
        if (this.leftOuter || this.rightOuter) {
            build.addLabel(GraphNode.Label.NULL_KEY_RELAXED_JOIN);
        }
        this.builder.addGraphNode(Arrays.asList(graphNode, graphNode2), build);
        HashSet hashSet = new HashSet(((KStreamImpl) kStream).subTopologySourceNodes);
        hashSet.addAll(((KStreamImpl) kStream2).subTopologySourceNodes);
        return new KStreamImpl(suffixWithOrElseGet5, streamJoinedInternal.keySerde(), null, hashSet, false, build, this.builder);
    }

    private void assertWindowSettings(WindowBytesStoreSupplier windowBytesStoreSupplier, JoinWindows joinWindows) {
        if (!windowBytesStoreSupplier.retainDuplicates()) {
            throw new StreamsException("The StoreSupplier must set retainDuplicates=true, found retainDuplicates=false");
        }
        if (!(windowBytesStoreSupplier.retentionPeriod() == joinWindows.size() + joinWindows.gracePeriodMs() && windowBytesStoreSupplier.windowSize() == joinWindows.size())) {
            throw new StreamsException(String.format("Window settings mismatch. WindowBytesStoreSupplier settings %s must match JoinWindows settings %s for the window size and retention period", windowBytesStoreSupplier, joinWindows));
        }
    }

    private void assertUniqueStoreNames(WindowBytesStoreSupplier windowBytesStoreSupplier, WindowBytesStoreSupplier windowBytesStoreSupplier2) {
        if (windowBytesStoreSupplier != null && windowBytesStoreSupplier2 != null && windowBytesStoreSupplier.name().equals(windowBytesStoreSupplier2.name())) {
            throw new StreamsException("Both StoreSuppliers have the same name.  StoreSuppliers must provide unique names");
        }
    }

    private static <K, V> StoreFactory joinWindowStoreBuilderFromSupplier(WindowBytesStoreSupplier windowBytesStoreSupplier, Serde<K> serde, Serde<V> serde2) {
        return new StoreBuilderWrapper(Stores.windowStoreBuilder(windowBytesStoreSupplier, serde, serde2));
    }
}
