package org.yamcs.http.websocket;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.yamcs.Processor;
import org.yamcs.ProcessorException;
import org.yamcs.http.api.TableApi;
import org.yamcs.protobuf.StreamSubscriptionRequest;
import org.yamcs.protobuf.Table;
import org.yamcs.protobuf.Yamcs;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;

/* loaded from: input_file:org/yamcs/http/websocket/StreamResource.class */
public class StreamResource implements StreamSubscriber, WebSocketResource {
    private ConnectedWebSocketClient client;
    private Set<Stream> subscribedStreams = new HashSet();
    private String yamcsInstance;

    public StreamResource(ConnectedWebSocketClient connectedWebSocketClient) {
        this.client = connectedWebSocketClient;
        Processor processor = connectedWebSocketClient.getProcessor();
        if (processor != null) {
            this.yamcsInstance = processor.getInstance();
        }
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public String getName() {
        return "stream";
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public WebSocketReply subscribe(WebSocketDecodeContext webSocketDecodeContext, WebSocketDecoder webSocketDecoder) throws WebSocketException {
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(this.yamcsInstance);
        StreamSubscriptionRequest build = webSocketDecoder.decodeMessageData(webSocketDecodeContext, StreamSubscriptionRequest.newBuilder()).build();
        if (!build.hasStream()) {
            throw new WebSocketException(webSocketDecodeContext.getRequestId(), "No stream was provided");
        }
        Stream stream = yarchDatabase.getStream(build.getStream());
        if (!this.subscribedStreams.contains(stream)) {
            stream.addSubscriber(this);
            this.subscribedStreams.add(stream);
        }
        return WebSocketReply.ack(webSocketDecodeContext.getRequestId());
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void onTuple(Stream stream, Tuple tuple) {
        this.client.sendData(Yamcs.ProtoDataType.STREAM_DATA, Table.StreamData.newBuilder().setStream(stream.getName()).addAllColumn(TableApi.toColumnDataList(tuple)).build());
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void streamClosed(Stream stream) {
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public WebSocketReply unsubscribe(WebSocketDecodeContext webSocketDecodeContext, WebSocketDecoder webSocketDecoder) throws WebSocketException {
        unsubscribeAll();
        return WebSocketReply.ack(webSocketDecodeContext.getRequestId());
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public void selectProcessor(Processor processor) throws ProcessorException {
        this.yamcsInstance = processor.getInstance();
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public void unselectProcessor() {
        this.yamcsInstance = null;
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public void socketClosed() {
        unsubscribeAll();
    }

    private void unsubscribeAll() {
        Iterator<Stream> it = this.subscribedStreams.iterator();
        while (it.hasNext()) {
            it.next().removeSubscriber(this);
        }
        this.subscribedStreams.clear();
    }
}
