package de.huxhorn.lilith.consumers;

import de.huxhorn.lilith.data.eventsource.EventWrapper;
import de.huxhorn.lilith.engine.EventConsumer;
import de.huxhorn.sulky.io.TimeoutOutputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/huxhorn/lilith/consumers/SocketEventConsumer.class */
public class SocketEventConsumer<T extends Serializable> implements EventConsumer<T>, Runnable {
    final Logger logger;
    private static final int DEFAULT_RECONNECTION_DELAY = 60000;
    private static final int DEFAULT_CONNECTION_TIMEOUT = 10000;
    private static final int DEFAULT_WRITE_TIMEOUT = 1500;
    private ObjectOutputStream output;
    private long failTime;
    private String host;
    private int port;
    private int connectionTimeout;
    private int writeTimeout;
    private long reconnectionDelay;

    public SocketEventConsumer(String str, int i) {
        this();
        this.host = str;
        this.port = i;
    }

    public SocketEventConsumer() {
        this.logger = LoggerFactory.getLogger(SocketEventConsumer.class);
        this.output = null;
        this.failTime = 0L;
        this.connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
        this.reconnectionDelay = 60000L;
        this.writeTimeout = DEFAULT_WRITE_TIMEOUT;
    }

    public String getHost() {
        return this.host;
    }

    public void setHost(String str) {
        this.host = str;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public int getWriteTimeout() {
        return this.writeTimeout;
    }

    public void setWriteTimeout(int i) {
        this.writeTimeout = i;
    }

    public int getConnectionTimeout() {
        return this.connectionTimeout;
    }

    public void setConnectionTimeout(int i) {
        this.connectionTimeout = i;
    }

    public long getReconnectionDelay() {
        return this.reconnectionDelay;
    }

    public void setReconnectionDelay(long j) {
        this.reconnectionDelay = j;
    }

    public void consume(List<EventWrapper<T>> list) {
        if (list == null || list.size() == 0) {
            return;
        }
        if (this.output == null) {
            int size = list.size();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Dropping {} events.", Integer.valueOf(size));
                return;
            }
            return;
        }
        try {
            for (EventWrapper<T> eventWrapper : list) {
                if (eventWrapper.getEvent() != null) {
                    this.output.writeObject(eventWrapper.getEvent());
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Wrote event.");
                    }
                } else if (this.logger.isInfoEnabled()) {
                    this.logger.info("Detected end of stream for source {}.", eventWrapper.getSourceIdentifier());
                }
            }
            this.output.flush();
        } catch (IOException e) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Exception while writing event.", e);
            }
            IOUtils.closeQuietly(this.output);
            this.output = null;
            this.failTime = System.currentTimeMillis();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            if (this.output == null) {
                initObjectOutputStream();
            }
            try {
                Thread.sleep(this.reconnectionDelay);
            } catch (InterruptedException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Interrupted...", e);
                    return;
                }
                return;
            }
        }
    }

    private void initObjectOutputStream() {
        if (this.output != null) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.failTime >= this.reconnectionDelay) {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(this.host, this.port);
            try {
                Socket socket = new Socket();
                socket.connect(inetSocketAddress, this.connectionTimeout);
                socket.setSoTimeout(this.connectionTimeout);
                this.output = new ObjectOutputStream(new BufferedOutputStream(new TimeoutOutputStream(socket.getOutputStream(), this.writeTimeout)));
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Created connection to {}.", inetSocketAddress);
                }
            } catch (IOException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Exception while creating connection to " + inetSocketAddress + ".", e);
                }
                IOUtils.closeQuietly(this.output);
                this.output = null;
                this.failTime = currentTimeMillis;
            }
        }
    }
}
