package org.streampipes.wrapper.flink.consumer;

import java.io.Serializable;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.dataformat.SpDataFormatDefinition;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.jms.ActiveMQConsumer;
import org.streampipes.model.grounding.JmsTransportProtocol;

/* loaded from: input_file:org/streampipes/wrapper/flink/consumer/JmsConsumer.class */
public class JmsConsumer implements SourceFunction<Map<String, Object>>, Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JmsConsumer.class);
    private JmsTransportProtocol protocol;
    private SpDataFormatDefinition spDataFormatDefinition;
    private Boolean isRunning;
    private ActiveMQConsumer activeMQConsumer = new ActiveMQConsumer();
    private Queue<byte[]> queue = new LinkedBlockingQueue();

    public JmsConsumer(JmsTransportProtocol jmsTransportProtocol, SpDataFormatDefinition spDataFormatDefinition) {
        this.protocol = jmsTransportProtocol;
        this.spDataFormatDefinition = spDataFormatDefinition;
    }

    public void run(SourceFunction.SourceContext<Map<String, Object>> sourceContext) throws Exception {
        this.isRunning = true;
        this.activeMQConsumer.connect(this.protocol, new InternalEventProcessor<byte[]>() { // from class: org.streampipes.wrapper.flink.consumer.JmsConsumer.1
            public void onEvent(byte[] bArr) {
                JmsConsumer.this.queue.add(bArr);
            }
        });
        while (this.isRunning.booleanValue()) {
            if (this.queue.isEmpty()) {
                Thread.sleep(100L);
            } else {
                sourceContext.collect(this.spDataFormatDefinition.toMap(this.queue.poll()));
            }
        }
    }

    public void cancel() {
        try {
            this.activeMQConsumer.disconnect();
            this.isRunning = false;
        } catch (SpRuntimeException e) {
            LOG.error(e.getMessage());
        }
    }
}
