package org.yamcs.web.websocket;

import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yamcs.Processor;
import org.yamcs.ProcessorException;
import org.yamcs.protobuf.Yamcs;
import org.yamcs.tctm.TmDataLinkInitialiser;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;

/* loaded from: input_file:org/yamcs/web/websocket/PacketResource.class */
public class PacketResource extends AbstractWebSocketResource {
    private static final Logger log = LoggerFactory.getLogger(PacketResource.class);
    public static final String RESOURCE_NAME = "packets";
    public static final String OP_subscribe = "subscribe";
    public static final String OP_unsubscribe = "unsubscribe";
    private String streamName;
    private Stream stream;
    private StreamSubscriber streamSubscriber;

    public PacketResource(WebSocketProcessorClient webSocketProcessorClient) {
        super(webSocketProcessorClient);
    }

    @Override // org.yamcs.web.websocket.AbstractWebSocketResource
    public WebSocketReply processRequest(WebSocketDecodeContext webSocketDecodeContext, WebSocketDecoder webSocketDecoder) throws WebSocketException {
        String operation = webSocketDecodeContext.getOperation();
        if ("unsubscribe".equals(operation)) {
            return unsubscribe(webSocketDecodeContext.getRequestId());
        }
        if (!operation.startsWith("subscribe")) {
            throw new WebSocketException(webSocketDecodeContext.getRequestId(), "Unsupported operation '" + webSocketDecodeContext.getOperation() + "'");
        }
        if (this.streamSubscriber != null) {
            throw new WebSocketException(webSocketDecodeContext.getRequestId(), "Already subscribed to a stream");
        }
        String[] split = operation.split("\\s+");
        if (split.length != 2) {
            throw new WebSocketException(webSocketDecodeContext.getRequestId(), "Invalid request. Use 'subscribe <stream_name>'");
        }
        this.streamName = split[1];
        this.stream = YarchDatabase.getInstance(this.processor.getInstance()).getStream(this.streamName);
        if (this.stream == null) {
            throw new WebSocketException(webSocketDecodeContext.getRequestId(), "Invalid request. No stream named '" + this.streamName + "'");
        }
        return subscribe(webSocketDecodeContext.getRequestId());
    }

    private WebSocketReply subscribe(int i) throws WebSocketException {
        doUnsubscribe();
        doSubscribe();
        return WebSocketReply.ack(i);
    }

    @Override // org.yamcs.web.websocket.AbstractWebSocketResource
    public void switchProcessor(Processor processor, Processor processor2) throws ProcessorException {
        doUnsubscribe();
        super.switchProcessor(processor, processor2);
        this.stream = YarchDatabase.getInstance(this.processor.getInstance()).getStream(this.streamName);
        doSubscribe();
    }

    private WebSocketReply unsubscribe(int i) throws WebSocketException {
        doUnsubscribe();
        return WebSocketReply.ack(i);
    }

    @Override // org.yamcs.web.websocket.AbstractWebSocketResource
    public void quit() {
        doUnsubscribe();
    }

    private void doSubscribe() {
        if (this.stream != null) {
            this.streamSubscriber = new StreamSubscriber() { // from class: org.yamcs.web.websocket.PacketResource.1
                @Override // org.yamcs.yarch.StreamSubscriber
                public void onTuple(Stream stream, Tuple tuple) {
                    try {
                        byte[] bArr = (byte[]) tuple.getColumn(TmDataLinkInitialiser.PACKET_COLUMN);
                        long longValue = ((Long) tuple.getColumn("gentime")).longValue();
                        long longValue2 = ((Long) tuple.getColumn("rectime")).longValue();
                        PacketResource.this.wsHandler.sendData(Yamcs.ProtoDataType.TM_PACKET, Yamcs.TmPacketData.newBuilder().setPacket(ByteString.copyFrom(bArr)).setGenerationTime(longValue).setReceptionTime(longValue2).setSequenceNumber(((Integer) tuple.getColumn("seqNum")).intValue()).build());
                    } catch (Exception e) {
                        PacketResource.log.warn("got error when sending event, quitting", e);
                        PacketResource.this.quit();
                    }
                }

                @Override // org.yamcs.yarch.StreamSubscriber
                public void streamClosed(Stream stream) {
                }
            };
            this.stream.addSubscriber(this.streamSubscriber);
        }
    }

    private void doUnsubscribe() {
        if (this.streamSubscriber != null) {
            this.stream.removeSubscriber(this.streamSubscriber);
        }
        this.streamSubscriber = null;
    }
}
