package com.yahoo.bullet.dsl.connector;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.dsl.BulletDSLConfig;
import com.yahoo.bullet.dsl.BulletDSLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/dsl/connector/PulsarConnector.class */
public class PulsarConnector extends BulletConnector {
    private static final Logger log = LoggerFactory.getLogger(PulsarConnector.class);
    private static final long serialVersionUID = 2958805692867790602L;
    private transient PulsarClient client;
    private transient Consumer<Object> consumer;
    private boolean asyncCommit;
    private int timeout;

    public PulsarConnector(BulletConfig bulletConfig) {
        super(bulletConfig);
        this.asyncCommit = ((Boolean) this.config.getAs(BulletDSLConfig.CONNECTOR_ASYNC_COMMIT_ENABLE, Boolean.class)).booleanValue();
        this.timeout = ((Number) this.config.getAs(BulletDSLConfig.CONNECTOR_READ_TIMEOUT_MS, Number.class)).intValue();
    }

    @Override // com.yahoo.bullet.dsl.connector.BulletConnector
    public void initialize() throws BulletDSLException {
        Map<String, Object> allWithPrefix = this.config.getAllWithPrefix(Optional.empty(), BulletDSLConfig.CONNECTOR_PULSAR_CLIENT_NAMESPACE, true);
        Map<String, Object> allWithPrefix2 = this.config.getAllWithPrefix(Optional.empty(), BulletDSLConfig.CONNECTOR_PULSAR_CONSUMER_NAMESPACE, true);
        List list = (List) this.config.getAs(BulletDSLConfig.CONNECTOR_PULSAR_TOPICS, List.class);
        Boolean bool = (Boolean) this.config.getAs(BulletDSLConfig.CONNECTOR_PULSAR_AUTH_ENABLE, Boolean.class);
        String str = (String) this.config.getAs(BulletDSLConfig.CONNECTOR_PULSAR_AUTH_PLUGIN_CLASS_NAME, String.class);
        String str2 = (String) this.config.getAs(BulletDSLConfig.CONNECTOR_PULSAR_AUTH_PARAMS_STRING, String.class);
        Schema schema = getSchema((String) this.config.getAs(BulletDSLConfig.CONNECTOR_PULSAR_SCHEMA_TYPE, String.class), (String) this.config.getAs(BulletDSLConfig.CONNECTOR_PULSAR_SCHEMA_CLASS_NAME, String.class));
        this.client = getPulsarClient(allWithPrefix, bool, str, str2);
        try {
            this.consumer = this.client.newConsumer(schema).loadConf(allWithPrefix2).topics(list).subscribe();
        } catch (Exception e) {
            throw new BulletDSLException("Could not create Pulsar consumer.", e);
        }
    }

    PulsarClient getPulsarClient(Map<String, Object> map, Boolean bool, String str, String str2) throws BulletDSLException {
        try {
            ClientBuilder loadConf = PulsarClient.builder().loadConf(map);
            if (bool.booleanValue()) {
                loadConf.authentication(str, str2);
            }
            return loadConf.build();
        } catch (Exception e) {
            throw new BulletDSLException("Could not create Pulsar client.", e);
        }
    }

    @Override // com.yahoo.bullet.dsl.connector.BulletConnector
    public List<Object> read() throws BulletDSLException {
        ArrayList arrayList = new ArrayList();
        while (true) {
            Message<Object> message = getMessage();
            if (message == null) {
                return arrayList;
            }
            arrayList.add(message.getValue());
            acknowledge(message);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.consumer.closeAsync();
        this.client.closeAsync();
    }

    private Message<Object> getMessage() throws BulletDSLException {
        try {
            return this.consumer.receive(this.timeout, TimeUnit.MILLISECONDS);
        } catch (PulsarClientException e) {
            throw new BulletDSLException("Could not read from consumer.", e);
        }
    }

    private void acknowledge(Message<Object> message) throws BulletDSLException {
        if (this.asyncCommit) {
            this.consumer.acknowledgeAsync(message);
            return;
        }
        try {
            this.consumer.acknowledge(message);
        } catch (PulsarClientException e) {
            throw new BulletDSLException("Could not acknowledge message.", e);
        }
    }

    private Schema getSchema(String str, String str2) throws BulletDSLException {
        Class<?> cls;
        if (str2 != null) {
            try {
                cls = Class.forName(str2);
            } catch (Exception e) {
                throw new BulletDSLException("Could not create Pulsar schema.", e);
            }
        } else {
            cls = null;
        }
        Class<?> cls2 = cls;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1838656495:
                if (str.equals(BulletDSLConfig.PULSAR_SCHEMA_STRING)) {
                    z = true;
                    break;
                }
                break;
            case -206537845:
                if (str.equals(BulletDSLConfig.PULSAR_SCHEMA_PROTOBUF)) {
                    z = 4;
                    break;
                }
                break;
            case 2021682:
                if (str.equals(BulletDSLConfig.PULSAR_SCHEMA_AVRO)) {
                    z = 3;
                    break;
                }
                break;
            case 2286824:
                if (str.equals(BulletDSLConfig.PULSAR_SCHEMA_JSON)) {
                    z = 2;
                    break;
                }
                break;
            case 63686731:
                if (str.equals("BYTES")) {
                    z = false;
                    break;
                }
                break;
            case 1999208305:
                if (str.equals(BulletDSLConfig.PULSAR_SCHEMA_CUSTOM)) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return Schema.BYTES;
            case true:
                return Schema.STRING;
            case true:
                return Schema.JSON(cls2);
            case true:
                return Schema.AVRO(cls2);
            case true:
                return Schema.PROTOBUF(cls2);
            case true:
                return (Schema) cls2.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            default:
                throw new BulletDSLException("Pulsar schema type must be one of: " + BulletDSLConfig.PULSAR_SCHEMA_TYPES);
        }
    }

    PulsarClient getClient() {
        return this.client;
    }

    void setConsumer(Consumer<Object> consumer) {
        this.consumer = consumer;
    }
}
