package io.zeebe.engine.processing.message;

import io.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.zeebe.engine.state.message.MessageState;
import io.zeebe.engine.state.message.MessageSubscriptionState;
import io.zeebe.util.sched.ActorControl;
import java.time.Duration;

/* loaded from: input_file:io/zeebe/engine/processing/message/MessageObserver.class */
public final class MessageObserver implements StreamProcessorLifecycleAware {
    public static final Duration MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL = Duration.ofSeconds(60);
    public static final Duration SUBSCRIPTION_TIMEOUT = Duration.ofSeconds(10);
    public static final Duration SUBSCRIPTION_CHECK_INTERVAL = Duration.ofSeconds(30);
    private final SubscriptionCommandSender subscriptionCommandSender;
    private final MessageState messageState;
    private final MessageSubscriptionState subscriptionState;

    public MessageObserver(MessageState messageState, MessageSubscriptionState messageSubscriptionState, SubscriptionCommandSender subscriptionCommandSender) {
        this.subscriptionCommandSender = subscriptionCommandSender;
        this.messageState = messageState;
        this.subscriptionState = messageSubscriptionState;
    }

    @Override // io.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware
    public void onRecovered(ReadonlyProcessingContext readonlyProcessingContext) {
        ActorControl actor = readonlyProcessingContext.getActor();
        readonlyProcessingContext.getActor().runAtFixedRate(MESSAGE_TIME_TO_LIVE_CHECK_INTERVAL, new MessageTimeToLiveChecker(readonlyProcessingContext.getLogStreamWriter(), this.messageState));
        actor.runAtFixedRate(SUBSCRIPTION_CHECK_INTERVAL, new PendingMessageSubscriptionChecker(this.subscriptionCommandSender, this.subscriptionState, SUBSCRIPTION_TIMEOUT.toMillis()));
    }
}
