package de.saly.elasticsearch.maildestination;

import de.saly.elasticsearch.support.IndexableMailMessage;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.mail.Message;
import javax.mail.MessagingException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

/* loaded from: input_file:de/saly/elasticsearch/maildestination/ElasticsearchBulkMailDestination.class */
public class ElasticsearchBulkMailDestination extends ElasticsearchMailDestination {
    private static final AtomicInteger outstandingBulkRequests = new AtomicInteger(0);
    private static final AtomicInteger queue = new AtomicInteger(0);
    private BulkProcessor bulk;
    private TimeValue flushInterval = TimeValue.timeValueSeconds(5);
    private final BulkProcessor.Listener listener = new BulkProcessor.Listener() { // from class: de.saly.elasticsearch.maildestination.ElasticsearchBulkMailDestination.1
        public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            ElasticsearchBulkMailDestination.this.logger.info("Bulk actions done successfully [{}] success [{} items] [{}ms], {} outstanding bulk requests, queue size is {}", new Object[]{Long.valueOf(j), Integer.valueOf(bulkResponse.getItems().length), Long.valueOf(bulkResponse.getTookInMillis()), Long.valueOf(ElasticsearchBulkMailDestination.outstandingBulkRequests.decrementAndGet()), Integer.valueOf(ElasticsearchBulkMailDestination.queue.addAndGet(-bulkResponse.getItems().length))});
        }

        public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
            ElasticsearchBulkMailDestination.this.logger.error("Bulk actions done with errors [" + j + "] error, {} outstanding bulk requests", th, new Object[]{Long.valueOf(ElasticsearchBulkMailDestination.outstandingBulkRequests.decrementAndGet())});
            ElasticsearchBulkMailDestination.this.setError(true);
        }

        public void beforeBulk(long j, BulkRequest bulkRequest) {
            ElasticsearchBulkMailDestination.this.logger.info("New bulk actions queued [{}] of [{} items], {} outstanding bulk requests", new Object[]{Long.valueOf(j), Integer.valueOf(bulkRequest.numberOfActions()), Long.valueOf(ElasticsearchBulkMailDestination.outstandingBulkRequests.incrementAndGet())});
        }
    };
    private int maxBulkActions = 100;
    private int maxConcurrentBulkRequests = 30;
    private final ByteSizeValue maxVolumePerBulkRequest = ByteSizeValue.parseBytesSizeValue("10mb");

    @Override // de.saly.elasticsearch.maildestination.ElasticsearchMailDestination
    public ElasticsearchMailDestination client(Client client) {
        super.client(client);
        this.bulk = BulkProcessor.builder(client, this.listener).setBulkActions(this.maxBulkActions - 1).setConcurrentRequests(this.maxConcurrentBulkRequests).setBulkSize(this.maxVolumePerBulkRequest).setFlushInterval(this.flushInterval).build();
        return this;
    }

    @Override // de.saly.elasticsearch.maildestination.ElasticsearchMailDestination, de.saly.elasticsearch.maildestination.MailDestination
    public synchronized void close() {
        super.close();
        if (this.bulk != null) {
            this.logger.debug("Shutdown (flush) bulk processor, is super closed " + isClosed(), new Object[0]);
            this.bulk.close();
        }
    }

    public ElasticsearchBulkMailDestination flushInterval(TimeValue timeValue) {
        this.flushInterval = timeValue;
        return this;
    }

    public ElasticsearchBulkMailDestination maxBulkActions(int i) {
        this.maxBulkActions = i;
        return this;
    }

    public ElasticsearchBulkMailDestination maxConcurrentBulkRequests(int i) {
        this.maxConcurrentBulkRequests = i;
        return this;
    }

    @Override // de.saly.elasticsearch.maildestination.ElasticsearchMailDestination, de.saly.elasticsearch.maildestination.MailDestination
    public void onMessage(Message message) throws IOException, MessagingException {
        if (isClosed()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Is closed, will not index", new Object[0]);
                return;
            }
            return;
        }
        if (isError()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("error, not indexing", new Object[0]);
                return;
            }
            return;
        }
        IndexableMailMessage fromJavaMailMessage = IndexableMailMessage.fromJavaMailMessage(message, isWithTextContent(), isWithHtmlContent(), isPreferHtmlContent(), isWithAttachments(), isStripTagsFromTextContent(), getHeadersToFields());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Bulk process mail " + fromJavaMailMessage.getUid() + "/" + fromJavaMailMessage.getPopId() + " :: " + fromJavaMailMessage.getSubject() + "/" + fromJavaMailMessage.getSentDate(), new Object[0]);
        }
        try {
            if (!isClosed()) {
                this.bulk.add(createIndexRequest(fromJavaMailMessage));
                queue.incrementAndGet();
            }
        } catch (ElasticsearchIllegalStateException e) {
            if (isClosed()) {
                this.logger.debug("Bulkprocessing error due to {}", new Object[]{e.toString()});
            } else {
                this.logger.error("Bulkprocessing error due to {}", e, new Object[]{e.toString()});
            }
        }
    }
}
