package cz.o2.proxima.beam.tools.groovy;

import cz.o2.proxima.beam.tools.proto.service.Collect;
import cz.o2.proxima.beam.tools.proto.service.CollectServiceGrpc;
import cz.o2.proxima.beam.tools.shaded.com.google.protobuf.ByteString;
import cz.o2.proxima.beam.tools.shaded.io.grpc.ConnectivityState;
import cz.o2.proxima.beam.tools.shaded.io.grpc.ManagedChannel;
import cz.o2.proxima.beam.tools.shaded.io.grpc.ManagedChannelBuilder;
import cz.o2.proxima.beam.tools.shaded.io.grpc.ServerBuilder;
import cz.o2.proxima.beam.tools.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil;
import cz.o2.proxima.beam.tools.shaded.io.grpc.stub.StreamObserver;
import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.Generated;
import org.apache.beam.sdk.coders.Coder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/RemoteConsumer.class */
public class RemoteConsumer<T> implements AutoCloseable, Consumer<T> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(RemoteConsumer.class);
    private static final int CONTINUE = 100;
    private static final int OK = 200;
    private final Coder<T> coder;
    private final String hostname;
    private final int port;
    private final transient Consumer<T> consumer;
    private final transient RemoteConsumer<T>.Server server;
    private transient CollectServiceGrpc.CollectServiceStub stub;
    private transient ManagedChannel channel;
    private transient StreamObserver<Collect.Item> observer;
    private transient CompletableFuture<Void> terminateFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/RemoteConsumer$CollectService.class */
    public class CollectService extends CollectServiceGrpc.CollectServiceImplBase {
        private final List<CompletableFuture<Void>> unterminatedCalls = Collections.synchronizedList(new ArrayList());

        private CollectService() {
        }

        @Override // cz.o2.proxima.beam.tools.proto.service.CollectServiceGrpc.AsyncService
        public StreamObserver<Collect.Item> collect(final StreamObserver<Collect.Response> streamObserver) {
            final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.unterminatedCalls.add(completableFuture);
            streamObserver.onNext(Collect.Response.newBuilder().setStatus(100).build());
            return new StreamObserver<Collect.Item>() { // from class: cz.o2.proxima.beam.tools.groovy.RemoteConsumer.CollectService.1
                List<Collect.Item> received = new ArrayList();

                @Override // cz.o2.proxima.beam.tools.shaded.io.grpc.stub.StreamObserver
                public void onNext(Collect.Item item) {
                    this.received.add(item);
                    if (this.received.size() > 10) {
                        flush();
                    }
                }

                @Override // cz.o2.proxima.beam.tools.shaded.io.grpc.stub.StreamObserver
                public void onError(Throwable th) {
                    RemoteConsumer.log.warn("Error in collect()", th);
                    completableFuture.completeExceptionally(th);
                }

                @Override // cz.o2.proxima.beam.tools.shaded.io.grpc.stub.StreamObserver
                public void onCompleted() {
                    flush();
                    streamObserver.onNext(Collect.Response.newBuilder().setStatus(200).setStatusCode("OK").build());
                    streamObserver.onCompleted();
                    completableFuture.complete(null);
                }

                private void flush() {
                    synchronized (RemoteConsumer.this) {
                        this.received.forEach(item -> {
                            RemoteConsumer.this.consumer.accept(ExceptionUtils.uncheckedFactory(() -> {
                                return RemoteConsumer.this.deserialize(item.getSerialized().newInput());
                            }));
                        });
                        this.received.clear();
                    }
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    String implMethodName = serializedLambda.getImplMethodName();
                    boolean z = -1;
                    switch (implMethodName.hashCode()) {
                        case 684686756:
                            if (implMethodName.equals("lambda$flush$700ab1ef$1")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/tools/groovy/RemoteConsumer$CollectService$1") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/beam/tools/proto/service/Collect$Item;)Ljava/lang/Object;")) {
                                AnonymousClass1 anonymousClass1 = (AnonymousClass1) serializedLambda.getCapturedArg(0);
                                Collect.Item item = (Collect.Item) serializedLambda.getCapturedArg(1);
                                return () -> {
                                    return RemoteConsumer.this.deserialize(item.getSerialized().newInput());
                                };
                            }
                            break;
                    }
                    throw new IllegalArgumentException("Invalid lambda deserialization");
                }
            };
        }

        public void awaitAllClosed() {
            CountDownLatch countDownLatch;
            synchronized (this.unterminatedCalls) {
                countDownLatch = new CountDownLatch(this.unterminatedCalls.size());
                this.unterminatedCalls.forEach(completableFuture -> {
                    completableFuture.whenComplete((BiConsumer) (r5, th) -> {
                        if (th != null) {
                            RemoteConsumer.log.warn("Error waiting for termination of ongoing call. Ignored.", th);
                        }
                        countDownLatch.countDown();
                    });
                });
            }
            Objects.requireNonNull(countDownLatch);
            ExceptionUtils.ignoringInterrupted(countDownLatch::await);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 93223254:
                    if (implMethodName.equals("await")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("java/util/concurrent/CountDownLatch") && serializedLambda.getImplMethodSignature().equals("()V")) {
                        CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                        return countDownLatch::await;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/beam/tools/groovy/RemoteConsumer$Server.class */
    public class Server implements AutoCloseable {
        private final cz.o2.proxima.beam.tools.shaded.io.grpc.Server server;
        private final RemoteConsumer<T>.CollectService service;

        /* JADX WARN: Type inference failed for: r1v4, types: [cz.o2.proxima.beam.tools.shaded.io.grpc.ServerBuilder] */
        private Server(int i) {
            this.service = new CollectService();
            this.server = ServerBuilder.forPort(i).executor(Executors.newCachedThreadPool()).addService(this.service).build();
        }

        void run() throws IOException {
            this.server.start();
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.service.awaitAllClosed();
            this.server.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> RemoteConsumer<T> create(Object obj, String str, int i, Consumer<T> consumer, Coder<T> coder) {
        int i2 = 3;
        while (i2 > 0) {
            i2--;
            int port = getPort(i, System.identityHashCode(obj));
            RemoteConsumer<T> remoteConsumer = null;
            try {
                remoteConsumer = new RemoteConsumer<>(str, port, consumer, coder);
                remoteConsumer.start();
                return remoteConsumer;
            } catch (Exception e) {
                if (remoteConsumer != null) {
                    remoteConsumer.close();
                }
                if (!isBindException(e)) {
                    throw new RuntimeException(e);
                }
                log.debug("Failed to bind on port {}", Integer.valueOf(port), e);
            }
        }
        throw new RuntimeException("Retries exhausted trying to start server");
    }

    @VisibleForTesting
    static boolean isBindException(Throwable th) {
        return ((th instanceof IOException) && th.getMessage() != null && th.getMessage().startsWith("Failed to bind")) || (th.getCause() != null && isBindException(th.getCause()));
    }

    static int getPort(int i, int i2) {
        return i > 0 ? i : (((ThreadLocalRandom.current().nextInt() ^ i2) & Integer.MAX_VALUE) % 50000) + Http2CodecUtil.DEFAULT_MAX_QUEUED_CONTROL_FRAMES;
    }

    @VisibleForTesting
    RemoteConsumer(String str, int i, Consumer<T> consumer, Coder<T> coder) {
        this.server = new Server(i);
        this.hostname = str;
        this.port = i;
        this.consumer = consumer;
        this.coder = coder;
    }

    /* JADX WARN: Type inference failed for: r1v11, types: [cz.o2.proxima.beam.tools.shaded.io.grpc.ManagedChannelBuilder] */
    StreamObserver<Collect.Item> observer() {
        if (this.channel == null) {
            this.channel = ManagedChannelBuilder.forAddress(this.hostname, this.port).usePlaintext().build();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ManagedChannel managedChannel = this.channel;
            ConnectivityState connectivityState = ConnectivityState.READY;
            Objects.requireNonNull(countDownLatch);
            managedChannel.notifyWhenStateChanged(connectivityState, countDownLatch::countDown);
            ExceptionUtils.ignoringInterrupted(() -> {
                if (countDownLatch.await(1L, TimeUnit.SECONDS)) {
                    return;
                }
                log.warn("Timeout waiting for channel to become connected. Skipping.");
            });
        }
        if (this.stub == null) {
            this.stub = CollectServiceGrpc.newStub(this.channel);
        }
        if (this.observer == null) {
            this.terminateFuture = new CompletableFuture<>();
            final CountDownLatch countDownLatch2 = new CountDownLatch(1);
            this.observer = this.stub.collect(new StreamObserver<Collect.Response>() { // from class: cz.o2.proxima.beam.tools.groovy.RemoteConsumer.1
                @Override // cz.o2.proxima.beam.tools.shaded.io.grpc.stub.StreamObserver
                public void onNext(Collect.Response response) {
                    if (response.getStatus() == 100) {
                        countDownLatch2.countDown();
                    } else if (response.getStatus() == 200) {
                        RemoteConsumer.this.terminateFuture.complete(null);
                    }
                }

                @Override // cz.o2.proxima.beam.tools.shaded.io.grpc.stub.StreamObserver
                public void onError(Throwable th) {
                    RemoteConsumer.this.terminateFuture.completeExceptionally(th);
                    RemoteConsumer.this.observer = null;
                }

                @Override // cz.o2.proxima.beam.tools.shaded.io.grpc.stub.StreamObserver
                public void onCompleted() {
                }
            });
            countDownLatch2.countDown();
        }
        return this.observer;
    }

    public void accept(T t) {
        try {
            observer().onNext(Collect.Item.newBuilder().setSerialized(ByteString.readFrom(serialize(t))).build());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        if (this.channel != null) {
            Exception exc = null;
            if (this.observer != null) {
                this.observer.onCompleted();
                try {
                    this.terminateFuture.get();
                } catch (Exception e) {
                    log.error("Error waiting for observer to terminate.", e);
                    exc = e;
                }
                this.observer = null;
            }
            this.channel.shutdown();
            this.channel = null;
            if (exc != null) {
                throw new RuntimeException(exc);
            }
        }
        if (this.server != null) {
            this.server.close();
        }
    }

    void start() throws IOException {
        this.server.run();
    }

    InputStream serialize(T t) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            this.coder.encode(t, byteArrayOutputStream);
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
            byteArrayOutputStream.close();
            return byteArrayInputStream;
        } catch (Throwable th) {
            try {
                byteArrayOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    T deserialize(InputStream inputStream) throws IOException {
        return (T) this.coder.decode(inputStream);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        stop();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1872003208:
                if (implMethodName.equals("lambda$observer$97092b58$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/beam/tools/groovy/RemoteConsumer") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;)V")) {
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(0);
                    return () -> {
                        if (countDownLatch.await(1L, TimeUnit.SECONDS)) {
                            return;
                        }
                        log.warn("Timeout waiting for channel to become connected. Skipping.");
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
