package org.apache.beam.fn.harness.control;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/fn/harness/control/FinalizeBundleHandler.class */
public class FinalizeBundleHandler {
    private final ConcurrentMap<String, Collection<CallbackRegistration>> bundleFinalizationCallbacks = new ConcurrentHashMap();
    private final PriorityQueue<TimestampedValue<String>> cleanUpQueue = new PriorityQueue<>(11, Comparator.comparing((v0) -> {
        return v0.getTimestamp();
    }));
    private final Future<Void> cleanUpResult;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/fn/harness/control/FinalizeBundleHandler$CallbackRegistration.class */
    public static abstract class CallbackRegistration {
        public static CallbackRegistration create(Instant instant, DoFn.BundleFinalizer.Callback callback) {
            return new AutoValue_FinalizeBundleHandler_CallbackRegistration(instant, callback);
        }

        public abstract Instant getExpiryTime();

        public abstract DoFn.BundleFinalizer.Callback getCallback();
    }

    public FinalizeBundleHandler(ExecutorService executorService) {
        this.cleanUpResult = executorService.submit(() -> {
            while (true) {
                synchronized (this.cleanUpQueue) {
                    TimestampedValue<String> peek = this.cleanUpQueue.peek();
                    while (peek == null) {
                        this.cleanUpQueue.wait();
                        peek = this.cleanUpQueue.peek();
                    }
                    for (Instant now = Instant.now(); peek.getTimestamp().isAfter(now); now = Instant.now()) {
                        this.cleanUpQueue.wait(new Duration(now, peek.getTimestamp()).getMillis());
                        peek = this.cleanUpQueue.peek();
                    }
                    this.bundleFinalizationCallbacks.remove(this.cleanUpQueue.poll().getValue());
                }
            }
        });
    }

    public void registerCallbacks(String str, Collection<CallbackRegistration> collection) {
        if (collection.isEmpty()) {
            return;
        }
        Collection<CallbackRegistration> putIfAbsent = this.bundleFinalizationCallbacks.putIfAbsent(str, collection);
        Preconditions.checkState(putIfAbsent == null, "Expected to not have any past callbacks for bundle %s but found %s.", str, putIfAbsent);
        long j = Long.MIN_VALUE;
        Iterator<CallbackRegistration> it = collection.iterator();
        while (it.hasNext()) {
            j = Math.max(j, it.next().getExpiryTime().getMillis());
        }
        synchronized (this.cleanUpQueue) {
            this.cleanUpQueue.offer(TimestampedValue.of(str, new Instant(j)));
            this.cleanUpQueue.notify();
        }
    }

    public BeamFnApi.InstructionResponse.Builder finalizeBundle(BeamFnApi.InstructionRequest instructionRequest) throws Exception {
        String instructionId = instructionRequest.getFinalizeBundle().getInstructionId();
        Collection<CallbackRegistration> remove = this.bundleFinalizationCallbacks.remove(instructionId);
        if (remove == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setFinalizeBundle(BeamFnApi.FinalizeBundleResponse.getDefaultInstance());
        }
        ArrayList arrayList = new ArrayList();
        Iterator<CallbackRegistration> it = remove.iterator();
        while (it.hasNext()) {
            try {
                it.next().getCallback().onBundleSuccess();
            } catch (Exception e) {
                arrayList.add(e);
            }
        }
        if (arrayList.isEmpty()) {
            return BeamFnApi.InstructionResponse.newBuilder().setFinalizeBundle(BeamFnApi.FinalizeBundleResponse.getDefaultInstance());
        }
        Exception exc = new Exception(String.format("Failed to handle bundle finalization for bundle %s.", instructionId));
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            exc.addSuppressed((Exception) it2.next());
        }
        throw exc;
    }
}
