package cz.o2.proxima.server;

import com.google.common.base.Strings;
import com.google.protobuf.TextFormat;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.proto.service.IngestServiceGrpc;
import cz.o2.proxima.proto.service.Rpc;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.server.metrics.Metrics;
import cz.o2.proxima.storage.StreamElement;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/server/IngestService.class */
public class IngestService extends IngestServiceGrpc.IngestServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(IngestService.class);
    private final Repository repo;
    private final DirectDataOperator direct;
    private final ScheduledExecutorService scheduler;

    /* loaded from: input_file:cz/o2/proxima/server/IngestService$IngestBulkObserver.class */
    private class IngestBulkObserver implements StreamObserver<Rpc.IngestBulk> {
        final StreamObserver<Rpc.StatusBulk> responseObserver;
        static final long MAX_SLEEP_NANOS = 100000000;
        static final int MAX_QUEUED_STATUSES = 500;
        ScheduledFuture<?> flushFuture;
        final Queue<Rpc.Status> statusQueue = new ConcurrentLinkedQueue();
        final AtomicBoolean completed = new AtomicBoolean(false);
        final Object inflightRequestsLock = new Object();
        final AtomicInteger inflightRequests = new AtomicInteger();
        final AtomicLong lastFlushNanos = new AtomicLong(System.nanoTime());
        final Rpc.StatusBulk.Builder builder = Rpc.StatusBulk.newBuilder();
        Runnable flushTask = createFlushTask();

        IngestBulkObserver(StreamObserver<Rpc.StatusBulk> streamObserver) {
            this.flushFuture = IngestService.this.scheduler.scheduleAtFixedRate(this.flushTask, MAX_SLEEP_NANOS, MAX_SLEEP_NANOS, TimeUnit.NANOSECONDS);
            this.responseObserver = streamObserver;
        }

        private Runnable createFlushTask() {
            return () -> {
                try {
                    synchronized (this.builder) {
                        while (this.statusQueue.size() > MAX_QUEUED_STATUSES) {
                            peekQueueToBuilderAndFlush();
                        }
                        if (System.nanoTime() - this.lastFlushNanos.get() >= MAX_SLEEP_NANOS) {
                            while (!this.statusQueue.isEmpty()) {
                                peekQueueToBuilderAndFlush();
                            }
                        }
                        if (this.builder.getStatusCount() > 0) {
                            this.responseObserver.onNext(this.builder.build());
                            this.builder.clear();
                        }
                        if (this.completed.get() && this.inflightRequests.get() == 0 && this.statusQueue.isEmpty()) {
                            this.responseObserver.onCompleted();
                        }
                    }
                } catch (Exception e) {
                    IngestService.log.error("Failed to send bulk status", e);
                }
            };
        }

        private void peekQueueToBuilderAndFlush() {
            synchronized (this.builder) {
                this.builder.addStatus(this.statusQueue.poll());
                if (this.builder.getStatusCount() >= 1000) {
                    flush();
                }
            }
        }

        private void flush() {
            synchronized (this.builder) {
                this.lastFlushNanos.set(System.nanoTime());
                Rpc.StatusBulk build = this.builder.build();
                if (build.getStatusCount() > 0) {
                    this.responseObserver.onNext(build);
                }
                this.builder.clear();
            }
        }

        public void onNext(Rpc.IngestBulk ingestBulk) {
            Metrics.INGEST_BULK.increment();
            Metrics.BULK_SIZE.increment(ingestBulk.getIngestCount());
            this.inflightRequests.addAndGet(ingestBulk.getIngestCount());
            ingestBulk.getIngestList().stream().forEach(ingest -> {
                IngestService.this.processSingleIngest(ingest, status -> {
                    this.statusQueue.add(status);
                    if (this.statusQueue.size() >= MAX_QUEUED_STATUSES) {
                        IngestService.this.scheduler.execute(this.flushTask);
                    }
                    if (this.inflightRequests.decrementAndGet() == 0) {
                        synchronized (this.inflightRequestsLock) {
                            this.inflightRequestsLock.notifyAll();
                        }
                    }
                });
            });
        }

        public void onError(Throwable th) {
            IngestService.log.error("Error from client", th);
            this.responseObserver.onError(th);
            this.flushFuture.cancel(true);
        }

        public void onCompleted() {
            this.completed.set(true);
            this.flushFuture.cancel(true);
            synchronized (this.inflightRequests) {
                while (this.inflightRequests.get() > 0) {
                    try {
                        this.inflightRequests.wait(100L);
                    } catch (InterruptedException e) {
                        IngestService.log.warn("Interrupted while waiting to send responses to client", e);
                        Thread.currentThread().interrupt();
                    }
                }
            }
            while (!this.statusQueue.isEmpty()) {
                peekQueueToBuilderAndFlush();
            }
            flush();
            this.responseObserver.onCompleted();
        }
    }

    /* loaded from: input_file:cz/o2/proxima/server/IngestService$IngestObserver.class */
    private class IngestObserver implements StreamObserver<Rpc.Ingest> {
        final StreamObserver<Rpc.Status> responseObserver;
        final Object inflightRequestsLock = new Object();
        final AtomicInteger inflightRequests = new AtomicInteger(0);
        final Object responseObserverLock = new Object();

        IngestObserver(StreamObserver<Rpc.Status> streamObserver) {
            this.responseObserver = streamObserver;
        }

        public void onNext(Rpc.Ingest ingest) {
            Metrics.INGEST_SINGLE.increment();
            this.inflightRequests.incrementAndGet();
            IngestService.this.processSingleIngest(ingest, status -> {
                synchronized (this.responseObserverLock) {
                    this.responseObserver.onNext(status);
                }
                if (this.inflightRequests.decrementAndGet() == 0) {
                    synchronized (this.inflightRequestsLock) {
                        this.inflightRequestsLock.notifyAll();
                    }
                }
            });
        }

        public void onError(Throwable th) {
            IngestService.log.error("Error on channel", th);
            synchronized (this.responseObserverLock) {
                this.responseObserver.onError(th);
            }
        }

        public void onCompleted() {
            this.inflightRequests.accumulateAndGet(0, (i, i2) -> {
                int i = i + i2;
                if (i > 0) {
                    synchronized (this.inflightRequestsLock) {
                        while (this.inflightRequests.get() > 0) {
                            try {
                                this.inflightRequestsLock.wait();
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }
                synchronized (this.responseObserverLock) {
                    this.responseObserver.onCompleted();
                }
                return i;
            });
        }
    }

    public IngestService(Repository repository, DirectDataOperator directDataOperator, ScheduledExecutorService scheduledExecutorService) {
        this.repo = repository;
        this.direct = directDataOperator;
        this.scheduler = scheduledExecutorService;
    }

    private void processSingleIngest(Rpc.Ingest ingest, Consumer<Rpc.Status> consumer) {
        if (log.isDebugEnabled()) {
            log.debug("Processing input ingest {}", TextFormat.shortDebugString(ingest));
        }
        Consumer<Rpc.Status> consumer2 = status -> {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = TextFormat.shortDebugString(ingest);
            objArr[1] = Integer.valueOf(status.getStatus());
            objArr[2] = status.getStatus() == 200 ? "OK" : status.getStatusMessage();
            logger.info("Input ingest {}: {}, {}", objArr);
            consumer.accept(status);
        };
        Metrics.INGESTS.increment();
        try {
            if (!writeRequest(ingest, consumer2)) {
                Metrics.INVALID_REQUEST.increment();
            }
        } catch (Exception e) {
            log.error("Error processing user request {}", ingest, e);
            consumer2.accept(IngestServer.status(ingest.getUuid(), 500, e.getMessage()));
        }
    }

    private boolean writeRequest(Rpc.Ingest ingest, Consumer<Rpc.Status> consumer) {
        if (Strings.isNullOrEmpty(ingest.getKey()) || Strings.isNullOrEmpty(ingest.getEntity()) || Strings.isNullOrEmpty(ingest.getAttribute())) {
            consumer.accept(IngestServer.status(ingest.getUuid(), 400, "Missing required fields in input message"));
            return false;
        }
        Optional findEntity = this.repo.findEntity(ingest.getEntity());
        if (!findEntity.isPresent()) {
            consumer.accept(IngestServer.notFound(ingest.getUuid(), "Entity " + ingest.getEntity() + " not found"));
            return false;
        }
        Optional findAttribute = ((EntityDescriptor) findEntity.get()).findAttribute(ingest.getAttribute());
        if (findAttribute.isPresent()) {
            return IngestServer.ingestRequest(this.direct, toStreamElement(ingest, (EntityDescriptor) findEntity.get(), (AttributeDescriptor) findAttribute.get()), ingest.getUuid(), consumer);
        }
        consumer.accept(IngestServer.notFound(ingest.getUuid(), "Attribute " + ingest.getAttribute() + " of entity " + ((EntityDescriptor) findEntity.get()).getName() + " not found"));
        return false;
    }

    public void ingest(Rpc.Ingest ingest, StreamObserver<Rpc.Status> streamObserver) {
        Metrics.INGEST_SINGLE.increment();
        processSingleIngest(ingest, status -> {
            streamObserver.onNext(status);
            streamObserver.onCompleted();
        });
    }

    public StreamObserver<Rpc.Ingest> ingestSingle(StreamObserver<Rpc.Status> streamObserver) {
        return new IngestObserver(streamObserver);
    }

    public StreamObserver<Rpc.IngestBulk> ingestBulk(StreamObserver<Rpc.StatusBulk> streamObserver) {
        return new IngestBulkObserver(streamObserver);
    }

    private static StreamElement toStreamElement(Rpc.Ingest ingest, EntityDescriptor entityDescriptor, AttributeDescriptor<?> attributeDescriptor) {
        long currentTimeMillis = ingest.getStamp() == 0 ? System.currentTimeMillis() : ingest.getStamp();
        return ingest.getDelete() ? (attributeDescriptor.isWildcard() && attributeDescriptor.getName().equals(ingest.getAttribute())) ? StreamElement.deleteWildcard(entityDescriptor, attributeDescriptor, ingest.getUuid(), ingest.getKey(), currentTimeMillis) : StreamElement.delete(entityDescriptor, attributeDescriptor, ingest.getUuid(), ingest.getKey(), ingest.getAttribute(), currentTimeMillis) : StreamElement.upsert(entityDescriptor, attributeDescriptor, ingest.getUuid(), ingest.getKey(), ingest.getAttribute(), currentTimeMillis, ingest.getValue().toByteArray());
    }
}
