package cz.o2.proxima.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.TextFormat;
import cz.o2.proxima.proto.service.IngestServiceGrpc;
import cz.o2.proxima.proto.service.RetrieveServiceGrpc;
import cz.o2.proxima.proto.service.Rpc;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.beans.ConstructorProperties;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/client/IngestClient.class */
public class IngestClient implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(IngestClient.class);
    private final String host;
    private final int port;
    private final Options options;
    private final Thread flushThread;

    @VisibleForTesting
    StreamObserver<Rpc.IngestBulk> requestObserver;

    @VisibleForTesting
    Channel channel = null;

    @VisibleForTesting
    IngestServiceGrpc.IngestServiceStub stub = null;
    private RetrieveServiceGrpc.RetrieveServiceBlockingStub getStub = null;
    private final Rpc.IngestBulk.Builder bulkBuilder = Rpc.IngestBulk.newBuilder();
    private final CountDownLatch closedLatch = new CountDownLatch(1);

    @VisibleForTesting
    final StreamObserver<Rpc.StatusBulk> statusObserver = newStatusObserver();
    private final ScheduledThreadPoolExecutor timer = new ScheduledThreadPoolExecutor(1);
    private long lastFlush = System.nanoTime();
    private final Map<String, Request> inFlightRequests = Collections.synchronizedMap(new HashMap());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/client/IngestClient$Request.class */
    public final class Request {
        private final Consumer<Rpc.Status> consumer;
        private final ScheduledFuture timeoutFuture;
        private final Rpc.Ingest payload;

        /* JADX INFO: Access modifiers changed from: private */
        public void setStatus(Rpc.Status status) {
            if (this.timeoutFuture == null || this.timeoutFuture.cancel(false)) {
                this.consumer.accept(status);
            }
        }

        void retry() {
            IngestClient.this.sendTry(this.payload, -1L, TimeUnit.MILLISECONDS, this.consumer, true);
        }

        @ConstructorProperties({"consumer", "timeoutFuture", "payload"})
        public Request(Consumer<Rpc.Status> consumer, ScheduledFuture scheduledFuture, Rpc.Ingest ingest) {
            this.consumer = consumer;
            this.timeoutFuture = scheduledFuture;
            this.payload = ingest;
        }

        public Consumer<Rpc.Status> getConsumer() {
            return this.consumer;
        }

        public ScheduledFuture getTimeoutFuture() {
            return this.timeoutFuture;
        }

        public Rpc.Ingest getPayload() {
            return this.payload;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Request)) {
                return false;
            }
            Request request = (Request) obj;
            Consumer<Rpc.Status> consumer = getConsumer();
            Consumer<Rpc.Status> consumer2 = request.getConsumer();
            if (consumer == null) {
                if (consumer2 != null) {
                    return false;
                }
            } else if (!consumer.equals(consumer2)) {
                return false;
            }
            ScheduledFuture timeoutFuture = getTimeoutFuture();
            ScheduledFuture timeoutFuture2 = request.getTimeoutFuture();
            if (timeoutFuture == null) {
                if (timeoutFuture2 != null) {
                    return false;
                }
            } else if (!timeoutFuture.equals(timeoutFuture2)) {
                return false;
            }
            Rpc.Ingest payload = getPayload();
            Rpc.Ingest payload2 = request.getPayload();
            return payload == null ? payload2 == null : payload.equals(payload2);
        }

        public int hashCode() {
            Consumer<Rpc.Status> consumer = getConsumer();
            int hashCode = (1 * 59) + (consumer == null ? 43 : consumer.hashCode());
            ScheduledFuture timeoutFuture = getTimeoutFuture();
            int hashCode2 = (hashCode * 59) + (timeoutFuture == null ? 43 : timeoutFuture.hashCode());
            Rpc.Ingest payload = getPayload();
            return (hashCode2 * 59) + (payload == null ? 43 : payload.hashCode());
        }

        public String toString() {
            return "IngestClient.Request(consumer=" + getConsumer() + ", timeoutFuture=" + getTimeoutFuture() + ", payload=" + getPayload() + ")";
        }
    }

    public static IngestClient create(String str, int i) {
        return create(str, i, new Options());
    }

    public static IngestClient create(String str, int i, Options options) {
        return new IngestClient(str, i, options);
    }

    @VisibleForTesting
    IngestClient(String str, int i, Options options) {
        this.host = str;
        this.port = i;
        this.options = options;
        this.flushThread = new Thread(() -> {
            long flushUsec = options.getFlushUsec() * 1000;
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    long nanoTime = (flushUsec - System.nanoTime()) + this.lastFlush;
                    synchronized (this) {
                        if (nanoTime > 0) {
                            wait(nanoTime / 1000000, (int) (nanoTime % 1000000));
                        }
                    }
                    synchronized (this) {
                        if (this.bulkBuilder.getIngestCount() > 0) {
                            flush();
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        this.flushThread.setDaemon(true);
        this.flushThread.setName(getClass().getSimpleName() + "-flushThread");
    }

    private StreamObserver<Rpc.StatusBulk> newStatusObserver() {
        return new StreamObserver<Rpc.StatusBulk>() { // from class: cz.o2.proxima.client.IngestClient.1
            public void onNext(Rpc.StatusBulk statusBulk) {
                for (Rpc.Status status : statusBulk.getStatusList()) {
                    Request request = (Request) IngestClient.this.inFlightRequests.remove(status.getUuid());
                    if (request == null) {
                        IngestClient.log.warn("Received response for unknown message {}", TextFormat.shortDebugString(status));
                    } else {
                        synchronized (IngestClient.this.inFlightRequests) {
                            IngestClient.this.inFlightRequests.notifyAll();
                        }
                        request.setStatus(status);
                    }
                }
            }

            public void onError(Throwable th) {
                IngestClient.log.warn("Error on channel, closing stub", th);
                synchronized (IngestClient.this) {
                    IngestClient.this.stub = null;
                    try {
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (InterruptedException e) {
                        IngestClient.log.warn("Interrupted while waiting before channel open retry.", e);
                        Thread.currentThread().interrupt();
                    }
                    IngestClient.this.createChannelAndStub();
                }
            }

            public void onCompleted() {
                synchronized (IngestClient.this.inFlightRequests) {
                    IngestClient.this.inFlightRequests.clear();
                }
                IngestClient.this.closedLatch.countDown();
            }
        };
    }

    public void send(Rpc.Ingest ingest, Consumer<Rpc.Status> consumer) {
        send(ingest, -1L, TimeUnit.SECONDS, consumer);
    }

    public void send(Rpc.Ingest ingest, long j, TimeUnit timeUnit, Consumer<Rpc.Status> consumer) {
        sendTry(ingest, j, timeUnit, consumer, false);
    }

    public Rpc.GetResponse get(Rpc.GetRequest getRequest) {
        ensureChannel();
        return this.getStub.get(getRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTry(Rpc.Ingest ingest, long j, TimeUnit timeUnit, Consumer<Rpc.Status> consumer, boolean z) {
        if (Strings.isNullOrEmpty(ingest.getUuid())) {
            throw new IllegalArgumentException("UUID cannot be null, because it is used to confirm messages.");
        }
        synchronized (this) {
            if (!this.flushThread.isAlive()) {
                this.flushThread.start();
            }
            ensureChannel();
        }
        ScheduledFuture<?> scheduledFuture = null;
        if (j > 0) {
            scheduledFuture = this.timer.schedule(() -> {
                this.inFlightRequests.remove(ingest.getUuid());
                consumer.accept(Rpc.Status.newBuilder().setStatus(504).setStatusMessage("Timeout while waiting for response of request UUID " + ingest.getUuid()).build());
            }, j, timeUnit);
        }
        while (!z && this.inFlightRequests.size() >= this.options.getMaxInflightRequests()) {
            synchronized (this.inFlightRequests) {
                try {
                    this.inFlightRequests.wait(100L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    consumer.accept(Rpc.Status.newBuilder().setStatus(417).setStatusMessage("Interrupted while waiting for the requests to settle").build());
                    return;
                }
            }
        }
        this.inFlightRequests.putIfAbsent(ingest.getUuid(), new Request(consumer, scheduledFuture, ingest));
        synchronized (this) {
            this.bulkBuilder.addIngest(ingest);
            if (this.bulkBuilder.getIngestCount() >= this.options.getMaxFlushRecords()) {
                flush();
            }
        }
    }

    @VisibleForTesting
    synchronized void createChannelAndStub() {
        if (this.channel == null) {
            this.channel = ManagedChannelBuilder.forAddress(this.host, this.port).usePlaintext().executor(this.options.getExecutor()).build();
        }
        this.getStub = RetrieveServiceGrpc.newBlockingStub(this.channel);
        this.stub = IngestServiceGrpc.newStub(this.channel);
        this.requestObserver = this.stub.ingestBulk(this.statusObserver);
        synchronized (this.inFlightRequests) {
            this.inFlightRequests.values().forEach((v0) -> {
                v0.retry();
            });
        }
    }

    private void ensureChannel() {
        if (this.channel == null) {
            createChannelAndStub();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        boolean z;
        synchronized (this) {
            flush();
            z = this.channel != null;
        }
        if (z) {
            while (!this.inFlightRequests.isEmpty()) {
                synchronized (this.inFlightRequests) {
                    try {
                        this.inFlightRequests.wait(100L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            synchronized (this) {
                this.requestObserver.onCompleted();
            }
            this.flushThread.interrupt();
            try {
                if (!this.closedLatch.await(1L, TimeUnit.SECONDS)) {
                    log.warn("Unable to await for flushThreads");
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            this.channel = null;
        }
    }

    private synchronized void flush() {
        if (this.requestObserver != null) {
            this.requestObserver.onNext(this.bulkBuilder.build());
        } else {
            log.warn("Cannot send bulk due to null observer. This might suggest bug in code.");
        }
        this.bulkBuilder.clear();
        this.lastFlush = System.nanoTime();
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public Options getOptions() {
        return this.options;
    }
}
