package me.ahoo.simba.redis;

import com.google.common.base.Strings;
import com.google.common.io.Resources;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.api.reactive.RedisScriptingReactiveCommands;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Executor;
import me.ahoo.simba.SimbaException;
import me.ahoo.simba.core.AbstractMutexContendService;
import me.ahoo.simba.core.ContendPeriod;
import me.ahoo.simba.core.MutexContender;
import me.ahoo.simba.core.MutexOwner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

/* loaded from: input_file:me/ahoo/simba/redis/RedisMutexContendService.class */
public class RedisMutexContendService extends AbstractMutexContendService {
    private static final Logger log = LoggerFactory.getLogger(RedisMutexContendService.class);
    private static final byte[] SCRIPT_ACQUIRE = getScript("mutex_acquire.lua");
    private static final byte[] SCRIPT_RELEASE = getScript("mutex_release.lua");
    private static final byte[] SCRIPT_GUARD = getScript("mutex_guard.lua");
    private final String[] keys;
    private final String mutexChannel;
    private final String contenderChannel;
    private final Duration ttl;
    private final Duration transition;
    private final ContendPeriod contendPeriod;
    private final RedisScriptingReactiveCommands<String, String> redisCommands;
    private final RedisPubSubReactiveCommands<String, String> redisPubSubCommands;
    private ChannelSubscriber channelSubscriber;
    private volatile Disposable scheduleFuture;

    /* loaded from: input_file:me/ahoo/simba/redis/RedisMutexContendService$ChannelSubscriber.class */
    public class ChannelSubscriber extends BaseSubscriber<ChannelMessage<String, String>> {
        public ChannelSubscriber() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(ChannelMessage<String, String> channelMessage) {
            boolean startsWith = ((String) channelMessage.getChannel()).startsWith(RedisMutexContendService.this.mutexChannel);
            if (RedisMutexContendService.log.isDebugEnabled()) {
                RedisMutexContendService.log.debug("hookOnNext - mutex:[{}] - ownerId:[{}] - channel:[{}] - message:[{}] - subscribed:[{}]", new Object[]{RedisMutexContendService.this.getMutex(), RedisMutexContendService.this.getContenderId(), channelMessage.getChannel(), channelMessage.getMessage(), Boolean.valueOf(startsWith)});
            }
            if (startsWith) {
                try {
                    Message of = Message.of((String) channelMessage.getMessage());
                    String event = of.getEvent();
                    boolean z = -1;
                    switch (event.hashCode()) {
                        case -1731151282:
                            if (event.equals(Message.EVENT_ACQUIRED)) {
                                z = true;
                                break;
                            }
                            break;
                        case -551298755:
                            if (event.equals(Message.EVENT_RELEASED)) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            RedisMutexContendService.this.notifyOwner(MutexOwner.NONE);
                            RedisMutexContendService.this.acquire().subscribe();
                            break;
                        case true:
                            RedisMutexContendService.this.notifyOwner(RedisMutexContendService.this.newMutexOwner(of.getOwnerId(), RedisMutexContendService.this.getTransitionAt(of)));
                            break;
                        default:
                            throw new IllegalStateException("Unexpected value: " + of.getEvent());
                    }
                } catch (Throwable th) {
                    if (RedisMutexContendService.log.isErrorEnabled()) {
                        RedisMutexContendService.log.error(th.getMessage(), th);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisMutexContendService(MutexContender mutexContender, Executor executor, Duration duration, Duration duration2, RedisScriptingReactiveCommands<String, String> redisScriptingReactiveCommands, RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands) {
        super(mutexContender, executor);
        this.keys = new String[]{"{" + mutexContender.getMutex() + "}"};
        this.mutexChannel = Strings.lenientFormat("%s:{%s}", new Object[]{"simba", mutexContender.getMutex()});
        this.contenderChannel = Strings.lenientFormat("%s:%s", new Object[]{this.mutexChannel, mutexContender.getContenderId()});
        this.ttl = duration;
        this.transition = duration2;
        this.redisCommands = redisScriptingReactiveCommands;
        this.redisPubSubCommands = redisPubSubReactiveCommands;
        this.contendPeriod = new ContendPeriod(getContenderId());
    }

    public static byte[] getScript(String str) {
        try {
            return Resources.toByteArray(Resources.getResource(str));
        } catch (IOException e) {
            if (log.isErrorEnabled()) {
                log.error(e.getMessage(), e);
            }
            throw new SimbaException(e.getMessage(), e);
        }
    }

    protected void startContend() {
        startSubscribe();
        nextSchedule(0L);
    }

    private void startSubscribe() {
        this.channelSubscriber = new ChannelSubscriber();
        this.redisPubSubCommands.observeChannels().subscribe(this.channelSubscriber);
        this.redisPubSubCommands.subscribe(new String[]{this.mutexChannel, this.contenderChannel});
    }

    private void nextSchedule(long j) {
        if (log.isDebugEnabled()) {
            log.debug("nextSchedule - mutex:[{}] contenderId:[{}] - nextDelay:[{}].", new Object[]{getMutex(), getContenderId(), Long.valueOf(j)});
        }
        this.scheduleFuture = Mono.delay(Duration.ofMillis(j)).flatMap(l -> {
            return isOwner() ? guard() : acquire();
        }).subscribe();
    }

    private MutexOwner notifyOwnerAndScheduleNext(String str) {
        try {
            MutexOwner newMutexOwner = newMutexOwner(AcquireResult.of(str));
            notifyOwner(newMutexOwner).whenComplete((r6, th) -> {
                if (log.isErrorEnabled()) {
                    log.error(th.getMessage(), th);
                }
                nextSchedule(this.contendPeriod.ensureNextDelay(newMutexOwner));
            });
            return newMutexOwner;
        } catch (Throwable th2) {
            if (log.isErrorEnabled()) {
                log.error(th2.getMessage(), th2);
            }
            nextSchedule(this.ttl.toMillis());
            return MutexOwner.NONE;
        }
    }

    private Mono<MutexOwner> guard() {
        return this.redisCommands.eval(SCRIPT_GUARD, ScriptOutputType.VALUE, this.keys, new String[]{getContenderId(), String.valueOf(this.ttl.toMillis())}).next().cast(String.class).map(this::notifyOwnerAndScheduleNext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<MutexOwner> acquire() {
        return this.redisCommands.eval(SCRIPT_ACQUIRE, ScriptOutputType.VALUE, this.keys, new String[]{getContenderId(), String.valueOf(this.ttl.toMillis() + this.transition.toMillis())}).next().cast(String.class).map(this::notifyOwnerAndScheduleNext);
    }

    private MutexOwner newMutexOwner(AcquireResult acquireResult) {
        return newMutexOwner(acquireResult.getOwnerId(), acquireResult.getTransitionAt());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MutexOwner newMutexOwner(String str, long j) {
        long millis = j - this.transition.toMillis();
        return new MutexOwner(str, millis - this.ttl.toMillis(), millis, j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getTransitionAt(Message message) {
        return message.getEventAt() + this.ttl.toMillis() + this.transition.toMillis();
    }

    protected void stopContend() {
        stopSubscribe();
        disposeSchedule();
        release();
    }

    private void stopSubscribe() {
        if (this.channelSubscriber != null) {
            this.channelSubscriber.cancel();
        }
        this.redisPubSubCommands.unsubscribe(new String[]{this.mutexChannel, this.contenderChannel}).subscribe();
    }

    private void disposeSchedule() {
        if (this.scheduleFuture == null || this.scheduleFuture.isDisposed()) {
            return;
        }
        this.scheduleFuture.dispose();
    }

    private void release() {
        this.redisCommands.eval(SCRIPT_RELEASE, ScriptOutputType.BOOLEAN, this.keys, new String[]{getContenderId()}).next().ofType(Boolean.class).doOnNext(bool -> {
            try {
                notifyOwner(MutexOwner.NONE);
            } catch (Throwable th) {
                if (log.isWarnEnabled()) {
                    log.warn("stopContend - mutex:[{}] - contenderId:[{}] - message:[{}]", new Object[]{getMutex(), getContenderId(), th.getMessage()});
                }
            }
        }).subscribe();
    }
}
