package com.nb6868.onex.common.sse;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONObject;
import com.nb6868.onex.common.exception.OnexException;
import jakarta.validation.constraints.NotEmpty;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
/* loaded from: input_file:com/nb6868/onex/common/sse/SseEmitterService.class */
public class SseEmitterService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SseEmitterService.class);
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap();

    public SseEmitter createSseConnect(@NotEmpty String str) {
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(completionCallBack(str));
        sseEmitter.onError(errorCallBack(str));
        sseEmitter.onTimeout(timeoutCallBack(str));
        sseCache.put(str, sseEmitter);
        log.info("创建新的sse连接，当前id：{}", str);
        try {
            sseEmitter.send(SseEmitter.event().id("SID").data(str));
            return sseEmitter;
        } catch (IOException e) {
            log.error("createSseConnect: 创建长链接异常，客户端ID:{}", str, e);
            throw new OnexException("创建连接异常！", e);
        }
    }

    public void closeSseConnect(String str) {
        SseEmitter sseEmitter = sseCache.get(str);
        if (sseEmitter != null) {
            sseEmitter.complete();
            removeUser(str);
        }
    }

    public SseEmitter getSseEmitterBySid(String str) {
        return sseCache.get(str);
    }

    public void sendOneMessage(String str, JSONObject jSONObject) {
        if (CollectionUtil.isEmpty(sseCache)) {
            return;
        }
        sendMsgToClientBySid(str, jSONObject, getSseEmitterBySid(str));
    }

    public void sendMsgToClient(JSONObject jSONObject) {
        if (CollectionUtil.isEmpty(sseCache)) {
            return;
        }
        for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
            sendMsgToClientBySid(entry.getKey(), jSONObject, entry.getValue());
        }
    }

    public void sendMsgToClientBySid(String str, JSONObject jSONObject, SseEmitter sseEmitter) {
        SseEmitter sseEmitter2;
        if (sseEmitter == null) {
            log.error("sendMsgToClientBySid 推送消息失败：客户端{}未创建长链接,失败消息:{}", str, jSONObject.toString());
            return;
        }
        SseEmitter.SseEventBuilder data = SseEmitter.event().id("TASK_RESULT").data(jSONObject, MediaType.APPLICATION_JSON);
        try {
            sseEmitter.send(data);
        } catch (IOException e) {
            log.error("sendMsgToClient: 推送消息失败：{},尝试进行重推", jSONObject, e);
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(10000L);
                    sseEmitter2 = sseCache.get(str);
                } catch (Exception e2) {
                    log.error("sendMsgToClient：{}的第{}次消息重推失败", new Object[]{str, Integer.valueOf(i + 1), e2});
                }
                if (sseEmitter2 != null) {
                    sseEmitter2.send(data);
                    log.info("sendMsgToClient：{}的第{}次消息重推成功,{}", new Object[]{str, Integer.valueOf(i + 1), jSONObject});
                    return;
                }
                log.error("sendMsgToClient：{}的第{}次消息重推失败，未创建长链接", str, Integer.valueOf(i + 1));
            }
        }
    }

    private Runnable completionCallBack(String str) {
        return () -> {
            log.info("结束连接：{}", str);
            removeUser(str);
        };
    }

    private Runnable timeoutCallBack(String str) {
        return () -> {
            log.info("连接超时：{}", str);
            removeUser(str);
        };
    }

    private Consumer<Throwable> errorCallBack(String str) {
        return th -> {
            log.error("errorCallBack：连接异常,客户端ID:{}", str);
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(10000L);
                    SseEmitter sseEmitter = sseCache.get(str);
                    if (sseEmitter == null) {
                        log.error("errorCallBack：第{}次消息重推失败,未获取到 {} 对应的长链接", Integer.valueOf(i + 1), str);
                    } else {
                        sseEmitter.send("失败后重新推送");
                    }
                } catch (Exception e) {
                    log.error("errorCallBack: 错误回调，客户端ID:{}", str, e);
                }
            }
        };
    }

    private void removeUser(String str) {
        sseCache.remove(str);
        log.info("removeUser:移除用户：{}", str);
    }
}
