package co.cask.tracker;

import ch.qos.logback.core.pattern.color.ANSIConstants;
import co.cask.cdap.api.annotation.Property;
import co.cask.cdap.api.annotation.Tick;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessageFetcher;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.conf.Constants;
import co.cask.tracker.config.AuditLogConfig;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tracker/AuditLogConsumer.class */
public final class AuditLogConsumer extends AbstractFlowlet {
    private static final Logger LOG = LoggerFactory.getLogger(AuditLogConsumer.class);
    private static final String OFFSET = "tms.offset";
    private KeyValueTable offsetStore;
    private OutputEmitter<String> emitter;

    @Property
    private final String offsetDatasetName;

    @Property
    private final String namespace;

    @Property
    private final String topic;

    @Property
    private final int limit;
    private Stopwatch stopwatch;
    private MessageFetcher messageFetcher;
    private long timeout;
    private boolean emptyIterator;

    public AuditLogConsumer(AuditLogConfig auditLogConfig) {
        this.offsetDatasetName = auditLogConfig.getOffsetDataset();
        this.namespace = auditLogConfig.getNamespace();
        this.topic = auditLogConfig.getTopic();
        this.limit = auditLogConfig.getLimit();
    }

    protected void configure() {
        createDataset(this.offsetDatasetName, KeyValueTable.class);
    }

    public void initialize(FlowletContext flowletContext) throws Exception {
        super.initialize(flowletContext);
        this.offsetStore = flowletContext.getDataset(this.offsetDatasetName);
        String str = (String) flowletContext.getRuntimeArguments().get("data.tx.timeout");
        if (str == null) {
            str = ANSIConstants.BLACK_FG;
        }
        this.timeout = Long.parseLong(str) - 10;
        this.stopwatch = new Stopwatch();
        this.emptyIterator = false;
        this.messageFetcher = getContext().getMessageFetcher();
    }

    @Tick(delay = Constants.MetricsCollector.DEFAULT_FREQUENCY_SECONDS, unit = TimeUnit.SECONDS)
    protected void pollAuditTopic() throws Exception {
        String str = null;
        byte[] read = this.offsetStore.read(OFFSET);
        String str2 = null;
        if (read != null) {
            str2 = Bytes.toString(read);
        }
        this.stopwatch.reset();
        this.stopwatch.start();
        do {
            this.emptyIterator = true;
            try {
                try {
                    CloseableIterator fetch = this.messageFetcher.fetch(this.namespace, this.topic, this.limit, str2);
                    Throwable th = null;
                    while (fetch.hasNext()) {
                        try {
                            try {
                                Message message = (Message) fetch.next();
                                str = message.getId();
                                this.emitter.emit(message.getPayloadAsString());
                                this.emptyIterator = false;
                            } catch (Throwable th2) {
                                if (fetch != null) {
                                    if (th != null) {
                                        try {
                                            fetch.close();
                                        } catch (Throwable th3) {
                                            th.addSuppressed(th3);
                                        }
                                    } else {
                                        fetch.close();
                                    }
                                }
                                throw th2;
                                break;
                            }
                        } catch (Throwable th4) {
                            th = th4;
                            throw th4;
                            break;
                        }
                    }
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    if (!this.emptyIterator) {
                        this.offsetStore.write(OFFSET, str);
                        str2 = str;
                    }
                } catch (TopicNotFoundException e) {
                    LOG.warn("Audit Topic {} was not found.", this.topic, e);
                    if (!this.emptyIterator) {
                        this.offsetStore.write(OFFSET, str);
                        str2 = str;
                    }
                }
                if (this.emptyIterator) {
                    break;
                }
            } catch (Throwable th6) {
                if (!this.emptyIterator) {
                    this.offsetStore.write(OFFSET, str);
                }
                throw th6;
            }
        } while (this.stopwatch.elapsedTime(TimeUnit.SECONDS) < this.timeout);
        this.stopwatch.stop();
    }
}
