package cn.sliew.carp.framework.queue.kekio.redis;

import cn.sliew.carp.framework.queue.kekio.AbstractQueue;
import cn.sliew.carp.framework.queue.kekio.MessageHandler;
import cn.sliew.carp.framework.queue.kekio.Queue;
import cn.sliew.carp.framework.queue.kekio.QueueExecutor;
import cn.sliew.carp.framework.queue.kekio.message.Message;
import cn.sliew.carp.framework.queue.kekio.metrics.EventPublisher;
import cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue;
import cn.sliew.milky.common.function.CheckedConsumer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.hash.Hashing;
import io.micrometer.core.instrument.MeterRegistry;
import java.beans.ConstructorProperties;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.commands.JedisCommands;

/* loaded from: input_file:cn/sliew/carp/framework/queue/kekio/redis/AbstractRedisQueue.class */
public abstract class AbstractRedisQueue<CLIENT extends JedisCommands> extends AbstractQueue implements MonitorableQueue {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractRedisQueue.class);
    protected final Integer lockTtlSeconds;
    protected final ObjectMapper mapper;
    private final ObjectMapper hashObjectMapper;
    protected static final String READ_MESSAGE_SRC = "local java_scientific = function(x)\n  return string.format(\"%.12E\", x):gsub(\"\\+\", \"\")\nend\n\n-- get the message, move the fingerprint to the unacked queue and return\nlocal message = redis.call(\"HGET\", messagesKey, fingerprint)\n\n-- check for an ack timeout override on the message\nlocal unackScore = unackDefaultScore\nif type(message) == \"string\" and message ~= nil then\n  local ackTimeoutOverride = tonumber(cjson.decode(message)[\"ackTimeoutMs\"])\n  if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then\n    unackScore = unackBaseScore + ackTimeoutOverride\n  end\nend\n\nunackScore = java_scientific(unackScore)\n\nredis.call(\"ZREM\", queueKey, fingerprint)\nredis.call(\"ZADD\", unackKey, unackScore, fingerprint)\n";
    protected static final String READ_MESSAGE_WITH_LOCK_SRC = "local queueKey = KEYS[1]\nlocal unackKey = KEYS[2]\nlocal lockKey = KEYS[3]\nlocal messagesKey = KEYS[4]\nlocal maxScore = ARGV[1]\nlocal peekFingerprintCount = ARGV[2]\nlocal lockTtlSeconds = ARGV[3]\nlocal unackDefaultScore = ARGV[4]\nlocal unackBaseScore = ARGV[5]\n\nlocal not_empty = function(x)\n  return (type(x) == \"table\") and (not x.err) and (#x ~= 0)\nend\n\nlocal acquire_lock = function(fingerprints, locksKey, lockTtlSeconds)\n  if not_empty(fingerprints) then\n    local i=1\n    while (i <= #fingerprints) do\n      redis.call(\"ECHO\", \"attempting lock on \" .. fingerprints[i])\n      if redis.call(\"SET\", locksKey .. \":\" .. fingerprints[i], \"\\uD83D\\uDD12\", \"EX\", lockTtlSeconds, \"NX\") then\n        redis.call(\"ECHO\", \"acquired lock on \" .. fingerprints[i])\n        return fingerprints[i], fingerprints[i+1]\n      end\n      i=i+2\n    end\n  end\n  return nil, nil\nend\n\n-- acquire a lock on a fingerprint\nlocal fingerprints = redis.call(\"ZRANGEBYSCORE\", queueKey, 0.0, maxScore, \"WITHSCORES\", \"LIMIT\", 0, peekFingerprintCount)\nlocal fingerprint, fingerprintScore = acquire_lock(fingerprints, lockKey, lockTtlSeconds)\n\n-- no lock could be acquired\nif fingerprint == nil then\n  if #fingerprints == 0 then\n    return \"NoReadyMessages\"\n  end\n  return \"AcquireLockFailed\"\nend\n\nlocal java_scientific = function(x)\n  return string.format(\"%.12E\", x):gsub(\"\\+\", \"\")\nend\n\n-- get the message, move the fingerprint to the unacked queue and return\nlocal message = redis.call(\"HGET\", messagesKey, fingerprint)\n\n-- check for an ack timeout override on the message\nlocal unackScore = unackDefaultScore\nif type(message) == \"string\" and message ~= nil then\n  local ackTimeoutOverride = tonumber(cjson.decode(message)[\"ackTimeoutMs\"])\n  if ackTimeoutOverride ~= nil and unackBaseScore ~= nil then\n    unackScore = unackBaseScore + ackTimeoutOverride\n  end\nend\n\nunackScore = java_scientific(unackScore)\n\nredis.call(\"ZREM\", queueKey, fingerprint)\nredis.call(\"ZADD\", unackKey, unackScore, fingerprint)\nreturn {fingerprint, fingerprintScore, message}\n";

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:cn/sliew/carp/framework/queue/kekio/redis/AbstractRedisQueue$Fingerprint.class */
    public static class Fingerprint {
        private final String latest;
        private final Set<String> all;

        @Generated
        public String getLatest() {
            return this.latest;
        }

        @Generated
        public Set<String> getAll() {
            return this.all;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Fingerprint)) {
                return false;
            }
            Fingerprint fingerprint = (Fingerprint) obj;
            if (!fingerprint.canEqual(this)) {
                return false;
            }
            String latest = getLatest();
            String latest2 = fingerprint.getLatest();
            if (latest == null) {
                if (latest2 != null) {
                    return false;
                }
            } else if (!latest.equals(latest2)) {
                return false;
            }
            Set<String> all = getAll();
            Set<String> all2 = fingerprint.getAll();
            return all == null ? all2 == null : all.equals(all2);
        }

        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof Fingerprint;
        }

        @Generated
        public int hashCode() {
            String latest = getLatest();
            int hashCode = (1 * 59) + (latest == null ? 43 : latest.hashCode());
            Set<String> all = getAll();
            return (hashCode * 59) + (all == null ? 43 : all.hashCode());
        }

        @Generated
        @ConstructorProperties({"latest", "all"})
        public Fingerprint(String str, Set<String> set) {
            this.latest = str;
            this.all = set;
        }
    }

    protected abstract String getQueueKey();

    protected abstract String getUnackedKey();

    protected abstract String getMessagesKey();

    protected abstract String getLocksKey();

    protected abstract String getAttemptsKey();

    public abstract void cacheScript();

    protected abstract String getReadMessageWithLockScriptSha();

    protected abstract void setReadMessageWithLockScriptSha(String str);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract <T> T withJedis(Function<CLIENT, T> function);

    /* JADX INFO: Access modifiers changed from: protected */
    public void withJedis(Consumer<CLIENT> consumer) {
        withJedis(jedisCommands -> {
            consumer.accept(jedisCommands);
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRedisQueue(ObjectMapper objectMapper, String str, QueueExecutor queueExecutor, Collection<MessageHandler> collection, List<Queue.DeadMessageCallback> list, EventPublisher eventPublisher, MeterRegistry meterRegistry, Boolean bool, Duration duration, Duration duration2, Boolean bool2, TemporalAmount temporalAmount, Integer num) {
        super(str, queueExecutor, collection, list, eventPublisher, meterRegistry, bool, duration, duration2, bool2, temporalAmount);
        this.lockTtlSeconds = Integer.valueOf(Objects.nonNull(num) ? num.intValue() : 10);
        this.mapper = objectMapper;
        this.hashObjectMapper = objectMapper.copy();
        this.hashObjectMapper.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public TemporalAmount getAckTimeout() {
        return this.ackTimeout;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public List<Queue.DeadMessageCallback> getDeadMessageHandlers() {
        return this.deadMessageHandlers;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.Queue
    public Boolean canPollMany() {
        return this.canPollMany;
    }

    @Override // cn.sliew.carp.framework.queue.kekio.metrics.MonitorableQueue
    public EventPublisher getPublisher() {
        return this.publisher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleDeadMessage(Message message) {
        if (CollectionUtils.isNotEmpty(this.deadMessageHandlers)) {
            this.deadMessageHandlers.forEach(deadMessageCallback -> {
                deadMessageCallback.accept(this, message);
            });
            return;
        }
        try {
            log.error("Handle dead message error, empty deadMessageHandlers. message: {}", SerDerUtil.serializeAsJsonString(this.mapper, message));
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public double score() {
        return score(Duration.ZERO);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public double score(TemporalAmount temporalAmount) {
        return Instant.now().plus(Objects.nonNull(temporalAmount) ? temporalAmount : Duration.ZERO).toEpochMilli();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract <E extends Throwable> List<Object> multi(CheckedConsumer<Transaction, E> checkedConsumer);

    /* JADX INFO: Access modifiers changed from: protected */
    public int hgetInt(String str, String str2) {
        return hgetInt(str, str2, 0);
    }

    protected int hgetInt(String str, String str2, int i) {
        return ((Integer) withJedis(jedisCommands -> {
            String hget = jedisCommands.hget(str, str2);
            return Integer.valueOf(hget != null ? Integer.parseInt(hget) : i);
        })).intValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean zismember(JedisCommands jedisCommands, String str, String str2) {
        return jedisCommands.zrank(str, str2) != null;
    }

    protected boolean anyZismember(JedisCommands jedisCommands, String str, Set<String> set) {
        return set.stream().anyMatch(str2 -> {
            return zismember(jedisCommands, str, str2);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String firstFingerprint(String str, Fingerprint fingerprint) {
        return (String) withJedis(jedisCommands -> {
            return fingerprint.getAll().stream().filter(str2 -> {
                return zismember(jedisCommands, str, str2);
            }).findFirst().orElse(null);
        });
    }

    @Deprecated
    protected String hashV1(Message message) {
        try {
            return Hashing.murmur3_128().hashString(SerDerUtil.serializeAsJsonString(this.mapper, message), Charset.defaultCharset()).toString();
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    protected String hashV2(Message message) {
        try {
            HashMap hashMap = (HashMap) this.hashObjectMapper.convertValue(message, HashMap.class);
            hashMap.remove("attributes");
            return Hashing.murmur3_128().hashString("v2:" + SerDerUtil.serializeAsJsonString(this.hashObjectMapper, hashMap), StandardCharsets.UTF_8).toString();
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Fingerprint fingerprint(Message message) {
        String hashV2 = hashV2(message);
        HashSet hashSet = new HashSet();
        hashSet.add(hashV2);
        hashSet.add(hashV1(message));
        return new Fingerprint(hashV2, hashSet);
    }
}
