package com.github.kfcfans.powerjob.worker;

import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor;
import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor;
import com.github.kfcfans.powerjob.worker.actors.WorkerActor;
import com.github.kfcfans.powerjob.worker.background.OmsLogHandler;
import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService;
import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.OmsBannerPrinter;
import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/github/kfcfans/powerjob/worker/OhMyWorker.class */
public class OhMyWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(OhMyWorker.class);
    private static OhMyConfig config;
    private static String currentServer;
    private static String workerAddress;
    public static ActorSystem actorSystem;
    private static Long appId;
    private static ScheduledExecutorService timingPool;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringUtils.inject(applicationContext);
    }

    public void afterPropertiesSet() throws Exception {
        init();
    }

    public void init() throws Exception {
        Stopwatch createStarted = Stopwatch.createStarted();
        log.info("[OhMyWorker] start to initialize OhMyWorker...");
        try {
            OmsBannerPrinter.print();
            if (config.isEnableTestMode()) {
                log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env.");
            } else {
                appId = assertAppName();
            }
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost());
            newHashMap.put("akka.remote.artery.canonical.port", Integer.valueOf(config.getPort()));
            workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
            Config withFallback = ConfigFactory.parseMap(newHashMap).withFallback(ConfigFactory.load("oms-worker.akka.conf"));
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            actorSystem = ActorSystem.create("oms", withFallback);
            actorSystem.actorOf(Props.create(TaskTrackerActor.class, new Object[0]).withDispatcher("akka.task-tracker-dispatcher").withRouter(new RoundRobinPool(availableProcessors * 2)), "task_tracker");
            actorSystem.actorOf(Props.create(ProcessorTrackerActor.class, new Object[0]).withDispatcher("akka.processor-tracker-dispatcher").withRouter(new RoundRobinPool(availableProcessors)), "processor_tracker");
            actorSystem.actorOf(Props.create(WorkerActor.class, new Object[0]).withDispatcher("akka.worker-common-dispatcher").withRouter(new RoundRobinPool(availableProcessors)), "worker");
            actorSystem.eventStream().subscribe(actorSystem.actorOf(Props.create(TroubleshootingActor.class, new Object[0]), "troubleshooting"), DeadLetter.class);
            log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
            log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
            TaskPersistenceService.INSTANCE.init();
            log.info("[OhMyWorker] local storage initialized successfully.");
            currentServer = ServerDiscoveryService.discovery();
            if (StringUtils.isEmpty(currentServer) && !config.isEnableTestMode()) {
                throw new RuntimeException("can't find any available server, this worker has been quarantined.");
            }
            log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer);
            timingPool = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build());
            timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0L, 15L, TimeUnit.SECONDS);
            timingPool.scheduleAtFixedRate(() -> {
                currentServer = ServerDiscoveryService.discovery();
            }, 10L, 10L, TimeUnit.SECONDS);
            timingPool.scheduleWithFixedDelay(OmsLogHandler.INSTANCE.logSubmitter, 0L, 5L, TimeUnit.SECONDS);
            log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", createStarted);
        } catch (Exception e) {
            log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", createStarted, e);
            throw e;
        }
    }

    public void setConfig(OhMyConfig ohMyConfig) {
        config = ohMyConfig;
    }

    private Long assertAppName() {
        String appName = config.getAppName();
        Objects.requireNonNull(appName, "appName can't be empty!");
        Iterator<String> it = config.getServerAddress().iterator();
        while (it.hasNext()) {
            String format = String.format("http://%s/server/assert?appName=%s", it.next(), appName);
            try {
                ResultDTO resultDTO = (ResultDTO) JsonUtils.parseObject((String) CommonUtils.executeWithRetry0(() -> {
                    return HttpUtils.get(format);
                }), ResultDTO.class);
                if (!resultDTO.isSuccess()) {
                    log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
                    throw new PowerJobException(resultDTO.getMessage());
                }
                Long valueOf = Long.valueOf(resultDTO.getData().toString());
                log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, valueOf);
                return valueOf;
            } catch (PowerJobException e) {
                throw e;
            } catch (Exception e2) {
                log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", format);
            }
        }
        log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
        throw new PowerJobException("no server available!");
    }

    public void destroy() throws Exception {
        timingPool.shutdownNow();
    }

    public static OhMyConfig getConfig() {
        return config;
    }

    public static String getCurrentServer() {
        return currentServer;
    }

    public static String getWorkerAddress() {
        return workerAddress;
    }

    public static Long getAppId() {
        return appId;
    }
}
