package com.erudika.para.server.queue;

import com.erudika.para.core.ParaObject;
import com.erudika.para.core.Sysprop;
import com.erudika.para.core.Webhook;
import com.erudika.para.core.annotations.Locked;
import com.erudika.para.core.utils.Pager;
import com.erudika.para.core.utils.Para;
import com.erudika.para.core.utils.ParaObjectUtils;
import com.erudika.para.core.utils.Utils;
import com.erudika.para.server.utils.HealthUtils;
import com.erudika.para.server.utils.filters.CORSFilter;
import com.fasterxml.jackson.databind.ObjectReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/erudika/para/server/queue/River.class */
public abstract class River implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(River.class);
    private static final CloseableHttpClient HTTP = HttpClientBuilder.create().setConnectionReuseStrategy((httpRequest, httpResponse, httpContext) -> {
        return false;
    }).setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(10, TimeUnit.SECONDS).build()).build();
    private static ConcurrentHashMap<String, Integer> pendingIds;

    abstract List<String> pullMessages();

    @Override // java.lang.Runnable
    public void run() {
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        LinkedList linkedList3 = new LinkedList();
        ObjectReader jsonReader = ParaObjectUtils.getJsonReader(Map.class);
        int i = 0;
        while (!Thread.interrupted()) {
            try {
                logger.debug("Waiting {}s for messages...", Integer.valueOf(Para.getConfig().queuePollingIntervalSec()));
                int i2 = 0;
                List<String> emptyList = Collections.emptyList();
                if (HealthUtils.getInstance().isHealthy()) {
                    try {
                        emptyList = pullMessages();
                        logger.debug("Pulled {} messages from queue.", Integer.valueOf(emptyList.size()));
                        for (String str : emptyList) {
                            logger.debug("Message from queue: {}", str);
                            if (StringUtils.contains(str, "appid") && StringUtils.contains(str, "type")) {
                                i2 += parseAndCategorizeMessage((Map) jsonReader.readValue(str), linkedList, linkedList2, linkedList3);
                            }
                        }
                    } catch (Exception e) {
                        logger.error("Batch processing operation failed:", e);
                    }
                }
                if (!linkedList.isEmpty() || !linkedList2.isEmpty() || !linkedList3.isEmpty() || i2 > 0) {
                    logger.debug("River summary: {} created, {} updated, {} deleted, {} webhooks delivered.", new Object[]{Integer.valueOf(linkedList.size()), Integer.valueOf(linkedList2.size()), Integer.valueOf(linkedList3.size()), Integer.valueOf(i2)});
                    persistChanges(linkedList, linkedList2, linkedList3);
                    i = 0;
                } else if (emptyList.isEmpty()) {
                    i++;
                    int queuePollingWaitSec = Para.getConfig().queuePollingWaitSec();
                    if (queuePollingWaitSec > 0 && i >= 3) {
                        logger.debug("Queue is empty. Sleeping for {}s...", Integer.valueOf(queuePollingWaitSec));
                        Thread.sleep(queuePollingWaitSec * 1000);
                    }
                }
            } catch (InterruptedException e2) {
                logger.info("River interrupted: {}", e2.getMessage());
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    private int parseAndCategorizeMessage(Map<String, Object> map, List<ParaObject> list, List<ParaObject> list2, List<ParaObject> list3) {
        String str = map.containsKey("id") ? (String) map.get("id") : null;
        String str2 = (String) map.get("type");
        String str3 = (String) map.get("appid");
        boolean equals = ParaObjectUtils.toClass(str2).equals(Sysprop.class);
        if (StringUtils.isBlank(str3) || !equals) {
            return 0;
        }
        if ("webhookpayload".equals(str2)) {
            return processWebhookPayload(str3, str, map);
        }
        if ("indexpayload".equals(str2)) {
            return processIndexPayload(str3, str, map);
        }
        if (map.containsKey("_delete") && "true".equals(map.get("_delete")) && str != null) {
            Sysprop sysprop = new Sysprop(str);
            sysprop.setAppid(str3);
            list3.add(sysprop);
            return 0;
        }
        if (str != null && !"true".equals(map.get("_create"))) {
            list2.add(ParaObjectUtils.setAnnotatedFields(Para.getDAO().read(str3, str), map, Locked.class));
            return 0;
        }
        ParaObject annotatedFields = ParaObjectUtils.setAnnotatedFields(map);
        if (annotatedFields == null) {
            return 0;
        }
        list.add(annotatedFields);
        return 0;
    }

    protected int processWebhookPayload(String str, String str2, Map<String, Object> map) {
        if (!Para.getConfig().webhooksEnabled() || !map.containsKey("targetUrl") || StringUtils.isBlank(str2) || map.isEmpty()) {
            return 0;
        }
        try {
            boolean booleanValue = ((Boolean) map.get("urlEncoded")).booleanValue();
            String trimToEmpty = StringUtils.trimToEmpty((String) map.get("targetUrl"));
            String str3 = (String) map.get("payload");
            Integer valueOf = Integer.valueOf(Math.abs(NumberUtils.toInt(String.valueOf(map.get("repeatedDeliveryAttempts")), 1)));
            HttpPost httpPost = new HttpPost(trimToEmpty);
            httpPost.addHeader("User-Agent", "Para Webhook Dispacher " + Para.getVersion());
            httpPost.setHeader("Content-Type", booleanValue ? "application/x-www-form-urlencoded" : "application/json");
            httpPost.setHeader("X-Webhook-Signature", (String) map.get("signature"));
            httpPost.setHeader("X-Para-Event", (String) map.get("event"));
            if (booleanValue) {
                httpPost.setEntity(new StringEntity("payload=".concat(Utils.urlEncode(str3)), Charset.forName(Para.getConfig().defaultEncoding())));
            } else {
                httpPost.setEntity(new StringEntity(str3, Charset.forName(Para.getConfig().defaultEncoding())));
            }
            if (valueOf.intValue() > 100) {
                valueOf = 100;
            }
            IntStream.range(0, Math.max(1, valueOf.intValue())).parallel().forEach(i -> {
                String str4 = CORSFilter.DEFAULT_EXPOSED_HEADERS;
                try {
                    str4 = (String) HTTP.execute(httpPost, classicHttpResponse -> {
                        if (classicHttpResponse == null || Math.abs(classicHttpResponse.getCode() - 200) <= 10) {
                            logger.debug("Webhook {} delivered to {} successfully.", str2, trimToEmpty);
                            return "OK";
                        }
                        updateFailureCount(str, str2);
                        logger.info("Webhook {} delivery failed! {} responded with code {} {} instead of 2xx.", new Object[]{str2, trimToEmpty, Integer.valueOf(classicHttpResponse.getCode()), classicHttpResponse.getReasonPhrase()});
                        return classicHttpResponse.getReasonPhrase();
                    });
                } catch (Exception e) {
                    updateFailureCount(str, str2);
                    logger.info("Webhook {} not delivered! {} isn't responding. {}", new Object[]{str2, trimToEmpty, str4});
                }
            });
            return 1;
        } catch (Exception e) {
            updateFailureCount(str, str2);
            logger.error("Webhook payload was not delivered:", e);
            return 0;
        }
    }

    protected int processIndexPayload(String str, String str2, Map<String, Object> map) {
        if (!Para.getConfig().isSearchEnabled() || StringUtils.isBlank(str2) || map.isEmpty()) {
            return 0;
        }
        Object obj = map.get("payload");
        try {
            boolean z = -1;
            switch (str2.hashCode()) {
                case -1021108635:
                    if (str2.equals("unindex_all_op")) {
                        z = true;
                        break;
                    }
                    break;
                case -1018717054:
                    if (str2.equals("delete_index_op")) {
                        z = 4;
                        break;
                    }
                    break;
                case -797884398:
                    if (str2.equals("rebuild_index_op")) {
                        z = 2;
                        break;
                    }
                    break;
                case 209268044:
                    if (str2.equals("index_all_op")) {
                        z = false;
                        break;
                    }
                    break;
                case 1028893777:
                    if (str2.equals("create_index_op")) {
                        z = 3;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    indexAllWithRetry(str, obj);
                    return 1;
                case true:
                    Para.getSearch().unindexAll(str, getPayloadObjects(str, obj));
                    return 1;
                case true:
                    Para.getSearch().rebuildIndex(Para.getDAO(), ParaObjectUtils.setAnnotatedFields((Map) obj), CORSFilter.DEFAULT_EXPOSED_HEADERS, new Pager[0]);
                    return 1;
                case true:
                    Para.getSearch().createIndex(ParaObjectUtils.setAnnotatedFields((Map) obj));
                    return 1;
                case true:
                    Para.getSearch().deleteIndex(ParaObjectUtils.setAnnotatedFields((Map) obj));
                    return 1;
                default:
                    return 1;
            }
        } catch (Exception e) {
            logger.error("Indexing operation " + str2 + " failed for app '" + str + "'!", e);
            return 0;
        }
    }

    private void persistChanges(List<ParaObject> list, List<ParaObject> list2, List<ParaObject> list3) {
        if (!list.isEmpty()) {
            Para.getDAO().createAll(list);
        }
        if (!list2.isEmpty()) {
            Para.getDAO().updateAll(list2);
        }
        if (!list3.isEmpty()) {
            Para.getDAO().deleteAll(list3);
        }
        list.clear();
        list2.clear();
        list3.clear();
    }

    private void indexAllWithRetry(String str, Object obj) {
        List list = (List) Optional.ofNullable(obj).orElse(Collections.emptyList());
        Para.getCache().removeAll(str, list);
        Map readAll = Para.getDAO().readAll(str, list, true);
        Para.getSearch().indexAll(str, (List) readAll.values().stream().filter(paraObject -> {
            return paraObject != null;
        }).collect(Collectors.toList()));
        if (readAll.containsValue(null)) {
            if (pendingIds == null) {
                pendingIds = new ConcurrentHashMap<>();
            }
            readAll.entrySet().stream().filter(entry -> {
                return entry.getValue() == null;
            }).forEachOrdered(entry2 -> {
                pendingIds.putIfAbsent((String) entry2.getKey(), 1);
            });
            logger.debug("Some objects are missing from local database while performing 'index_all_op': {}", pendingIds);
            Para.asyncExecute(() -> {
                for (int i = 0; i < Para.getConfig().riverMaxIndexingRetries(); i++) {
                    try {
                        try {
                            Thread.sleep(1000 * (i + 1));
                            Map readAll2 = Para.getDAO().readAll(str, new ArrayList(pendingIds.keySet()), true);
                            int size = pendingIds.size();
                            readAll2.entrySet().stream().filter(entry3 -> {
                                return entry3.getValue() != null;
                            }).forEachOrdered(entry4 -> {
                                pendingIds.remove(entry4.getKey());
                            });
                            if (size != pendingIds.size()) {
                                Para.getSearch().indexAll(str, (List) readAll2.values().stream().collect(Collectors.toList()));
                            }
                            if (pendingIds.isEmpty()) {
                                break;
                            }
                        } catch (InterruptedException e) {
                            logger.info("Retry indexing operation interrupted: {}", e.getMessage());
                            Thread.currentThread().interrupt();
                            if (pendingIds.isEmpty()) {
                                return;
                            }
                            logger.warn("Indexing operation 'index_all_op' failed for objects {} as they were not found in the database for app '{}'. This may cause the index to become out of sync or corrupted.", pendingIds, str);
                            pendingIds.clear();
                            return;
                        }
                    } catch (Throwable th) {
                        if (!pendingIds.isEmpty()) {
                            logger.warn("Indexing operation 'index_all_op' failed for objects {} as they were not found in the database for app '{}'. This may cause the index to become out of sync or corrupted.", pendingIds, str);
                            pendingIds.clear();
                        }
                        throw th;
                    }
                }
                if (pendingIds.isEmpty()) {
                    return;
                }
                logger.warn("Indexing operation 'index_all_op' failed for objects {} as they were not found in the database for app '{}'. This may cause the index to become out of sync or corrupted.", pendingIds, str);
                pendingIds.clear();
            });
        }
    }

    private List<ParaObject> getPayloadObjects(String str, Object obj) {
        List list = (List) Optional.ofNullable(obj).orElse(Collections.emptyList());
        Para.getCache().removeAll(str, list);
        return (List) list.stream().map(str2 -> {
            Sysprop sysprop = new Sysprop(str2);
            sysprop.setAppid(str);
            return sysprop;
        }).collect(Collectors.toList());
    }

    private void updateFailureCount(String str, String str2) {
        String str3 = "failed_webhook_count" + Para.getConfig().separator() + str2;
        Integer num = (Integer) Para.getCache().get(str, str3);
        if (num == null) {
            num = 0;
        }
        if (num.intValue() < Para.getConfig().maxFailedWebhookAttempts() - 1) {
            Para.getCache().put(str, str3, Integer.valueOf(num.intValue() + 1));
            return;
        }
        Webhook read = Para.getDAO().read(str, str2);
        if (read != null) {
            read.setActive(false);
            read.setTooManyFailures(true);
            Para.getDAO().update(str, read);
            Para.getCache().remove(str, str3);
            logger.info("Webhook {} was disabled - a maximum of {} failed deliveries was reached.", str2, Integer.valueOf(Para.getConfig().maxFailedWebhookAttempts()));
        }
    }
}
