package com.yahoo.bullet.storm.drpc;

import com.yahoo.bullet.common.BulletConfig;
import com.yahoo.bullet.pubsub.Metadata;
import com.yahoo.bullet.pubsub.PubSubException;
import com.yahoo.bullet.pubsub.PubSubMessage;
import com.yahoo.bullet.pubsub.Publisher;
import com.yahoo.bullet.storm.BulletStormConfig;
import com.yahoo.bullet.storm.drpc.utils.DRPCOutputCollector;
import com.yahoo.bullet.storm.drpc.utils.DRPCTuple;
import java.io.Serializable;
import java.util.Map;
import org.apache.storm.drpc.ReturnResults;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/yahoo/bullet/storm/drpc/DRPCResultPublisher.class */
public class DRPCResultPublisher implements Publisher {
    private static final Logger log = LoggerFactory.getLogger(DRPCResultPublisher.class);
    private ReturnResults bolt;
    private DRPCOutputCollector collector;

    public DRPCResultPublisher(BulletConfig bulletConfig) {
        Map map = (Map) bulletConfig.getRequiredConfigAs(BulletStormConfig.STORM_CONFIG, Map.class);
        this.collector = new DRPCOutputCollector();
        OutputCollector outputCollector = new OutputCollector(this.collector);
        this.bolt = new ReturnResults();
        this.bolt.prepare(map, (TopologyContext) null, outputCollector);
    }

    public PubSubMessage send(PubSubMessage pubSubMessage) throws PubSubException {
        Metadata metadata = pubSubMessage.getMetadata();
        String obj = metadata.getContent().toString();
        log.debug("Removing metadata {} for result {}@{}: {}", new Object[]{obj, pubSubMessage.getId(), pubSubMessage.getContent()});
        metadata.setContent((Serializable) null);
        String asJSON = pubSubMessage.asJSON();
        this.bolt.execute(new DRPCTuple(new Values(new Object[]{asJSON, obj})));
        if (!this.collector.isAcked()) {
            throw new PubSubException("Message not acked. Unable to send message through DRPC:\n " + asJSON);
        }
        this.collector.reset();
        return pubSubMessage;
    }

    public void close() {
        this.bolt.cleanup();
    }

    void setBolt(ReturnResults returnResults) {
        this.bolt = returnResults;
    }

    DRPCOutputCollector getCollector() {
        return this.collector;
    }
}
