package stream.runtime;

import org.apache.spark.streaming.scheduler.BatchInfo;
import org.apache.spark.streaming.scheduler.ReceiverInfo;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import streams.spark.StreamsSparkContext;

/* loaded from: input_file:stream/runtime/StreamingMonitor.class */
public class StreamingMonitor implements StreamingListener {
    private static transient Logger log = LoggerFactory.getLogger(StreamingMonitor.class);
    private static final boolean STOP_ON_RECEIVER_ERROR = true;
    private final DProcessContext context;
    private final StreamingState state;
    private long batchCount = 0;
    private long itemCount = 0;

    public StreamingMonitor(DProcessContext dProcessContext) {
        this.context = dProcessContext;
        this.state = new StreamingState(dProcessContext);
    }

    public synchronized void onProcessStarting() {
        log.info("\n\n==== STREAMING executor of {} STARTING ====\nBATCH INTERVAL: {}s\nBLOCK INTERVAL: {}\nMAX NUM EMPTY RDDs: {}\nSTATE: {}\n\n============================================\n", new Object[]{this.context.getProcessId(), Integer.valueOf(StreamsSparkContext.STREAMING_INTERVAL), StreamsSparkContext.sc().getConf().get("spark.streaming.blockInterval", "200ms"), Integer.valueOf(this.state.getMaxEmptyRdds()), this.state.toString()});
    }

    public synchronized void onProcessFinished() {
        log.info("\n\n==== executor of {} FINISHED ====\n", this.context.getProcessId());
    }

    public synchronized void onBatchSubmitted(StreamingListenerBatchSubmitted streamingListenerBatchSubmitted) {
        BatchInfo batchInfo = streamingListenerBatchSubmitted.batchInfo();
        if (!this.state.isStarted() && batchInfo.numRecords() > 0) {
            this.state.markStarted();
            log.info("{} MARKED STARTED.", this.context.getProcessId());
        } else {
            if (this.state.isStarted()) {
                return;
            }
            log.info("{} WAITING to start.", this.context.getProcessId());
        }
    }

    public synchronized void onBatchStarted(StreamingListenerBatchStarted streamingListenerBatchStarted) {
    }

    public synchronized void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
        BatchInfo batchInfo = streamingListenerBatchCompleted.batchInfo();
        if (this.state.isStarted() && batchInfo.numRecords() <= 0) {
            this.state.increaseNumEmptyRdds();
            if (this.state.getNumEmptyRdds() > this.state.getMaxEmptyRdds() && this.state.getMaxEmptyRdds() >= 0) {
                try {
                    this.state.markStopped(false);
                    log.info("{} MARKED STOPPED.", this.context.getProcessId());
                } catch (IllegalStateException e) {
                    log.warn("{} was tried to be marked stopped multiple times", this.context.getProcessId());
                }
            }
        } else if (batchInfo.numRecords() > 0) {
            this.state.zeroEmptyRdds();
        }
        if (!this.state.isStarted() || this.state.isStopped()) {
            return;
        }
        this.batchCount++;
        this.itemCount += batchInfo.numRecords();
        log.info("\n\n==== executor of {} COMPLETED batch {} ====\nNUM PROCESSED ITEMS: {} (total {})\nTOTAL DELAY: {}ms\nSTATE: {}\n\n============================================\n", new Object[]{this.context.getProcessId(), Long.valueOf(this.batchCount), Long.valueOf(batchInfo.numRecords()), Long.valueOf(this.itemCount), batchInfo.totalDelay().get().toString(), this.state.toString()});
    }

    public synchronized void onOutputOperationStarted(StreamingListenerOutputOperationStarted streamingListenerOutputOperationStarted) {
    }

    public synchronized void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted streamingListenerOutputOperationCompleted) {
    }

    public synchronized void onReceiverStarted(StreamingListenerReceiverStarted streamingListenerReceiverStarted) {
        log.info("\n\n==== Receiver {} STARTED ====\nPROCESS: {}\nLOCATION: {}\n\n============================================\n", new Object[]{streamingListenerReceiverStarted.receiverInfo().name(), this.context.getProcessId(), streamingListenerReceiverStarted.receiverInfo().location()});
    }

    public synchronized void onReceiverError(StreamingListenerReceiverError streamingListenerReceiverError) {
        ReceiverInfo receiverInfo = streamingListenerReceiverError.receiverInfo();
        log.info("\n\n==== Receiver {} REPORTS ERROR ====\nMESSAGE: {}\nERROR: {}\n\n============================================\n", new Object[]{receiverInfo.name(), receiverInfo.lastErrorMessage(), receiverInfo.lastError()});
        try {
            this.state.markStopped(true);
            log.warn("{} MARKED STOPPED due to receiver error: {}", this.context.getProcessId(), receiverInfo.lastErrorMessage());
        } catch (IllegalStateException e) {
            log.warn("{} was tried to be marked stopped multiple times. There was another receiver error: {}", this.context.getProcessId(), receiverInfo.lastErrorMessage());
        }
    }

    public synchronized void onReceiverStopped(StreamingListenerReceiverStopped streamingListenerReceiverStopped) {
        log.info("\n\n==== Receiver {} STOPPED ====\n", streamingListenerReceiverStopped.receiverInfo().name());
    }

    public synchronized boolean isProcessSarted() {
        return this.state.isStarted();
    }

    public synchronized boolean isProcessStopped() {
        return this.state.isStopped();
    }

    public synchronized boolean isProcessRunning() {
        return this.state.isStarted() && !this.state.isStopped();
    }
}
