package cz.o2.proxima.beam.tools.groovy;

import cz.o2.proxima.beam.core.BeamDataOperator;
import cz.o2.proxima.functional.Factory;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.tools.groovy.Stream;
import cz.o2.proxima.tools.groovy.StreamProvider;
import cz.o2.proxima.tools.groovy.ToolsClassLoader;
import cz.o2.proxima.tools.groovy.WindowedStream;
import cz.o2.proxima.util.Classpath;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/BeamStreamProvider.class */
public abstract class BeamStreamProvider implements StreamProvider {
    private static final Logger log = LoggerFactory.getLogger(BeamStreamProvider.class);
    Repository repo;
    BeamDataOperator beam;

    /* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/BeamStreamProvider$Default.class */
    public static class Default extends BeamStreamProvider {

        @VisibleForTesting
        private String[] args;

        @Nullable
        private String runner = null;

        @VisibleForTesting
        private final List<RunnerRegistrar> registrars = new ArrayList();

        @Override // cz.o2.proxima.beam.tools.groovy.BeamStreamProvider
        public void init(Repository repository, String[] strArr) {
            String[] readAndRemoveRegistrars = readAndRemoveRegistrars(strArr, this.registrars);
            super.init(repository, readAndRemoveRegistrars);
            this.args = readAndRemoveRegistrars;
            BeamStreamProvider.log.info("Created {} arguments {}", getClass().getName(), Arrays.toString(readAndRemoveRegistrars));
            this.runner = System.getenv("RUNNER");
        }

        @Override // cz.o2.proxima.beam.tools.groovy.BeamStreamProvider
        protected Factory<PipelineOptions> getPipelineOptionsFactory() {
            return () -> {
                PipelineOptions create = PipelineOptionsFactory.fromArgs(this.args).create();
                if (this.runner != null) {
                    create.setRunner(Classpath.findClass(this.runner, PipelineRunner.class));
                }
                this.registrars.forEach(runnerRegistrar -> {
                    runnerRegistrar.apply(create);
                });
                return create;
            };
        }

        private String[] readAndRemoveRegistrars(String[] strArr, List<RunnerRegistrar> list) {
            List<String> list2 = (List) Arrays.stream(strArr).collect(Collectors.toList());
            ArrayList arrayList = new ArrayList();
            for (String str : list2) {
                if (str.startsWith("--runnerRegistrar=")) {
                    list.add((RunnerRegistrar) Classpath.newInstance(str.substring(18), RunnerRegistrar.class));
                } else {
                    arrayList.add(str);
                }
            }
            return (String[]) arrayList.toArray(new String[arrayList.size()]);
        }

        String[] getArgs() {
            return this.args;
        }

        List<RunnerRegistrar> getRegistrars() {
            return this.registrars;
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1702239255:
                    if (implMethodName.equals("lambda$getPipelineOptionsFactory$5e4cabed$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/tools/groovy/BeamStreamProvider$Default") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/beam/sdk/options/PipelineOptions;")) {
                        Default r0 = (Default) serializedLambda.getCapturedArg(0);
                        return () -> {
                            PipelineOptions create = PipelineOptionsFactory.fromArgs(this.args).create();
                            if (this.runner != null) {
                                create.setRunner(Classpath.findClass(this.runner, PipelineRunner.class));
                            }
                            this.registrars.forEach(runnerRegistrar -> {
                                runnerRegistrar.apply(create);
                            });
                            return create;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/BeamStreamProvider$RunnerRegistrar.class */
    public interface RunnerRegistrar extends Serializable {
        void apply(PipelineOptions pipelineOptions);
    }

    public void init(Repository repository, String[] strArr) {
        this.repo = repository;
        Preconditions.checkArgument(this.repo.hasOperator("beam"), "Please include proxima-beam-core dependency");
        this.beam = repository.getOrCreateOperator(BeamDataOperator.class);
    }

    public Stream<StreamElement> getStream(Position position, boolean z, boolean z2, StreamProvider.TerminatePredicate terminatePredicate, AttributeDescriptor<?>... attributeDescriptorArr) {
        return BeamStream.stream(this.beam, position, z, z2, terminatePredicate, getJarRegisteringPipelineFactory(), attributeDescriptorArr);
    }

    public WindowedStream<StreamElement> getBatchUpdates(long j, long j2, StreamProvider.TerminatePredicate terminatePredicate, AttributeDescriptor<?>... attributeDescriptorArr) {
        return BeamStream.batchUpdates(this.beam, j, j2, terminatePredicate, getJarRegisteringPipelineFactory(), attributeDescriptorArr);
    }

    public WindowedStream<StreamElement> getBatchSnapshot(long j, long j2, StreamProvider.TerminatePredicate terminatePredicate, AttributeDescriptor<?>... attributeDescriptorArr) {
        return BeamStream.batchSnapshot(this.beam, j, j2, terminatePredicate, getJarRegisteringPipelineFactory(), attributeDescriptorArr);
    }

    public void close() {
        if (this.beam != null) {
            this.beam.close();
        }
    }

    protected Factory<PipelineOptions> getPipelineOptionsFactory() {
        return PipelineOptionsFactory::create;
    }

    protected Set<String> listUdfClassNames() {
        return (Set) Optional.ofNullable(getToolsClassLoader()).map((v0) -> {
            return v0.getDefinedClasses();
        }).orElse(Collections.emptySet());
    }

    Factory<Pipeline> getJarRegisteringPipelineFactory() {
        Factory<PipelineOptions> pipelineOptionsFactory = getPipelineOptionsFactory();
        UnaryFunction<PipelineOptions, Pipeline> createPipelineFromOpts = getCreatePipelineFromOpts();
        return () -> {
            PipelineOptions pipelineOptions = (PipelineOptions) pipelineOptionsFactory.apply();
            createUdfJarAndRegisterToPipeline(pipelineOptions);
            return (Pipeline) createPipelineFromOpts.apply(pipelineOptions);
        };
    }

    protected UnaryFunction<PipelineOptions, Pipeline> getCreatePipelineFromOpts() {
        return Pipeline::create;
    }

    void createUdfJarAndRegisterToPipeline(PipelineOptions pipelineOptions) {
        String simpleName = pipelineOptions.getRunner().getSimpleName();
        try {
            File createJarFromUdfs = createJarFromUdfs();
            URL url = new URL("file://" + createJarFromUdfs.getAbsolutePath());
            log.info("Injecting generated jar at {} into {}", createJarFromUdfs, simpleName);
            injectJarIntoContextClassLoader(url);
            if (pipelineOptions.getRunner().getClassLoader() instanceof URLClassLoader) {
                injectUrlIntoClassloader((URLClassLoader) pipelineOptions.getRunner().getClassLoader(), url);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private File createJarFromUdfs() throws IOException {
        Set<String> listUdfClassNames = listUdfClassNames();
        File createTempFile = File.createTempFile("proxima-tools", ".jar");
        ToolsClassLoader toolsClassLoader = getToolsClassLoader();
        log.info("Building jar from classes {} retrieved from {}", listUdfClassNames, toolsClassLoader);
        createTempFile.deleteOnExit();
        JarOutputStream jarOutputStream = new JarOutputStream(new FileOutputStream(createTempFile));
        Throwable th = null;
        try {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                for (String str : listUdfClassNames) {
                    JarEntry jarEntry = new JarEntry(str.replace('.', '/') + ".class");
                    jarEntry.setTime(currentTimeMillis);
                    jarOutputStream.putNextEntry(jarEntry);
                    IOUtils.copy(new ByteArrayInputStream(toolsClassLoader.getClassByteCode(str)), jarOutputStream);
                    jarOutputStream.closeEntry();
                }
                if (0 != 0) {
                    try {
                        jarOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    jarOutputStream.close();
                }
                return createTempFile;
            } finally {
            }
        } catch (Throwable th3) {
            if (th != null) {
                try {
                    jarOutputStream.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                jarOutputStream.close();
            }
            throw th3;
        }
    }

    @VisibleForTesting
    static void injectJarIntoContextClassLoader(URL url) {
        Thread.currentThread().setContextClassLoader(new URLClassLoader(new URL[]{url}, Thread.currentThread().getContextClassLoader()));
    }

    private void injectUrlIntoClassloader(URLClassLoader uRLClassLoader, URL url) {
        try {
            Method declaredMethod = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
            declaredMethod.setAccessible(true);
            declaredMethod.invoke(uRLClassLoader, url);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Nullable
    private ToolsClassLoader getToolsClassLoader() {
        ClassLoader classLoader;
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        while (true) {
            classLoader = contextClassLoader;
            if (classLoader == null || (classLoader instanceof ToolsClassLoader)) {
                break;
            }
            contextClassLoader = classLoader.getParent();
        }
        return (ToolsClassLoader) classLoader;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1352294148:
                if (implMethodName.equals("create")) {
                    z = true;
                    break;
                }
                break;
            case 1745435752:
                if (implMethodName.equals("lambda$getJarRegisteringPipelineFactory$e7d9d3e1$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/tools/groovy/BeamStreamProvider") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/functional/Factory;Lcz/o2/proxima/functional/UnaryFunction;)Lorg/apache/beam/sdk/Pipeline;")) {
                    BeamStreamProvider beamStreamProvider = (BeamStreamProvider) serializedLambda.getCapturedArg(0);
                    Factory factory = (Factory) serializedLambda.getCapturedArg(1);
                    UnaryFunction unaryFunction = (UnaryFunction) serializedLambda.getCapturedArg(2);
                    return () -> {
                        PipelineOptions pipelineOptions = (PipelineOptions) factory.apply();
                        createUdfJarAndRegisterToPipeline(pipelineOptions);
                        return (Pipeline) unaryFunction.apply(pipelineOptions);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/options/PipelineOptionsFactory") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/beam/sdk/options/PipelineOptions;")) {
                    return PipelineOptionsFactory::create;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/options/PipelineOptions;)Lorg/apache/beam/sdk/Pipeline;")) {
                    return Pipeline::create;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
