package com.facebook.presto.spark.execution.nativeprocess;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.client.ServerInfo;
import com.facebook.presto.server.RequestErrorTracker;
import com.facebook.presto.server.smile.BaseResponse;
import com.facebook.presto.spark.classloader_interface.PrestoSparkFatalException;
import com.facebook.presto.spark.execution.http.PrestoSparkHttpServerClient;
import com.facebook.presto.spark.execution.property.WorkerProperty;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.lang.ProcessBuilder;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkFiles;

/* loaded from: input_file:com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcess.class */
public class NativeExecutionProcess implements AutoCloseable {
    private static final Logger log = Logger.get(NativeExecutionProcess.class);
    private static final String NATIVE_EXECUTION_TASK_ERROR_MESSAGE = "Native process launch failed with multiple retries.";
    private static final String WORKER_CONFIG_FILE = "/config.properties";
    private static final String WORKER_NODE_CONFIG_FILE = "/node.properties";
    private static final String WORKER_VELOX_CONFIG_FILE = "/velox.properties";
    private static final String WORKER_CONNECTOR_CONFIG_FILE = "/catalog/";
    private static final int SIGSYS = 31;
    private final String executablePath;
    private final String programArguments;
    private final Session session;
    private final PrestoSparkHttpServerClient serverClient;
    private final URI location;
    private final int port;
    private final Executor executor;
    private final RequestErrorTracker errorTracker;
    private final HttpClient httpClient;
    private final WorkerProperty<?, ?, ?, ?> workerProperty;
    private volatile Process process;
    private volatile ProcessOutputPipe processOutputPipe;

    /* loaded from: input_file:com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcess$ProcessOutputPipe.class */
    private static class ProcessOutputPipe implements Runnable {
        private final long pid;
        private final InputStream inputStream;
        private final OutputStream outputStream;
        private final StringBuilder abortMessage = new StringBuilder();
        private final AtomicBoolean started = new AtomicBoolean();

        public ProcessOutputPipe(long j, InputStream inputStream, OutputStream outputStream) {
            this.pid = j;
            this.inputStream = (InputStream) Objects.requireNonNull(inputStream, "inputStream is null");
            this.outputStream = (OutputStream) Objects.requireNonNull(outputStream, "outputStream is null");
        }

        public void start() {
            if (this.started.compareAndSet(false, true)) {
                Thread thread = new Thread(this, String.format("NativeExecutionProcess#ProcessOutputPipe[%s]", Long.valueOf(this.pid)));
                thread.setDaemon(true);
                thread.start();
            }
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.inputStream, StandardCharsets.UTF_8));
                Throwable th = null;
                try {
                    BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(this.outputStream, StandardCharsets.UTF_8));
                    Throwable th2 = null;
                    boolean z = false;
                    while (true) {
                        try {
                            try {
                                String readLine = bufferedReader.readLine();
                                if (readLine == null) {
                                    break;
                                }
                                if (!z && readLine.startsWith("*** Aborted")) {
                                    z = true;
                                }
                                if (z) {
                                    synchronized (this.abortMessage) {
                                        this.abortMessage.append(readLine).append("\n");
                                    }
                                }
                                bufferedWriter.write(readLine);
                                bufferedWriter.newLine();
                                bufferedWriter.flush();
                            } catch (Throwable th3) {
                                th2 = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (bufferedWriter != null) {
                                if (th2 != null) {
                                    try {
                                        bufferedWriter.close();
                                    } catch (Throwable th5) {
                                        th2.addSuppressed(th5);
                                    }
                                } else {
                                    bufferedWriter.close();
                                }
                            }
                            throw th4;
                        }
                    }
                    if (bufferedWriter != null) {
                        if (0 != 0) {
                            try {
                                bufferedWriter.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            bufferedWriter.close();
                        }
                    }
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                } catch (Throwable th8) {
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th9) {
                                th.addSuppressed(th9);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th8;
                }
            } catch (IOException e) {
                NativeExecutionProcess.log.warn(e, "failure occurred when copying streams");
            }
        }

        public String getAbortMessage() {
            String sb;
            synchronized (this.abortMessage) {
                sb = this.abortMessage.toString();
            }
            return sb;
        }
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig] */
    public NativeExecutionProcess(String str, String str2, Session session, HttpClient httpClient, Executor executor, ScheduledExecutorService scheduledExecutorService, JsonCodec<ServerInfo> jsonCodec, Duration duration, WorkerProperty<?, ?, ?, ?> workerProperty) throws IOException {
        this.executablePath = (String) Objects.requireNonNull(str, "executablePath is null");
        this.programArguments = (String) Objects.requireNonNull(str2, "programArguments is null");
        String nodeInternalAddress = workerProperty.getNodeConfig().getNodeInternalAddress();
        this.port = getAvailableTcpPort(nodeInternalAddress);
        this.session = (Session) Objects.requireNonNull(session, "session is null");
        this.location = HttpUriBuilder.uriBuilder().scheme("http").host(nodeInternalAddress).port(getPort()).build();
        this.httpClient = (HttpClient) Objects.requireNonNull(httpClient, "httpClient is null");
        this.serverClient = new PrestoSparkHttpServerClient(this.httpClient, this.location, jsonCodec);
        this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
        this.errorTracker = new RequestErrorTracker("NativeExecution", this.location, StandardErrorCode.NATIVE_EXECUTION_TASK_ERROR, NATIVE_EXECUTION_TASK_ERROR_MESSAGE, duration, scheduledExecutorService, "getting native process status");
        this.workerProperty = (WorkerProperty) Objects.requireNonNull(workerProperty, "workerProperty is null");
    }

    public synchronized void start() throws ExecutionException, InterruptedException, IOException {
        if (this.process != null) {
            return;
        }
        ProcessBuilder processBuilder = new ProcessBuilder(getLaunchCommand());
        processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        processBuilder.environment().put("INIT_PRESTO_QUERY_ID", this.session.getQueryId().toString());
        try {
            this.process = processBuilder.start();
            this.processOutputPipe = new ProcessOutputPipe(getPid(this.process), this.process.getErrorStream(), new FileOutputStream(FileDescriptor.err));
            this.processOutputPipe.start();
            try {
                getServerInfoWithRetry().get();
            } catch (Throwable th) {
                close();
                throw new PrestoSparkFatalException(th.getMessage(), th.getCause());
            }
        } catch (IOException e) {
            log.error(String.format("Cannot start %s, error message: %s", processBuilder.command(), e.getMessage()));
            throw new PrestoException(StandardErrorCode.NATIVE_EXECUTION_PROCESS_LAUNCH_ERROR, String.format("Cannot start %s", processBuilder.command()), e);
        }
    }

    @VisibleForTesting
    public SettableFuture<ServerInfo> getServerInfoWithRetry() {
        SettableFuture<ServerInfo> create = SettableFuture.create();
        doGetServerInfo(create);
        return create;
    }

    public void terminateWithCore(Duration duration) {
        Process sendSignal = sendSignal(SIGSYS);
        if (sendSignal == null) {
            return;
        }
        try {
            long pid = getPid(sendSignal);
            log.info("Waiting %s for process %s to terminate", new Object[]{duration, Long.valueOf(pid)});
            if (sendSignal.waitFor(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                log.info("Process %s successfully terminated with status code %s", new Object[]{Long.valueOf(pid), Integer.valueOf(sendSignal.exitValue())});
            } else {
                log.warn("Process %s did not terminate within %s", new Object[]{Long.valueOf(pid), duration});
                sendSignal.destroyForcibly();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private Process sendSignal(int i) {
        Process process = this.process;
        if (process == null) {
            log.warn("Failure sending signal, process does not exist");
            return null;
        }
        long pid = getPid(process);
        if (!process.isAlive()) {
            log.warn("Failure sending signal, process is dead: %s", new Object[]{Long.valueOf(pid)});
            return null;
        }
        try {
            log.info("Sending signal to process %s: %s", new Object[]{Long.valueOf(pid), Integer.valueOf(i)});
            Runtime.getRuntime().exec(String.format("kill -%s %s", Integer.valueOf(i), Long.valueOf(pid)));
            return process;
        } catch (IOException e) {
            log.warn(e, "Failure sending signal to process %s", new Object[]{Long.valueOf(pid)});
            return null;
        }
    }

    private static long getPid(Process process) {
        try {
            if (!process.getClass().getName().equals("java.lang.UNIXProcess")) {
                return -1L;
            }
            Field declaredField = process.getClass().getDeclaredField("pid");
            declaredField.setAccessible(true);
            long j = declaredField.getLong(process);
            declaredField.setAccessible(false);
            return j;
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new AssertionError(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Process process = this.process;
        if (process == null) {
            return;
        }
        if (!process.isAlive()) {
            log.info("Process is dead: %s", new Object[]{Long.valueOf(getPid(process))});
            return;
        }
        long pid = getPid(process);
        log.info("Destroying process: %s", new Object[]{Long.valueOf(pid)});
        process.destroy();
        try {
            try {
                process.waitFor(1L, TimeUnit.SECONDS);
                if (process.isAlive()) {
                    log.warn("Graceful shutdown of native execution process failed. Force killing it: %s", new Object[]{Long.valueOf(pid)});
                    process.destroyForcibly();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (process.isAlive()) {
                    log.warn("Graceful shutdown of native execution process failed. Force killing it: %s", new Object[]{Long.valueOf(pid)});
                    process.destroyForcibly();
                }
            }
        } catch (Throwable th) {
            if (process.isAlive()) {
                log.warn("Graceful shutdown of native execution process failed. Force killing it: %s", new Object[]{Long.valueOf(pid)});
                process.destroyForcibly();
            }
            throw th;
        }
    }

    public boolean isAlive() {
        return this.process != null && this.process.isAlive();
    }

    public String getCrashReport() {
        ProcessOutputPipe processOutputPipe = this.processOutputPipe;
        return processOutputPipe == null ? "" : processOutputPipe.getAbortMessage();
    }

    public int getPort() {
        return this.port;
    }

    public URI getLocation() {
        return this.location;
    }

    private static int getAvailableTcpPort(String str) {
        try {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(str, 0));
            int localPort = serverSocket.getLocalPort();
            serverSocket.close();
            return localPort;
        } catch (Exception e) {
            throw new PrestoSparkFatalException("Failed to acquire port on host", e);
        }
    }

    private String getNativeExecutionCatalogName(Session session) {
        Preconditions.checkArgument(session.getCatalog().isPresent(), "Catalog isn't set in the session.");
        return (String) session.getCatalog().get();
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig] */
    private void populateConfigurationFiles(String str) throws IOException {
        this.workerProperty.getSystemConfig().setHttpServerPort(this.port);
        this.workerProperty.populateAllProperties(Paths.get(str, WORKER_VELOX_CONFIG_FILE), Paths.get(str, WORKER_CONFIG_FILE), Paths.get(str, WORKER_NODE_CONFIG_FILE), Paths.get(str, String.format("%s%s.properties", WORKER_CONNECTOR_CONFIG_FILE, getNativeExecutionCatalogName(this.session))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doGetServerInfo(final SettableFuture<ServerInfo> settableFuture) {
        Futures.addCallback(this.serverClient.getServerInfo(), new FutureCallback<BaseResponse<ServerInfo>>() { // from class: com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcess.1
            public void onSuccess(@Nullable BaseResponse<ServerInfo> baseResponse) {
                if (baseResponse.getStatusCode() != HttpStatus.OK.code()) {
                    throw new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Request failed with HTTP status " + baseResponse.getStatusCode());
                }
                settableFuture.set(baseResponse.getValue());
            }

            public void onFailure(Throwable th) {
                if ((th instanceof RejectedExecutionException) && NativeExecutionProcess.this.httpClient.isClosed()) {
                    NativeExecutionProcess.log.error(String.format("Unable to start the native process. HTTP client is closed. Reason: %s", th.getMessage()));
                    settableFuture.setException(th);
                    return;
                }
                try {
                    NativeExecutionProcess.this.errorTracker.requestFailed(th);
                    ListenableFuture acquireRequestPermit = NativeExecutionProcess.this.errorTracker.acquireRequestPermit();
                    if (acquireRequestPermit.isDone()) {
                        NativeExecutionProcess.this.doGetServerInfo(settableFuture);
                    } else {
                        SettableFuture settableFuture2 = settableFuture;
                        acquireRequestPermit.addListener(() -> {
                            NativeExecutionProcess.this.doGetServerInfo(settableFuture2);
                        }, NativeExecutionProcess.this.executor);
                    }
                } catch (PrestoException e) {
                    settableFuture.setException(e);
                }
            }
        }, MoreExecutors.directExecutor());
    }

    private String getProcessWorkingPath(String str) {
        File file = new File(str);
        File file2 = new File(SparkEnv$.MODULE$.get() != null ? SparkFiles.getRootDirectory() : ".");
        if (!file.isAbsolute()) {
            file = new File(file2, str);
        }
        if (file.exists()) {
            return file.getAbsolutePath();
        }
        log.error(String.format("File doesn't exist %s", file.getAbsolutePath()));
        throw new PrestoException(StandardErrorCode.NATIVE_EXECUTION_BINARY_NOT_EXIST, String.format("File doesn't exist %s", file.getAbsolutePath()));
    }

    private List<String> getLaunchCommand() throws IOException {
        String path = Paths.get(getProcessWorkingPath("./"), String.valueOf(this.port)).toAbsolutePath().toString();
        ImmutableList.Builder builder = ImmutableList.builder();
        List asList = Arrays.asList(this.programArguments.split("\\s+"));
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= asList.size()) {
                break;
            }
            if (((String) asList.get(i)).equals("--etc_dir")) {
                z = true;
                path = (String) asList.get(i + 1);
                break;
            }
            i++;
        }
        builder.add(this.executablePath).addAll(asList);
        if (!z) {
            builder.add("--etc_dir").add(path);
            populateConfigurationFiles(path);
        }
        ImmutableList build = builder.build();
        log.info("Launching native process using command: %s", new Object[]{String.join(" ", (Iterable<? extends CharSequence>) build)});
        return build;
    }
}
