package com.yahoo.bullet.storm;

import com.yahoo.bullet.dsl.BulletDSLConfig;
import com.yahoo.bullet.dsl.BulletDSLException;
import com.yahoo.bullet.dsl.connector.BulletConnector;
import com.yahoo.bullet.dsl.converter.BulletRecordConverter;
import com.yahoo.bullet.dsl.deserializer.BulletDeserializer;
import com.yahoo.bullet.dsl.deserializer.IdentityDeserializer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/DSLSpout.class */
public class DSLSpout<T extends BulletConnector> extends ConfigComponent implements IRichSpout {
    private static final long serialVersionUID = 9218045272408135524L;
    protected transient SpoutOutputCollector collector;
    protected T connector;
    private BulletRecordConverter converter;
    private BulletDeserializer deserializer;
    private boolean dslBoltEnable;
    private static final Logger log = LoggerFactory.getLogger(DSLSpout.class);
    private static final Long DUMMY_ID = 42L;

    public DSLSpout(BulletStormConfig bulletStormConfig) {
        super(bulletStormConfig);
        BulletDSLConfig bulletDSLConfig = new BulletDSLConfig(bulletStormConfig);
        this.connector = getConnector(bulletDSLConfig);
        this.dslBoltEnable = ((Boolean) bulletDSLConfig.getAs(BulletStormConfig.DSL_BOLT_ENABLE, Boolean.class)).booleanValue();
        if (this.dslBoltEnable) {
            return;
        }
        boolean booleanValue = ((Boolean) bulletDSLConfig.getAs(BulletStormConfig.DSL_DESERIALIZER_ENABLE, Boolean.class)).booleanValue();
        this.converter = BulletRecordConverter.from(bulletDSLConfig);
        this.deserializer = booleanValue ? BulletDeserializer.from(bulletDSLConfig) : new IdentityDeserializer(bulletDSLConfig);
    }

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        try {
            this.connector.initialize();
        } catch (BulletDSLException e) {
            throw new RuntimeException("Could not open DSLSpout.", e);
        }
    }

    public void activate() {
        log.info("DSLSpout activated");
    }

    public void deactivate() {
        log.info("DSLSpout deactivated");
    }

    public void nextTuple() {
        List<Object> readObjects = readObjects();
        if (readObjects.isEmpty()) {
            return;
        }
        if (this.dslBoltEnable) {
            this.collector.emit(new Values(new Object[]{readObjects, Long.valueOf(System.currentTimeMillis())}), DUMMY_ID);
        } else {
            readObjects.forEach(this::convertAndEmit);
        }
    }

    private List<Object> readObjects() {
        try {
            return (List) this.connector.read().stream().filter(Objects::nonNull).collect(Collectors.toList());
        } catch (BulletDSLException e) {
            log.error("Could not read from BulletConnector.", e);
            return Collections.emptyList();
        }
    }

    private void convertAndEmit(Object obj) {
        try {
            this.collector.emit(new Values(new Object[]{this.converter.convert(this.deserializer.deserialize(obj)), Long.valueOf(System.currentTimeMillis())}), DUMMY_ID);
        } catch (BulletDSLException e) {
            log.error("Could not convert object.", e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{TopologyConstants.RECORD_FIELD, "timestamp"}));
    }

    public void ack(Object obj) {
    }

    public void fail(Object obj) {
    }

    public void close() {
        try {
            this.connector.close();
        } catch (Exception e) {
            log.error("Could not close BulletConnector.", e);
        }
        log.info("DSLSpout closed");
    }

    protected T getConnector(BulletDSLConfig bulletDSLConfig) {
        return (T) BulletConnector.from(bulletDSLConfig);
    }
}
