package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeParameter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/fn/harness/state/OrderedListUserState.class */
public class OrderedListUserState<T> {
    private final BeamFnStateClient beamFnStateClient;
    private final BeamFnApi.StateRequest requestTemplate;
    private final TimestampedValueCoder<T> timestampedValueCoder;
    private NavigableMap<Instant, Collection<T>> pendingAdds = Maps.newTreeMap();
    private TreeRangeSet<Instant> pendingRemoves = TreeRangeSet.create();
    private boolean isCleared = false;
    private boolean isClosed = false;

    /* loaded from: input_file:org/apache/beam/fn/harness/state/OrderedListUserState$TimestampedValueCoder.class */
    public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> {
        private final Coder<T> valueCoder;
        private final KvCoder<Long, T> internalKvCoder;

        public static <T> TimestampedValueCoder<T> of(Coder<T> coder) {
            return new TimestampedValueCoder<>(coder);
        }

        public Object structuralValue(TimestampedValue<T> timestampedValue) {
            return TimestampedValue.of(this.valueCoder.structuralValue(timestampedValue.getValue()), timestampedValue.getTimestamp());
        }

        TimestampedValueCoder(Coder<T> coder) {
            this.valueCoder = (Coder) Preconditions.checkNotNull(coder);
            this.internalKvCoder = KvCoder.of(VarLongCoder.of(), LengthPrefixCoder.of(coder));
        }

        public void encode(TimestampedValue<T> timestampedValue, OutputStream outputStream) throws IOException {
            this.internalKvCoder.encode(KV.of(Long.valueOf(timestampedValue.getTimestamp().getMillis()), timestampedValue.getValue()), outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public TimestampedValue<T> m40decode(InputStream inputStream) throws IOException {
            KV decode = this.internalKvCoder.decode(inputStream);
            return TimestampedValue.of(decode.getValue(), Instant.ofEpochMilli(((Long) decode.getKey()).longValue()));
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
            verifyDeterministic(this, "TimestampedValueCoder requires a deterministic valueCoder", new Coder[]{this.valueCoder});
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.valueCoder);
        }

        public Coder<T> getValueCoder() {
            return this.valueCoder;
        }

        public TypeDescriptor<TimestampedValue<T>> getEncodedTypeDescriptor() {
            return new TypeDescriptor<TimestampedValue<T>>() { // from class: org.apache.beam.fn.harness.state.OrderedListUserState.TimestampedValueCoder.2
            }.where(new TypeParameter<T>() { // from class: org.apache.beam.fn.harness.state.OrderedListUserState.TimestampedValueCoder.1
            }, this.valueCoder.getEncodedTypeDescriptor());
        }

        public List<? extends Coder<?>> getComponents() {
            return Collections.singletonList(this.valueCoder);
        }
    }

    public OrderedListUserState(Cache<?, ?> cache, BeamFnStateClient beamFnStateClient, String str, BeamFnApi.StateKey stateKey, Coder<T> coder) {
        Preconditions.checkArgument(stateKey.hasOrderedListUserState(), "Expected OrderedListUserState StateKey but received %s.", stateKey);
        this.beamFnStateClient = beamFnStateClient;
        this.timestampedValueCoder = TimestampedValueCoder.of(coder);
        this.requestTemplate = BeamFnApi.StateRequest.newBuilder().setInstructionId(str).setStateKey(stateKey).build();
    }

    public void add(TimestampedValue<T> timestampedValue) {
        Preconditions.checkState(!this.isClosed, "OrderedList user state is no longer usable because it is closed for %s", this.requestTemplate.getStateKey());
        Instant timestamp = timestampedValue.getTimestamp();
        this.pendingAdds.putIfAbsent(timestamp, new ArrayList());
        ((Collection) this.pendingAdds.get(timestamp)).add(timestampedValue.getValue());
    }

    public Iterable<TimestampedValue<T>> readRange(Instant instant, Instant instant2) {
        Preconditions.checkState(!this.isClosed, "OrderedList user state is no longer usable because it is closed for %s", this.requestTemplate.getStateKey());
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Instant, Collection<T>> entry : this.pendingAdds.subMap(instant, instant2).entrySet()) {
            arrayList.add(PrefetchableIterables.limit(Iterables.transform(entry.getValue(), obj -> {
                return TimestampedValue.of(obj, (Instant) entry.getKey());
            }), entry.getValue().size()));
        }
        Iterable<TimestampedValue<T>> concat = Iterables.concat(arrayList);
        if (this.isCleared) {
            return concat;
        }
        BeamFnApi.StateRequest.Builder builder = this.requestTemplate.toBuilder();
        builder.getStateKeyBuilder().getOrderedListUserStateBuilder().getRangeBuilder().setStart(instant.getMillis()).setEnd(instant2.getMillis());
        PrefetchableIterables.Default readAllAndDecodeStartingFrom = StateFetchingIterators.readAllAndDecodeStartingFrom(Caches.noop(), this.beamFnStateClient, builder.build(), this.timestampedValueCoder);
        TreeRangeSet create = TreeRangeSet.create(this.pendingRemoves);
        return Iterables.mergeSorted(ImmutableList.of(Iterables.filter(readAllAndDecodeStartingFrom, timestampedValue -> {
            return !create.contains(timestampedValue.getTimestamp());
        }), concat), Comparator.comparing((v0) -> {
            return v0.getTimestamp();
        }));
    }

    public Iterable<TimestampedValue<T>> read() {
        Preconditions.checkState(!this.isClosed, "OrderedList user state is no longer usable because it is closed for %s", this.requestTemplate.getStateKey());
        return readRange(Instant.ofEpochMilli(Long.MIN_VALUE), Instant.ofEpochMilli(Long.MAX_VALUE));
    }

    public void clearRange(Instant instant, Instant instant2) {
        Preconditions.checkState(!this.isClosed, "OrderedList user state is no longer usable because it is closed for %s", this.requestTemplate.getStateKey());
        this.pendingAdds.subMap(instant, instant2).clear();
        if (this.isCleared) {
            return;
        }
        this.pendingRemoves.add(Range.range(instant, BoundType.CLOSED, instant2, BoundType.OPEN));
    }

    public void clear() {
        Preconditions.checkState(!this.isClosed, "OrderedList user state is no longer usable because it is closed for %s", this.requestTemplate.getStateKey());
        this.isCleared = true;
        this.pendingRemoves = TreeRangeSet.create();
        this.pendingRemoves.add(Range.range(Instant.ofEpochMilli(Long.MIN_VALUE), BoundType.CLOSED, Instant.ofEpochMilli(Long.MAX_VALUE), BoundType.OPEN));
        this.pendingAdds.clear();
    }

    public void asyncClose() throws Exception {
        this.isClosed = true;
        if (!this.pendingRemoves.isEmpty()) {
            for (Range range : this.pendingRemoves.asRanges()) {
                BeamFnApi.StateRequest.Builder builder = this.requestTemplate.toBuilder();
                builder.setClear(BeamFnApi.StateClearRequest.newBuilder().build());
                builder.getStateKeyBuilder().getOrderedListUserStateBuilder().getRangeBuilder().setStart(range.lowerEndpoint().getMillis()).setEnd(range.upperEndpoint().getMillis());
                CompletableFuture<BeamFnApi.StateResponse> handle = this.beamFnStateClient.handle(builder);
                if (!handle.get().getError().isEmpty()) {
                    throw new IllegalStateException(handle.get().getError());
                }
            }
            this.pendingRemoves.clear();
        }
        if (this.pendingAdds.isEmpty()) {
            return;
        }
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        for (Map.Entry<Instant, Collection<T>> entry : this.pendingAdds.entrySet()) {
            Iterator<T> it = entry.getValue().iterator();
            while (it.hasNext()) {
                try {
                    this.timestampedValueCoder.encode((TimestampedValue) TimestampedValue.of(it.next(), entry.getKey()), (OutputStream) byteStringOutputStream);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        BeamFnApi.StateRequest.Builder builder2 = this.requestTemplate.toBuilder();
        builder2.getAppendBuilder().setData(byteStringOutputStream.toByteString());
        CompletableFuture<BeamFnApi.StateResponse> handle2 = this.beamFnStateClient.handle(builder2);
        if (!handle2.get().getError().isEmpty()) {
            throw new IllegalStateException(handle2.get().getError());
        }
        this.pendingAdds.clear();
    }
}
