package org.yamcs.http.websocket;

import java.util.concurrent.atomic.AtomicBoolean;
import org.yamcs.Processor;
import org.yamcs.ProcessorException;
import org.yamcs.archive.EventRecorder;
import org.yamcs.protobuf.Yamcs;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;

/* loaded from: input_file:org/yamcs/http/websocket/EventResource.class */
public class EventResource implements WebSocketResource {
    private ConnectedWebSocketClient client;
    private Processor processor;
    private Stream stream;
    private StreamSubscriber streamSubscriber;
    private AtomicBoolean subscribed = new AtomicBoolean(false);

    public EventResource(ConnectedWebSocketClient connectedWebSocketClient) {
        this.client = connectedWebSocketClient;
        this.processor = connectedWebSocketClient.getProcessor();
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public String getName() {
        return EventRecorder.TABLE_NAME;
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public WebSocketReply subscribe(WebSocketDecodeContext webSocketDecodeContext, WebSocketDecoder webSocketDecoder) throws WebSocketException {
        if (!this.subscribed.getAndSet(true)) {
            doSubscribe();
        }
        return WebSocketReply.ack(webSocketDecodeContext.getRequestId());
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public WebSocketReply unsubscribe(WebSocketDecodeContext webSocketDecodeContext, WebSocketDecoder webSocketDecoder) throws WebSocketException {
        doUnsubscribe();
        return WebSocketReply.ack(webSocketDecodeContext.getRequestId());
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public void unselectProcessor() {
        this.processor = null;
        doUnsubscribe();
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public void selectProcessor(Processor processor) throws ProcessorException {
        this.processor = processor;
        if (this.subscribed.get()) {
            doSubscribe();
        }
    }

    private void doSubscribe() {
        if (this.processor == null) {
            return;
        }
        this.stream = YarchDatabase.getInstance(this.processor.getInstance()).getStream(EventRecorder.REALTIME_EVENT_STREAM_NAME);
        if (this.stream == null) {
            return;
        }
        this.streamSubscriber = new StreamSubscriber() { // from class: org.yamcs.http.websocket.EventResource.1
            @Override // org.yamcs.yarch.StreamSubscriber
            public void onTuple(Stream stream, Tuple tuple) {
                Yamcs.Event event = (Yamcs.Event) tuple.getColumn("body");
                EventResource.this.client.sendData(Yamcs.ProtoDataType.EVENT, Yamcs.Event.newBuilder(event).setGenerationTimeUTC(TimeEncoding.toString(event.getGenerationTime())).setReceptionTimeUTC(TimeEncoding.toString(event.getReceptionTime())).build());
            }

            @Override // org.yamcs.yarch.StreamSubscriber
            public void streamClosed(Stream stream) {
            }
        };
        this.stream.addSubscriber(this.streamSubscriber);
    }

    private void doUnsubscribe() {
        if (this.streamSubscriber != null) {
            this.stream.removeSubscriber(this.streamSubscriber);
        }
        this.streamSubscriber = null;
        this.stream = null;
        this.subscribed.set(false);
    }

    @Override // org.yamcs.http.websocket.WebSocketResource
    public void socketClosed() {
        doUnsubscribe();
    }
}
