package org.apache.paimon.flink.compact;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.class */
public class UnawareBucketCompactionTopoBuilder {
    private final transient StreamExecutionEnvironment env;
    private final String tableIdentifier;
    private final FileStoreTable table;

    @Nullable
    private Predicate partitionPredicate;
    private boolean isContinuous = false;

    @Nullable
    private Duration partitionIdleTime = null;

    public UnawareBucketCompactionTopoBuilder(StreamExecutionEnvironment streamExecutionEnvironment, String str, FileStoreTable fileStoreTable) {
        this.env = streamExecutionEnvironment;
        this.tableIdentifier = str;
        this.table = fileStoreTable;
    }

    public void withContinuousMode(boolean z) {
        this.isContinuous = z;
    }

    public void withPartitionPredicate(Predicate predicate) {
        this.partitionPredicate = predicate;
    }

    public void withPartitionIdleTime(@Nullable Duration duration) {
        this.partitionIdleTime = duration;
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [java.time.ZonedDateTime] */
    public void build() {
        DataStreamSource<UnawareAppendCompactionTask> buildSource = buildSource();
        if (this.isContinuous) {
            Preconditions.checkArgument(this.partitionIdleTime == null, "Streaming mode does not support partitionIdleTime");
        } else if (this.partitionIdleTime != null) {
            Map<BinaryRow, Long> partitionInfo = getPartitionInfo(this.table);
            long epochMilli = LocalDateTime.now().minus((TemporalAmount) this.partitionIdleTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            buildSource = new DataStreamSource<>(buildSource.filter(unawareAppendCompactionTask -> {
                return ((Long) partitionInfo.get(unawareAppendCompactionTask.partition())).longValue() <= epochMilli;
            }));
        }
        sinkFromSource(buildSource);
    }

    private Map<BinaryRow, Long> getPartitionInfo(FileStoreTable fileStoreTable) {
        return (Map) fileStoreTable.newSnapshotReader().partitionEntries().stream().collect(Collectors.toMap((v0) -> {
            return v0.partition();
        }, (v0) -> {
            return v0.lastFileCreationTime();
        }));
    }

    private DataStreamSource<UnawareAppendCompactionTask> buildSource() {
        return BucketUnawareCompactSource.buildSource(this.env, new BucketUnawareCompactSource(this.table, this.isContinuous, this.table.coreOptions().continuousDiscoveryInterval().toMillis(), this.partitionPredicate), this.tableIdentifier);
    }

    private void sinkFromSource(DataStreamSource<UnawareAppendCompactionTask> dataStreamSource) {
        UnawareBucketCompactionSink.sink(this.table, rebalanceInput(dataStreamSource));
    }

    private DataStream<UnawareAppendCompactionTask> rebalanceInput(DataStreamSource<UnawareAppendCompactionTask> dataStreamSource) {
        Integer num = (Integer) Options.fromMap(this.table.options()).get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
        PartitionTransformation partitionTransformation = new PartitionTransformation(dataStreamSource.getTransformation(), new RebalancePartitioner());
        if (num != null) {
            partitionTransformation.setParallelism(num.intValue());
        } else {
            partitionTransformation.setParallelism(this.env.getParallelism());
        }
        return new DataStream<>(this.env, partitionTransformation);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -953957182:
                if (implMethodName.equals("lambda$build$e6b8f16f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map;JLorg/apache/paimon/append/UnawareAppendCompactionTask;)Z")) {
                    Map map = (Map) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    return unawareAppendCompactionTask -> {
                        return ((Long) map.get(unawareAppendCompactionTask.partition())).longValue() <= longValue;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
