package cn.howardliu.gear.es.storm.bolt;

import backtype.storm.topology.FailedException;
import backtype.storm.tuple.Tuple;
import cn.howardliu.gear.es.EsConfig;
import cn.howardliu.gear.es.storm.EsSource;
import java.util.List;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/howardliu/gear/es/storm/bolt/EsBulkIndexBolt.class */
public class EsBulkIndexBolt extends AbstractEsBolt {
    private static final Logger logger = LoggerFactory.getLogger(EsBulkIndexBolt.class);

    public EsBulkIndexBolt(EsConfig esConfig) {
        super(esConfig);
    }

    @Override // cn.howardliu.gear.es.storm.bolt.AbstractEsBolt
    public void execute(Tuple tuple) {
        try {
            List<EsSource> list = (List) tuple.getValueByField("sources");
            BulkRequestBuilder prepareBulk = client.prepareBulk();
            for (EsSource esSource : list) {
                prepareBulk.add(client.prepareIndex(esSource.getIndexName(), esSource.getTypeName(), esSource.getId()).setSource(esSource.getSource()));
            }
            BulkResponse bulkResponse = (BulkResponse) prepareBulk.execute().actionGet();
            if (bulkResponse.hasFailures()) {
                String str = "failed processing bulk index requests " + bulkResponse.buildFailureMessage();
                logger.warn(str);
                this.collector.reportError(new FailedException(str));
            }
            this.collector.ack(tuple);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(tuple);
        }
    }
}
