package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.ReactiveRedisCommands;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.stream.ClaimedMessages;
import io.quarkus.redis.datasource.stream.PendingMessage;
import io.quarkus.redis.datasource.stream.ReactiveStreamCommands;
import io.quarkus.redis.datasource.stream.StreamMessage;
import io.quarkus.redis.datasource.stream.StreamRange;
import io.quarkus.redis.datasource.stream.XAddArgs;
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
import io.quarkus.redis.datasource.stream.XPendingArgs;
import io.quarkus.redis.datasource.stream.XPendingSummary;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:io/quarkus/redis/runtime/datasource/ReactiveStreamCommandsImpl.class */
public class ReactiveStreamCommandsImpl<K, F, V> extends AbstractStreamCommands<K, F, V> implements ReactiveStreamCommands<K, F, V>, ReactiveRedisCommands {
    private final ReactiveRedisDataSource reactive;
    private final Type typeOfValue;
    private final Type typeOfField;
    private final Type typeOfKey;

    public ReactiveStreamCommandsImpl(ReactiveRedisDataSourceImpl reactiveRedisDataSourceImpl, Type type, Type type2, Type type3) {
        super(reactiveRedisDataSourceImpl, type, type2, type3);
        this.typeOfKey = type;
        this.typeOfField = type2;
        this.typeOfValue = type3;
        this.reactive = reactiveRedisDataSourceImpl;
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisCommands
    public ReactiveRedisDataSource getDataSource() {
        return this.reactive;
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Integer> xack(K k, String str, String... strArr) {
        return super._xack(k, str, strArr).map((v0) -> {
            return v0.toInteger();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<String> xadd(K k, Map<F, V> map) {
        return super._xadd(k, map).map(ReactiveStreamCommandsImpl::getIdOrNull);
    }

    protected static String getIdOrNull(Response response) {
        if (response == null) {
            return null;
        }
        return response.toString();
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<String> xadd(K k, XAddArgs xAddArgs, Map<F, V> map) {
        return super._xadd(k, xAddArgs, map).map(ReactiveStreamCommandsImpl::getIdOrNull);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<ClaimedMessages<K, F, V>> xautoclaim(K k, String str, String str2, Duration duration, String str3, int i) {
        return super._xautoclaim(k, str, str2, duration, str3, i).map(response -> {
            return decodeAsClaimedMessages(k, response);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClaimedMessages<K, F, V> decodeAsClaimedMessages(K k, Response response) {
        return response == null ? new ClaimedMessages<>(null, List.of()) : new ClaimedMessages<>(response.get(0).toString(), decodeListOfMessages(k, response.get(1)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected List<StreamMessage<K, F, V>> decodeMessageListPrefixedByKey(Response response) {
        if (response == null) {
            return List.of();
        }
        Object decode = this.marshaller.decode(this.typeOfKey, response.get(0));
        Response response2 = response.get(1);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < response2.size(); i++) {
            arrayList.add(decodeMessageWithStreamId(decode, response2.get(i)));
        }
        return arrayList;
    }

    private StreamMessage<K, F, V> decodeMessageWithStreamId(K k, Response response) {
        if (response == null) {
            return null;
        }
        return response.type() == ResponseType.BULK ? new StreamMessage<>(k, response.toString(), Map.of()) : new StreamMessage<>(k, response.get(0).toString(), decodeMessagePayload(response.get(1)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    Map<F, V> decodeMessagePayload(Response response) {
        HashMap hashMap = new HashMap();
        Object obj = null;
        Iterator it = response.iterator();
        while (it.hasNext()) {
            Response response2 = (Response) it.next();
            if (obj == null) {
                obj = this.marshaller.decode(this.typeOfField, response2);
            } else {
                hashMap.put(obj, this.marshaller.decode(this.typeOfValue, response2));
                obj = null;
            }
        }
        return hashMap;
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<ClaimedMessages<K, F, V>> xautoclaim(K k, String str, String str2, Duration duration, String str3) {
        return super._xautoclaim(k, str, str2, duration, str3).map(response -> {
            return decodeAsClaimedMessages(k, response);
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<ClaimedMessages<K, F, V>> xautoclaim(K k, String str, String str2, Duration duration, String str3, int i, boolean z) {
        return super._xautoclaim(k, str, str2, duration, str3, i, z).map(response -> {
            return decodeAsClaimedMessages(k, response);
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xclaim(K k, String str, String str2, Duration duration, String... strArr) {
        return super._xclaim(k, str, str2, duration, strArr).map(response -> {
            return decodeListOfMessages(k, response);
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xclaim(K k, String str, String str2, Duration duration, XClaimArgs xClaimArgs, String... strArr) {
        return super._xclaim(k, str, str2, duration, xClaimArgs, strArr).map(response -> {
            return decodeListOfMessages(k, response);
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Integer> xdel(K k, String... strArr) {
        return super._xdel(k, strArr).map((v0) -> {
            return v0.toInteger();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Void> xgroupCreate(K k, String str, String str2) {
        return super._xgroupCreate(k, str, str2).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Void> xgroupCreate(K k, String str, String str2, XGroupCreateArgs xGroupCreateArgs) {
        return super._xgroupCreate(k, str, str2, xGroupCreateArgs).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Boolean> xgroupCreateConsumer(K k, String str, String str2) {
        return super._xgroupCreateConsumer(k, str, str2).map((v0) -> {
            return v0.toBoolean();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Long> xgroupDelConsumer(K k, String str, String str2) {
        return super._xgroupDelConsumer(k, str, str2).map((v0) -> {
            return v0.toLong();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Boolean> xgroupDestroy(K k, String str) {
        return super._xgroupDestroy(k, str).map((v0) -> {
            return v0.toBoolean();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Void> xgroupSetId(K k, String str, String str2) {
        return super._xgroupSetId(k, str, str2).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Void> xgroupSetId(K k, String str, String str2, XGroupSetIdArgs xGroupSetIdArgs) {
        return super._xgroupSetId(k, str, str2, xGroupSetIdArgs).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Long> xlen(K k) {
        return super._xlen(k).map((v0) -> {
            return v0.toLong();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xrange(K k, StreamRange streamRange, int i) {
        return super._xrange(k, streamRange, i).map(response -> {
            return decodeListOfMessages(k, response);
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xrange(K k, StreamRange streamRange) {
        return super._xrange(k, streamRange).map(response -> {
            return decodeListOfMessages(k, response);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<StreamMessage<K, F, V>> decodeListOfMessages(K k, Response response) {
        if (response == null) {
            return List.of();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = response.iterator();
        while (it.hasNext()) {
            arrayList.add(decodeMessageWithStreamId(k, (Response) it.next()));
        }
        return arrayList;
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xread(K k, String str) {
        return xread(Map.of(k, str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<StreamMessage<K, F, V>> decodeAsListOfMessagesFromXRead(Response response) {
        if (response == null) {
            return List.of();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = response.iterator();
        while (it.hasNext()) {
            arrayList.addAll(decodeMessageListPrefixedByKey((Response) it.next()));
        }
        return arrayList;
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xread(Map<K, String> map) {
        return super._xread(map).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xread(K k, String str, XReadArgs xReadArgs) {
        return xread(Map.of(k, str), xReadArgs);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xread(Map<K, String> map, XReadArgs xReadArgs) {
        return super._xread(map, xReadArgs).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String str, String str2, K k, String str3) {
        return xreadgroup(str, str2, Map.of(k, str3));
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String str, String str2, Map<K, String> map) {
        return super._xreadgroup(str, str2, map).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String str, String str2, K k, String str3, XReadGroupArgs xReadGroupArgs) {
        return xreadgroup(str, str2, Map.of(k, str3), xReadGroupArgs);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String str, String str2, Map<K, String> map, XReadGroupArgs xReadGroupArgs) {
        return super._xreadgroup(str, str2, map, xReadGroupArgs).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xrevrange(K k, StreamRange streamRange, int i) {
        return super._xrevrange(k, streamRange, i).map(response -> {
            return decodeListOfMessages(k, response);
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<StreamMessage<K, F, V>>> xrevrange(K k, StreamRange streamRange) {
        return super._xrevrange(k, streamRange).map(response -> {
            return decodeListOfMessages(k, response);
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Long> xtrim(K k, String str) {
        return super._xtrim(k, new XTrimArgs().minid(str)).map((v0) -> {
            return v0.toLong();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<Long> xtrim(K k, XTrimArgs xTrimArgs) {
        return super._xtrim(k, xTrimArgs).map((v0) -> {
            return v0.toLong();
        });
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<XPendingSummary> xpending(K k, String str) {
        return super._xpending(k, str).map(this::decodeAsXPendingSummary);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<PendingMessage>> xpending(K k, String str, StreamRange streamRange, int i) {
        return xpending(k, str, streamRange, i, null);
    }

    @Override // io.quarkus.redis.datasource.stream.ReactiveStreamCommands
    public Uni<List<PendingMessage>> xpending(K k, String str, StreamRange streamRange, int i, XPendingArgs xPendingArgs) {
        return super._xpending(k, str, streamRange, i, xPendingArgs).map(this::decodeListOfPendingMessages);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<PendingMessage> decodeListOfPendingMessages(Response response) {
        if (response == null) {
            return List.of();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = response.iterator();
        while (it.hasNext()) {
            Response response2 = (Response) it.next();
            arrayList.add(new PendingMessage(response2.get(0).toString(), response2.get(1).toString(), Duration.ofMillis(response2.get(2).toLong().longValue()), response2.get(3).toInteger().intValue()));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public XPendingSummary decodeAsXPendingSummary(Response response) {
        if (response == null) {
            return null;
        }
        Long l = response.get(0).toLong();
        String response2 = response.get(1) != null ? response.get(1).toString() : null;
        String response3 = response.get(2) != null ? response.get(2).toString() : null;
        HashMap hashMap = new HashMap();
        if (response.get(3) != null) {
            Iterator it = response.get(3).iterator();
            while (it.hasNext()) {
                Response response4 = (Response) it.next();
                hashMap.put(response4.get(0).toString(), response4.get(1).toLong());
            }
        }
        return new XPendingSummary(l.longValue(), response2, response3, hashMap);
    }
}
