package com.yahoo.bullet.storm.drpc;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.pubsub.BufferingSubscriber;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.storm.BulletStormConfig;
import com.yahoo.bullet.storm.drpc.utils.DRPCOutputCollector;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.drpc.DRPCSpout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/drpc/DRPCQuerySubscriber.class */
public class DRPCQuerySubscriber extends BufferingSubscriber {
    private static final Logger log = LoggerFactory.getLogger(DRPCQuerySubscriber.class);
    private DRPCSpout spout;
    private DRPCOutputCollector collector;
    private Map<String, Object> emittedIDs;

    public DRPCQuerySubscriber(BulletConfig bulletConfig, int i) {
        this(bulletConfig, i, new DRPCOutputCollector(), new DRPCSpout((String) bulletConfig.getRequiredConfigAs(DRPCConfig.DRPC_FUNCTION, String.class)));
    }

    DRPCQuerySubscriber(BulletConfig bulletConfig, int i, DRPCOutputCollector dRPCOutputCollector, DRPCSpout dRPCSpout) {
        super(i);
        this.collector = dRPCOutputCollector;
        this.spout = dRPCSpout;
        this.emittedIDs = new HashMap();
        dRPCSpout.open((Map) bulletConfig.getRequiredConfigAs(BulletStormConfig.STORM_CONFIG, Map.class), (TopologyContext) bulletConfig.getRequiredConfigAs(BulletStormConfig.STORM_CONTEXT, TopologyContext.class), new SpoutOutputCollector(dRPCOutputCollector));
    }

    public List<PubSubMessage> getMessages() {
        this.spout.nextTuple();
        if (!this.collector.haveOutput()) {
            return null;
        }
        List<List<Object>> reset = this.collector.reset();
        log.debug("Have a message through DRPC {}", reset);
        List<Object> list = reset.get(0);
        List list2 = (List) list.get(0);
        Object obj = list.get(1);
        String str = (String) list2.get(0);
        String str2 = (String) list2.get(1);
        log.debug("Read message\n{}\nfrom DRPC with return information {}", str, str2);
        PubSubMessage fromJSON = PubSubMessage.fromJSON(str);
        String id = fromJSON.getId();
        PubSubMessage pubSubMessage = new PubSubMessage(id, fromJSON.getContentAsString(), new Metadata((Metadata.Signal) null, str2));
        this.emittedIDs.put(id, obj);
        return Collections.singletonList(pubSubMessage);
    }

    public void commit(String str) {
        super.commit(str);
        this.emittedIDs.remove(str);
    }

    public void close() {
        log.warn("Failing all pending requests: {}", this.emittedIDs);
        Collection<Object> values = this.emittedIDs.values();
        DRPCSpout dRPCSpout = this.spout;
        dRPCSpout.getClass();
        values.forEach(dRPCSpout::fail);
        log.info("Closing spout...");
        this.spout.close();
    }
}
