package cz.o2.proxima.beam.tools.groovy;

import com.google.api.client.util.Lists;
import com.google.auto.service.AutoService;
import cz.o2.proxima.beam.core.BeamDataOperator;
import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.functional.Factory;
import cz.o2.proxima.core.functional.UnaryFunction;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.core.util.Classpath;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.tools.groovy.Stream;
import cz.o2.proxima.tools.groovy.StreamProvider;
import cz.o2.proxima.tools.groovy.ToolsClassLoader;
import cz.o2.proxima.tools.groovy.WindowedStream;
import groovy.lang.Closure;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/BeamStreamProvider.class */
public abstract class BeamStreamProvider implements StreamProvider {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(BeamStreamProvider.class);
    Repository repo;
    BeamDataOperator beam;

    @AutoService({StreamProvider.class})
    /* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/BeamStreamProvider$Default.class */
    public static class Default extends BeamStreamProvider {

        @VisibleForTesting
        private String[] args;

        @Nullable
        private String runner = null;

        @VisibleForTesting
        private final List<RunnerRegistrar> registrars = new ArrayList();

        @Override // cz.o2.proxima.beam.tools.groovy.BeamStreamProvider
        public void init(Repository repository, String[] strArr) {
            String[] readAndRemoveRegistrars = readAndRemoveRegistrars(strArr, this.registrars);
            super.init(repository, readAndRemoveRegistrars);
            this.args = readAndRemoveRegistrars;
            this.runner = System.getenv("RUNNER");
            BeamStreamProvider.log.info("Created {} with arguments {} and env RUNNER {}", new Object[]{getClass().getName(), Arrays.toString(readAndRemoveRegistrars), this.runner});
        }

        @Override // cz.o2.proxima.beam.tools.groovy.BeamStreamProvider
        protected Supplier<PipelineOptions> getPipelineOptionsFactory() {
            return () -> {
                PipelineOptions create = PipelineOptionsFactory.fromArgs(this.args).create();
                if (this.runner != null) {
                    create.setRunner(Classpath.findClass(this.runner, PipelineRunner.class));
                }
                this.registrars.forEach(runnerRegistrar -> {
                    runnerRegistrar.apply(create);
                });
                return create;
            };
        }

        private String[] readAndRemoveRegistrars(String[] strArr, List<RunnerRegistrar> list) {
            List<String> list2 = (List) Arrays.stream(strArr).collect(Collectors.toList());
            ArrayList arrayList = new ArrayList();
            for (String str : list2) {
                if (str.startsWith("--runnerRegistrar=")) {
                    list.add((RunnerRegistrar) Classpath.newInstance(str.substring(18), RunnerRegistrar.class));
                } else {
                    arrayList.add(str);
                }
            }
            return (String[]) arrayList.toArray(new String[0]);
        }

        @Generated
        String[] getArgs() {
            return this.args;
        }

        @Generated
        List<RunnerRegistrar> getRegistrars() {
            return this.registrars;
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/BeamStreamProvider$RunnerRegistrar.class */
    public interface RunnerRegistrar {
        void apply(PipelineOptions pipelineOptions);
    }

    public static <T> Stream<T> wrap(Repository repository, PCollection<T> pCollection) {
        return BeamStream.wrap(repository, pCollection);
    }

    public void init(Repository repository, String[] strArr) {
        this.repo = repository;
        Preconditions.checkArgument(this.repo.hasOperator("beam"), "Please include proxima-beam-core dependency");
        this.beam = repository.getOrCreateOperator(BeamDataOperator.class, new Consumer[0]);
    }

    public Stream<StreamElement> getStream(Position position, boolean z, boolean z2, StreamProvider.TerminatePredicate terminatePredicate, AttributeDescriptor<?>... attributeDescriptorArr) {
        return BeamStream.stream(this.beam, position, z, z2, terminatePredicate, getJarRegisteringPipelineFactory(), attributeDescriptorArr);
    }

    public WindowedStream<StreamElement> getBatchUpdates(long j, long j2, StreamProvider.TerminatePredicate terminatePredicate, AttributeDescriptor<?>... attributeDescriptorArr) {
        return BeamStream.batchUpdates(this.beam, j, j2, terminatePredicate, getJarRegisteringPipelineFactory(), attributeDescriptorArr);
    }

    public WindowedStream<StreamElement> getBatchSnapshot(long j, long j2, StreamProvider.TerminatePredicate terminatePredicate, AttributeDescriptor<?>... attributeDescriptorArr) {
        return BeamStream.batchSnapshot(this.beam, j, j2, terminatePredicate, getJarRegisteringPipelineFactory(), attributeDescriptorArr);
    }

    public <T> WindowedStream<T> impulse(String str, Closure<T> closure) {
        Closure dehydrate = BeamStream.dehydrate(closure);
        BeamDataOperator beamDataOperator = this.beam;
        Factory<Pipeline> jarRegisteringPipelineFactory = getJarRegisteringPipelineFactory();
        Objects.requireNonNull(dehydrate);
        return BeamStream.impulse(str, beamDataOperator, jarRegisteringPipelineFactory, dehydrate::call);
    }

    public <T> WindowedStream<T> periodicImpulse(String str, Closure<T> closure, long j) {
        Closure dehydrate = BeamStream.dehydrate(closure);
        BeamDataOperator beamDataOperator = this.beam;
        Factory<Pipeline> jarRegisteringPipelineFactory = getJarRegisteringPipelineFactory();
        Objects.requireNonNull(dehydrate);
        return BeamStream.periodicImpulse(str, beamDataOperator, jarRegisteringPipelineFactory, dehydrate::call, j);
    }

    public void close() {
        if (this.beam != null) {
            this.beam.close();
        }
    }

    protected Supplier<PipelineOptions> getPipelineOptionsFactory() {
        return PipelineOptionsFactory::create;
    }

    protected Set<String> listUdfClassNames() {
        return (Set) Optional.ofNullable(getToolsClassLoader()).map((v0) -> {
            return v0.getDefinedClasses();
        }).orElse(Collections.emptySet());
    }

    Factory<Pipeline> getJarRegisteringPipelineFactory() {
        Supplier<PipelineOptions> pipelineOptionsFactory = getPipelineOptionsFactory();
        UnaryFunction<PipelineOptions, Pipeline> createPipelineFromOpts = getCreatePipelineFromOpts();
        return () -> {
            PipelineOptions pipelineOptions = (PipelineOptions) pipelineOptionsFactory.get();
            ExperimentalOptions as = pipelineOptions.as(ExperimentalOptions.class);
            ArrayList newArrayList = Lists.newArrayList((Iterable) MoreObjects.firstNonNull(as.getExperiments(), new ArrayList()));
            newArrayList.add("use_deprecated_read");
            as.setExperiments(newArrayList);
            Pipeline pipeline = (Pipeline) createPipelineFromOpts.apply(pipelineOptions);
            createUdfJarAndRegisterToPipeline(pipeline.getOptions());
            return pipeline;
        };
    }

    protected UnaryFunction<PipelineOptions, Pipeline> getCreatePipelineFromOpts() {
        return Pipeline::create;
    }

    @VisibleForTesting
    void createUdfJarAndRegisterToPipeline(PipelineOptions pipelineOptions) {
        String simpleName = pipelineOptions.getRunner().getSimpleName();
        try {
            File createJarFromUdfs = createJarFromUdfs();
            log.info("Created jar {} with generated classes.", createJarFromUdfs);
            ArrayList arrayList = new ArrayList(Collections.singletonList(createJarFromUdfs));
            java.util.stream.Stream<R> map = getAddedJars().stream().map(uri -> {
                return new File(uri.getPath());
            });
            Objects.requireNonNull(arrayList);
            map.forEach((v1) -> {
                r1.add(v1);
            });
            registerToPipeline(pipelineOptions, simpleName, arrayList);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Collection<URI> getAddedJars() {
        return (Collection) Optional.ofNullable(getToolsClassLoader()).map((v0) -> {
            return v0.getAddedURLs();
        }).orElse(Collections.emptySet());
    }

    private void registerToPipeline(PipelineOptions pipelineOptions, String str, Collection<File> collection) {
        log.info("Adding jars {} into classpath for runner {}", collection, str);
        List<String> list = (List) collection.stream().map((v0) -> {
            return v0.getAbsolutePath();
        }).collect(Collectors.toList());
        if (str.equals("FlinkRunner")) {
            FlinkPipelineOptions as = pipelineOptions.as(FlinkPipelineOptions.class);
            as.setFilesToStage(addToList(list, as.getFilesToStage()));
        } else if (str.equals("SparkRunner")) {
            SparkCommonPipelineOptions as2 = pipelineOptions.as(SparkCommonPipelineOptions.class);
            as2.setFilesToStage(addToList(list, as2.getFilesToStage()));
        } else {
            if (!str.equals("DirectRunner")) {
                log.warn("Injecting jar into unknown runner {}. It might not work as expected. If you are experiencing issues with sub run and/or submission, please fill github issue reporting the name of the runner.", str);
            }
            injectJarIntoContextClassLoader(collection);
        }
    }

    private List<String> addToList(@Nonnull List<String> list, @Nullable List<String> list2) {
        HashSet hashSet = new HashSet(list);
        if (list2 != null) {
            hashSet.addAll(list2);
        }
        return new ArrayList(hashSet);
    }

    private File createJarFromUdfs() throws IOException {
        Set<String> listUdfClassNames = listUdfClassNames();
        File createTempFile = File.createTempFile("proxima-tools", ".jar");
        ToolsClassLoader toolsClassLoader = getToolsClassLoader();
        log.info("Building jar from classes {} retrieved from {}", listUdfClassNames, toolsClassLoader);
        createTempFile.deleteOnExit();
        JarOutputStream jarOutputStream = new JarOutputStream(new FileOutputStream(createTempFile));
        try {
            long currentTimeMillis = System.currentTimeMillis();
            for (String str : listUdfClassNames) {
                JarEntry jarEntry = new JarEntry(str.replace('.', '/') + ".class");
                jarEntry.setTime(currentTimeMillis);
                jarOutputStream.putNextEntry(jarEntry);
                IOUtils.copy(new ByteArrayInputStream(toolsClassLoader.getClassByteCode(str)), jarOutputStream);
                jarOutputStream.closeEntry();
            }
            jarOutputStream.close();
            return createTempFile;
        } catch (Throwable th) {
            try {
                jarOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @VisibleForTesting
    static void injectJarIntoContextClassLoader(Collection<File> collection) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(new URLClassLoader((URL[]) ((List) collection.stream().map(file -> {
            return (URL) ExceptionUtils.uncheckedFactory(() -> {
                return file.toURI().toURL();
            });
        }).collect(Collectors.toList())).toArray(new URL[0]), contextClassLoader));
    }

    @Nullable
    private ToolsClassLoader getToolsClassLoader() {
        ClassLoader classLoader;
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        while (true) {
            classLoader = contextClassLoader;
            if (classLoader == null || (classLoader instanceof ToolsClassLoader)) {
                break;
            }
            contextClassLoader = classLoader.getParent();
        }
        return (ToolsClassLoader) classLoader;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1542355184:
                if (implMethodName.equals("lambda$injectJarIntoContextClassLoader$f47e745d$1")) {
                    z = true;
                    break;
                }
                break;
            case -1352294148:
                if (implMethodName.equals("create")) {
                    z = 2;
                    break;
                }
                break;
            case 3045982:
                if (implMethodName.equals("call")) {
                    z = false;
                    break;
                }
                break;
            case 199778950:
                if (implMethodName.equals("lambda$getJarRegisteringPipelineFactory$ad058ce$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("groovy/lang/Closure") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    Closure closure = (Closure) serializedLambda.getCapturedArg(0);
                    return closure::call;
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("groovy/lang/Closure") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    Closure closure2 = (Closure) serializedLambda.getCapturedArg(0);
                    return closure2::call;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/tools/groovy/BeamStreamProvider") && serializedLambda.getImplMethodSignature().equals("(Ljava/io/File;)Ljava/net/URL;")) {
                    File file = (File) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return file.toURI().toURL();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/options/PipelineOptions;)Lorg/apache/beam/sdk/Pipeline;")) {
                    return Pipeline::create;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/tools/groovy/BeamStreamProvider") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/function/Supplier;Lcz/o2/proxima/core/functional/UnaryFunction;)Lorg/apache/beam/sdk/Pipeline;")) {
                    BeamStreamProvider beamStreamProvider = (BeamStreamProvider) serializedLambda.getCapturedArg(0);
                    Supplier supplier = (Supplier) serializedLambda.getCapturedArg(1);
                    UnaryFunction unaryFunction = (UnaryFunction) serializedLambda.getCapturedArg(2);
                    return () -> {
                        PipelineOptions pipelineOptions = (PipelineOptions) supplier.get();
                        ExperimentalOptions as = pipelineOptions.as(ExperimentalOptions.class);
                        ArrayList newArrayList = Lists.newArrayList((Iterable) MoreObjects.firstNonNull(as.getExperiments(), new ArrayList()));
                        newArrayList.add("use_deprecated_read");
                        as.setExperiments(newArrayList);
                        Pipeline pipeline = (Pipeline) unaryFunction.apply(pipelineOptions);
                        createUdfJarAndRegisterToPipeline(pipeline.getOptions());
                        return pipeline;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
