package org.apache.paimon.flink.sink.cdc;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.types.DataField;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.class */
public class CdcMultiTableParsingProcessFunction<T> extends ProcessFunction<T, Void> {
    private final EventParser.Factory<T> parserFactory;
    private transient EventParser<T> parser;
    private transient Map<String, OutputTag<List<DataField>>> updatedDataFieldsOutputTags;
    private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;

    public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> factory) {
        this.parserFactory = factory;
    }

    public void open(OpenContext openContext) throws Exception {
        open(new Configuration());
    }

    public void open(Configuration configuration) throws Exception {
        this.parser = this.parserFactory.create();
        this.updatedDataFieldsOutputTags = new HashMap();
        this.recordOutputTags = new HashMap();
    }

    public void processElement(T t, ProcessFunction<T, Void>.Context context, Collector<Void> collector) throws Exception {
        this.parser.setRawEvent(t);
        String parseTableName = this.parser.parseTableName();
        List<DataField> parseSchemaChange = this.parser.parseSchemaChange();
        if (!parseSchemaChange.isEmpty()) {
            context.output(getUpdatedDataFieldsOutputTag(parseTableName), parseSchemaChange);
        }
        this.parser.parseRecords().forEach(cdcRecord -> {
            context.output(getRecordOutputTag(parseTableName), cdcRecord);
        });
    }

    private OutputTag<List<DataField>> getUpdatedDataFieldsOutputTag(String str) {
        return this.updatedDataFieldsOutputTags.computeIfAbsent(str, CdcMultiTableParsingProcessFunction::createUpdatedDataFieldsOutputTag);
    }

    public static OutputTag<List<DataField>> createUpdatedDataFieldsOutputTag(String str) {
        return new OutputTag<>("new-data-field-list-" + str, new ListTypeInfo(DataField.class));
    }

    private OutputTag<CdcRecord> getRecordOutputTag(String str) {
        return this.recordOutputTags.computeIfAbsent(str, CdcMultiTableParsingProcessFunction::createRecordOutputTag);
    }

    public static OutputTag<CdcRecord> createRecordOutputTag(String str) {
        return new OutputTag<>("record-" + str, TypeInformation.of(CdcRecord.class));
    }
}
