package stream.io.multi;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Processor;
import stream.data.Data;
import stream.data.DataFactory;
import stream.io.DataStream;
import stream.io.active.ActiveDataStream;
import stream.io.active.ActiveDataStreamImpl;

/* loaded from: input_file:stream/io/multi/AbstractMultiDataStream.class */
public abstract class AbstractMultiDataStream implements MultiDataStream {
    static Logger log = LoggerFactory.getLogger(AbstractMultiDataStream.class);
    protected ActiveDataStream activeWrapper;
    protected Long limit = -1L;
    protected Long count = 0L;
    protected Boolean activate = false;
    protected Map<String, Class<?>> attributes = new LinkedHashMap();
    protected ArrayList<Processor> preprocessors = new ArrayList<>();
    protected Map<String, DataStream> streams = new HashMap();
    protected List<String> additionOrder = new ArrayList();

    public void addStream(String str, DataStream dataStream) {
        this.streams.put(str, dataStream);
        this.additionOrder.add(str);
        log.info("added Stream {}", dataStream);
    }

    public Map<String, DataStream> getStreams() {
        return this.streams;
    }

    public Long getLimit() {
        return this.limit;
    }

    public void setLimit(Long l) {
        this.limit = l;
    }

    public Boolean getActivate() {
        return this.activate;
    }

    public void setActivate(Boolean bool) {
        this.activate = bool;
    }

    public Map<String, Class<?>> getAttributes() {
        return this.attributes;
    }

    public List<Processor> getPreprocessors() {
        return this.preprocessors;
    }

    public void addPreprocessor(Processor processor) {
        this.preprocessors.add(processor);
    }

    public void addPreprocessor(int i, Processor processor) {
        this.preprocessors.add(i, processor);
    }

    public boolean removePreprocessor(Processor processor) {
        return this.preprocessors.remove(processor);
    }

    public Processor removePreprocessor(int i) {
        return this.preprocessors.remove(i);
    }

    protected abstract Data readNext(Data data, Map<String, DataStream> map) throws Exception;

    public Data readNext() throws Exception {
        return readNext(DataFactory.create());
    }

    public final Data readNext(Data data) throws Exception {
        if (this.limit.longValue() > 0 && this.count.longValue() >= this.limit.longValue()) {
            return null;
        }
        Data data2 = null;
        while (data2 == null) {
            data2 = readNext(data, this.streams);
            if (data2 == null) {
                log.debug("End-of-stream reached!");
                return null;
            }
            Iterator<Processor> it = this.preprocessors.iterator();
            while (it.hasNext()) {
                data2 = it.next().process(data2);
                if (data2 == null) {
                    break;
                }
            }
        }
        Long l = this.count;
        this.count = Long.valueOf(this.count.longValue() + 1);
        return data2;
    }

    public void close() throws Exception {
        for (DataStream dataStream : this.streams.values()) {
            try {
                dataStream.close();
            } catch (Exception e) {
                log.error("Failed to close stream {}: {}", dataStream, e.getMessage());
            }
        }
    }

    public void init() throws Exception {
        Iterator<DataStream> it = this.streams.values().iterator();
        while (it.hasNext()) {
            it.next().init();
        }
        log.info("initialized all Streams.");
        if (this.activate.booleanValue()) {
            this.activeWrapper = new ActiveDataStreamImpl(this);
            this.activeWrapper.activate();
            log.info("Activated this multiStream.");
        }
    }
}
