package cn.flyelf.cache.redis;

import cn.flyelf.cache.annotation.ACTION;
import cn.flyelf.cache.annotation.CacheConstant;
import cn.flyelf.cache.core.AbstractCacheAction;
import cn.flyelf.cache.core.context.CacheContext;
import cn.flyelf.cache.core.exception.CacheException;
import cn.flyelf.cache.core.model.CachePolicy;
import cn.flyelf.cache.core.server.CacheExchange;
import cn.flyelf.cache.redis.codec.LayerRedisCodec;
import cn.flyelf.cache.redis.context.RedisCacheContext;
import cn.flyelf.cache.redis.model.RedisConstant;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:cn/flyelf/cache/redis/BaseRedisCacheAction.class */
public abstract class BaseRedisCacheAction<K, V, C> extends AbstractCacheAction<K, V, RedisLayerProcessor> {
    private static final Logger log = LoggerFactory.getLogger(BaseRedisCacheAction.class);
    private LayerRedisCodec<K, V> codec;
    private AbstractRedisClient client;

    public BaseRedisCacheAction(ACTION action, RedisLayerProcessor redisLayerProcessor, String str) {
        super(action, redisLayerProcessor, str);
        this.client = processor().getPool().getClient(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Boolean> hasKey(C c, K k) {
        return processor().hasKey(c, k);
    }

    protected CacheContext initContext(@Nonnull CacheExchange<K, V> cacheExchange) {
        C reactive;
        initCodec(cacheExchange);
        StatefulConnection<?, ?> connection = processor().getPool().getConnection(this.codec, this.area);
        if (null == connection || null == (reactive = reactive(connection))) {
            return null;
        }
        return new RedisCacheContext(reactive, connection);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void pub(CacheExchange<K, V> cacheExchange) {
        processor().pub(action(), this.area, cacheExchange);
    }

    protected void destroyContext() {
        if (processor().getPool().releaseConnection(this.codec, this.area, ((RedisCacheContext) this.cacheContext).getConnection())) {
            return;
        }
        log.warn("归还redis链接失败");
    }

    protected C reactive(StatefulConnection<?, ?> statefulConnection) {
        return (C) processor().reactive(this.client, statefulConnection);
    }

    void initCodec(CacheExchange<K, V> cacheExchange) {
        this.codec = new LayerRedisCodec<>(cacheExchange.getRequest().getKeyClass(), cacheExchange.getRequest().getSerializerClass());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Void> mapResult(String str, CacheExchange<K, V> cacheExchange) {
        if (RedisConstant.SUCCESS.equals(str)) {
            return cacheExchange.getResponse().success(CacheConstant.SUCCEED, processor().name());
        }
        return Mono.error(cacheExchange.getResponse().failure(processor().name(), new CacheException(processor().name(), CacheConstant.RESULT.PUT, str)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Void> mapResult(Long l, CacheExchange<K, V> cacheExchange) {
        if (l.longValue() > 0) {
            return cacheExchange.getResponse().success(CacheConstant.SUCCEED, processor().name());
        }
        return Mono.error(cacheExchange.getResponse().failure(processor().name(), new CacheException(processor().name(), CacheConstant.RESULT.PUT, l.toString())));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> Mono<T> expire(C c, CacheExchange<K, V> cacheExchange, T t) {
        log.info("redis 设置有效期，key：{}，结果：{}", cacheExchange.getRequest().getKey(), t);
        CachePolicy policy = cacheExchange.getPolicy();
        Long duration = cacheExchange.getRequest().getDuration();
        TimeUnit timeUnit = cacheExchange.getRequest().getTimeUnit();
        if (duration == null || timeUnit == null) {
            duration = policy.findDuration(processor().name());
            timeUnit = policy.getTimeUnit();
        }
        return (duration == null || timeUnit == null) ? Mono.just(t) : (timeUnit.equals(TimeUnit.MICROSECONDS) || timeUnit.equals(TimeUnit.MILLISECONDS) || timeUnit.equals(TimeUnit.NANOSECONDS)) ? c instanceof RedisAdvancedClusterReactiveCommands ? ((RedisAdvancedClusterReactiveCommands) c).pexpire(cacheExchange.getRequest().getKey(), timeUnit.toMillis(duration.longValue())).thenReturn(t) : ((RedisReactiveCommands) c).pexpire(cacheExchange.getRequest().getKey(), timeUnit.toMillis(duration.longValue())).thenReturn(t) : c instanceof RedisAdvancedClusterReactiveCommands ? ((RedisAdvancedClusterReactiveCommands) c).expire(cacheExchange.getRequest().getKey(), timeUnit.toSeconds(duration.longValue())).thenReturn(t) : ((RedisReactiveCommands) c).expire(cacheExchange.getRequest().getKey(), timeUnit.toSeconds(duration.longValue())).thenReturn(t);
    }
}
