package de.huxhorn.lilith.engine.impl.eventproducer;

import de.huxhorn.lilith.data.eventsource.EventWrapper;
import de.huxhorn.lilith.data.eventsource.SourceIdentifier;
import de.huxhorn.sulky.buffers.AppendOperation;
import de.huxhorn.sulky.io.IOUtilities;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/huxhorn/lilith/engine/impl/eventproducer/AbstractStreamEventProducer.class */
public abstract class AbstractStreamEventProducer<T extends Serializable> extends AbstractEventProducer<T> {
    final Logger logger;
    private ObjectInputStream dataInput;

    /* loaded from: input_file:de/huxhorn/lilith/engine/impl/eventproducer/AbstractStreamEventProducer$ReceiverRunnable.class */
    private class ReceiverRunnable implements Runnable {
        private ReceiverRunnable() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Object readObject = AbstractStreamEventProducer.this.dataInput.readObject();
                    Serializable postProcessEvent = AbstractStreamEventProducer.this.postProcessEvent(readObject);
                    if (readObject != null) {
                        AbstractStreamEventProducer.this.addEvent(postProcessEvent);
                    } else if (AbstractStreamEventProducer.this.logger.isInfoEnabled()) {
                        AbstractStreamEventProducer.this.logger.info("Retrieved null!");
                    }
                } catch (IOException e) {
                    if (AbstractStreamEventProducer.this.logger.isDebugEnabled()) {
                        AbstractStreamEventProducer.this.logger.debug("Exception ({}: '{}') while reading events. Adding eventWrapper with empty event and stopping...", new Object[]{e.getClass().getName(), e.getMessage(), e});
                    }
                    AbstractStreamEventProducer.this.addEvent(null);
                    IOUtilities.interruptIfNecessary(e);
                    return;
                } catch (Throwable th) {
                    if (AbstractStreamEventProducer.this.logger.isWarnEnabled()) {
                        AbstractStreamEventProducer.this.logger.warn("Exception ({}: '{}') while reading events. Adding eventWrapper with empty event and stopping...", new Object[]{th.getClass().getName(), th.getMessage(), th});
                    }
                    AbstractStreamEventProducer.this.addEvent(null);
                    IOUtilities.interruptIfNecessary(th);
                    return;
                }
            }
        }
    }

    public AbstractStreamEventProducer(SourceIdentifier sourceIdentifier, AppendOperation<EventWrapper<T>> appendOperation, SourceIdentifierUpdater<T> sourceIdentifierUpdater, InputStream inputStream) throws IOException {
        super(sourceIdentifier, appendOperation, sourceIdentifierUpdater);
        this.logger = LoggerFactory.getLogger(AbstractStreamEventProducer.class);
        this.dataInput = new WhitelistObjectInputStream(new BufferedInputStream(inputStream), SerializableWhitelist.WHITELIST, false);
    }

    @Override // de.huxhorn.lilith.engine.EventProducer
    public void start() {
        Thread thread = new Thread(new ReceiverRunnable(), "" + getSourceIdentifier() + "-Receiver");
        thread.setDaemon(false);
        thread.start();
    }

    protected abstract T postProcessEvent(Object obj);

    @Override // de.huxhorn.lilith.engine.EventProducer
    public void close() {
        IOUtilities.closeQuietly(this.dataInput);
    }
}
