package co.cask.cdap.data.stream.service;

import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.stream.StreamPropertyListener;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.twill.common.Cancellable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/service/BasicStreamWriterSizeCollector.class */
public class BasicStreamWriterSizeCollector extends AbstractIdleService implements StreamWriterSizeCollector {
    private static final Logger LOG = LoggerFactory.getLogger(BasicStreamWriterSizeCollector.class);
    private final StreamCoordinatorClient streamCoordinatorClient;
    private final ConcurrentMap<String, AtomicLong> streamSizes = Maps.newConcurrentMap();
    private final List<Cancellable> truncationSubscriptions = Lists.newArrayList();

    @Inject
    public BasicStreamWriterSizeCollector(StreamCoordinatorClient streamCoordinatorClient) {
        this.streamCoordinatorClient = streamCoordinatorClient;
    }

    protected void startUp() throws Exception {
    }

    protected void shutDown() throws Exception {
        Iterator<Cancellable> it = this.truncationSubscriptions.iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
    }

    @Override // co.cask.cdap.data.stream.service.StreamWriterSizeCollector
    public long getTotalCollected(String str) {
        AtomicLong atomicLong = this.streamSizes.get(str);
        if (atomicLong != null) {
            return atomicLong.get();
        }
        return 0L;
    }

    @Override // co.cask.cdap.data.stream.service.StreamWriterSizeCollector
    public synchronized void received(String str, long j) {
        AtomicLong atomicLong = this.streamSizes.get(str);
        if (atomicLong == null) {
            atomicLong = this.streamSizes.putIfAbsent(str, new AtomicLong(j));
            if (atomicLong == null) {
                this.truncationSubscriptions.add(this.streamCoordinatorClient.addListener(str, new StreamPropertyListener() { // from class: co.cask.cdap.data.stream.service.BasicStreamWriterSizeCollector.1
                    @Override // co.cask.cdap.data.stream.StreamPropertyListener
                    public void generationChanged(String str2, int i) {
                        BasicStreamWriterSizeCollector.this.streamSizes.put(str2, new AtomicLong(0L));
                    }
                }));
            }
        }
        if (atomicLong != null) {
            atomicLong.addAndGet(j);
        }
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = str;
        objArr[1] = Long.valueOf(j);
        objArr[2] = Long.valueOf(atomicLong == null ? j : atomicLong.get());
        logger.trace("Received data for stream {}: {}B. Total size is now {}", objArr);
    }
}
