package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializationUtils;

/* loaded from: input_file:org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.class */
public class MultiTablesStoreCompactOperator extends PrepareCommitOperator<RowData, MultiTableCommittable> {
    private static final long serialVersionUID = 1;
    private StoreSinkWrite.Provider storeSinkWriteProvider;
    private final CheckpointConfig checkpointConfig;
    private final boolean isStreaming;
    private final boolean ignorePreviousFiles;
    private final String initialCommitUser;
    private transient StoreSinkWriteState state;
    private transient DataFileMetaSerializer dataFileMetaSerializer;
    private final Catalog.Loader catalogLoader;
    protected Catalog catalog;
    protected Map<Identifier, FileStoreTable> tables;
    protected Map<Identifier, StoreSinkWrite> writes;
    protected String commitUser;

    public MultiTablesStoreCompactOperator(Catalog.Loader loader, String str, CheckpointConfig checkpointConfig, boolean z, boolean z2, Options options) {
        super(options);
        this.catalogLoader = loader;
        this.initialCommitUser = str;
        this.checkpointConfig = checkpointConfig;
        this.isStreaming = z;
        this.ignorePreviousFiles = z2;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.catalog = this.catalogLoader.load();
        this.commitUser = (String) StateUtils.getSingleValueFromState(stateInitializationContext, "commit_user_state", String.class, this.initialCommitUser);
        this.state = new StoreSinkWriteState(stateInitializationContext, (str, binaryRow, i) -> {
            return ChannelComputer.select(binaryRow, i, getRuntimeContext().getNumberOfParallelSubtasks()) == getRuntimeContext().getIndexOfThisSubtask();
        });
        this.tables = new HashMap();
        this.writes = new HashMap();
    }

    public void open() throws Exception {
        super.open();
        this.dataFileMetaSerializer = new DataFileMetaSerializer();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        long j = rowData.getLong(0);
        BinaryRow deserializeBinaryRow = SerializationUtils.deserializeBinaryRow(rowData.getBinary(1));
        int i = rowData.getInt(2);
        List<DataFileMeta> deserializeList = this.dataFileMetaSerializer.deserializeList(rowData.getBinary(3));
        Identifier create = Identifier.create(rowData.getString(4).toString(), rowData.getString(5).toString());
        FileStoreTable table = getTable(create);
        Preconditions.checkArgument(!table.coreOptions().writeOnly(), CoreOptions.WRITE_ONLY.key() + " should not be true for MultiTablesStoreCompactOperator.");
        this.storeSinkWriteProvider = createWriteProvider(table, this.checkpointConfig, this.isStreaming, this.ignorePreviousFiles);
        StoreSinkWrite computeIfAbsent = this.writes.computeIfAbsent(create, identifier -> {
            return this.storeSinkWriteProvider.provide(table, this.commitUser, this.state, getContainingTask().getEnvironment().getIOManager(), this.memoryPool, getMetricGroup());
        });
        if (computeIfAbsent.streamingMode()) {
            computeIfAbsent.notifyNewFiles(j, deserializeBinaryRow, i, deserializeList);
            computeIfAbsent.compact(deserializeBinaryRow, i, false);
        } else {
            Preconditions.checkArgument(deserializeList.isEmpty(), "Batch compact job does not concern what files are compacted. They only need to know what buckets are compacted.");
            computeIfAbsent.compact(deserializeBinaryRow, i, true);
        }
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    protected List<MultiTableCommittable> prepareCommit(boolean z, long j) throws IOException {
        LinkedList linkedList = new LinkedList();
        for (Map.Entry<Identifier, StoreSinkWrite> entry : this.writes.entrySet()) {
            Identifier key = entry.getKey();
            linkedList.addAll((Collection) entry.getValue().prepareCommit(z, j).stream().map(committable -> {
                return MultiTableCommittable.fromCommittable(key, committable);
            }).collect(Collectors.toList()));
        }
        return linkedList;
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        Iterator<StoreSinkWrite> it = this.writes.values().iterator();
        while (it.hasNext()) {
            it.next().snapshotState();
        }
        this.state.snapshotState();
    }

    @Override // org.apache.paimon.flink.sink.PrepareCommitOperator
    public void close() throws Exception {
        super.close();
        Iterator<StoreSinkWrite> it = this.writes.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.catalog != null) {
            this.catalog.close();
            this.catalog = null;
        }
    }

    private FileStoreTable getTable(Identifier identifier) throws InterruptedException {
        FileStoreTable fileStoreTable = this.tables.get(identifier);
        if (fileStoreTable == null) {
            while (true) {
                try {
                    FileStoreTable fileStoreTable2 = (FileStoreTable) this.catalog.getTable(identifier);
                    HashMap<String, String> hashMap = new HashMap<String, String>() { // from class: org.apache.paimon.flink.sink.MultiTablesStoreCompactOperator.1
                        {
                            put(CoreOptions.WRITE_ONLY.key(), "false");
                        }
                    };
                    if (this.isStreaming) {
                        hashMap.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
                        hashMap.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
                        hashMap.put(CoreOptions.LOOKUP_WAIT.key(), "false");
                    }
                    fileStoreTable = fileStoreTable2.copy((Map<String, String>) hashMap);
                    this.tables.put(identifier, fileStoreTable);
                    break;
                } catch (Catalog.TableNotExistException e) {
                    Thread.sleep(500L);
                }
            }
        }
        return fileStoreTable;
    }

    private StoreSinkWrite.Provider createWriteProvider(FileStoreTable fileStoreTable, CheckpointConfig checkpointConfig, boolean z, boolean z2) {
        boolean prepareCommitWaitCompaction;
        Options configuration = fileStoreTable.coreOptions().toConfiguration();
        CoreOptions.ChangelogProducer changelogProducer = fileStoreTable.coreOptions().changelogProducer();
        CoreOptions coreOptions = fileStoreTable.coreOptions();
        if (coreOptions.writeOnly()) {
            prepareCommitWaitCompaction = false;
        } else {
            prepareCommitWaitCompaction = coreOptions.prepareCommitWaitCompaction();
            int i = -1;
            if (configuration.contains(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)) {
                i = ((Integer) configuration.get(CoreOptions.FULL_COMPACTION_DELTA_COMMITS)).intValue();
            } else if (configuration.contains(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
                i = (int) (((Duration) configuration.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)).toMillis() / checkpointConfig.getCheckpointInterval());
            }
            if (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || i >= 0) {
                int max = Math.max(i, 1);
                return (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
                    return new GlobalFullCompactionSinkWrite(fileStoreTable2, str, storeSinkWriteState, iOManager, z2, prepareCommitWaitCompaction, max, z, memorySegmentPool, metricGroup);
                };
            }
        }
        if (!coreOptions.needLookup() || coreOptions.prepareCommitWaitCompaction()) {
            boolean z3 = prepareCommitWaitCompaction;
            return (fileStoreTable3, str2, storeSinkWriteState2, iOManager2, memorySegmentPool2, metricGroup2) -> {
                return new StoreSinkWriteImpl(fileStoreTable3, str2, storeSinkWriteState2, iOManager2, z2, z3, z, memorySegmentPool2, metricGroup2);
            };
        }
        boolean z4 = prepareCommitWaitCompaction;
        return (fileStoreTable4, str3, storeSinkWriteState3, iOManager3, memorySegmentPool3, metricGroup3) -> {
            return new AsyncLookupSinkWrite(fileStoreTable4, str3, storeSinkWriteState3, iOManager3, z2, z4, z, memorySegmentPool3, metricGroup3);
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1301836416:
                if (implMethodName.equals("lambda$createWriteProvider$282a48e2$1")) {
                    z = false;
                    break;
                }
                break;
            case 1560460027:
                if (implMethodName.equals("lambda$createWriteProvider$b5c90d59$1")) {
                    z = true;
                    break;
                }
                break;
            case 1560460028:
                if (implMethodName.equals("lambda$createWriteProvider$b5c90d59$2")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator") && serializedLambda.getImplMethodSignature().equals("(ZZIZLorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    boolean booleanValue2 = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    int intValue = ((Integer) serializedLambda.getCapturedArg(2)).intValue();
                    boolean booleanValue3 = ((Boolean) serializedLambda.getCapturedArg(3)).booleanValue();
                    return (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
                        return new GlobalFullCompactionSinkWrite(fileStoreTable2, str, storeSinkWriteState, iOManager, booleanValue, booleanValue2, intValue, booleanValue3, memorySegmentPool, metricGroup);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator") && serializedLambda.getImplMethodSignature().equals("(ZZZLorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    boolean booleanValue4 = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    boolean booleanValue5 = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    boolean booleanValue6 = ((Boolean) serializedLambda.getCapturedArg(2)).booleanValue();
                    return (fileStoreTable4, str3, storeSinkWriteState3, iOManager3, memorySegmentPool3, metricGroup3) -> {
                        return new AsyncLookupSinkWrite(fileStoreTable4, str3, storeSinkWriteState3, iOManager3, booleanValue4, booleanValue5, booleanValue6, memorySegmentPool3, metricGroup3);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator") && serializedLambda.getImplMethodSignature().equals("(ZZZLorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    boolean booleanValue7 = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    boolean booleanValue8 = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    boolean booleanValue9 = ((Boolean) serializedLambda.getCapturedArg(2)).booleanValue();
                    return (fileStoreTable3, str2, storeSinkWriteState2, iOManager2, memorySegmentPool2, metricGroup2) -> {
                        return new StoreSinkWriteImpl(fileStoreTable3, str2, storeSinkWriteState2, iOManager2, booleanValue7, booleanValue8, booleanValue9, memorySegmentPool2, metricGroup2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
