package org.yamcs.http;

import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.yamcs.NotThreadSafe;
import org.yamcs.api.HttpBody;
import org.yamcs.api.MediaType;
import org.yamcs.api.Observer;
import org.yamcs.logging.Log;

@NotThreadSafe
/* loaded from: input_file:org/yamcs/http/ServerStreamingObserver.class */
public class ServerStreamingObserver implements Observer<Message> {
    private static final int CHUNK_SIZE = 8096;
    private static final Log log = new Log(ServerStreamingObserver.class);
    private RouteContext ctx;
    private MediaType mediaType;
    private ByteBuf buf;
    protected ByteBufOutputStream bufOut;
    private int messageCount = 0;
    private boolean cancelled;
    private boolean completed;
    private Runnable cancelHandler;

    public ServerStreamingObserver(RouteContext routeContext) {
        this.ctx = routeContext;
    }

    public void next(Message message) {
        if (this.completed) {
            throw new IllegalStateException("Observer already completed");
        }
        if (this.messageCount == 0) {
            initializeHttpResponse(message);
        }
        try {
            if (message instanceof HttpBody) {
                HttpBody httpBody = (HttpBody) message;
                if (httpBody.hasData()) {
                    httpBody.getData().writeTo(this.bufOut);
                }
            } else if (MediaType.PROTOBUF.equals(this.mediaType)) {
                message.writeDelimitedTo(this.bufOut);
            } else {
                this.bufOut.write(this.ctx.printJson(message).getBytes(StandardCharsets.UTF_8));
            }
            if (this.buf.readableBytes() >= CHUNK_SIZE) {
                this.bufOut.close();
                this.ctx.addTransferredSize(this.buf.readableBytes());
                writeChunk(this.buf);
                resetBuffer();
            }
        } catch (ClosedChannelException e) {
            cancelCall("closed channel");
        } catch (IOException e2) {
            cancelCall(e2.getMessage());
            throw new UncheckedIOException(e2);
        }
        this.messageCount++;
    }

    private void cancelCall(String str) {
        if (this.cancelled) {
            return;
        }
        log.info("Cancelling call ({})", str);
        this.cancelled = true;
        if (this.cancelHandler != null) {
            this.cancelHandler.run();
        }
    }

    private void initializeHttpResponse(Message message) {
        resetBuffer();
        String str = null;
        if (message == null || !(message instanceof HttpBody)) {
            this.mediaType = RouteContext.deriveTargetContentType(this.ctx.nettyRequest);
        } else {
            HttpBody httpBody = (HttpBody) message;
            this.mediaType = MediaType.from(httpBody.getContentType());
            if (httpBody.hasFilename()) {
                str = httpBody.getFilename();
            }
        }
        startChunkedTransfer(this.mediaType, str);
        this.ctx.reportStatusCode(200);
    }

    private void resetBuffer() {
        this.buf = this.ctx.nettyContext.alloc().buffer();
        this.bufOut = new ByteBufOutputStream(this.buf);
    }

    public void completeExceptionally(Throwable th) {
        if (this.completed) {
            throw new IllegalStateException("Observer already completed");
        }
        this.completed = true;
        Channel channel = this.ctx.nettyContext.channel();
        if (channel.isOpen()) {
            log.warn("Closing channel because transfer failed");
            channel.close();
        }
    }

    public void complete() {
        if (this.completed) {
            throw new IllegalStateException("Observer already completed");
        }
        this.completed = true;
        if (this.cancelled) {
            this.ctx.requestFuture.complete(null);
            return;
        }
        if (this.messageCount == 0) {
            initializeHttpResponse(null);
        } else {
            try {
                this.bufOut.close();
                if (this.buf.readableBytes() > 0) {
                    this.ctx.addTransferredSize(this.buf.readableBytes());
                    writeChunk(this.buf);
                }
            } catch (IOException e) {
                log.error("Could not write final chunk of data", e);
            }
        }
        this.ctx.nettyContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE).addListener(future -> {
            if (future.isSuccess()) {
                this.ctx.requestFuture.complete(null);
            } else {
                this.ctx.requestFuture.completeExceptionally(future.cause());
            }
        });
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public void setCancelHandler(Runnable runnable) {
        this.cancelHandler = runnable;
    }

    private void startChunkedTransfer(MediaType mediaType, String str) {
        log.info("{}: {} {} 200 starting chunked transfer", this.ctx, this.ctx.nettyRequest.method(), this.ctx.nettyRequest.uri());
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        defaultHttpResponse.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
        defaultHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, mediaType);
        if (str != null) {
            defaultHttpResponse.headers().set("Content-Disposition", "attachment; filename=\"" + str + "\"");
        }
        this.ctx.nettyContext.writeAndFlush(defaultHttpResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    private void writeChunk(ByteBuf byteBuf) throws IOException {
        Channel channel = this.ctx.nettyContext.channel();
        if (!channel.isOpen()) {
            throw new ClosedChannelException();
        }
        ChannelFuture writeAndFlush = this.ctx.nettyContext.writeAndFlush(new DefaultHttpContent(byteBuf));
        try {
            if (channel.isWritable() || writeAndFlush.await(10L, TimeUnit.SECONDS)) {
            } else {
                throw new IOException("Channel did not become writable in 10 seconds");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
