package com.spotify.scio.transforms;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Stream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/scio/transforms/PipeDoFn.class */
public class PipeDoFn extends DoFn<String, String> {
    private static final Logger LOG = LoggerFactory.getLogger(PipeDoFn.class);
    private static final ConcurrentMap<UUID, Optional<RuntimeException>> setupMap = new ConcurrentHashMap();
    private static final ConcurrentMap<UUID, Optional<RuntimeException>> teardownMap = new ConcurrentHashMap();
    private final UUID uuid;
    private final String[] cmdArray;
    private final List<String[]> setupCmds;
    private final List<String[]> teardownCmds;
    private final String[] envp;
    private final File dir;
    private boolean isNewBundle;
    private transient Process pipeProcess;
    private transient ExecutorService executorService;
    private transient BufferedWriter stdIn;
    private transient CompletableFuture<Void> stdOut;

    public PipeDoFn(String str) {
        this(ProcessUtil.tokenizeCommand(str));
    }

    public PipeDoFn(String[] strArr) {
        this(strArr, (Map<String, String>) null, (File) null, (List<String[]>) null, (List<String[]>) null);
    }

    public PipeDoFn(String str, Map<String, String> map, File file, List<String> list, List<String> list2) {
        this(ProcessUtil.tokenizeCommand(str), map, file, ProcessUtil.tokenizeCommands(list), ProcessUtil.tokenizeCommands(list2));
    }

    public PipeDoFn(String[] strArr, Map<String, String> map, File file, List<String[]> list, List<String[]> list2) {
        this.uuid = UUID.randomUUID();
        this.cmdArray = strArr;
        this.envp = ProcessUtil.createEnv(map);
        this.dir = file;
        this.setupCmds = list;
        this.teardownCmds = list2;
    }

    @DoFn.Setup
    public void setup() {
        this.executorService = MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(1));
        Optional<RuntimeException> computeIfAbsent = setupMap.computeIfAbsent(this.uuid, uuid -> {
            return runCommands("Setup", this.setupCmds);
        });
        if (computeIfAbsent.isPresent()) {
            throw computeIfAbsent.get();
        }
    }

    @DoFn.Teardown
    public void teardown() {
        this.executorService.shutdown();
        Optional<RuntimeException> computeIfAbsent = teardownMap.computeIfAbsent(this.uuid, uuid -> {
            return runCommands("Teardown", this.teardownCmds);
        });
        if (computeIfAbsent.isPresent()) {
            throw computeIfAbsent.get();
        }
    }

    private Optional<RuntimeException> runCommands(String str, List<String[]> list) {
        if (list == null) {
            return Optional.empty();
        }
        for (String[] strArr : list) {
            try {
                LOG.info("{} command started: {}", str, ProcessUtil.join(strArr));
                Process exec = Runtime.getRuntime().exec(strArr, this.envp, this.dir);
                int waitFor = exec.waitFor();
                String stdOut = ProcessUtil.getStdOut(exec);
                String stdErr = ProcessUtil.getStdErr(exec);
                if (!stdOut.isEmpty()) {
                    LOG.info("STDOUT:\n" + stdOut);
                }
                if (!stdErr.isEmpty()) {
                    LOG.info("STDERR:\n" + stdErr);
                }
                if (waitFor != 0) {
                    return Optional.of(new IllegalArgumentException("Non-zero exit code: " + waitFor));
                }
            } catch (IOException e) {
                return Optional.of(new UncheckedIOException(e));
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return Optional.of(new RuntimeException(e2));
            }
        }
        return Optional.empty();
    }

    @DoFn.StartBundle
    public void startBundle(DoFn<String, String>.StartBundleContext startBundleContext) {
        this.isNewBundle = true;
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<String, String>.FinishBundleContext finishBundleContext) {
        try {
            try {
                this.stdIn.close();
                int waitFor = this.pipeProcess.waitFor();
                this.stdOut.get();
                String stdErr = ProcessUtil.getStdErr(this.pipeProcess);
                LOG.info("Process exited: {}{}", ProcessUtil.join(this.cmdArray), stdErr.isEmpty() ? "" : ", STDERR:\n" + stdErr);
                Preconditions.checkState(waitFor == 0, "Non-zero exit code: " + waitFor);
                this.pipeProcess.destroy();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            } catch (InterruptedException | ExecutionException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            this.pipeProcess.destroy();
            throw th;
        }
    }

    @DoFn.ProcessElement
    public void processElement(DoFn<String, String>.ProcessContext processContext) {
        if (this.isNewBundle) {
            try {
                this.pipeProcess = Runtime.getRuntime().exec(this.cmdArray, this.envp, this.dir);
                this.stdIn = new BufferedWriter(new OutputStreamWriter(this.pipeProcess.getOutputStream()));
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.pipeProcess.getInputStream()));
                this.stdOut = CompletableFuture.runAsync(() -> {
                    Stream<String> lines = bufferedReader.lines();
                    Objects.requireNonNull(processContext);
                    lines.forEach((v1) -> {
                        r1.output(v1);
                    });
                }, this.executorService);
                LOG.info("Process started: {}", ProcessUtil.join(this.cmdArray));
                this.isNewBundle = false;
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
        try {
            this.stdIn.write((String) processContext.element());
            this.stdIn.newLine();
        } catch (IOException e2) {
            throw new UncheckedIOException(e2);
        }
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("Command", Joiner.on(' ').join(this.cmdArray))).add(DisplayData.item("Environment", this.envp == null ? "null" : Joiner.on(' ').join(this.envp))).add(DisplayData.item("Working Directory", this.dir == null ? "null" : this.dir.toString()));
    }
}
