package org.apache.beam.examples;

import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/* loaded from: input_file:org/apache/beam/examples/KafkaStreaming.class */
public class KafkaStreaming {
    private static final String TOPIC_NAME = "my-topic";
    private static final int ALLOWED_LATENESS_TIME = 1;
    private static final int TIME_OUTPUT_AFTER_FIRST_ELEMENT = 10;
    private static final int WINDOW_TIME = 30;
    private static final int MESSAGES_COUNT = 100;
    private static final String[] NAMES = {"Alice", "Bob", "Charlie", "David"};
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("HH:mm:ss");

    /* renamed from: org.apache.beam.examples.KafkaStreaming$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing = new int[PaneInfo.Timing.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[PaneInfo.Timing.EARLY.ordinal()] = KafkaStreaming.ALLOWED_LATENESS_TIME;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[PaneInfo.Timing.ON_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[PaneInfo.Timing.LATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$IntermittentlyFailingIntegerDeserializer.class */
    public static class IntermittentlyFailingIntegerDeserializer implements Deserializer<Integer> {
        public static final IntegerDeserializer INTEGER_DESERIALIZER = new IntegerDeserializer();
        public int deserializeCount = 0;

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Integer m14deserialize(String str, byte[] bArr) {
            this.deserializeCount += KafkaStreaming.ALLOWED_LATENESS_TIME;
            if (this.deserializeCount % KafkaStreaming.TIME_OUTPUT_AFTER_FIRST_ELEMENT == 0) {
                throw new SerializationException("Expected Serialization Exception");
            }
            return INTEGER_DESERIALIZER.deserialize(str, bArr);
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$KafkaConsumer.class */
    public static class KafkaConsumer {
        private final KafkaStreamingOptions options;

        public KafkaConsumer(KafkaStreamingOptions kafkaStreamingOptions) {
            this.options = kafkaStreamingOptions;
        }

        public void run() {
            Pipeline create = Pipeline.create(this.options);
            Window into = Window.into(FixedWindows.of(Duration.standardSeconds(30L)));
            AfterProcessingTime plusDelayOf = AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10L));
            HashMap hashMap = new HashMap();
            hashMap.put("auto.offset.reset", "latest");
            ErrorHandler.BadRecordErrorHandler registerBadRecordErrorHandler = create.registerBadRecordErrorHandler(new LogErrors());
            try {
                PCollection apply = create.apply(KafkaIO.read().withBootstrapServers(this.options.getKafkaHost()).withTopic(KafkaStreaming.TOPIC_NAME).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(IntermittentlyFailingIntegerDeserializer.class).withConsumerConfigUpdates(hashMap).withBadRecordErrorHandler(registerBadRecordErrorHandler).withoutMetadata());
                if (registerBadRecordErrorHandler != null) {
                    registerBadRecordErrorHandler.close();
                }
                apply.apply(into.triggering(Repeatedly.forever(plusDelayOf)).withAllowedLateness(Duration.standardSeconds(1L)).accumulatingFiredPanes()).apply(Sum.integersPerKey()).apply(Combine.globally(new WindowCombineFn()).withoutDefaults()).apply(ParDo.of(new LogResults()));
                create.run().waitUntilFinish();
                System.out.println("Pipeline finished");
            } catch (Throwable th) {
                if (registerBadRecordErrorHandler != null) {
                    try {
                        registerBadRecordErrorHandler.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$KafkaProducer.class */
    public static class KafkaProducer extends TimerTask {
        private final KafkaStreamingOptions options;

        /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$KafkaProducer$RandomUserScoreGeneratorFn.class */
        static class RandomUserScoreGeneratorFn extends DoFn<Object, KV<String, Integer>> {
            private static final int MAX_SCORE = 100;

            RandomUserScoreGeneratorFn() {
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<Object, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(generate());
            }

            public KV<String, Integer> generate() {
                Random random = new Random();
                return KV.of(KafkaStreaming.NAMES[random.nextInt(KafkaStreaming.NAMES.length)], Integer.valueOf(random.nextInt(MAX_SCORE) + KafkaStreaming.ALLOWED_LATENESS_TIME));
            }
        }

        public KafkaProducer(KafkaStreamingOptions kafkaStreamingOptions) {
            this.options = kafkaStreamingOptions;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            Pipeline create = Pipeline.create(this.options);
            create.apply(GenerateSequence.from(0L).withRate(100L, Duration.standardSeconds(30L)).withTimestampFn(l -> {
                return new Instant(System.currentTimeMillis());
            })).apply(ParDo.of(new RandomUserScoreGeneratorFn())).apply(KafkaIO.write().withBootstrapServers(this.options.getKafkaHost()).withTopic(KafkaStreaming.TOPIC_NAME).withKeySerializer(StringSerializer.class).withValueSerializer(IntegerSerializer.class).withProducerConfigUpdates(new HashMap()));
            create.run().waitUntilFinish();
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -532492533:
                    if (implMethodName.equals("lambda$run$5a007597$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/examples/KafkaStreaming$KafkaProducer") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Lorg/joda/time/Instant;")) {
                        return l -> {
                            return new Instant(System.currentTimeMillis());
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$KafkaStreamingOptions.class */
    public interface KafkaStreamingOptions extends PipelineOptions {
        @Default.String("kafka_server:9092")
        @Description("Kafka server host")
        String getKafkaHost();

        void setKafkaHost(String str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$LogErrors.class */
    public static class LogErrors extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$LogErrors$LogErrorFn.class */
        public static class LogErrorFn extends DoFn<BadRecord, BadRecord> {
            LogErrorFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element BadRecord badRecord, DoFn.OutputReceiver<BadRecord> outputReceiver) {
                System.out.println(badRecord);
                outputReceiver.output(badRecord);
            }
        }

        LogErrors() {
        }

        public PCollection<BadRecord> expand(PCollection<BadRecord> pCollection) {
            return pCollection.apply("Log Errors", ParDo.of(new LogErrorFn()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$LogResults.class */
    public static class LogResults extends DoFn<Map<String, Integer>, Map<String, Integer>> {
        LogResults() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Map<String, Integer>, Map<String, Integer>>.ProcessContext processContext, IntervalWindow intervalWindow) throws Exception {
            Map map = (Map) processContext.element();
            if (map == null) {
                processContext.output((Map) processContext.element());
                return;
            }
            String instant = intervalWindow.start().toString(KafkaStreaming.dateTimeFormatter);
            String instant2 = intervalWindow.end().toString(KafkaStreaming.dateTimeFormatter);
            PaneInfo.Timing timing = processContext.pane().getTiming();
            switch (AnonymousClass1.$SwitchMap$org$apache$beam$sdk$transforms$windowing$PaneInfo$Timing[timing.ordinal()]) {
                case KafkaStreaming.ALLOWED_LATENESS_TIME /* 1 */:
                    System.out.println("Live score (running sum) for the current round:");
                    break;
                case 2:
                    System.out.println("Final score for the current round:");
                    break;
                case 3:
                    System.out.printf("Late score for the round from %s to %s:%n", instant, instant2);
                    break;
                default:
                    throw new RuntimeException("Unknown timing value");
            }
            for (Map.Entry entry : map.entrySet()) {
                System.out.printf("%10s: %-10s%n", entry.getKey(), entry.getValue());
            }
            if (timing == PaneInfo.Timing.ON_TIME) {
                System.out.printf("======= End of round from %s to %s =======%n%n", instant, instant2);
            } else {
                System.out.println();
            }
            processContext.output((Map) processContext.element());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/examples/KafkaStreaming$WindowCombineFn.class */
    public static class WindowCombineFn extends Combine.CombineFn<KV<String, Integer>, Map<String, Integer>, Map<String, Integer>> {
        static final /* synthetic */ boolean $assertionsDisabled;

        WindowCombineFn() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Map<String, Integer> m17createAccumulator() {
            return new HashMap();
        }

        public Map<String, Integer> addInput(Map<String, Integer> map, KV<String, Integer> kv) {
            if (!$assertionsDisabled && kv == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && map == null) {
                throw new AssertionError();
            }
            map.put((String) kv.getKey(), (Integer) kv.getValue());
            return map;
        }

        public Map<String, Integer> mergeAccumulators(Iterable<Map<String, Integer>> iterable) {
            HashMap hashMap = new HashMap();
            Iterator<Map<String, Integer>> it = iterable.iterator();
            while (it.hasNext()) {
                for (Map.Entry<String, Integer> entry : it.next().entrySet()) {
                    if (hashMap.containsKey(entry.getKey())) {
                        hashMap.put(entry.getKey(), Integer.valueOf(((Integer) hashMap.get(entry.getKey())).intValue() + entry.getValue().intValue()));
                    } else {
                        hashMap.put(entry.getKey(), entry.getValue());
                    }
                }
            }
            return hashMap;
        }

        public Map<String, Integer> extractOutput(Map<String, Integer> map) {
            return map;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m16mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Map<String, Integer>>) iterable);
        }

        static {
            $assertionsDisabled = !KafkaStreaming.class.desiredAssertionStatus();
        }
    }

    public static void main(String[] strArr) {
        Duration standardSeconds = Duration.standardSeconds(30L);
        Instant instant = new Instant((Instant.now().getMillis() + standardSeconds.getMillis()) - (Instant.now().plus(standardSeconds).getMillis() % standardSeconds.getMillis()));
        KafkaStreamingOptions kafkaStreamingOptions = (KafkaStreamingOptions) PipelineOptionsFactory.fromArgs(strArr).withValidation().as(KafkaStreamingOptions.class);
        new Timer().schedule(new KafkaProducer(kafkaStreamingOptions), instant.toDate());
        new KafkaConsumer(kafkaStreamingOptions).run();
    }
}
