package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamIterationHead.class */
public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
    private RecordWriterOutput<OUT>[] streamOutputs;
    private final BlockingQueue<StreamRecord<OUT>> dataChannel;
    private final String brokerID;
    private final long iterationWaitTime;
    private final boolean shouldWait;

    public StreamIterationHead(Environment environment) throws Exception {
        super(environment);
        String iterationId = getConfiguration().getIterationId();
        if (iterationId == null || iterationId.length() == 0) {
            throw new FlinkRuntimeException("Missing iteration ID in the task configuration");
        }
        this.dataChannel = new ArrayBlockingQueue(1);
        this.brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId, getEnvironment().getTaskInfo().getIndexOfThisSubtask());
        this.iterationWaitTime = getConfiguration().getIterationWaitTime();
        this.shouldWait = this.iterationWaitTime > 0;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        StreamRecord<OUT> poll = this.shouldWait ? this.dataChannel.poll(this.iterationWaitTime, TimeUnit.MILLISECONDS) : this.dataChannel.take();
        if (poll == null) {
            controller.suspendDefaultAction();
            this.mailboxProcessor.suspend();
            return;
        }
        for (RecordWriterOutput<OUT> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.collect((StreamRecord) poll);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.runtime.tasks.OneInputStreamTask, org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() {
        BlockingQueueBroker.INSTANCE.handIn(this.brokerID, this.dataChannel);
        LOG.info("Iteration head {} added feedback queue under {}", getName(), this.brokerID);
        this.streamOutputs = getStreamOutputs();
        if (isSerializingTimestamps()) {
            for (RecordWriterOutput<OUT> recordWriterOutput : this.streamOutputs) {
                recordWriterOutput.emitWatermark(new Watermark(CommittableMessage.EOI));
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanUpInternal() {
        BlockingQueueBroker.INSTANCE.remove(this.brokerID);
        LOG.info("Iteration head {} removed feedback queue under {}", getName(), this.brokerID);
    }

    public static String createBrokerIdString(JobID jobID, String str, int i) {
        return jobID + "-" + str + "-" + i;
    }
}
