package stream.net;

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.net.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.annotations.Parameter;
import stream.io.DataObjectStream;
import stream.io.SourceURL;
import stream.util.parser.TimeParser;

/* loaded from: input_file:stream/net/BufferedDataObjectStream.class */
public class BufferedDataObjectStream extends DataObjectStream {
    private final Logger log;
    protected int bufferSize;
    protected int connectionRetries;
    protected long reconnectInterval;
    protected String urlCache;
    protected boolean connected;

    public BufferedDataObjectStream(InputStream inputStream) {
        super(inputStream);
        this.log = LoggerFactory.getLogger(BufferedDataObjectStream.class);
    }

    public BufferedDataObjectStream(SourceURL sourceURL) {
        super(sourceURL);
        this.log = LoggerFactory.getLogger(BufferedDataObjectStream.class);
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    @Parameter(required = false, defaultValue = "0", description = "The buffer size (in bytes) used by this stream. If set to <= 0: The BufferedInputStream's default buffer size is taken (usually 8192)")
    public void setBufferSize(int i) {
        this.bufferSize = i;
    }

    public int getConnectionRetries() {
        return this.connectionRetries;
    }

    @Parameter(required = false, defaultValue = "Integer.MAX_VALUE", description = "the maximum amount of connection retries before the general connection attempt finally fails")
    public void setConnectionRetries(int i) {
        this.connectionRetries = i;
    }

    public long getReconnectInterval() {
        return this.reconnectInterval;
    }

    @Parameter(required = false, defaultValue = "5s", description = "the time between two connection attempts if the first connection attempt was unsuccessful")
    public void setReconnectInterval(String str) throws Exception {
        this.reconnectInterval = TimeParser.parseTime(str).longValue();
    }

    @Override // stream.io.DataObjectStream
    public void init() throws Exception {
        this.input = null;
        this.in = null;
        this.bufferSize = 0;
        this.connectionRetries = Integer.MAX_VALUE;
        this.reconnectInterval = 5000L;
        this.urlCache = null;
        this.connected = false;
    }

    protected void connect() throws ConnectException {
        close();
        boolean z = false;
        int i = 0;
        while (!z && i <= this.connectionRetries) {
            try {
                this.log.info("Trying to open connection to {} ...", getConnectionString());
                if (i > 0) {
                    this.log.info("Connection retry counter: {}", Integer.valueOf(i));
                }
                InputStream inputStream = getInputStream();
                this.input = new ObjectInputStream(this.bufferSize > 0 ? new BufferedInputStream(inputStream, this.bufferSize) : new BufferedInputStream(inputStream));
                z = true;
            } catch (Exception e) {
                this.log.warn("Unable to connect to {}: {}", getConnectionString(), e.toString());
                close();
                try {
                    Thread.sleep(this.reconnectInterval);
                } catch (InterruptedException e2) {
                }
                i++;
            }
        }
        if (z) {
            this.log.info("Successfully connected to {}", getConnectionString());
        } else {
            this.log.error("Giving up connection attempt after {} retries. Connection to {} unavailable.", Integer.valueOf(i - 1), getConnectionString());
            close();
            throw new ConnectException();
        }
    }

    @Override // stream.io.DataObjectStream
    public Data readNext() throws ConnectException {
        if (!this.connected) {
            connect();
            this.connected = true;
        }
        while (true) {
            try {
                return (Data) this.input.readObject();
            } catch (Exception e) {
                this.log.error("Exception while reading data from socket stream {}: {}", getConnectionString(), e.toString());
                this.log.info("Trying to restart connection to {} ...", getConnectionString());
                connect();
            }
        }
    }

    protected String getConnectionString() {
        if (this.urlCache == null) {
            this.urlCache = this.url.getProtocol() + "://" + this.url.getHost() + ":" + this.url.getPort();
        }
        return this.urlCache;
    }

    @Override // stream.io.DataObjectStream
    public void close() {
        if (this.input == null && this.in == null) {
            return;
        }
        this.log.info("Closing connection to {} ...", getConnectionString());
        try {
            if (this.input != null) {
                this.input.close();
            }
            if (this.in != null) {
                this.in.close();
            }
        } catch (Exception e) {
            this.log.warn("Exception while closing connection to {}: {}", getConnectionString(), e.toString());
        }
        this.input = null;
        this.in = null;
        this.log.info("Connection to {} closed.", getConnectionString());
    }
}
