package com.acxiom.pipeline.streaming;

import com.acxiom.pipeline.Constants$;
import com.acxiom.pipeline.PipelineContext;
import com.acxiom.pipeline.utils.StreamingUtils$;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
import scala.Predef$;
import scala.Some;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: StreamingQueryMonitor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005uc!B\u0015+\u0003\u0003\u0019\u0004\u0002\u0003\u001d\u0001\u0005\u000b\u0007I\u0011I\u001d\t\u0013\u001d\u0003!\u0011!Q\u0001\niB\u0005\u0002C%\u0001\u0005\u000b\u0007I\u0011\t&\t\u0013=\u0003!\u0011!Q\u0001\n-\u0003\u0006\"B)\u0001\t\u0003\u0011\u0006b\u0002,\u0001\u0005\u0004%\tb\u0016\u0005\u0007K\u0002\u0001\u000b\u0011\u0002-\t\u000f\u0019\u0004!\u0019!C\tO\"1A\u000e\u0001Q\u0001\n!Dq!\u001c\u0001C\u0002\u0013Eq\r\u0003\u0004o\u0001\u0001\u0006I\u0001\u001b\u0005\b_\u0002\u0011\r\u0011\"\u0005q\u0011\u0019!\b\u0001)A\u0005c\"9Q\u000f\u0001a\u0001\n#\u0001\bb\u0002<\u0001\u0001\u0004%\tb\u001e\u0005\u0007{\u0002\u0001\u000b\u0015B9\t\u000fy\u0004\u0001\u0019!C\ta\"Aq\u0010\u0001a\u0001\n#\t\t\u0001C\u0004\u0002\u0006\u0001\u0001\u000b\u0015B9\t\u0011\u0005\u001d\u0001\u00011A\u0005\u0012AD\u0011\"!\u0003\u0001\u0001\u0004%\t\"a\u0003\t\u000f\u0005=\u0001\u0001)Q\u0005c\"Q\u0011\u0011\u0003\u0001A\u0002\u0003\u0007I\u0011C,\t\u0017\u0005M\u0001\u00011AA\u0002\u0013E\u0011Q\u0003\u0005\u000b\u00033\u0001\u0001\u0019!A!B\u0013A\u0006\"CA\u000e\u0001\u0001\u0007I\u0011CA\u000f\u0011%\tY\u0003\u0001a\u0001\n#\ti\u0003\u0003\u0005\u00022\u0001\u0001\u000b\u0015BA\u0010\u0011%\t\u0019\u0004\u0001a\u0001\n#\t)\u0004C\u0005\u0002>\u0001\u0001\r\u0011\"\u0005\u0002@!A\u00111\t\u0001!B\u0013\t9\u0004C\u0004\u0002F\u0001!\t%!\b\t\u000f\u0005\u001d\u0003\u0001\"\u0011\u00026!I\u0011\u0011\n\u0001A\u0002\u0013E\u0011Q\u0007\u0005\n\u0003\u0017\u0002\u0001\u0019!C\t\u0003\u001bB\u0001\"!\u0015\u0001A\u0003&\u0011q\u0007\u0005\b\u0003'\u0002A\u0011AA\u001b\u0011\u001d\t)\u0006\u0001C\u0001\u0003/Bq!!\u0017\u0001\r\u0003\t9\u0006C\u0004\u0002\\\u0001!\t%a\u0016\u0003?\t\u000bGo\u00195Xe&$Xm\u0015;sK\u0006l\u0017N\\4Rk\u0016\u0014\u00180T8oSR|'O\u0003\u0002,Y\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003[9\n\u0001\u0002]5qK2Lg.\u001a\u0006\u0003_A\na!Y2yS>l'\"A\u0019\u0002\u0007\r|Wn\u0001\u0001\u0014\u0005\u0001!\u0004CA\u001b7\u001b\u0005Q\u0013BA\u001c+\u0005e\u0011\u0015m]3TiJ,\u0017-\\5oOF+XM]=N_:LGo\u001c:\u0002\u000bE,XM]=\u0016\u0003i\u0002\"aO#\u000e\u0003qR!aK\u001f\u000b\u0005yz\u0014aA:rY*\u0011\u0001)Q\u0001\u0006gB\f'o\u001b\u0006\u0003\u0005\u000e\u000ba!\u00199bG\",'\"\u0001#\u0002\u0007=\u0014x-\u0003\u0002Gy\tq1\u000b\u001e:fC6LgnZ)vKJL\u0018AB9vKJL\b%\u0003\u00029m\u0005y\u0001/\u001b9fY&tWmQ8oi\u0016DH/F\u0001L!\taU*D\u0001-\u0013\tqEFA\bQSB,G.\u001b8f\u0007>tG/\u001a=u\u0003A\u0001\u0018\u000e]3mS:,7i\u001c8uKb$\b%\u0003\u0002Jm\u00051A(\u001b8jiz\"2a\u0015+V!\t)\u0004\u0001C\u00039\u000b\u0001\u0007!\bC\u0003J\u000b\u0001\u00071*A\u0006n_:LGo\u001c:UsB,W#\u0001-\u0011\u0005e\u0013gB\u0001.a!\tYf,D\u0001]\u0015\ti&'\u0001\u0004=e>|GO\u0010\u0006\u0002?\u0006)1oY1mC&\u0011\u0011MX\u0001\u0007!J,G-\u001a4\n\u0005\r$'AB*ue&twM\u0003\u0002b=\u0006aQn\u001c8ji>\u0014H+\u001f9fA\u0005AA-\u001e:bi&|g.F\u0001i!\tI'.D\u0001_\u0013\tYgLA\u0002J]R\f\u0011\u0002Z;sCRLwN\u001c\u0011\u0002\u001f\u0005\u0004\bO]8yS6\fG/\u001a*poN\f\u0001#\u00199qe>D\u0018.\\1uKJ{wo\u001d\u0011\u0002\u001bMdW-\u001a9EkJ\fG/[8o+\u0005\t\bCA5s\u0013\t\u0019hL\u0001\u0003M_:<\u0017AD:mK\u0016\u0004H)\u001e:bi&|g\u000eI\u0001\ngR\f'\u000f\u001e+j[\u0016\fQb\u001d;beR$\u0016.\\3`I\u0015\fHC\u0001=|!\tI\u00170\u0003\u0002{=\n!QK\\5u\u0011\u001dax\"!AA\u0002E\f1\u0001\u001f\u00132\u0003)\u0019H/\u0019:u)&lW\rI\u0001\te><8i\\;oi\u0006a!o\\<D_VtGo\u0018\u0013fcR\u0019\u00010a\u0001\t\u000fq\u0014\u0012\u0011!a\u0001c\u0006I!o\\<D_VtG\u000fI\u0001\u0010GV\u0014(/\u001a8u\tV\u0014\u0018\r^5p]\u0006\u00192-\u001e:sK:$H)\u001e:bi&|gn\u0018\u0013fcR\u0019\u00010!\u0004\t\u000fq,\u0012\u0011!a\u0001c\u0006\u00012-\u001e:sK:$H)\u001e:bi&|g\u000eI\u0001\rY\u0006\u001cHo\u0015;biV\u001c\u0018\nZ\u0001\u0011Y\u0006\u001cHo\u0015;biV\u001c\u0018\nZ0%KF$2\u0001_A\f\u0011\u001da\b$!AA\u0002a\u000bQ\u0002\\1tiN#\u0018\r^;t\u0013\u0012\u0004\u0013aB4m_\n\fGn]\u000b\u0003\u0003?\u0001b!WA\u00111\u0006\u0015\u0012bAA\u0012I\n\u0019Q*\u00199\u0011\u0007%\f9#C\u0002\u0002*y\u00131!\u00118z\u0003-9Gn\u001c2bYN|F%Z9\u0015\u0007a\fy\u0003\u0003\u0005}7\u0005\u0005\t\u0019AA\u0010\u0003!9Gn\u001c2bYN\u0004\u0013AE2p]RLg.^3Qe>\u001cWm]:j]\u001e,\"!a\u000e\u0011\u0007%\fI$C\u0002\u0002<y\u0013qAQ8pY\u0016\fg.\u0001\fd_:$\u0018N\\;f!J|7-Z:tS:<w\fJ3r)\rA\u0018\u0011\t\u0005\tyz\t\t\u00111\u0001\u00028\u0005\u00192m\u001c8uS:,X\r\u0015:pG\u0016\u001c8/\u001b8hA\u0005\u0001r-\u001a;HY>\u0014\u0017\r\\+qI\u0006$Xm]\u0001\tG>tG/\u001b8vK\u0006Q\u0001O]8dKN\u001c\u0018N\\4\u0002\u001dA\u0014xnY3tg&twm\u0018\u0013fcR\u0019\u00010a\u0014\t\u0011q\u001c\u0013\u0011!a\u0001\u0003o\t1\u0002\u001d:pG\u0016\u001c8/\u001b8hA\u0005q1.Z3q!J|7-Z:tS:<\u0017AE2iK\u000e\\7)\u001e:sK:$8\u000b^1ukN$\u0012\u0001_\u0001\u0014[\u0006t\u0017mZ3Rk\u0016\u0014\u0018p\u00155vi\u0012|wO\\\u0001\u0004eVt\u0007")
/* loaded from: input_file:com/acxiom/pipeline/streaming/BatchWriteStreamingQueryMonitor.class */
public abstract class BatchWriteStreamingQueryMonitor extends BaseStreamingQueryMonitor {
    private final String monitorType;
    private final int duration;
    private final int approximateRows;
    private final long sleepDuration;
    private long startTime;
    private long rowCount;
    private long currentDuration;
    private String lastStatusId;
    private Map<String, Object> globals;
    private boolean continueProcessing;
    private boolean processing;

    @Override // com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor, com.acxiom.pipeline.streaming.StreamingQueryMonitor
    public StreamingQuery query() {
        return super.query();
    }

    @Override // com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor, com.acxiom.pipeline.streaming.StreamingQueryMonitor
    public PipelineContext pipelineContext() {
        return super.pipelineContext();
    }

    public String monitorType() {
        return this.monitorType;
    }

    public int duration() {
        return this.duration;
    }

    public int approximateRows() {
        return this.approximateRows;
    }

    public long sleepDuration() {
        return this.sleepDuration;
    }

    public long startTime() {
        return this.startTime;
    }

    public void startTime_$eq(long j) {
        this.startTime = j;
    }

    public long rowCount() {
        return this.rowCount;
    }

    public void rowCount_$eq(long j) {
        this.rowCount = j;
    }

    public long currentDuration() {
        return this.currentDuration;
    }

    public void currentDuration_$eq(long j) {
        this.currentDuration = j;
    }

    public String lastStatusId() {
        return this.lastStatusId;
    }

    public void lastStatusId_$eq(String str) {
        this.lastStatusId = str;
    }

    public Map<String, Object> globals() {
        return this.globals;
    }

    public void globals_$eq(Map<String, Object> map) {
        this.globals = map;
    }

    public boolean continueProcessing() {
        return this.continueProcessing;
    }

    public void continueProcessing_$eq(boolean z) {
        this.continueProcessing = z;
    }

    @Override // com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor, com.acxiom.pipeline.streaming.StreamingQueryMonitor
    public Map<String, Object> getGlobalUpdates() {
        return globals();
    }

    @Override // com.acxiom.pipeline.streaming.BaseStreamingQueryMonitor, com.acxiom.pipeline.streaming.StreamingQueryMonitor
    /* renamed from: continue */
    public boolean mo35continue() {
        return continueProcessing();
    }

    public boolean processing() {
        return this.processing;
    }

    public void processing_$eq(boolean z) {
        this.processing = z;
    }

    public boolean keepProcessing() {
        return processing();
    }

    /* JADX WARN: Code restructure failed: missing block: B:6:0x0024, code lost:
    
        if (currentDuration() < sleepDuration()) goto L11;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void checkCurrentStatus() {
        /*
            r6 = this;
            r0 = r6
            r1 = r6
            java.lang.String r1 = r1.monitorType()
            java.lang.String r2 = "duration"
            r7 = r2
            r2 = r1
            if (r2 != 0) goto L14
        Ld:
            r1 = r7
            if (r1 == 0) goto L1b
            goto L27
        L14:
            r2 = r7
            boolean r1 = r1.equals(r2)
            if (r1 == 0) goto L27
        L1b:
            r1 = r6
            long r1 = r1.currentDuration()
            r2 = r6
            long r2 = r2.sleepDuration()
            int r1 = (r1 > r2 ? 1 : (r1 == r2 ? 0 : -1))
            if (r1 >= 0) goto L34
        L27:
            r1 = r6
            long r1 = r1.rowCount()
            r2 = r6
            int r2 = r2.approximateRows()
            long r2 = (long) r2
            int r1 = (r1 > r2 ? 1 : (r1 == r2 ? 0 : -1))
            if (r1 < 0) goto L41
        L34:
            r1 = r6
            org.apache.log4j.Logger r1 = r1.logger()
            java.lang.String r2 = "Streaming threshold met"
            r1.info(r2)
            r1 = 0
            goto L42
        L41:
            r1 = 1
        L42:
            r0.processing_$eq(r1)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.acxiom.pipeline.streaming.BatchWriteStreamingQueryMonitor.checkCurrentStatus():void");
    }

    public abstract void manageQueryShutdown();

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        logger().info("Starting streaming batch monitor");
        while (keepProcessing()) {
            Thread.sleep(Constants$.MODULE$.ONE_HUNDRED());
            currentDuration_$eq(System.currentTimeMillis() - startTime());
            List list = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(query().recentProgress())).toList();
            int indexWhere = list.indexWhere(streamingQueryProgress -> {
                return BoxesRunTime.boxToBoolean($anonfun$run$1(this, streamingQueryProgress));
            });
            (indexWhere != -1 ? list.slice(indexWhere + 1, new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(query().recentProgress())).toList().size()) : list).foreach(streamingQueryProgress2 -> {
                $anonfun$run$2(this, streamingQueryProgress2);
                return BoxedUnit.UNIT;
            });
            checkCurrentStatus();
        }
        manageQueryShutdown();
        logger().info("Streaming query being stopped");
        query().stop();
    }

    public static final /* synthetic */ boolean $anonfun$run$1(BatchWriteStreamingQueryMonitor batchWriteStreamingQueryMonitor, StreamingQueryProgress streamingQueryProgress) {
        String sb = new StringBuilder(6).append(streamingQueryProgress.id()).append("::").append(streamingQueryProgress.runId()).append("::").append(streamingQueryProgress.batchId()).append("::").append(streamingQueryProgress.timestamp()).toString();
        String lastStatusId = batchWriteStreamingQueryMonitor.lastStatusId();
        return sb != null ? sb.equals(lastStatusId) : lastStatusId == null;
    }

    public static final /* synthetic */ void $anonfun$run$2(BatchWriteStreamingQueryMonitor batchWriteStreamingQueryMonitor, StreamingQueryProgress streamingQueryProgress) {
        batchWriteStreamingQueryMonitor.rowCount_$eq(batchWriteStreamingQueryMonitor.rowCount() + streamingQueryProgress.numInputRows());
        batchWriteStreamingQueryMonitor.lastStatusId_$eq(new StringBuilder(6).append(streamingQueryProgress.id()).append("::").append(streamingQueryProgress.runId()).append("::").append(streamingQueryProgress.batchId()).append("::").append(streamingQueryProgress.timestamp()).toString());
    }

    public BatchWriteStreamingQueryMonitor(StreamingQuery streamingQuery, PipelineContext pipelineContext) {
        super(streamingQuery, pipelineContext);
        long j;
        this.monitorType = ((String) pipelineContext.getGlobalAs("STREAMING_BATCH_MONITOR_TYPE").getOrElse(() -> {
            return "duration";
        })).toLowerCase();
        this.duration = BoxesRunTime.unboxToInt(pipelineContext.getGlobalAs("STREAMING_BATCH_MONITOR_DURATION").getOrElse(() -> {
            return Constants$.MODULE$.ONE_THOUSAND() * Constants$.MODULE$.SIXTY();
        }));
        this.approximateRows = BoxesRunTime.unboxToInt(pipelineContext.getGlobalAs("STREAMING_BATCH_MONITOR_COUNT").getOrElse(() -> {
            return Constants$.MODULE$.ZERO();
        }));
        String monitorType = monitorType();
        if (monitorType != null ? !monitorType.equals("duration") : "duration" != 0) {
            j = 0;
        } else {
            j = StreamingUtils$.MODULE$.getDuration(new Some((String) pipelineContext.getGlobalAs("STREAMING_BATCH_MONITOR_DURATION_TYPE").getOrElse(() -> {
                return "milliseconds";
            })), new Some(Integer.toString(duration()))).milliseconds();
        }
        this.sleepDuration = j;
        this.startTime = System.currentTimeMillis();
        this.rowCount = 0L;
        this.currentDuration = 0L;
        this.globals = Predef$.MODULE$.Map().apply(Nil$.MODULE$);
        this.continueProcessing = false;
        this.processing = true;
    }
}
