package net.trajano.ms.engine.internal;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/trajano/ms/engine/internal/VertxBlockingInputStream.class */
public class VertxBlockingInputStream extends InputStream {
    private static final Buffer END_BUFFER = (Buffer) Symbol.newSymbol(Buffer.class);
    private static final Buffer END_BUFFER_WITH_ERROR = (Buffer) Symbol.newSymbol(Buffer.class);
    private static final Logger LOG = LoggerFactory.getLogger(VertxBlockingInputStream.class);
    private Buffer currentBuffer;
    private int pos;
    private int availableBytes = 0;
    private long bytesRead = 0;
    private boolean closed = false;
    private IOException exceptionToThrow = null;
    private final BlockingQueue<Buffer> queue = new LinkedBlockingQueue();

    public VertxBlockingInputStream() {
    }

    public VertxBlockingInputStream(ReadStream<Buffer> readStream) {
        readStream.handler(this::populate).endHandler(r3 -> {
            end();
        });
    }

    @Override // java.io.InputStream
    public int available() throws IOException {
        return this.availableBytes;
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closed = true;
    }

    public void end() {
        this.queue.add(END_BUFFER);
    }

    public void error(Throwable th) {
        this.exceptionToThrow = new IOException(th);
        this.queue.add(END_BUFFER_WITH_ERROR);
    }

    @Override // java.io.InputStream
    public boolean markSupported() {
        return false;
    }

    public void populate(Buffer buffer) {
        this.queue.add(buffer);
        this.availableBytes += buffer.length();
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed");
        }
        if (this.currentBuffer == null) {
            try {
                this.currentBuffer = this.queue.take();
                this.pos = 0;
            } catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for next buffer", e);
                Thread.currentThread().interrupt();
            }
        }
        if (this.currentBuffer == null) {
            throw new IOException("Obtained a null buffer from the queue");
        }
        if (this.currentBuffer == END_BUFFER_WITH_ERROR) {
            throw this.exceptionToThrow;
        }
        if (this.currentBuffer == END_BUFFER) {
            return -1;
        }
        Buffer buffer = this.currentBuffer;
        int i = this.pos;
        this.pos = i + 1;
        int i2 = buffer.getByte(i) & 255;
        this.availableBytes--;
        this.bytesRead++;
        if (this.pos == this.currentBuffer.length()) {
            this.currentBuffer = null;
        }
        return i2;
    }

    @Override // java.io.InputStream
    public synchronized void reset() throws IOException {
        throw new IOException("reset not supported");
    }

    public long totalBytesRead() {
        return this.bytesRead;
    }
}
