package org.yamcs.yarch;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.yamcs.logging.Log;

/* loaded from: input_file:org/yamcs/yarch/Stream.class */
public abstract class Stream {
    public static final int SETUP = 0;
    public static final int RUNNING = 1;
    public static final int QUITTING = 2;
    protected String name;
    protected TupleDefinition outputDefinition;
    protected Log log;
    protected YarchDatabaseInstance ydb;
    private ExceptionHandler handler;
    protected final Collection<StreamSubscriber> subscribers = new ConcurrentLinkedQueue();
    protected AtomicInteger state = new AtomicInteger(0);
    private volatile AtomicLong dataCount = new AtomicLong();
    private volatile AtomicInteger subscriberCount = new AtomicInteger();

    /* loaded from: input_file:org/yamcs/yarch/Stream$ExceptionHandler.class */
    public interface ExceptionHandler {
        void handle(Tuple tuple, StreamSubscriber streamSubscriber, Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Stream(YarchDatabaseInstance yarchDatabaseInstance, String str, TupleDefinition tupleDefinition) {
        this.name = str;
        this.outputDefinition = tupleDefinition;
        this.ydb = yarchDatabaseInstance;
        this.log = new Log(getClass(), yarchDatabaseInstance.getName());
        this.log.setContext(str);
    }

    public abstract void doStart();

    public TupleDefinition getDefinition() {
        return this.outputDefinition;
    }

    public void emitTuple(Tuple tuple) {
        this.dataCount.incrementAndGet();
        for (StreamSubscriber streamSubscriber : this.subscribers) {
            try {
                streamSubscriber.onTuple(this, tuple);
            } catch (Exception e) {
                if (this.handler == null) {
                    this.log.warn("Exception received when emitting tuple to subscriber " + streamSubscriber, e);
                    throw e;
                }
                this.handler.handle(tuple, streamSubscriber, e);
            }
        }
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public void addSubscriber(StreamSubscriber streamSubscriber) {
        this.subscribers.add(streamSubscriber);
        this.subscriberCount.incrementAndGet();
    }

    public void removeSubscriber(StreamSubscriber streamSubscriber) {
        this.subscribers.remove(streamSubscriber);
        this.subscriberCount.decrementAndGet();
    }

    public ColumnDefinition getColumnDefinition(String str) {
        return this.outputDefinition.getColumn(str);
    }

    public final void start() {
        if (this.state.compareAndSet(0, 1)) {
            doStart();
        }
    }

    protected boolean isRunning() {
        return this.state.get() == 1;
    }

    protected boolean quitting() {
        return this.state.get() == 2;
    }

    public final void close() {
        if (this.state.getAndSet(2) == 2) {
            return;
        }
        this.ydb.removeStream(this.name);
        this.log.debug("Closed stream {} num emitted tuples: {}", this.name, Long.valueOf(getDataCount()));
        doClose();
        Iterator<StreamSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            it.next().streamClosed(this);
        }
    }

    protected abstract void doClose();

    public int getState() {
        return this.state.get();
    }

    public boolean isClosed() {
        return this.state.get() == 2;
    }

    public long getDataCount() {
        return this.dataCount.get();
    }

    public int getSubscriberCount() {
        return this.subscriberCount.get();
    }

    public Collection<StreamSubscriber> getSubscribers() {
        return Collections.unmodifiableCollection(this.subscribers);
    }

    public void exceptionHandler(ExceptionHandler exceptionHandler) {
        this.handler = exceptionHandler;
    }

    public String toString() {
        return this.name;
    }
}
