package org.apache.spark.api.python;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkException;
import org.apache.spark.internal.Logging;
import org.apache.spark.security.SocketAuthHelper;
import org.apache.spark.util.RedirectThread;
import org.apache.spark.util.RedirectThread$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Queue;
import scala.collection.mutable.StringBuilder;
import scala.collection.mutable.WeakHashMap;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;

/* compiled from: PythonWorkerFactory.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ug!B\u0001\u0003\u0001\u0019a!a\u0005)zi\"|gnV8sW\u0016\u0014h)Y2u_JL(BA\u0002\u0005\u0003\u0019\u0001\u0018\u0010\u001e5p]*\u0011QAB\u0001\u0004CBL'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\u0007\u0001i1\u0003\u0005\u0002\u000f#5\tqBC\u0001\u0011\u0003\u0015\u00198-\u00197b\u0013\t\u0011rB\u0001\u0004B]f\u0014VM\u001a\t\u0003)]i\u0011!\u0006\u0006\u0003-\u0019\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u00031U\u0011q\u0001T8hO&tw\r\u0003\u0005\u001b\u0001\t\u0005\t\u0015!\u0003\u001d\u0003)\u0001\u0018\u0010\u001e5p]\u0016CXmY\u0002\u0001!\ti\u0002E\u0004\u0002\u000f=%\u0011qdD\u0001\u0007!J,G-\u001a4\n\u0005\u0005\u0012#AB*ue&twM\u0003\u0002 \u001f!AA\u0005\u0001B\u0001B\u0003%Q%A\u0004f]Z4\u0016M]:\u0011\tu1C\u0004H\u0005\u0003O\t\u00121!T1q\u0011\u0015I\u0003\u0001\"\u0001+\u0003\u0019a\u0014N\\5u}Q\u00191&\f\u0018\u0011\u00051\u0002Q\"\u0001\u0002\t\u000biA\u0003\u0019\u0001\u000f\t\u000b\u0011B\u0003\u0019A\u0013\t\u000fA\u0002!\u0019!C\u0001c\u0005IQo]3EC\u0016lwN\\\u000b\u0002eA\u0011abM\u0005\u0003i=\u0011qAQ8pY\u0016\fg\u000e\u0003\u00047\u0001\u0001\u0006IAM\u0001\u000bkN,G)Y3n_:\u0004\u0003b\u0002\u001d\u0001\u0005\u0004%\t!O\u0001\rI\u0006,Wn\u001c8N_\u0012,H.Z\u000b\u00029!11\b\u0001Q\u0001\nq\tQ\u0002Z1f[>tWj\u001c3vY\u0016\u0004\u0003bB\u001f\u0001\u0005\u0004%\t!O\u0001\ro>\u00148.\u001a:N_\u0012,H.\u001a\u0005\u0007\u007f\u0001\u0001\u000b\u0011\u0002\u000f\u0002\u001b]|'o[3s\u001b>$W\u000f\\3!\u0011\u001d\t\u0005A1A\u0005\n\t\u000b!\"Y;uQ\"+G\u000e]3s+\u0005\u0019\u0005C\u0001#H\u001b\u0005)%B\u0001$\u0007\u0003!\u0019XmY;sSRL\u0018B\u0001%F\u0005A\u0019vnY6fi\u0006+H\u000f\u001b%fYB,'\u000f\u0003\u0004K\u0001\u0001\u0006IaQ\u0001\fCV$\b\u000eS3ma\u0016\u0014\b\u0005C\u0004M\u0001\u0001\u0007I\u0011A'\u0002\r\u0011\fW-\\8o+\u0005q\u0005CA(U\u001b\u0005\u0001&BA)S\u0003\u0011a\u0017M\\4\u000b\u0003M\u000bAA[1wC&\u0011Q\u000b\u0015\u0002\b!J|7-Z:t\u0011\u001d9\u0006\u00011A\u0005\u0002a\u000b!\u0002Z1f[>tw\fJ3r)\tIF\f\u0005\u0002\u000f5&\u00111l\u0004\u0002\u0005+:LG\u000fC\u0004^-\u0006\u0005\t\u0019\u0001(\u0002\u0007a$\u0013\u0007\u0003\u0004`\u0001\u0001\u0006KAT\u0001\bI\u0006,Wn\u001c8!\u0011\u001d\t\u0007A1A\u0005\u0002\t\f!\u0002Z1f[>t\u0007j\\:u+\u0005\u0019\u0007C\u00013h\u001b\u0005)'B\u00014S\u0003\rqW\r^\u0005\u0003Q\u0016\u00141\"\u00138fi\u0006#GM]3tg\"1!\u000e\u0001Q\u0001\n\r\f1\u0002Z1f[>t\u0007j\\:uA!9A\u000e\u0001a\u0001\n\u0003i\u0017A\u00033bK6|g\u000eU8siV\ta\u000e\u0005\u0002\u000f_&\u0011\u0001o\u0004\u0002\u0004\u0013:$\bb\u0002:\u0001\u0001\u0004%\ta]\u0001\u000fI\u0006,Wn\u001c8Q_J$x\fJ3r)\tIF\u000fC\u0004^c\u0006\u0005\t\u0019\u00018\t\rY\u0004\u0001\u0015)\u0003o\u0003-!\u0017-Z7p]B{'\u000f\u001e\u0011\t\u000fa\u0004!\u0019!C\u0001s\u0006iA-Y3n_:<vN]6feN,\u0012A\u001f\t\u0007w\u0006\u0005\u0011Q\u00018\u000e\u0003qT!! @\u0002\u000f5,H/\u00192mK*\u0011qpD\u0001\u000bG>dG.Z2uS>t\u0017bAA\u0002y\nYq+Z1l\u0011\u0006\u001c\b.T1q!\r!\u0017qA\u0005\u0004\u0003\u0013)'AB*pG.,G\u000fC\u0004\u0002\u000e\u0001\u0001\u000b\u0011\u0002>\u0002\u001d\u0011\fW-\\8o/>\u00148.\u001a:tA!I\u0011\u0011\u0003\u0001C\u0002\u0013\u0005\u00111C\u0001\fS\u0012dWmV8sW\u0016\u00148/\u0006\u0002\u0002\u0016A)10a\u0006\u0002\u0006%\u0019\u0011\u0011\u0004?\u0003\u000bE+X-^3\t\u0011\u0005u\u0001\u0001)A\u0005\u0003+\tA\"\u001b3mK^{'o[3sg\u0002B\u0011\"!\t\u0001\u0001\u0004%\t!a\t\u0002\u00191\f7\u000f^!di&4\u0018\u000e^=\u0016\u0005\u0005\u0015\u0002c\u0001\b\u0002(%\u0019\u0011\u0011F\b\u0003\t1{gn\u001a\u0005\n\u0003[\u0001\u0001\u0019!C\u0001\u0003_\t\u0001\u0003\\1ti\u0006\u001bG/\u001b<jif|F%Z9\u0015\u0007e\u000b\t\u0004C\u0005^\u0003W\t\t\u00111\u0001\u0002&!A\u0011Q\u0007\u0001!B\u0013\t)#A\u0007mCN$\u0018i\u0019;jm&$\u0018\u0010\t\u0005\n\u0003s\u0001\u0001\u0019!C\u0001\u0003w\tQb]5na2,wk\u001c:lKJ\u001cXCAA\u001f!\u0019Y\u0018\u0011AA\u0003\u001d\"I\u0011\u0011\t\u0001A\u0002\u0013\u0005\u00111I\u0001\u0012g&l\u0007\u000f\\3X_J\\WM]:`I\u0015\fHcA-\u0002F!IQ,a\u0010\u0002\u0002\u0003\u0007\u0011Q\b\u0005\t\u0003\u0013\u0002\u0001\u0015)\u0003\u0002>\u0005q1/[7qY\u0016<vN]6feN\u0004\u0003\u0002CA'\u0001\t\u0007I\u0011A\u001d\u0002\u0015ALH\u000f[8o!\u0006$\b\u000eC\u0004\u0002R\u0001\u0001\u000b\u0011\u0002\u000f\u0002\u0017ALH\u000f[8o!\u0006$\b\u000e\t\u0005\b\u0003+\u0002A\u0011AA,\u0003\u0019\u0019'/Z1uKR\u0011\u0011Q\u0001\u0005\b\u00037\u0002A\u0011BA,\u0003M\u0019'/Z1uKRC'o\\;hQ\u0012\u000bW-\\8o\u0011\u001d\ty\u0006\u0001C\u0005\u0003/\n!c\u0019:fCR,7+[7qY\u0016<vN]6fe\"9\u00111\r\u0001\u0005\n\u0005\u0015\u0014aC:uCJ$H)Y3n_:$\u0012!\u0017\u0005\b\u0003S\u0002A\u0011BA6\u0003]\u0011X\rZ5sK\u000e$8\u000b\u001e:fC6\u001cHk\\*uI\u0016\u0014(\u000fF\u0003Z\u0003[\ni\b\u0003\u0005\u0002p\u0005\u001d\u0004\u0019AA9\u0003\u0019\u0019H\u000fZ8viB!\u00111OA=\u001b\t\t)HC\u0002\u0002xI\u000b!![8\n\t\u0005m\u0014Q\u000f\u0002\f\u0013:\u0004X\u000f^*ue\u0016\fW\u000e\u0003\u0005\u0002��\u0005\u001d\u0004\u0019AA9\u0003\u0019\u0019H\u000fZ3se\u001a1\u00111\u0011\u0001\u0005\u0003\u000b\u0013Q\"T8oSR|'\u000f\u00165sK\u0006$7\u0003BAA\u0003\u000f\u00032aTAE\u0013\r\tY\t\u0015\u0002\u0007)\"\u0014X-\u00193\t\u000f%\n\t\t\"\u0001\u0002\u0010R\u0011\u0011\u0011\u0013\t\u0005\u0003'\u000b\t)D\u0001\u0001\u0011!\t9*!!\u0005B\u0005\u0015\u0014a\u0001:v]\"9\u00111\u0014\u0001\u0005\n\u0005\u0015\u0014AE2mK\u0006tW\u000f]%eY\u0016<vN]6feNDq!a(\u0001\t\u0013\t)'\u0001\u0006ti>\u0004H)Y3n_:Dq!a)\u0001\t\u0003\t)'\u0001\u0003ti>\u0004\bbBAT\u0001\u0011\u0005\u0011\u0011V\u0001\u000bgR|\u0007oV8sW\u0016\u0014HcA-\u0002,\"A\u0011QVAS\u0001\u0004\t)!\u0001\u0004x_J\\WM\u001d\u0005\b\u0003c\u0003A\u0011AAZ\u00035\u0011X\r\\3bg\u0016<vN]6feR\u0019\u0011,!.\t\u0011\u00055\u0016q\u0016a\u0001\u0003\u000b9q!!/\u0003\u0011\u0013\tY,A\nQsRDwN\\,pe.,'OR1di>\u0014\u0018\u0010E\u0002-\u0003{3a!\u0001\u0002\t\n\u0005}6cAA_\u001b!9\u0011&!0\u0005\u0002\u0005\rGCAA^\u0011%\t9-!0C\u0002\u0013\u0005Q.A\fQ%>\u001bUiU*`/\u0006KEk\u0018+J\u001b\u0016{U\u000bV0N'\"A\u00111ZA_A\u0003%a.\u0001\rQ%>\u001bUiU*`/\u0006KEk\u0018+J\u001b\u0016{U\u000bV0N'\u0002B\u0011\"a4\u0002>\n\u0007I\u0011A7\u0002-%#E*R0X\u001fJ[UIU0U\u00136+u*\u0016+`\u001bNC\u0001\"a5\u0002>\u0002\u0006IA\\\u0001\u0018\u0013\u0012cUiX,P%.+%k\u0018+J\u001b\u0016{U\u000bV0N'\u0002\u0002")
/* loaded from: input_file:BOOT-INF/lib/spark-core_2.11-2.4.0.jar:org/apache/spark/api/python/PythonWorkerFactory.class */
public class PythonWorkerFactory implements Logging {
    public final String org$apache$spark$api$python$PythonWorkerFactory$$pythonExec;
    private final Map<String, String> envVars;
    private final boolean useDaemon;
    private final String daemonModule;
    private final String workerModule;
    private final SocketAuthHelper authHelper;
    private Process daemon;
    private final InetAddress daemonHost;
    private int daemonPort;
    private final WeakHashMap<Socket, Object> daemonWorkers;
    private final Queue<Socket> idleWorkers;
    private long lastActivity;
    private WeakHashMap<Socket, Process> simpleWorkers;
    private final String pythonPath;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    /* compiled from: PythonWorkerFactory.scala */
    /* loaded from: input_file:BOOT-INF/lib/spark-core_2.11-2.4.0.jar:org/apache/spark/api/python/PythonWorkerFactory$MonitorThread.class */
    public class MonitorThread extends Thread {
        public final /* synthetic */ PythonWorkerFactory $outer;

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v6, types: [int] */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                ?? r0 = this;
                synchronized (r0) {
                    r0 = ((org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().lastActivity() + PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_MS()) > System.currentTimeMillis() ? 1 : ((org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().lastActivity() + PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_MS()) == System.currentTimeMillis() ? 0 : -1));
                    if (r0 < 0) {
                        org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers();
                        org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer().lastActivity_$eq(System.currentTimeMillis());
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                }
                Thread.sleep(10000L);
            }
        }

        public /* synthetic */ PythonWorkerFactory org$apache$spark$api$python$PythonWorkerFactory$MonitorThread$$$outer() {
            return this.$outer;
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public MonitorThread(PythonWorkerFactory pythonWorkerFactory) {
            super(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Idle Worker Monitor for ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{pythonWorkerFactory.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec})));
            if (pythonWorkerFactory == null) {
                throw null;
            }
            this.$outer = pythonWorkerFactory;
            setDaemon(true);
        }
    }

    public static int IDLE_WORKER_TIMEOUT_MS() {
        return PythonWorkerFactory$.MODULE$.IDLE_WORKER_TIMEOUT_MS();
    }

    public static int PROCESS_WAIT_TIMEOUT_MS() {
        return PythonWorkerFactory$.MODULE$.PROCESS_WAIT_TIMEOUT_MS();
    }

    @Override // org.apache.spark.internal.Logging
    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    @Override // org.apache.spark.internal.Logging
    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    @Override // org.apache.spark.internal.Logging
    public String logName() {
        return Logging.Cclass.logName(this);
    }

    @Override // org.apache.spark.internal.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    @Override // org.apache.spark.internal.Logging
    public void initializeLogIfNecessary(boolean z) {
        Logging.Cclass.initializeLogIfNecessary(this, z);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.Cclass.initializeLogIfNecessary(this, z, z2);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary$default$2() {
        return Logging.Cclass.initializeLogIfNecessary$default$2(this);
    }

    public boolean useDaemon() {
        return this.useDaemon;
    }

    public String daemonModule() {
        return this.daemonModule;
    }

    public String workerModule() {
        return this.workerModule;
    }

    private SocketAuthHelper authHelper() {
        return this.authHelper;
    }

    public Process daemon() {
        return this.daemon;
    }

    public void daemon_$eq(Process process) {
        this.daemon = process;
    }

    public InetAddress daemonHost() {
        return this.daemonHost;
    }

    public int daemonPort() {
        return this.daemonPort;
    }

    public void daemonPort_$eq(int i) {
        this.daemonPort = i;
    }

    public WeakHashMap<Socket, Object> daemonWorkers() {
        return this.daemonWorkers;
    }

    public Queue<Socket> idleWorkers() {
        return this.idleWorkers;
    }

    public long lastActivity() {
        return this.lastActivity;
    }

    public void lastActivity_$eq(long j) {
        this.lastActivity = j;
    }

    public WeakHashMap<Socket, Process> simpleWorkers() {
        return this.simpleWorkers;
    }

    public void simpleWorkers_$eq(WeakHashMap<Socket, Process> weakHashMap) {
        this.simpleWorkers = weakHashMap;
    }

    public String pythonPath() {
        return this.pythonPath;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Socket create() {
        if (!useDaemon()) {
            return createSimpleWorker();
        }
        synchronized (this) {
            if (idleWorkers().size() > 0) {
                return idleWorkers().dequeue();
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return createThroughDaemon();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Socket createThroughDaemon() {
        ?? r0 = this;
        synchronized (r0) {
            startDaemon();
            Socket liftedTree1$1 = liftedTree1$1();
            r0 = r0;
            return liftedTree1$1;
        }
    }

    private Socket createSimpleWorker() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte())));
            ProcessBuilder processBuilder = new ProcessBuilder((List<String>) Arrays.asList(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec, "-m", workerModule()));
            java.util.Map<String, String> environment = processBuilder.environment();
            environment.putAll((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(this.envVars).asJava());
            environment.put("PYTHONPATH", pythonPath());
            environment.put("PYTHONUNBUFFERED", "YES");
            environment.put("PYTHON_WORKER_FACTORY_PORT", BoxesRunTime.boxToInteger(serverSocket.getLocalPort()).toString());
            environment.put("PYTHON_WORKER_FACTORY_SECRET", authHelper().secret());
            Process start = processBuilder.start();
            redirectStreamsToStderr(start.getInputStream(), start.getErrorStream());
            serverSocket.setSoTimeout(10000);
            try {
                Socket accept = serverSocket.accept();
                authHelper().authClient(accept);
                simpleWorkers().put(accept, start);
                if (serverSocket != null) {
                    serverSocket.close();
                }
                return accept;
            } catch (Exception e) {
                throw new SparkException("Python worker failed to connect back.", e);
            }
        } catch (Throwable th) {
            if (serverSocket != null) {
                serverSocket.close();
            }
            throw th;
        }
    }

    private synchronized void startDaemon() {
        if (daemon() == null) {
            try {
                List asList = Arrays.asList(this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec, "-m", daemonModule());
                ProcessBuilder processBuilder = new ProcessBuilder((List<String>) asList);
                java.util.Map<String, String> environment = processBuilder.environment();
                environment.putAll((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(this.envVars).asJava());
                environment.put("PYTHONPATH", pythonPath());
                environment.put("PYTHON_WORKER_FACTORY_SECRET", authHelper().secret());
                environment.put("PYTHONUNBUFFERED", "YES");
                daemon_$eq(processBuilder.start());
                DataInputStream dataInputStream = new DataInputStream(daemon().getInputStream());
                try {
                    daemonPort_$eq(dataInputStream.readInt());
                    if (daemonPort() >= 1 && daemonPort() <= 65535) {
                        redirectStreamsToStderr(dataInputStream, daemon().getErrorStream());
                        return;
                    }
                    String daemonModule = daemonModule();
                    Integer boxToInteger = BoxesRunTime.boxToInteger(daemonPort());
                    int daemonPort = daemonPort();
                    throw new SparkException(new StringOps(Predef$.MODULE$.augmentString(new StringOps("\n            |Bad data in %s's standard output. Invalid port number:\n            |  %s (0x%08x)\n            |Python command to execute the daemon was:\n            |  %s\n            |Check that you don't have any unexpected modules or libraries in\n            |your PYTHONPATH:\n            |  %s\n            |Also, check if you have a sitecustomize.py module in your python path,\n            |or in your python installation, that is printing to standard output").format(Predef$.MODULE$.genericWrapArray(new Object[]{daemonModule, boxToInteger, BoxesRunTime.boxToInteger(daemonPort), ((TraversableOnce) JavaConverters$.MODULE$.asScalaBufferConverter(asList).asScala()).mkString(" "), pythonPath()})))).stripMargin());
                } catch (EOFException unused) {
                    throw new SparkException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"No port number in ", "'s stdout"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{daemonModule()})));
                }
            } catch (Exception e) {
                String str = (String) Option$.MODULE$.apply(daemon()).flatMap(new PythonWorkerFactory$$anonfun$7(this)).getOrElse(new PythonWorkerFactory$$anonfun$8(this));
                stopDaemon();
                if (str != null ? str.equals("") : "" == 0) {
                    throw e;
                }
                SparkException sparkException = new SparkException(new StringOps(Predef$.MODULE$.augmentString(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"\n              |Error from python worker:\n              |  ", "\n              |PYTHONPATH was:\n              |  ", "\n              |", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str.replace("\n", "\n  "), pythonPath(), e})))).stripMargin());
                sparkException.setStackTrace(e.getStackTrace());
                throw sparkException;
            }
        }
    }

    private void redirectStreamsToStderr(InputStream inputStream, InputStream inputStream2) {
        try {
            new RedirectThread(inputStream, System.err, new StringBuilder().append((Object) "stdout reader for ").append((Object) this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString(), RedirectThread$.MODULE$.$lessinit$greater$default$4()).start();
            new RedirectThread(inputStream2, System.err, new StringBuilder().append((Object) "stderr reader for ").append((Object) this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec).toString(), RedirectThread$.MODULE$.$lessinit$greater$default$4()).start();
        } catch (Exception e) {
            logError(new PythonWorkerFactory$$anonfun$redirectStreamsToStderr$1(this), e);
        }
    }

    public void org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers() {
        while (idleWorkers().nonEmpty()) {
            try {
                idleWorkers().dequeue().close();
            } catch (Exception e) {
                logWarning(new PythonWorkerFactory$$anonfun$org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers$1(this), e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private void stopDaemon() {
        ?? r0 = this;
        synchronized (r0) {
            if (useDaemon()) {
                org$apache$spark$api$python$PythonWorkerFactory$$cleanupIdleWorkers();
                if (daemon() != null) {
                    daemon().destroy();
                }
                daemon_$eq(null);
                daemonPort_$eq(0);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                simpleWorkers().mapValues((Function1) new PythonWorkerFactory$$anonfun$stopDaemon$1(this));
            }
            r0 = r0;
        }
    }

    public void stop() {
        stopDaemon();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8 */
    public void stopWorker(Socket socket) {
        ?? r0 = this;
        synchronized (r0) {
            if (!useDaemon()) {
                simpleWorkers().get(socket).foreach(new PythonWorkerFactory$$anonfun$stopWorker$2(this));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else if (daemon() == null) {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else {
                daemonWorkers().get(socket).foreach(new PythonWorkerFactory$$anonfun$stopWorker$1(this));
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            }
            r0 = r0;
            socket.close();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10 */
    /* JADX WARN: Type inference failed for: r0v4 */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Throwable] */
    public void releaseWorker(Socket socket) {
        if (!useDaemon()) {
            try {
                socket.close();
                return;
            } catch (Exception e) {
                logWarning(new PythonWorkerFactory$$anonfun$releaseWorker$1(this), e);
                return;
            }
        }
        ?? r0 = this;
        synchronized (r0) {
            lastActivity_$eq(System.currentTimeMillis());
            idleWorkers().enqueue(Predef$.MODULE$.wrapRefArray(new Socket[]{socket}));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
        }
    }

    private final Socket createSocket$1() {
        Socket socket = new Socket(daemonHost(), daemonPort());
        int readInt = new DataInputStream(socket.getInputStream()).readInt();
        if (readInt < 0) {
            throw new IllegalStateException(new StringBuilder().append((Object) "Python daemon failed to launch worker with code ").append(BoxesRunTime.boxToInteger(readInt)).toString());
        }
        authHelper().authToServer(socket);
        daemonWorkers().put(socket, BoxesRunTime.boxToInteger(readInt));
        return socket;
    }

    private final Socket liftedTree1$1() {
        try {
            return createSocket$1();
        } catch (SocketException e) {
            logWarning(new PythonWorkerFactory$$anonfun$liftedTree1$1$1(this), e);
            logWarning(new PythonWorkerFactory$$anonfun$liftedTree1$1$2(this));
            stopDaemon();
            startDaemon();
            return createSocket$1();
        }
    }

    public PythonWorkerFactory(String str, Map<String, String> map) {
        this.org$apache$spark$api$python$PythonWorkerFactory$$pythonExec = str;
        this.envVars = map;
        Logging.Cclass.$init$(this);
        this.useDaemon = !System.getProperty("os.name").startsWith("Windows") && SparkEnv$.MODULE$.get().conf().getBoolean("spark.python.use.daemon", true);
        this.daemonModule = (String) SparkEnv$.MODULE$.get().conf().getOption("spark.python.daemon.module").map(new PythonWorkerFactory$$anonfun$1(this)).getOrElse(new PythonWorkerFactory$$anonfun$2(this));
        this.workerModule = (String) SparkEnv$.MODULE$.get().conf().getOption("spark.python.worker.module").map(new PythonWorkerFactory$$anonfun$3(this)).getOrElse(new PythonWorkerFactory$$anonfun$4(this));
        this.authHelper = new SocketAuthHelper(SparkEnv$.MODULE$.get().conf());
        this.daemon = null;
        this.daemonHost = InetAddress.getByAddress((byte[]) Array$.MODULE$.apply(Predef$.MODULE$.wrapByteArray(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), ClassTag$.MODULE$.Byte()));
        this.daemonPort = 0;
        this.daemonWorkers = new WeakHashMap<>();
        this.idleWorkers = new Queue<>();
        this.lastActivity = 0L;
        new MonitorThread(this).start();
        this.simpleWorkers = new WeakHashMap<>();
        this.pythonPath = PythonUtils$.MODULE$.mergePythonPaths(Predef$.MODULE$.wrapRefArray(new String[]{PythonUtils$.MODULE$.sparkPythonPath(), (String) map.getOrElse("PYTHONPATH", new PythonWorkerFactory$$anonfun$5(this)), (String) package$.MODULE$.env().getOrElse("PYTHONPATH", new PythonWorkerFactory$$anonfun$6(this))}));
    }
}
