package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.class */
public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceManager<K> {
    protected static final Logger LOG = LoggerFactory.getLogger(InternalTimeServiceManagerImpl.class);

    @VisibleForTesting
    static final String TIMER_STATE_PREFIX = "_timer_state";

    @VisibleForTesting
    static final String PROCESSING_TIMER_PREFIX = "_timer_state/processing_";

    @VisibleForTesting
    static final String EVENT_TIMER_PREFIX = "_timer_state/event_";
    private final TaskIOMetricGroup taskIOMetricGroup;
    private final KeyGroupRange localKeyGroupRange;
    private final KeyContext keyContext;
    private final PriorityQueueSetFactory priorityQueueSetFactory;
    private final ProcessingTimeService processingTimeService;
    private final StreamTaskCancellationContext cancellationContext;
    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices = new HashMap();

    private InternalTimeServiceManagerImpl(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupRange, KeyContext keyContext, PriorityQueueSetFactory priorityQueueSetFactory, ProcessingTimeService processingTimeService, StreamTaskCancellationContext streamTaskCancellationContext) {
        this.taskIOMetricGroup = taskIOMetricGroup;
        this.localKeyGroupRange = (KeyGroupRange) Preconditions.checkNotNull(keyGroupRange);
        this.priorityQueueSetFactory = (PriorityQueueSetFactory) Preconditions.checkNotNull(priorityQueueSetFactory);
        this.keyContext = (KeyContext) Preconditions.checkNotNull(keyContext);
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.cancellationContext = streamTaskCancellationContext;
    }

    public static <K> InternalTimeServiceManagerImpl<K> create(TaskIOMetricGroup taskIOMetricGroup, CheckpointableKeyedStateBackend<K> checkpointableKeyedStateBackend, ClassLoader classLoader, KeyContext keyContext, ProcessingTimeService processingTimeService, Iterable<KeyGroupStatePartitionStreamProvider> iterable, StreamTaskCancellationContext streamTaskCancellationContext) throws Exception {
        KeyGroupRange keyGroupRange = checkpointableKeyedStateBackend.getKeyGroupRange();
        InternalTimeServiceManagerImpl<K> internalTimeServiceManagerImpl = new InternalTimeServiceManagerImpl<>(taskIOMetricGroup, keyGroupRange, keyContext, checkpointableKeyedStateBackend, processingTimeService, streamTaskCancellationContext);
        for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : iterable) {
            int keyGroupId = keyGroupStatePartitionStreamProvider.getKeyGroupId();
            Preconditions.checkArgument(keyGroupRange.contains(keyGroupId), "Key Group " + keyGroupId + " does not belong to the local range.");
            internalTimeServiceManagerImpl.restoreStateForKeyGroup(keyGroupStatePartitionStreamProvider.getStream(), keyGroupId, classLoader);
        }
        return internalTimeServiceManagerImpl;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public <N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, Triggerable<K, N> triggerable) {
        Preconditions.checkNotNull(typeSerializer, "Timers can only be used on keyed operators.");
        TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(typeSerializer, typeSerializer2);
        InternalTimerServiceImpl<K, N> registerOrGetTimerService = registerOrGetTimerService(str, timerSerializer);
        registerOrGetTimerService.startTimerService(timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable);
        return registerOrGetTimerService;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String str, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, ?> internalTimerServiceImpl = this.timerServices.get(str);
        if (internalTimerServiceImpl == null) {
            internalTimerServiceImpl = new InternalTimerServiceImpl<>(this.taskIOMetricGroup, this.localKeyGroupRange, this.keyContext, this.processingTimeService, createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + str, timerSerializer), createTimerPriorityQueue(EVENT_TIMER_PREFIX + str, timerSerializer), this.cancellationContext);
            this.timerServices.put(str, internalTimerServiceImpl);
        }
        return (InternalTimerServiceImpl<K, N>) internalTimerServiceImpl;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public <N> InternalTimerService<N> getAsyncInternalTimerService(String str, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, Triggerable<K, N> triggerable, AsyncExecutionController<K> asyncExecutionController) {
        Preconditions.checkNotNull(typeSerializer, "Timers can only be used on keyed operators.");
        TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(typeSerializer, typeSerializer2);
        InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService = registerOrGetAsyncTimerService(str, timerSerializer, asyncExecutionController);
        registerOrGetAsyncTimerService.startTimerService(timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable);
        return registerOrGetAsyncTimerService;
    }

    <N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService(String str, TimerSerializer<K, N> timerSerializer, AsyncExecutionController<K> asyncExecutionController) {
        InternalTimerServiceAsyncImpl<K, N> internalTimerServiceAsyncImpl = (InternalTimerServiceAsyncImpl) this.timerServices.get(str);
        if (internalTimerServiceAsyncImpl == null) {
            internalTimerServiceAsyncImpl = new InternalTimerServiceAsyncImpl<>(this.taskIOMetricGroup, this.localKeyGroupRange, this.keyContext, this.processingTimeService, createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + str, timerSerializer), createTimerPriorityQueue(EVENT_TIMER_PREFIX + str, timerSerializer), this.cancellationContext, asyncExecutionController);
            this.timerServices.put(str, internalTimerServiceAsyncImpl);
        }
        return internalTimerServiceAsyncImpl;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
        return Collections.unmodifiableMap(this.timerServices);
    }

    private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(String str, TimerSerializer<K, N> timerSerializer) {
        return this.priorityQueueSetFactory.create(str, timerSerializer);
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public void advanceWatermark(Watermark watermark) throws Exception {
        Iterator<InternalTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
        while (it.hasNext()) {
            it.next().advanceWatermark(watermark.getTimestamp());
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public boolean tryAdvanceWatermark(Watermark watermark, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) throws Exception {
        Iterator<InternalTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
        while (it.hasNext()) {
            if (!it.next().tryAdvanceWatermark(watermark.getTimestamp(), shouldStopAdvancingFn)) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimeServiceManager
    public void snapshotToRawKeyedState(KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream, String str) throws Exception {
        try {
            try {
                Iterator it = keyedStateCheckpointOutputStream.getKeyGroupList().iterator();
                while (it.hasNext()) {
                    int intValue = ((Integer) it.next()).intValue();
                    keyedStateCheckpointOutputStream.startNewKeyGroup(intValue);
                    snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(keyedStateCheckpointOutputStream), intValue);
                }
            } finally {
                try {
                    keyedStateCheckpointOutputStream.close();
                } catch (Exception e) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", str, e);
                }
            }
        } catch (Exception e2) {
            throw new Exception("Could not write timer service of " + str + " to checkpoint state stream.", e2);
        }
    }

    private void snapshotStateForKeyGroup(DataOutputView dataOutputView, int i) throws IOException {
        new InternalTimerServiceSerializationProxy(this, i).write(dataOutputView);
    }

    private void restoreStateForKeyGroup(InputStream inputStream, int i, ClassLoader classLoader) throws IOException {
        new InternalTimerServiceSerializationProxy(this, classLoader, i).read(inputStream);
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        int i = 0;
        Iterator<InternalTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
        while (it.hasNext()) {
            i += it.next().numProcessingTimeTimers();
        }
        return i;
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        int i = 0;
        Iterator<InternalTimerServiceImpl<K, ?>> it = this.timerServices.values().iterator();
        while (it.hasNext()) {
            i += it.next().numEventTimeTimers();
        }
        return i;
    }
}
