package com.graphaware.writer;

import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.Service;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.neo4j.graphdb.GraphDatabaseService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/graphaware/writer/SingleThreadedWriter.class */
public abstract class SingleThreadedWriter extends AbstractScheduledService implements DatabaseWriter {
    private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedWriter.class);
    private static final int LOGGING_INTERVAL_MS = 5000;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    private final int queueCapacity;
    protected final LinkedBlockingQueue<RunnableFuture<?>> queue;
    protected final GraphDatabaseService database;
    private final ScheduledExecutorService queueSizeLogger;

    /* JADX INFO: Access modifiers changed from: protected */
    public SingleThreadedWriter(GraphDatabaseService graphDatabaseService) {
        this(graphDatabaseService, DEFAULT_QUEUE_CAPACITY);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SingleThreadedWriter(GraphDatabaseService graphDatabaseService, int i) {
        this.queueSizeLogger = Executors.newSingleThreadScheduledExecutor();
        this.database = graphDatabaseService;
        this.queueCapacity = i;
        this.queue = new LinkedBlockingQueue<>(i);
    }

    @PostConstruct
    public void start() {
        startAsync();
        awaitRunning();
        this.queueSizeLogger.scheduleWithFixedDelay(new Runnable() { // from class: com.graphaware.writer.SingleThreadedWriter.1
            @Override // java.lang.Runnable
            public void run() {
                if (SingleThreadedWriter.this.queue.size() > 0 || SingleThreadedWriter.this.logEmptyQueue()) {
                    SingleThreadedWriter.LOG.info("Queue size: " + SingleThreadedWriter.this.queue.size());
                }
            }
        }, 5L, loggingFrequencyMs(), TimeUnit.SECONDS);
    }

    @PreDestroy
    public void stop() {
        this.queueSizeLogger.shutdownNow();
        stopAsync();
        awaitTerminated();
    }

    protected void shutDown() throws Exception {
        runOneIteration();
    }

    public void write(Runnable runnable) {
        write(runnable, "UNKNOWN");
    }

    public void write(Runnable runnable, String str) {
        write(Executors.callable(runnable), str, 0);
    }

    public <T> T write(Callable<T> callable, String str, int i) {
        if (!state().equals(Service.State.NEW) && !state().equals(Service.State.STARTING) && !state().equals(Service.State.RUNNING)) {
            throw new IllegalStateException("Database writer is not running!");
        }
        RunnableFuture<T> createTask = createTask(callable);
        if (!offer(createTask)) {
            LOG.warn("Could not write task " + str + " to queue as it is too full. We're losing writes now.");
            return null;
        }
        if (i <= 0) {
            return null;
        }
        return (T) block(createTask, str, i);
    }

    protected boolean offer(RunnableFuture<?> runnableFuture) {
        return this.queue.offer(runnableFuture);
    }

    protected abstract <T> RunnableFuture<T> createTask(Callable<T> callable);

    protected final <T> T block(RunnableFuture<T> runnableFuture, String str, int i) {
        try {
            return runnableFuture.get(i, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.warn("Waiting for execution of a task was interrupted. ID: " + str, e);
            return null;
        } catch (ExecutionException e2) {
            LOG.warn("Execution of a task threw an exception. ID: " + str, e2);
            Throwable cause = e2.getCause();
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            throw new RuntimeException(cause);
        } catch (TimeoutException e3) {
            LOG.warn("Task didn't get executed within " + i + "ms. ID: " + str);
            return null;
        }
    }

    protected boolean logEmptyQueue() {
        return false;
    }

    protected long loggingFrequencyMs() {
        return 5000L;
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0L, 5L, TimeUnit.MILLISECONDS);
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass() && this.queueCapacity == ((SingleThreadedWriter) obj).queueCapacity;
    }

    public int hashCode() {
        return this.queueCapacity;
    }
}
