package de.huxhorn.lilith.engine.impl.eventproducer;

import de.huxhorn.lilith.data.eventsource.EventWrapper;
import de.huxhorn.lilith.data.eventsource.SourceIdentifier;
import de.huxhorn.sulky.buffers.AppendOperation;
import de.huxhorn.sulky.codec.Decoder;
import de.huxhorn.sulky.io.IOUtilities;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:de/huxhorn/lilith/engine/impl/eventproducer/AbstractMessageBasedEventProducer.class */
public abstract class AbstractMessageBasedEventProducer<T extends Serializable> extends AbstractEventProducer<T> {
    private final Logger logger;
    private final DataInputStream dataInput;
    private Decoder<T> decoder;
    private boolean compressing;
    private final AtomicLong heartbeatTimestamp;

    /* loaded from: input_file:de/huxhorn/lilith/engine/impl/eventproducer/AbstractMessageBasedEventProducer$HeartbeatObserverRunnable.class */
    private class HeartbeatObserverRunnable implements Runnable {
        private HeartbeatObserverRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            do {
                try {
                    Thread.sleep(45000L);
                } catch (InterruptedException e) {
                    if (AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) {
                        AbstractMessageBasedEventProducer.this.logger.info("Interrupted...", e);
                    }
                    AbstractMessageBasedEventProducer.this.close();
                    return;
                }
            } while (System.currentTimeMillis() - AbstractMessageBasedEventProducer.this.getHeartbeatTimestamp() <= 90000);
            if (AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) {
                AbstractMessageBasedEventProducer.this.logger.info("Closing receiver because heartbeat of {} was missing.", AbstractMessageBasedEventProducer.this.getSourceIdentifier());
            }
            AbstractMessageBasedEventProducer.this.close();
        }
    }

    /* loaded from: input_file:de/huxhorn/lilith/engine/impl/eventproducer/AbstractMessageBasedEventProducer$ReceiverRunnable.class */
    private class ReceiverRunnable implements Runnable {
        private SourceIdentifier sourceIdentifier;
        private static final String SOURCE_IDENTIFIER_MDC_KEY = "sourceIdentifier";

        public ReceiverRunnable(SourceIdentifier sourceIdentifier) {
            this.sourceIdentifier = sourceIdentifier;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            MDC.put(SOURCE_IDENTIFIER_MDC_KEY, this.sourceIdentifier.toString());
            while (true) {
                int i = 0;
                try {
                    try {
                        i = AbstractMessageBasedEventProducer.this.dataInput.readInt();
                        AbstractMessageBasedEventProducer.this.updateHeartbeatTimestamp();
                        if (i > 0) {
                            byte[] bArr = new byte[i];
                            AbstractMessageBasedEventProducer.this.dataInput.readFully(bArr);
                            Serializable serializable = (Serializable) AbstractMessageBasedEventProducer.this.decoder.decode(bArr);
                            if (serializable != null) {
                                AbstractMessageBasedEventProducer.this.addEvent(serializable);
                            } else if (AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) {
                                AbstractMessageBasedEventProducer.this.logger.info("Retrieved null!");
                            }
                        } else if (AbstractMessageBasedEventProducer.this.logger.isDebugEnabled()) {
                            AbstractMessageBasedEventProducer.this.logger.debug("Received heartbeat from {}.", AbstractMessageBasedEventProducer.this.getSourceIdentifier());
                        }
                    } catch (OutOfMemoryError e) {
                        if (1 != 0) {
                            if (AbstractMessageBasedEventProducer.this.logger.isWarnEnabled()) {
                                AbstractMessageBasedEventProducer.this.logger.warn("Out of memory while trying to allocate {} bytes! Skipping them instead...", Integer.valueOf(i));
                            }
                            skipBytes(i, AbstractMessageBasedEventProducer.this.dataInput);
                        } else if (AbstractMessageBasedEventProducer.this.logger.isWarnEnabled()) {
                            AbstractMessageBasedEventProducer.this.logger.warn("Out of memory while deserializing from {} bytes!", Integer.valueOf(i));
                        }
                    }
                } catch (Throwable th) {
                    if (AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) {
                        AbstractMessageBasedEventProducer.this.logger.info("Exception ({}: '{}') while reading events. Adding eventWrapper with empty event and stopping...", th.getClass().getName(), th.getMessage());
                    }
                    AbstractMessageBasedEventProducer.this.addEvent(null);
                    IOUtilities.interruptIfNecessary(th);
                    MDC.remove(SOURCE_IDENTIFIER_MDC_KEY);
                    return;
                }
            }
        }

        public void skipBytes(long j, InputStream inputStream) throws IOException {
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 >= j) {
                    return;
                }
                long skip = inputStream.skip(j - j3);
                if (skip < 0) {
                    throw new IOException("Negative skipped bytes value while trying to skip " + j + " bytes!");
                }
                j2 = j3 + skip;
            }
        }
    }

    public AbstractMessageBasedEventProducer(SourceIdentifier sourceIdentifier, AppendOperation<EventWrapper<T>> appendOperation, InputStream inputStream, boolean z) {
        super(sourceIdentifier, appendOperation);
        this.logger = LoggerFactory.getLogger(AbstractMessageBasedEventProducer.class);
        this.dataInput = new DataInputStream(new BufferedInputStream(inputStream));
        this.compressing = z;
        this.decoder = createDecoder();
        this.heartbeatTimestamp = new AtomicLong();
    }

    protected abstract Decoder<T> createDecoder();

    @Override // de.huxhorn.lilith.engine.EventProducer
    public void start() {
        updateHeartbeatTimestamp();
        Thread thread = new Thread(new ReceiverRunnable(getSourceIdentifier()), "" + getSourceIdentifier() + "-Receiver");
        thread.setDaemon(false);
        thread.start();
        Thread thread2 = new Thread(new HeartbeatObserverRunnable(), "" + getSourceIdentifier() + "-HeartbeatObserver");
        thread2.setDaemon(false);
        thread2.start();
    }

    protected void updateHeartbeatTimestamp() {
        this.heartbeatTimestamp.set(System.currentTimeMillis());
    }

    protected long getHeartbeatTimestamp() {
        return this.heartbeatTimestamp.get();
    }

    public boolean isCompressing() {
        return this.compressing;
    }

    @Override // de.huxhorn.lilith.engine.EventProducer
    public void close() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Closing {}.", getClass().getName());
        }
        IOUtilities.closeQuietly(this.dataInput);
    }
}
