package stream.io;

import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Processor;
import stream.data.Data;

/* loaded from: input_file:stream/io/DataStreamQueue.class */
public abstract class DataStreamQueue extends AbstractDataStream implements Processor, QueueService {
    static Logger log = LoggerFactory.getLogger(DataStreamQueue.class);
    final LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<>();

    @Override // stream.io.DataStream
    public void close() throws Exception {
        this.queue.clear();
    }

    @Override // stream.io.AbstractDataStream
    public void readHeader() throws Exception {
    }

    @Override // stream.io.AbstractDataStream
    public Data readItem(Data data) throws Exception {
        Data take = this.queue.take();
        log.debug("took item from queue: {}", take);
        while (take == null) {
            try {
                log.debug("waiting for item to arrive in queue...");
                Thread.sleep(100L);
                take = this.queue.take();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (take != null) {
            data.putAll(take);
        }
        return data;
    }

    @Override // stream.io.AbstractDataStream, stream.io.DataStream
    public Data readNext() throws Exception {
        Data take = this.queue.take();
        log.debug("took item from queue: {}", take);
        while (take == null) {
            try {
                log.debug("waiting for item to arrive in queue...");
                Thread.sleep(100L);
                take = this.queue.take();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return take;
    }

    @Override // stream.Processor
    public Data process(Data data) {
        this.queue.add(data);
        return data;
    }

    @Override // stream.io.QueueService
    public Data poll() {
        return this.queue.poll();
    }

    @Override // stream.io.QueueService
    public boolean enqueue(Data data) {
        try {
            return this.queue.add(data);
        } catch (Exception e) {
            return false;
        }
    }

    @Override // stream.service.Service
    public void reset() throws Exception {
        log.debug("Cleared Queue.");
        this.queue.clear();
    }
}
