package com.spotify.helios.agent;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.common.descriptors.TaskStatusEvent;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/helios/agent/QueueingHistoryWriter.class */
public class QueueingHistoryWriter extends AbstractIdleService implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(QueueingHistoryWriter.class);
    public static final int MAX_NUMBER_STATUS_EVENTS_TO_RETAIN = 30;
    private static final int MAX_QUEUE_SIZE = 30;
    private static final int MAX_TOTAL_SIZE = 600;
    private final ConcurrentMap<JobId, Deque<TaskStatusEvent>> items;
    private final ScheduledExecutorService zkWriterExecutor = MoreExecutors.getExitingScheduledExecutorService((ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1), 0, TimeUnit.SECONDS);
    private final String hostname;
    private final AtomicInteger count;
    private final ZooKeeperClient client;
    private final PersistentAtomicReference<ConcurrentMap<JobId, Deque<TaskStatusEvent>>> backingStore;

    /* JADX WARN: Multi-variable type inference failed */
    public QueueingHistoryWriter(String str, ZooKeeperClient zooKeeperClient, Path path) throws IOException, InterruptedException {
        this.hostname = str;
        this.client = zooKeeperClient;
        this.backingStore = PersistentAtomicReference.create(path, new TypeReference<ConcurrentMap<JobId, Deque<TaskStatusEvent>>>() { // from class: com.spotify.helios.agent.QueueingHistoryWriter.1
        }, new Supplier<ConcurrentMap<JobId, Deque<TaskStatusEvent>>>() { // from class: com.spotify.helios.agent.QueueingHistoryWriter.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public ConcurrentMap<JobId, Deque<TaskStatusEvent>> get() {
                return Maps.newConcurrentMap();
            }
        });
        this.items = this.backingStore.get();
        UnmodifiableIterator it = ImmutableSet.copyOf((Collection) this.items.keySet()).iterator();
        while (it.hasNext()) {
            JobId jobId = (JobId) it.next();
            if (this.items.get(jobId) == null) {
                this.items.remove(jobId);
            }
        }
        int i = 0;
        Iterator<Deque<TaskStatusEvent>> it2 = this.items.values().iterator();
        while (it2.hasNext()) {
            i += it2.next().size();
        }
        this.count = new AtomicInteger(i);
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        this.zkWriterExecutor.scheduleAtFixedRate(this, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws Exception {
        this.zkWriterExecutor.shutdownNow();
        this.zkWriterExecutor.awaitTermination(1L, TimeUnit.MINUTES);
    }

    private void add(TaskStatusEvent taskStatusEvent) throws InterruptedException {
        while (this.count.get() >= MAX_TOTAL_SIZE) {
            getNext();
        }
        Deque<TaskStatusEvent> deque = getDeque(taskStatusEvent.getStatus().getJob().getId());
        synchronized (deque) {
            while (deque.size() >= 30) {
                deque.remove();
                this.count.decrementAndGet();
            }
            deque.add(taskStatusEvent);
            this.count.incrementAndGet();
        }
        try {
            this.backingStore.set(this.items);
        } catch (ClosedByInterruptException e) {
            log.debug("Writing task status event to backing store was interrupted");
        } catch (IOException e2) {
            log.warn("Failed to write task status event to backing store", (Throwable) e2);
        }
    }

    private Deque<TaskStatusEvent> getDeque(JobId jobId) {
        synchronized (this.items) {
            Deque<TaskStatusEvent> deque = this.items.get(jobId);
            if (deque != null) {
                return deque;
            }
            ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
            this.items.put(jobId, concurrentLinkedDeque);
            return concurrentLinkedDeque;
        }
    }

    public void saveHistoryItem(JobId jobId, TaskStatus taskStatus) throws InterruptedException {
        saveHistoryItem(jobId, taskStatus, System.currentTimeMillis());
    }

    public void saveHistoryItem(JobId jobId, TaskStatus taskStatus, long j) throws InterruptedException {
        add(new TaskStatusEvent(taskStatus, j, this.hostname));
    }

    /* JADX WARN: Code restructure failed: missing block: B:17:0x0046, code lost:
    
        r0 = r0.poll();
        r3.count.decrementAndGet();
        com.google.common.base.Preconditions.checkState(r0.equals(r0), "current should equal newCurrent");
        r0 = r3.items;
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x006b, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x006c, code lost:
    
        r0 = r3.items.get(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x007d, code lost:
    
        if (r0 == null) goto L23;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x0087, code lost:
    
        if (r0.isEmpty() == false) goto L23;
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x008a, code lost:
    
        r3.items.remove(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x0097, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x00a7, code lost:
    
        return r0;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private com.spotify.helios.common.descriptors.TaskStatusEvent getNext() {
        /*
            r3 = this;
        L0:
            r0 = r3
            com.spotify.helios.common.descriptors.TaskStatusEvent r0 = r0.findEldestEvent()
            r4 = r0
            r0 = r4
            if (r0 != 0) goto Lb
            r0 = 0
            return r0
        Lb:
            r0 = r4
            com.spotify.helios.common.descriptors.TaskStatus r0 = r0.getStatus()
            com.spotify.helios.common.descriptors.Job r0 = r0.getJob()
            com.spotify.helios.common.descriptors.JobId r0 = r0.getId()
            r5 = r0
            r0 = r3
            java.util.concurrent.ConcurrentMap<com.spotify.helios.common.descriptors.JobId, java.util.Deque<com.spotify.helios.common.descriptors.TaskStatusEvent>> r0 = r0.items
            r1 = r5
            java.lang.Object r0 = r0.get(r1)
            java.util.Deque r0 = (java.util.Deque) r0
            r6 = r0
            r0 = r6
            if (r0 != 0) goto L2b
            goto L0
        L2b:
            r0 = r6
            r1 = r0
            r7 = r1
            monitor-enter(r0)
            r0 = r6
            java.lang.Object r0 = r0.peek()     // Catch: java.lang.Throwable -> La8
            com.spotify.helios.common.descriptors.TaskStatusEvent r0 = (com.spotify.helios.common.descriptors.TaskStatusEvent) r0     // Catch: java.lang.Throwable -> La8
            r1 = r4
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> La8
            if (r0 != 0) goto L46
            r0 = r7
            monitor-exit(r0)     // Catch: java.lang.Throwable -> La8
            goto L0
        L46:
            r0 = r6
            java.lang.Object r0 = r0.poll()     // Catch: java.lang.Throwable -> La8
            com.spotify.helios.common.descriptors.TaskStatusEvent r0 = (com.spotify.helios.common.descriptors.TaskStatusEvent) r0     // Catch: java.lang.Throwable -> La8
            r8 = r0
            r0 = r3
            java.util.concurrent.atomic.AtomicInteger r0 = r0.count     // Catch: java.lang.Throwable -> La8
            int r0 = r0.decrementAndGet()     // Catch: java.lang.Throwable -> La8
            r0 = r4
            r1 = r8
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> La8
            java.lang.String r1 = "current should equal newCurrent"
            com.google.common.base.Preconditions.checkState(r0, r1)     // Catch: java.lang.Throwable -> La8
            r0 = r3
            java.util.concurrent.ConcurrentMap<com.spotify.helios.common.descriptors.JobId, java.util.Deque<com.spotify.helios.common.descriptors.TaskStatusEvent>> r0 = r0.items     // Catch: java.lang.Throwable -> La8
            r1 = r0
            r9 = r1
            monitor-enter(r0)     // Catch: java.lang.Throwable -> La8
            r0 = r3
            java.util.concurrent.ConcurrentMap<com.spotify.helios.common.descriptors.JobId, java.util.Deque<com.spotify.helios.common.descriptors.TaskStatusEvent>> r0 = r0.items     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
            r1 = r5
            java.lang.Object r0 = r0.get(r1)     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
            java.util.Deque r0 = (java.util.Deque) r0     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
            r10 = r0
            r0 = r10
            if (r0 == 0) goto L95
            r0 = r10
            boolean r0 = r0.isEmpty()     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
            if (r0 == 0) goto L95
            r0 = r3
            java.util.concurrent.ConcurrentMap<com.spotify.helios.common.descriptors.JobId, java.util.Deque<com.spotify.helios.common.descriptors.TaskStatusEvent>> r0 = r0.items     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
            r1 = r5
            java.lang.Object r0 = r0.remove(r1)     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
        L95:
            r0 = r9
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
            goto La3
        L9b:
            r11 = move-exception
            r0 = r9
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L9b java.lang.Throwable -> La8
            r0 = r11
            throw r0     // Catch: java.lang.Throwable -> La8
        La3:
            r0 = r4
            r1 = r7
            monitor-exit(r1)     // Catch: java.lang.Throwable -> La8
            return r0
        La8:
            r12 = move-exception
            r0 = r7
            monitor-exit(r0)     // Catch: java.lang.Throwable -> La8
            r0 = r12
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: com.spotify.helios.agent.QueueingHistoryWriter.getNext():com.spotify.helios.common.descriptors.TaskStatusEvent");
    }

    public boolean isEmpty() {
        return this.count.get() == 0;
    }

    private void putBack(TaskStatusEvent taskStatusEvent) {
        Deque<TaskStatusEvent> deque = getDeque(taskStatusEvent.getStatus().getJob().getId());
        synchronized (deque) {
            if (deque.size() >= 30) {
                return;
            }
            deque.push(taskStatusEvent);
            this.count.incrementAndGet();
        }
    }

    private TaskStatusEvent findEldestEvent() {
        TaskStatusEvent taskStatusEvent = null;
        for (Deque<TaskStatusEvent> deque : this.items.values()) {
            if (deque != null) {
                TaskStatusEvent peek = deque.peek();
                if (taskStatusEvent == null || peek.getTimestamp() < taskStatusEvent.getTimestamp()) {
                    taskStatusEvent = peek;
                }
            }
        }
        return taskStatusEvent;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            TaskStatusEvent next = getNext();
            if (next == null) {
                return;
            }
            try {
                JobId id = next.getStatus().getJob().getId();
                String historyJobHostEventsTimestamp = Paths.historyJobHostEventsTimestamp(id, this.hostname, next.getTimestamp());
                log.debug("writing queued item to zookeeper {} {}", next.getStatus().getJob().getId(), Long.valueOf(next.getTimestamp()));
                this.client.ensurePath(historyJobHostEventsTimestamp, true);
                this.client.createAndSetData(historyJobHostEventsTimestamp, next.getStatus().toJsonBytes());
                List<String> children = this.client.getChildren(Paths.historyJobHostEvents(id, this.hostname));
                if (children.size() > 30) {
                    trimStatusEvents(children, id);
                }
            } catch (KeeperException.ConnectionLossException e) {
                log.warn("Connection lost while putting item into zookeeper, will retry");
                putBack(next);
                return;
            } catch (KeeperException.NodeExistsException e2) {
                log.debug("item we wanted in is already there");
            } catch (KeeperException e3) {
                log.error("Error putting item into zookeeper, will retry", (Throwable) e3);
                putBack(next);
                return;
            }
        }
    }

    private void trimStatusEvents(List<String> list, JobId jobId) {
        ArrayList newArrayList = Lists.newArrayList(Iterables.transform(list, new Function<String, Long>() { // from class: com.spotify.helios.agent.QueueingHistoryWriter.3
            @Override // com.google.common.base.Function
            public Long apply(String str) {
                return Long.valueOf(str);
            }
        }));
        Collections.sort(newArrayList);
        for (int i = 0; i < newArrayList.size() - 30; i++) {
            try {
                this.client.delete(Paths.historyJobHostEventsTimestamp(jobId, this.hostname, ((Long) newArrayList.get(i)).longValue()));
            } catch (KeeperException e) {
                log.warn("failure deleting overflow of status items - we're hoping a later execution will fix", (Throwable) e);
            }
        }
    }
}
