package org.streampipes.wrapper.flink.sink;

import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.streampipes.messaging.jms.ActiveMQPublisher;
import org.streampipes.model.grounding.JmsTransportProtocol;
import org.streampipes.wrapper.flink.serializer.ByteArraySerializer;

/* loaded from: input_file:org/streampipes/wrapper/flink/sink/FlinkJmsProducer.class */
public class FlinkJmsProducer extends RichSinkFunction<Map<String, Object>> {
    private static final long serialVersionUID = 1;
    private JmsTransportProtocol protocol;
    private ByteArraySerializer serializationSchema;
    private ActiveMQPublisher publisher;

    public FlinkJmsProducer(JmsTransportProtocol jmsTransportProtocol, ByteArraySerializer byteArraySerializer) {
        this.protocol = jmsTransportProtocol;
        this.serializationSchema = byteArraySerializer;
    }

    public void open(Configuration configuration) throws Exception {
        try {
            this.publisher = new ActiveMQPublisher();
            this.publisher.connect(this.protocol);
        } catch (Exception e) {
            throw new Exception("Failed to open Jms connection: " + e.getMessage(), e);
        }
    }

    public void invoke(Map<String, Object> map) throws Exception {
        this.publisher.publish(this.serializationSchema.serialize(map));
    }
}
