package cn.flyelf.cache.redis;

import cn.flyelf.cache.annotation.ACTION;
import cn.flyelf.cache.core.AbstractCacheAction;
import cn.flyelf.cache.core.context.CacheContext;
import cn.flyelf.cache.core.model.CachePolicy;
import cn.flyelf.cache.core.server.CacheExchange;
import cn.flyelf.cache.redis.codec.LayerRedisCodec;
import cn.flyelf.cache.redis.context.RedisCacheContext;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:cn/flyelf/cache/redis/BaseRedisCacheAction.class */
public abstract class BaseRedisCacheAction<K, V, R, C> extends AbstractCacheAction<K, V, R, RedisCacheLayerProcessor> {
    private static final Logger log = LoggerFactory.getLogger(BaseRedisCacheAction.class);
    private LayerRedisCodec<K, V> codec;
    private AbstractRedisClient client;

    public BaseRedisCacheAction(ACTION action, RedisCacheLayerProcessor redisCacheLayerProcessor, String str) {
        super(action, redisCacheLayerProcessor, str);
        this.client = this.layerProcessor.getClient(str);
    }

    public Mono<Boolean> onLoadAfter(CacheExchange<K, V, R> cacheExchange, V v) {
        if (null == this.client || null == this.codec) {
            return super.onLoadAfter(cacheExchange, v);
        }
        StatefulConnection connection = processor().getConnection(this.codec, this.area);
        if (null == connection) {
            log.error("redis回填onLoadAfter，无法连接到redis，处理失败");
            return super.onLoadAfter(cacheExchange, v);
        }
        C reactive = reactive(connection);
        return null != reactive ? doOnLoadAfter(reactive, cacheExchange, v).doOnSuccess(bool -> {
            processor().releaseConnection(this.codec, this.area, connection);
        }).doOnError(th -> {
            processor().releaseConnection(this.codec, this.area, connection);
        }) : super.onLoadAfter(cacheExchange, v);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Boolean> doOnLoadAfter(C c, CacheExchange<K, V, R> cacheExchange, V v) {
        log.debug("commands = {}, exchange = {}, value = {}", new Object[]{c, cacheExchange, v});
        return Mono.just(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Boolean> hasKey(C c, K k) {
        return c instanceof RedisAdvancedClusterReactiveCommands ? ((RedisAdvancedClusterReactiveCommands) c).exists(new Object[]{k}).map(obj -> {
            return Boolean.valueOf(obj.equals(1L));
        }) : ((RedisReactiveCommands) c).exists(new Object[]{k}).map(obj2 -> {
            return Boolean.valueOf(obj2.equals(1L));
        });
    }

    protected CacheContext initContext(CacheExchange<K, V, R> cacheExchange) {
        C reactive;
        initCodec(cacheExchange);
        StatefulConnection connection = processor().getConnection(this.codec, this.area);
        if (null == connection || null == (reactive = reactive(connection))) {
            return null;
        }
        return new RedisCacheContext(reactive, connection);
    }

    protected void destroyContext() {
        processor().releaseConnection(this.codec, this.area, ((RedisCacheContext) this.cacheContext).getConnection());
    }

    protected C reactive(StatefulConnection statefulConnection) {
        if (this.client instanceof RedisClient) {
            return (C) ((StatefulRedisConnection) statefulConnection).reactive();
        }
        if (this.client instanceof RedisClusterClient) {
            return (C) ((StatefulRedisClusterConnection) statefulConnection).reactive();
        }
        return null;
    }

    void initCodec(CacheExchange<K, V, R> cacheExchange) {
        this.codec = new LayerRedisCodec<>(cacheExchange.getRequest().getKeyClass(), cacheExchange.getRequest().getSerializerClass());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> Mono<T> expire(C c, CacheExchange<K, V, R> cacheExchange, T t) {
        CachePolicy policy = cacheExchange.getPolicy();
        return policy.getDuration() == null ? Mono.just(t) : (policy.getTimeUnit().equals(TimeUnit.MICROSECONDS) || policy.getTimeUnit().equals(TimeUnit.MILLISECONDS) || policy.getTimeUnit().equals(TimeUnit.NANOSECONDS)) ? c instanceof RedisAdvancedClusterReactiveCommands ? ((RedisAdvancedClusterReactiveCommands) c).pexpire(cacheExchange.getRequest().getKey(), policy.getTimeUnit().toMillis(policy.getDuration().longValue())).thenReturn(t) : ((RedisReactiveCommands) c).pexpire(cacheExchange.getRequest().getKey(), policy.getTimeUnit().toMillis(policy.getDuration().longValue())).thenReturn(t) : c instanceof RedisAdvancedClusterReactiveCommands ? ((RedisAdvancedClusterReactiveCommands) c).expire(cacheExchange.getRequest().getKey(), policy.getTimeUnit().toSeconds(policy.getDuration().longValue())).thenReturn(t) : ((RedisReactiveCommands) c).expire(cacheExchange.getRequest().getKey(), policy.getTimeUnit().toSeconds(policy.getDuration().longValue())).thenReturn(t);
    }
}
