package org.apache.pinot.tools.streams;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.net.URI;
import java.util.Properties;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.glassfish.tyrus.client.ClientManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/streams/MeetupRsvpStream.class */
public class MeetupRsvpStream {
    protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class);
    protected final boolean _partitionByKey;
    protected final StreamDataProducer _producer;
    protected ClientManager _client;
    protected volatile boolean _keepPublishing;

    public MeetupRsvpStream() throws Exception {
        this(false);
    }

    public MeetupRsvpStream(boolean z) throws Exception {
        this._partitionByKey = z;
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", "1");
        this._producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
    }

    public void run() throws Exception {
        this._client = ClientManager.createClient();
        this._keepPublishing = true;
        this._client.connectToServer(new Endpoint() { // from class: org.apache.pinot.tools.streams.MeetupRsvpStream.1
            public void onOpen(Session session, EndpointConfig endpointConfig) {
                session.addMessageHandler(String.class, MeetupRsvpStream.this.getMessageHandler());
            }
        }, ClientEndpointConfig.Builder.create().build(), new URI("wss://stream.meetup.com/2/rsvps"));
    }

    public void stopPublishing() {
        this._keepPublishing = false;
        this._client.shutdown();
        this._producer.close();
    }

    protected MessageHandler.Whole<String> getMessageHandler() {
        return str -> {
            try {
                JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(str);
                ObjectNode newObjectNode = JsonUtils.newObjectNode();
                JsonNode jsonNode = stringToJsonNode.get("venue");
                if (jsonNode != null) {
                    newObjectNode.set("venue_name", jsonNode.get("venue_name"));
                }
                JsonNode jsonNode2 = stringToJsonNode.get("event");
                String str = "";
                if (jsonNode2 != null) {
                    newObjectNode.set("event_name", jsonNode2.get("event_name"));
                    str = jsonNode2.get("event_id").toString();
                    newObjectNode.put("event_id", str);
                    newObjectNode.set("event_time", jsonNode2.get("time"));
                }
                JsonNode jsonNode3 = stringToJsonNode.get("group");
                if (jsonNode3 != null) {
                    newObjectNode.set("group_city", jsonNode3.get("group_city"));
                    newObjectNode.set("group_country", jsonNode3.get("group_country"));
                    newObjectNode.set("group_id", jsonNode3.get("group_id"));
                    newObjectNode.set("group_name", jsonNode3.get("group_name"));
                    newObjectNode.set("group_lat", jsonNode3.get("group_lat"));
                    newObjectNode.set("group_lon", jsonNode3.get("group_lon"));
                }
                newObjectNode.set("mtime", stringToJsonNode.get("mtime"));
                newObjectNode.put("rsvp_count", 1);
                if (this._keepPublishing) {
                    if (this._partitionByKey) {
                        this._producer.produce("meetupRSVPEvents", StringUtil.encodeUtf8(str), StringUtil.encodeUtf8(newObjectNode.toString()));
                    } else {
                        this._producer.produce("meetupRSVPEvents", StringUtil.encodeUtf8(newObjectNode.toString()));
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Caught exception while processing the message: {}", str, e);
            }
        };
    }
}
