package org.onlab.nio;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.onlab.nio.Message;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/onlab/nio/MessageStream.class */
public abstract class MessageStream<M extends Message> {
    private final IOLoop<M, ?> loop;
    private final ByteChannel channel;
    private final int maxIdleMillis;
    private final ByteBuffer inbound;
    private ByteBuffer outbound;
    private SelectionKey key;
    private volatile boolean writePending;
    private volatile boolean writeOccurred;
    private Exception ioError;
    private long lastActiveTime;
    protected Logger log = LoggerFactory.getLogger(getClass());
    private volatile boolean closed = false;
    private final Counter bytesIn = new Counter();
    private final Counter messagesIn = new Counter();
    private final Counter bytesOut = new Counter();
    private final Counter messagesOut = new Counter();

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageStream(IOLoop<M, ?> iOLoop, ByteChannel byteChannel, int i, int i2) {
        this.loop = (IOLoop) Preconditions.checkNotNull(iOLoop, "Loop cannot be null");
        this.channel = (ByteChannel) Preconditions.checkNotNull(byteChannel, "Byte channel cannot be null");
        Preconditions.checkArgument(i2 > 0, "Idle time must be positive");
        this.maxIdleMillis = i2;
        this.inbound = ByteBuffer.allocateDirect(i);
        this.outbound = ByteBuffer.allocateDirect(i);
    }

    protected abstract M read(ByteBuffer byteBuffer);

    protected abstract void write(M m, ByteBuffer byteBuffer);

    public void close() {
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.bytesIn.freeze();
            this.bytesOut.freeze();
            this.messagesIn.freeze();
            this.messagesOut.freeze();
            this.loop.removeStream(this);
            if (this.key != null) {
                try {
                    this.key.cancel();
                    this.key.channel().close();
                } catch (IOException e) {
                    this.log.warn("Unable to close stream", e);
                }
            }
        }
    }

    public synchronized boolean isClosed() {
        return this.closed;
    }

    public SelectionKey key() {
        return this.key;
    }

    public void setKey(SelectionKey selectionKey) {
        this.key = selectionKey;
        this.lastActiveTime = System.currentTimeMillis();
    }

    public IOLoop<M, ?> loop() {
        return this.loop;
    }

    public boolean hadError() {
        return this.ioError != null;
    }

    public Exception getError() {
        return this.ioError;
    }

    public List<M> read() throws IOException {
        try {
            if (this.channel.read(this.inbound) == -1) {
                return null;
            }
            ArrayList arrayList = new ArrayList();
            this.inbound.flip();
            while (true) {
                M read = read(this.inbound);
                if (read == null) {
                    this.inbound.compact();
                    this.lastActiveTime = System.currentTimeMillis();
                    return arrayList;
                }
                arrayList.add(read);
                this.messagesIn.add(1L);
                this.bytesIn.add(read.length());
            }
        } catch (Exception e) {
            throw new IOException("Unable to read messages", e);
        }
    }

    public void write(List<M> list) throws IOException {
        synchronized (this) {
            Iterator<M> it = list.iterator();
            while (it.hasNext()) {
                append(it.next());
            }
            flushUnlessAlreadyPlanningTo();
        }
    }

    public void write(M m) throws IOException {
        synchronized (this) {
            append(m);
            flushUnlessAlreadyPlanningTo();
        }
    }

    private void append(M m) {
        while (this.outbound.remaining() < m.length()) {
            doubleSize();
        }
        write(m, this.outbound);
        this.messagesOut.add(1L);
        this.bytesOut.add(m.length());
    }

    private void flushUnlessAlreadyPlanningTo() throws IOException {
        if (this.writeOccurred || this.writePending) {
            return;
        }
        flush();
    }

    public void flush() throws IOException {
        synchronized (this) {
            if (!this.writeOccurred && !this.writePending) {
                this.outbound.flip();
                try {
                    this.channel.write(this.outbound);
                } catch (IOException e) {
                    if (!this.closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
                        this.log.warn("Unable to write data", e);
                        this.ioError = e;
                    }
                }
                this.lastActiveTime = System.currentTimeMillis();
                this.writeOccurred = true;
                this.writePending = this.outbound.hasRemaining();
                this.outbound.compact();
            }
        }
    }

    boolean isWritePending() {
        boolean z;
        synchronized (this) {
            z = this.writePending;
        }
        return z;
    }

    boolean isFlushRequired() {
        boolean z;
        synchronized (this) {
            z = this.outbound.position() > 0;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flushIfPossible() throws IOException {
        synchronized (this) {
            this.writePending = false;
            this.writeOccurred = false;
            if (this.outbound.position() > 0) {
                flush();
            }
        }
        this.key.interestOps(1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flushIfWriteNotPending() throws IOException {
        synchronized (this) {
            this.writeOccurred = false;
            if (!this.writePending && this.outbound.position() > 0) {
                flush();
            }
        }
        if (isWritePending()) {
            this.key.interestOps(this.key.interestOps() | 4);
        }
    }

    private void doubleSize() {
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(this.outbound.capacity() * 2);
        this.outbound.flip();
        allocateDirect.put(this.outbound);
        this.outbound = allocateDirect;
    }

    protected int maxIdleMillis() {
        return this.maxIdleMillis;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isStale() {
        return System.currentTimeMillis() - this.lastActiveTime > ((long) maxIdleMillis()) && this.key != null;
    }

    public Counter bytesIn() {
        return this.bytesIn;
    }

    public Counter bytesOut() {
        return this.bytesOut;
    }

    public Counter messagesIn() {
        return this.messagesIn;
    }

    public Counter messagesOut() {
        return this.messagesOut;
    }
}
