package com.tokera.ate.io.core;

import com.google.common.collect.ImmutableSet;
import com.tokera.ate.common.LoggerHook;
import com.tokera.ate.dao.TopicAndPartition;
import com.tokera.ate.delegates.AteDelegate;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;

/* loaded from: input_file:com/tokera/ate/io/core/DataPartitionDaemon.class */
public abstract class DataPartitionDaemon implements Runnable {

    @Inject
    protected LoggerHook LOG;
    private Thread thread;
    protected AteDelegate d = AteDelegate.get();
    private Object threadLock = new Object();
    private Set<TopicAndPartition> partitions = new HashSet();
    private Object partitionsLock = new Object();
    private volatile int shouldRun = 1;
    protected AtomicBoolean isRunning = new AtomicBoolean(false);

    public void addPartition(TopicAndPartition topicAndPartition) {
        synchronized (this.partitionsLock) {
            if (this.partitions.add(topicAndPartition)) {
                start();
            }
        }
    }

    public void removePartition(TopicAndPartition topicAndPartition) {
        synchronized (this.partitionsLock) {
            if (this.partitions.remove(topicAndPartition) && !this.partitions.isEmpty()) {
                start();
            }
        }
    }

    public Set<TopicAndPartition> listPartitions() {
        ImmutableSet copyOf;
        synchronized (this.partitionsLock) {
            copyOf = ImmutableSet.copyOf(this.partitions);
        }
        return copyOf;
    }

    protected void start() {
        synchronized (this.threadLock) {
            this.shouldRun = 1;
            if (this.isRunning.compareAndSet(false, true)) {
                this.thread = new Thread(this);
                this.thread.setDaemon(true);
                this.thread.start();
            }
        }
    }

    protected void stop() {
        synchronized (this.threadLock) {
            this.shouldRun = 0;
            if (this.thread != null) {
                this.thread.interrupt();
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                    this.LOG.warn(e);
                }
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 5;
        while (this.shouldRun == 1) {
            try {
                try {
                    work();
                    i = 5;
                } catch (Throwable th) {
                    this.LOG.warn(th);
                    if (th instanceof InterruptedException) {
                        break;
                    }
                    try {
                        Thread.sleep(i);
                        i *= 2;
                        if (i > 5000) {
                            i = 5000;
                        }
                    } catch (InterruptedException e) {
                        this.LOG.warn(th);
                    }
                }
            } finally {
                this.isRunning.set(false);
            }
        }
    }

    protected abstract void work() throws InterruptedException;
}
