package com.arpnetworking.metrics.portal.scheduling;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.persistence.AbstractPersistentActorWithTimers;
import com.arpnetworking.metrics.Unit;
import com.arpnetworking.metrics.impl.BaseScale;
import com.arpnetworking.metrics.impl.BaseUnit;
import com.arpnetworking.metrics.impl.TsdUnit;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.portal.organizations.OrganizationRepository;
import com.arpnetworking.metrics.portal.scheduling.JobExecutorActor;
import com.arpnetworking.metrics.portal.scheduling.JobRef;
import com.arpnetworking.metrics.util.PagingIterator;
import com.arpnetworking.steno.LogBuilder;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.arpnetworking.steno.aspect.LogBuilderAspect;
import com.google.inject.Injector;
import java.lang.invoke.SerializedLambda;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Optional;
import models.internal.Organization;
import models.internal.scheduling.Job;
import org.aspectj.lang.JoinPoint;
import org.aspectj.runtime.reflect.Factory;
import play.core.enhancers.PropertiesEnhancer;

@PropertiesEnhancer.GeneratedAccessor
@PropertiesEnhancer.RewrittenAccessor
/* loaded from: input_file:com/arpnetworking/metrics/portal/scheduling/JobCoordinator.class */
public final class JobCoordinator<T> extends AbstractPersistentActorWithTimers {
    private final Injector _injector;
    private final Clock _clock;
    private final Class<? extends JobRepository<T>> _repositoryType;
    private final Class<? extends JobExecutionRepository<T>> _execRepositoryType;
    private final OrganizationRepository _organizationRepository;
    private final ActorRef _jobExecutorRegion;
    private final PeriodicMetrics _periodicMetrics;
    private boolean _currentlyExecuting = false;
    private static final String ANTI_ENTROPY_PERIODIC_TIMER_NAME = "TICK";
    private static final Duration ANTI_ENTROPY_TICK_INTERVAL;
    private static final Logger LOGGER;
    private static final Unit NANOS;
    private static final int JOB_QUERY_PAGE_SIZE = 256;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
    private static /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;

    @PropertiesEnhancer.GeneratedAccessor
    @PropertiesEnhancer.RewrittenAccessor
    /* loaded from: input_file:com/arpnetworking/metrics/portal/scheduling/JobCoordinator$AntiEntropyFinished.class */
    static final class AntiEntropyFinished {
        static final AntiEntropyFinished INSTANCE = new AntiEntropyFinished();

        AntiEntropyFinished() {
        }
    }

    @PropertiesEnhancer.GeneratedAccessor
    @PropertiesEnhancer.RewrittenAccessor
    /* loaded from: input_file:com/arpnetworking/metrics/portal/scheduling/JobCoordinator$AntiEntropyTick.class */
    static final class AntiEntropyTick {
        static final AntiEntropyTick INSTANCE = new AntiEntropyTick();

        AntiEntropyTick() {
        }
    }

    static {
        ajc$preClinit();
        ANTI_ENTROPY_TICK_INTERVAL = Duration.ofHours(1L);
        LOGGER = LoggerFactory.getLogger(JobCoordinator.class);
        NANOS = new TsdUnit.Builder().setScale(BaseScale.NANO).setBaseUnit(BaseUnit.SECOND).build();
    }

    public static <T> Props props(Injector injector, Class<? extends JobRepository<T>> cls, Class<? extends JobExecutionRepository<T>> cls2, OrganizationRepository organizationRepository, ActorRef actorRef, PeriodicMetrics periodicMetrics) {
        return props(injector, Clock.systemUTC(), cls, cls2, organizationRepository, actorRef, periodicMetrics);
    }

    static <T> Props props(Injector injector, Clock clock, Class<? extends JobRepository<T>> cls, Class<? extends JobExecutionRepository<T>> cls2, OrganizationRepository organizationRepository, ActorRef actorRef, PeriodicMetrics periodicMetrics) {
        return Props.create(JobCoordinator.class, () -> {
            return new JobCoordinator(injector, clock, cls, cls2, organizationRepository, actorRef, periodicMetrics);
        });
    }

    private JobCoordinator(Injector injector, Clock clock, Class<? extends JobRepository<T>> cls, Class<? extends JobExecutionRepository<T>> cls2, OrganizationRepository organizationRepository, ActorRef actorRef, PeriodicMetrics periodicMetrics) {
        this._injector = injector;
        this._clock = clock;
        this._repositoryType = cls;
        this._execRepositoryType = cls2;
        this._organizationRepository = organizationRepository;
        this._jobExecutorRegion = actorRef;
        this._periodicMetrics = periodicMetrics;
    }

    public void preStart() throws Exception {
        super.preStart();
        timers().startPeriodicTimer(ANTI_ENTROPY_PERIODIC_TIMER_NAME, AntiEntropyTick.INSTANCE, scala.concurrent.duration.Duration.fromNanos(ANTI_ENTROPY_TICK_INTERVAL.toNanos()));
    }

    private static <T> Iterator<? extends Job<T>> getAllJobs(JobRepository<T> jobRepository, Organization organization) {
        return (Iterator) new PagingIterator.Builder().setGetPage(num -> {
            return jobRepository.createJobQuery(organization).offset(num.intValue()).limit(JOB_QUERY_PAGE_SIZE).execute().values();
        }).build();
    }

    private void runAntiEntropy() {
        ActorRef self = self();
        try {
            try {
                LogBuilder addData = LOGGER.debug().setMessage("starting anti-entropy").addData("repositoryType", this._repositoryType).addData("execRepositoryType", this._execRepositoryType);
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_0, this, addData));
                addData.log();
                Instant instant = this._clock.instant();
                JobRepository jobRepository = (JobRepository) this._injector.getInstance(this._repositoryType);
                for (Organization organization : this._organizationRepository.query(this._organizationRepository.createQuery()).values()) {
                    getAllJobs(jobRepository, organization).forEachRemaining(job -> {
                        this._jobExecutorRegion.tell(new JobExecutorActor.Reload.Builder().setJobRef((JobRef) new JobRef.Builder().setRepositoryType(this._repositoryType).setExecutionRepositoryType(this._execRepositoryType).setOrganization(organization).setId(job.getId()).build()).setETag(job.getETag().orElse(null)).build(), self);
                    });
                }
                this._periodicMetrics.recordTimer("jobs/coordinator/anti_entropy/latency", ChronoUnit.NANOS.between(instant, this._clock.instant()), Optional.of(NANOS));
                this._periodicMetrics.recordCounter("jobs/coordinator/anti_entropy/success", 1L);
                LogBuilder addData2 = LOGGER.debug().setMessage("finished anti-entropy").addData("repositoryType", this._repositoryType).addData("elapsedTimeSec", Long.valueOf(ChronoUnit.NANOS.between(instant, this._clock.instant())));
                LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_1, this, addData2));
                addData2.log();
            } catch (RuntimeException e) {
                this._periodicMetrics.recordCounter("jobs/coordinator/anti_entropy/success", 0L);
                throw e;
            }
        } finally {
            self.tell(AntiEntropyFinished.INSTANCE, self);
        }
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(AntiEntropyTick.class, antiEntropyTick -> {
            LogBuilder addData = LOGGER.debug().setMessage("ticking").addData("repositoryType", this._repositoryType);
            LogBuilderAspect.aspectOf().addToContextLineAndMethod(Factory.makeJP(ajc$tjp_2, this, addData));
            addData.log();
            if (this._currentlyExecuting) {
                return;
            }
            getContext().getSystem().scheduler().scheduleOnce(scala.concurrent.duration.Duration.Zero(), this::runAntiEntropy, getContext().dispatcher());
        }).match(AntiEntropyFinished.class, antiEntropyFinished -> {
            this._currentlyExecuting = false;
        }).build();
    }

    public AbstractActor.Receive createReceiveRecover() {
        return receiveBuilder().build();
    }

    public String persistenceId() {
        return String.format("job-coordinator-%s", this._repositoryType.getCanonicalName());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1647345005:
                if (implMethodName.equals("lambda$0")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/arpnetworking/metrics/portal/scheduling/JobCoordinator") && serializedLambda.getImplMethodSignature().equals("(Lcom/google/inject/Injector;Ljava/time/Clock;Ljava/lang/Class;Ljava/lang/Class;Lcom/arpnetworking/metrics/portal/organizations/OrganizationRepository;Lakka/actor/ActorRef;Lcom/arpnetworking/metrics/incubator/PeriodicMetrics;)Lcom/arpnetworking/metrics/portal/scheduling/JobCoordinator;")) {
                    Injector injector = (Injector) serializedLambda.getCapturedArg(0);
                    Clock clock = (Clock) serializedLambda.getCapturedArg(1);
                    Class cls = (Class) serializedLambda.getCapturedArg(2);
                    Class cls2 = (Class) serializedLambda.getCapturedArg(3);
                    OrganizationRepository organizationRepository = (OrganizationRepository) serializedLambda.getCapturedArg(4);
                    ActorRef actorRef = (ActorRef) serializedLambda.getCapturedArg(5);
                    PeriodicMetrics periodicMetrics = (PeriodicMetrics) serializedLambda.getCapturedArg(6);
                    return () -> {
                        return new JobCoordinator(injector, clock, cls, cls2, organizationRepository, actorRef, periodicMetrics);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("JobCoordinator.java", JobCoordinator.class);
        ajc$tjp_0 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 163);
        ajc$tjp_1 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 201);
        ajc$tjp_2 = factory.makeSJP("method-call", factory.makeMethodSig("401", "log", "com.arpnetworking.steno.LogBuilder", "", "", "", "void"), 221);
    }
}
