package cz.o2.proxima.server;

import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.proto.service.Rpc;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.server.metrics.Metrics;
import cz.o2.proxima.storage.StreamElement;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.File;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/server/IngestServer.class */
public class IngestServer {
    private static final Logger log = LoggerFactory.getLogger(IngestServer.class);
    static final int CORES = Math.max(2, Runtime.getRuntime().availableProcessors());
    final Executor executor;
    final ScheduledExecutorService scheduler;
    final Repository repo;
    final DirectDataOperator direct;
    final Config cfg;
    final boolean ignoreErrors;
    RetryPolicy retryPolicy;

    public static void main(String[] strArr) throws Throwable {
        (strArr.length == 0 ? new IngestServer(ConfigFactory.load().resolve()) : new IngestServer(ConfigFactory.parseFile(new File(strArr[0])).resolve())).run();
    }

    protected IngestServer(Config config) {
        this(config, false);
    }

    @VisibleForTesting
    IngestServer(Config config, boolean z) {
        this.executor = new ThreadPoolExecutor(CORES, 10 * CORES, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue(10 * CORES));
        this.scheduler = new ScheduledThreadPoolExecutor(5);
        this.retryPolicy = new RetryPolicy().withMaxRetries(3).withBackoff(3000L, 20000L, TimeUnit.MILLISECONDS, 2.0d);
        this.cfg = config;
        this.repo = z ? Repository.ofTest(config, new Repository.Validate[0]) : Repository.of(config);
        this.direct = this.repo.getOrCreateOperator(DirectDataOperator.class);
        if (log.isDebugEnabled()) {
            this.repo.getAllEntities().forEach(entityDescriptor -> {
                entityDescriptor.getAllAttributes(true).forEach(attributeDescriptor -> {
                    log.debug("Configured attribute {}", attributeDescriptor);
                });
            });
        }
        if (this.repo.isEmpty()) {
            throw new IllegalArgumentException("No valid entities found in provided config!");
        }
        this.ignoreErrors = config.hasPath(Constants.CFG_IGNORE_ERRORS) && config.getBoolean(Constants.CFG_IGNORE_ERRORS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean ingestRequest(DirectDataOperator directDataOperator, StreamElement streamElement, String str, Consumer<Rpc.Status> consumer) {
        EntityDescriptor entityDescriptor = streamElement.getEntityDescriptor();
        AttributeDescriptor attributeDescriptor = streamElement.getAttributeDescriptor();
        OnlineAttributeWriter onlineAttributeWriter = (OnlineAttributeWriter) directDataOperator.getWriter(attributeDescriptor).orElseThrow(() -> {
            return new IllegalStateException("Writer for attribute " + attributeDescriptor.getName() + " not found");
        });
        if (onlineAttributeWriter == null) {
            log.warn("Missing writer for request {}", streamElement);
            consumer.accept(status(str, 503, "No writer for attribute " + attributeDescriptor.getName()));
            return false;
        }
        if (!(streamElement.isDelete() || attributeDescriptor.getValueSerializer().isValid(streamElement.getValue()))) {
            log.info("Request {} is not valid", streamElement);
            consumer.accept(status(str, 412, "Invalid scheme for " + entityDescriptor.getName() + "." + attributeDescriptor.getName()));
            return false;
        }
        if (!streamElement.isDelete()) {
            Metrics.UPDATE_REQUESTS.increment();
        } else if (streamElement.isDeleteWildcard()) {
            Metrics.DELETE_WILDCARD_REQUESTS.increment();
        } else {
            Metrics.DELETE_REQUESTS.increment();
        }
        Metrics.COMMIT_LOG_APPEND.increment();
        log.debug("Writing {} to commit log {}", streamElement, onlineAttributeWriter.getUri());
        onlineAttributeWriter.write(streamElement, (z, th) -> {
            if (z) {
                consumer.accept(ok(str));
            } else {
                log.warn("Failed to write {}", streamElement, th);
                consumer.accept(status(str, 500, th.toString()));
            }
        });
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Rpc.Status notFound(String str, String str2) {
        return Rpc.Status.newBuilder().setUuid(str).setStatus(404).setStatusMessage(str2).build();
    }

    static Rpc.Status ok(String str) {
        return Rpc.Status.newBuilder().setStatus(200).setUuid(str).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Rpc.Status status(String str, int i, String str2) {
        return Rpc.Status.newBuilder().setUuid(str).setStatus(i).setStatusMessage(str2).build();
    }

    private void run() {
        Server build = ServerBuilder.forPort(this.cfg.hasPath(Constants.CFG_PORT) ? this.cfg.getInt(Constants.CFG_PORT) : Constants.DEFALT_PORT).executor(this.executor).addService(new IngestService(this.repo, this.direct, this.scheduler)).addService(new RetrieveService(this.repo, this.direct)).build();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Gracefully shutting server down.");
            build.shutdown();
        }));
        try {
            build.start();
            log.info("Successfully started server 0.0.0.0:{}", Integer.valueOf(build.getPort()));
            Metrics.LIVENESS.increment(1.0d);
            build.awaitTermination();
            log.info("Server shutdown.");
        } catch (Exception e) {
            Utils.die("Failed to start the server", e);
        }
        Metrics.LIVENESS.reset();
    }

    @VisibleForTesting
    void runReplications() {
        ReplicationController.of(this.repo).runReplicationThreads().whenComplete((r3, th) -> {
            if (th != null) {
                Utils.die(th.getMessage(), th);
            }
        });
    }

    public static int getCORES() {
        return CORES;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    public Repository getRepo() {
        return this.repo;
    }

    public DirectDataOperator getDirect() {
        return this.direct;
    }

    public Config getCfg() {
        return this.cfg;
    }

    public boolean isIgnoreErrors() {
        return this.ignoreErrors;
    }

    public RetryPolicy getRetryPolicy() {
        return this.retryPolicy;
    }
}
