package org.apache.hudi.sink.append;

import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/append/AppendWriteFunction.class */
public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
    private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunction.class);
    private static final long serialVersionUID = 1;
    private transient BulkInsertWriterHelper writerHelper;
    private final RowType rowType;

    public AppendWriteFunction(Configuration configuration, RowType rowType) {
        super(configuration);
        this.rowType = rowType;
    }

    @Override // org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void snapshotState() {
        flushData(false);
    }

    public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
        if (this.writerHelper == null) {
            initWriterHelper();
        }
        this.writerHelper.write((RowData) i);
    }

    @Override // org.apache.hudi.sink.common.AbstractWriteFunction
    public void endInput() {
        flushData(true);
        this.writeStatuses.clear();
    }

    @VisibleForTesting
    public BulkInsertWriterHelper getWriterHelper() {
        return this.writerHelper;
    }

    private void initWriterHelper() {
        this.currentInstant = instantToWrite(true);
        if (this.currentInstant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), this.currentInstant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), this.rowType);
    }

    private void flushData(boolean z) {
        List<WriteStatus> emptyList;
        String instantToWrite;
        if (this.writerHelper != null) {
            emptyList = this.writerHelper.getWriteStatuses(this.taskID);
            instantToWrite = this.writerHelper.getInstantTime();
        } else {
            emptyList = Collections.emptyList();
            instantToWrite = instantToWrite(false);
            LOG.info("No data to write in subtask [{}] for instant [{}]", Integer.valueOf(this.taskID), instantToWrite);
        }
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(instantToWrite).writeStatus(emptyList).lastBatch(true).endInput(z).build());
        this.writerHelper = null;
        this.writeStatuses.addAll(emptyList);
        this.confirming = true;
    }
}
