package de.huxhorn.lilith.engine.impl.sourcemanager;

import de.huxhorn.lilith.data.eventsource.EventWrapper;
import de.huxhorn.lilith.data.eventsource.SourceIdentifier;
import de.huxhorn.lilith.engine.EventHandler;
import de.huxhorn.lilith.engine.EventProducer;
import de.huxhorn.lilith.engine.EventSource;
import de.huxhorn.lilith.engine.EventSourceListener;
import de.huxhorn.lilith.engine.EventSourceProducer;
import de.huxhorn.lilith.engine.SourceManager;
import de.huxhorn.sulky.buffers.BlockingCircularBuffer;
import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/huxhorn/lilith/engine/impl/sourcemanager/SourceManagerImpl.class */
public class SourceManagerImpl<T extends Serializable> implements SourceManager<T> {
    private final Logger logger = LoggerFactory.getLogger(SourceManagerImpl.class);
    private BlockingCircularBuffer<EventWrapper<T>> queue;
    private Set<EventSourceListener<T>> listeners;
    private PropertyChangeSupport changeSupport;
    private List<EventSource<T>> sources;
    private Set<EventProducer<T>> eventProducers;
    private EventPoller<T> eventPoller;
    private static final String NUMBER_OF_SOURCES = "numberOfSources";
    private List<EventSourceProducer<T>> eventSourceProducers;

    public SourceManagerImpl(BlockingCircularBuffer<EventWrapper<T>> blockingCircularBuffer) {
        this.queue = blockingCircularBuffer;
        this.eventPoller = new EventPoller<>(blockingCircularBuffer);
        this.eventPoller.setPollDelay(100);
        this.eventProducers = new HashSet();
        this.eventSourceProducers = new ArrayList();
        this.listeners = new HashSet();
        this.changeSupport = new PropertyChangeSupport(this);
        this.sources = new ArrayList();
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void addSource(EventSource<T> eventSource) {
        int size = this.sources.size();
        if (this.sources.contains(eventSource)) {
            return;
        }
        this.sources.add(eventSource);
        this.changeSupport.firePropertyChange(NUMBER_OF_SOURCES, size, this.sources.size());
        fireAddSource(eventSource);
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Added source {}.", eventSource);
        }
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void removeSource(SourceIdentifier sourceIdentifier) {
        int size = this.sources.size();
        ArrayList<EventSource<T>> arrayList = new ArrayList();
        for (EventSource<T> eventSource : this.sources) {
            if (sourceIdentifier.equals(eventSource.getSourceIdentifier())) {
                arrayList.add(eventSource);
            }
        }
        this.sources.removeAll(arrayList);
        this.changeSupport.firePropertyChange(NUMBER_OF_SOURCES, size, this.sources.size());
        for (EventSource<T> eventSource2 : arrayList) {
            fireRemoveSource(eventSource2);
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Removed source {}.", eventSource2);
            }
        }
    }

    private void fireAddSource(EventSource<T> eventSource) {
        Iterator<EventSourceListener<T>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().eventSourceAdded(eventSource);
        }
    }

    private void fireRemoveSource(EventSource<T> eventSource) {
        Iterator<EventSourceListener<T>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().eventSourceRemoved(eventSource);
        }
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public List<EventSource<T>> getSources() {
        return new ArrayList(this.sources);
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public int getNumberOfSources() {
        return this.sources.size();
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void addEventSourceProducer(EventSourceProducer<T> eventSourceProducer) {
        eventSourceProducer.setQueue(this.queue);
        eventSourceProducer.setSourceManager(this);
        this.eventSourceProducers.add(eventSourceProducer);
    }

    private EventProducer<T> findProducer(SourceIdentifier sourceIdentifier) {
        if (sourceIdentifier == null) {
            return null;
        }
        for (EventProducer<T> eventProducer : this.eventProducers) {
            if (sourceIdentifier.equals(eventProducer.getSourceIdentifier())) {
                return eventProducer;
            }
        }
        return null;
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void addEventProducer(EventProducer<T> eventProducer) {
        EventProducer<T> findProducer = findProducer(eventProducer.getSourceIdentifier());
        if (findProducer != null) {
            findProducer.close();
            this.eventProducers.remove(findProducer);
        }
        this.eventProducers.add(eventProducer);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Started {}.", eventProducer);
        }
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void removeEventProducer(SourceIdentifier sourceIdentifier) {
        EventProducer<T> findProducer = findProducer(sourceIdentifier);
        if (findProducer != null) {
            findProducer.close();
            this.eventProducers.remove(findProducer);
        }
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void setEventHandlers(List<EventHandler<T>> list) {
        this.eventPoller.setEventHandlers(list);
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public List<EventHandler<T>> getEventHandlers() {
        return this.eventPoller.getEventHandlers();
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void addEventSourceListener(EventSourceListener<T> eventSourceListener) {
        if (this.listeners.contains(eventSourceListener)) {
            return;
        }
        this.listeners.add(eventSourceListener);
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void removeEventSourceListener(EventSourceListener<T> eventSourceListener) {
        if (this.listeners.contains(eventSourceListener)) {
            this.listeners.remove(eventSourceListener);
        }
    }

    public void addPropertyChangeListener(PropertyChangeListener propertyChangeListener) {
        this.changeSupport.addPropertyChangeListener(propertyChangeListener);
    }

    public void removePropertyChangeListener(PropertyChangeListener propertyChangeListener) {
        this.changeSupport.removePropertyChangeListener(propertyChangeListener);
    }

    @Override // de.huxhorn.lilith.engine.SourceManager
    public void start() {
        Thread thread = new Thread(this.eventPoller, "EventPoller");
        thread.setDaemon(true);
        thread.start();
        for (EventHandler<T> eventHandler : getEventHandlers()) {
            if (eventHandler instanceof Runnable) {
                Thread thread2 = new Thread((Runnable) eventHandler, "Consumer-Thread");
                thread2.setDaemon(true);
                thread2.start();
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Started {}.", thread2);
                }
            }
        }
        for (EventSourceProducer<T> eventSourceProducer : this.eventSourceProducers) {
            if (eventSourceProducer instanceof Runnable) {
                Thread thread3 = new Thread((Runnable) eventSourceProducer, "Producer-Thread-" + eventSourceProducer);
                thread3.setDaemon(true);
                thread3.start();
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Started {}.", thread3);
                }
            }
        }
    }
}
