package org.apache.beam.runners.dataflow;

import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.runners.dataflow.internal.DataflowGroupByKey;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.util.construction.PTransformReplacements;
import org.apache.beam.sdk.util.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.class */
class RedistributeByKeyOverrideFactory<K, V> extends SingleInputOutputOverrideFactory<PCollection<KV<K, V>>, PCollection<KV<K, V>>, Redistribute.RedistributeByKey<K, V>> {

    /* loaded from: input_file:org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory$DataflowRedistributeByKey.class */
    private static class DataflowRedistributeByKey<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
        private final Redistribute.RedistributeByKey<K, V> originalTransform;

        private DataflowRedistributeByKey(Redistribute.RedistributeByKey<K, V> redistributeByKey) {
            this.originalTransform = redistributeByKey;
        }

        public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> pCollection) {
            WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
            PCollection apply = pCollection.apply("SetIdentityWindow", Window.into(new IdentityWindowFn(windowingStrategy.getWindowFn().windowCoder())).triggering(new ReshuffleTrigger()).discardingFiredPanes().withTimestampCombiner(TimestampCombiner.EARLIEST).withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()))).apply("ReifyOriginalMetadata", Reify.windowsInValue());
            return (this.originalTransform.getAllowDuplicates() ? (PCollection) apply.apply(DataflowGroupByKey.createWithAllowDuplicates()) : apply.apply(DataflowGroupByKey.create())).apply("ExpandIterable", ParDo.of(new DoFn<KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, ValueInSingleWindow<V>>>() { // from class: org.apache.beam.runners.dataflow.RedistributeByKeyOverrideFactory.DataflowRedistributeByKey.1
                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<K, Iterable<ValueInSingleWindow<V>>> kv, DoFn.OutputReceiver<KV<K, ValueInSingleWindow<V>>> outputReceiver) {
                    Object key = kv.getKey();
                    Iterator it = ((Iterable) kv.getValue()).iterator();
                    while (it.hasNext()) {
                        outputReceiver.output(KV.of(key, (ValueInSingleWindow) it.next()));
                    }
                }
            })).apply("RestoreMetadata", new RestoreMetadata()).setWindowingStrategyInternal(windowingStrategy);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory$RestoreMetadata.class */
    public static class RestoreMetadata<K, V> extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, PCollection<KV<K, V>>> {
        private RestoreMetadata() {
        }

        public PCollection<KV<K, V>> expand(PCollection<KV<K, ValueInSingleWindow<V>>> pCollection) {
            return pCollection.apply(ParDo.of(new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() { // from class: org.apache.beam.runners.dataflow.RedistributeByKeyOverrideFactory.RestoreMetadata.1
                public Duration getAllowedTimestampSkew() {
                    return Duration.millis(Long.MAX_VALUE);
                }

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<K, ValueInSingleWindow<V>> kv, DoFn.OutputReceiver<KV<K, V>> outputReceiver) {
                    outputReceiver.outputWindowedValue(KV.of(kv.getKey(), ((ValueInSingleWindow) kv.getValue()).getValue()), ((ValueInSingleWindow) kv.getValue()).getTimestamp(), Collections.singleton(((ValueInSingleWindow) kv.getValue()).getWindow()), ((ValueInSingleWindow) kv.getValue()).getPane());
                }
            }));
        }
    }

    public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, V>>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>, Redistribute.RedistributeByKey<K, V>> appliedPTransform) {
        return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new DataflowRedistributeByKey(appliedPTransform.getTransform()));
    }
}
