package org.talend.sdk.component.runtime.beam;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.bind.Jsonb;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.record.Schema;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.runtime.beam.spi.record.RecordCollectors;
import org.talend.sdk.component.runtime.output.InputFactory;
import org.talend.sdk.component.runtime.output.OutputFactory;
import org.talend.sdk.component.runtime.output.Processor;
import org.talend.sdk.component.runtime.output.ProcessorImpl;
import org.talend.sdk.component.runtime.record.RecordConverters;
import org.talend.sdk.component.runtime.record.Schemas;
import org.talend.sdk.component.runtime.serialization.ContainerFinder;
import org.talend.sdk.component.runtime.serialization.LightContainer;

/* loaded from: input_file:org/talend/sdk/component/runtime/beam/BaseProcessorFn.class */
abstract class BaseProcessorFn<O> extends DoFn<Record, O> {
    private static final Logger log = LoggerFactory.getLogger(BaseProcessorFn.class);
    protected Processor processor;
    protected int maxBatchSize = -1;
    protected int currentCount;
    protected volatile RecordBuilderFactory recordFactory;
    protected volatile Jsonb jsonb;

    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/BaseProcessorFn$BeamInputFactory.class */
    protected static final class BeamInputFactory implements InputFactory {
        private final Map<String, Iterator<Record>> objects;

        BeamInputFactory(DoFn<Record, ?>.ProcessContext processContext) {
            Record record = (Record) processContext.element();
            this.objects = (Map) record.getSchema().getAllEntries().filter(entry -> {
                return !entry.getName().startsWith("__talend_internal");
            }).collect(Collectors.toMap((v0) -> {
                return v0.getName();
            }, entry2 -> {
                return record.getArray(Record.class, entry2.getName()).iterator();
            }));
        }

        public Object read(String str) {
            Iterator<Record> orDefault = this.objects.getOrDefault(Schema.sanitizeConnectionName(str), Collections.emptyIterator());
            if (orDefault.hasNext()) {
                return orDefault.next();
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/BaseProcessorFn$BeamMultiOutputFactory.class */
    public static final class BeamMultiOutputFactory extends BeamOutputFactory {
        private final Collection<Record> outputs;

        /* JADX INFO: Access modifiers changed from: protected */
        public BeamMultiOutputFactory(Consumer<Record> consumer, RecordBuilderFactory recordBuilderFactory, Jsonb jsonb) {
            super(consumer, recordBuilderFactory, jsonb);
            this.outputs = new ArrayList();
        }

        @Override // org.talend.sdk.component.runtime.beam.BaseProcessorFn.BeamOutputFactory
        public OutputEmitter create(String str) {
            return obj -> {
                final ArrayList arrayList = new ArrayList();
                new BeamOutputEmitter(arrayList, this.factory, this.jsonb) { // from class: org.talend.sdk.component.runtime.beam.BaseProcessorFn.BeamMultiOutputFactory.1
                    @Override // org.talend.sdk.component.runtime.beam.BaseProcessorFn.BeamOutputEmitter
                    public void emit(Object obj) {
                        super.emit(obj);
                        Record record = arrayList.isEmpty() ? null : (Record) arrayList.iterator().next();
                        BeamMultiOutputFactory.this.outputs.add(BeamMultiOutputFactory.this.factory.newRecordBuilder().withArray(BeamMultiOutputFactory.this.factory.newEntryBuilder().withName(str).withType(Schema.Type.ARRAY).withElementSchema(record == null ? Schemas.EMPTY_RECORD : record.getSchema()).build(), arrayList).build());
                    }
                }.emit(obj);
            };
        }

        @Override // org.talend.sdk.component.runtime.beam.BaseProcessorFn.BeamOutputFactory
        public void postProcessing() {
            if (this.outputs.isEmpty()) {
                return;
            }
            Collection<Record> collection = this.outputs;
            Consumer<Record> consumer = this.emit;
            Objects.requireNonNull(consumer);
            collection.forEach((v1) -> {
                r1.accept(v1);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/BaseProcessorFn$BeamOutputEmitter.class */
    public static class BeamOutputEmitter implements OutputEmitter {
        private final Collection<Record> builder;
        private final RecordBuilderFactory recordBuilderFactory;
        private final Jsonb jsonb;
        private final RecordConverters converters = new RecordConverters();
        private final RecordConverters.MappingMetaRegistry registry = new RecordConverters.MappingMetaRegistry();

        public void emit(Object obj) {
            if (obj == null) {
                return;
            }
            this.builder.add(toRecord(obj));
        }

        private Record toRecord(Object obj) {
            return (Record) Record.class.cast(this.converters.toRecord(this.registry, obj, () -> {
                return this.jsonb;
            }, () -> {
                return this.recordBuilderFactory;
            }));
        }

        public BeamOutputEmitter(Collection<Record> collection, RecordBuilderFactory recordBuilderFactory, Jsonb jsonb) {
            this.builder = collection;
            this.recordBuilderFactory = recordBuilderFactory;
            this.jsonb = jsonb;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/BaseProcessorFn$BeamOutputFactory.class */
    public static abstract class BeamOutputFactory implements OutputFactory {
        protected final Consumer<Record> emit;
        protected final RecordBuilderFactory factory;
        protected final Jsonb jsonb;
        private final Map<String, Collection<Record>> outputs = new HashMap();

        public OutputEmitter create(String str) {
            return new BeamOutputEmitter(this.outputs.computeIfAbsent(Schema.sanitizeConnectionName(str), str2 -> {
                return new ArrayList();
            }), this.factory, this.jsonb);
        }

        public abstract void postProcessing();

        public BeamOutputFactory(Consumer<Record> consumer, RecordBuilderFactory recordBuilderFactory, Jsonb jsonb) {
            this.emit = consumer;
            this.factory = recordBuilderFactory;
            this.jsonb = jsonb;
        }
    }

    /* loaded from: input_file:org/talend/sdk/component/runtime/beam/BaseProcessorFn$BeamSingleOutputFactory.class */
    protected static final class BeamSingleOutputFactory extends BeamOutputFactory {
        private final Map<String, Collection<Record>> outputs;

        protected BeamSingleOutputFactory(Consumer<Record> consumer, RecordBuilderFactory recordBuilderFactory, Jsonb jsonb) {
            super(consumer, recordBuilderFactory, jsonb);
            this.outputs = new HashMap();
        }

        @Override // org.talend.sdk.component.runtime.beam.BaseProcessorFn.BeamOutputFactory
        public OutputEmitter create(String str) {
            return new BeamOutputEmitter(this.outputs.computeIfAbsent(Schema.sanitizeConnectionName(str), str2 -> {
                return new ArrayList();
            }), this.factory, this.jsonb);
        }

        @Override // org.talend.sdk.component.runtime.beam.BaseProcessorFn.BeamOutputFactory
        public void postProcessing() {
            if (this.outputs.isEmpty()) {
                return;
            }
            Stream<Map.Entry<String, Collection<Record>>> stream = this.outputs.entrySet().stream();
            RecordBuilderFactory recordBuilderFactory = this.factory;
            Objects.requireNonNull(recordBuilderFactory);
            this.emit.accept(((Record.Builder) stream.collect(recordBuilderFactory::newRecordBuilder, (builder, entry) -> {
                Record record = ((Collection) entry.getValue()).isEmpty() ? null : (Record) ((Collection) entry.getValue()).iterator().next();
                builder.withArray(this.factory.newEntryBuilder().withName((String) entry.getKey()).withType(Schema.Type.ARRAY).withElementSchema(record == null ? Schemas.EMPTY_RECORD : record.getSchema()).build(), (Collection) entry.getValue());
            }, RecordCollectors::merge)).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseProcessorFn(Processor processor) {
        this.processor = processor;
        if (ProcessorImpl.class.isInstance(processor)) {
            ((ProcessorImpl) ProcessorImpl.class.cast(processor)).getInternalConfiguration().entrySet().stream().filter(entry -> {
                return (!((String) entry.getKey()).endsWith("$maxBatchSize") || entry.getValue() == null || ((String) entry.getValue()).trim().isEmpty()) ? false : true;
            }).findFirst().ifPresent(entry2 -> {
                try {
                    this.maxBatchSize = Integer.parseInt(((String) entry2.getValue()).trim());
                } catch (NumberFormatException e) {
                    log.warn("Invalid configuration: " + entry2);
                }
            });
        }
    }

    protected abstract Consumer<Record> toEmitter(DoFn<Record, O>.ProcessContext processContext);

    protected abstract BeamOutputFactory getFinishBundleOutputFactory(DoFn<Record, O>.FinishBundleContext finishBundleContext);

    @DoFn.Setup
    public void setup() throws Exception {
        this.processor.start();
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<Record, O>.ProcessContext processContext) {
        ensureInit();
        if (this.currentCount == 0) {
            this.processor.beforeGroup();
        }
        BeamSingleOutputFactory beamSingleOutputFactory = new BeamSingleOutputFactory(toEmitter(processContext), this.recordFactory, this.jsonb);
        this.processor.onNext(new BeamInputFactory(processContext), beamSingleOutputFactory);
        beamSingleOutputFactory.postProcessing();
        this.currentCount++;
        if (this.maxBatchSize <= 0 || this.currentCount < this.maxBatchSize) {
            return;
        }
        this.currentCount = 0;
        BeamMultiOutputFactory beamMultiOutputFactory = new BeamMultiOutputFactory(toEmitter(processContext), this.recordFactory, this.jsonb);
        this.processor.afterGroup(beamSingleOutputFactory);
        beamMultiOutputFactory.postProcessing();
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<Record, O>.FinishBundleContext finishBundleContext) {
        if (this.currentCount > 0) {
            ensureInit();
            this.currentCount = 0;
            BeamOutputFactory finishBundleOutputFactory = getFinishBundleOutputFactory(finishBundleContext);
            this.processor.afterGroup(finishBundleOutputFactory);
            finishBundleOutputFactory.postProcessing();
        }
    }

    @DoFn.Teardown
    public void tearDown() {
        this.processor.stop();
    }

    private void ensureInit() {
        if (this.jsonb == null) {
            synchronized (this) {
                if (this.jsonb == null) {
                    LightContainer find = ContainerFinder.Instance.get().find(this.processor.plugin());
                    this.recordFactory = (RecordBuilderFactory) find.findService(RecordBuilderFactory.class);
                    this.jsonb = (Jsonb) find.findService(Jsonb.class);
                }
            }
        }
    }

    public BaseProcessorFn() {
    }

    public void setMaxBatchSize(int i) {
        this.maxBatchSize = i;
    }
}
