package cn.flyelf.cache.redis;

import cn.flyelf.cache.annotation.ACTION;
import cn.flyelf.cache.core.CacheAction;
import cn.flyelf.cache.core.PlaceHolder;
import cn.flyelf.cache.core.action.AbstractLayerProcessor;
import cn.flyelf.cache.core.conf.CacheLayerConfig;
import cn.flyelf.cache.core.event.CacheEventPublisher;
import cn.flyelf.cache.core.model.CachePolicy;
import cn.flyelf.cache.core.model.CacheSynchronizeEvent;
import cn.flyelf.cache.core.server.CacheExchange;
import cn.flyelf.cache.core.sync.CacheSynchronizePublisher;
import cn.flyelf.cache.redis.atomic.RedisPlaceHolder;
import cn.flyelf.cache.redis.codec.LayerRedisCodec;
import cn.flyelf.cache.redis.list.RedisCacheListGetAction;
import cn.flyelf.cache.redis.list.RedisCacheListPopAction;
import cn.flyelf.cache.redis.list.RedisCacheListPushAction;
import cn.flyelf.cache.redis.list.RedisCacheListPutAction;
import cn.flyelf.cache.redis.map.RedisCacheMapAddAction;
import cn.flyelf.cache.redis.map.RedisCacheMapDelKeyAction;
import cn.flyelf.cache.redis.map.RedisCacheMapExistKeyAction;
import cn.flyelf.cache.redis.map.RedisCacheMapGetAction;
import cn.flyelf.cache.redis.map.RedisCacheMapGetKeyAction;
import cn.flyelf.cache.redis.map.RedisCacheMapPutAction;
import cn.flyelf.cache.redis.model.PubSubWrapper;
import cn.flyelf.cache.redis.pool.RedisPool;
import cn.flyelf.cache.redis.set.RedisCacheSetGetAction;
import cn.flyelf.cache.redis.set.RedisCacheSetPutAction;
import cn.flyelf.cache.redis.simple.RedisCacheGetAction;
import cn.flyelf.cache.redis.simple.RedisCachePutAction;
import cn.flyelf.cache.redis.util.RedisUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisHashReactiveCommands;
import io.lettuce.core.api.reactive.RedisListReactiveCommands;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.api.reactive.RedisSetReactiveCommands;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:cn/flyelf/cache/redis/RedisLayerProcessor.class */
public class RedisLayerProcessor extends AbstractLayerProcessor {
    private final RedisPool pool;
    static final String PUB_SUB_PREFIX = "layer_cache_redis-pub-sub-";
    private final String uuid;
    private static final Logger log = LoggerFactory.getLogger(RedisLayerProcessor.class);
    private static final ConcurrentHashMap<ACTION, Constructor<?>> CONSTRUCTORS = new ConcurrentHashMap<>(15);

    public RedisLayerProcessor(CacheEventPublisher cacheEventPublisher, CacheSynchronizePublisher cacheSynchronizePublisher) {
        super(cacheEventPublisher, cacheSynchronizePublisher);
        this.pool = new RedisPool();
        this.uuid = UUID.randomUUID().toString();
    }

    public CacheAction<?, ?> action(ACTION action, String str) {
        return RedisUtil.action(this, CONSTRUCTORS.get(action), str);
    }

    <C extends RedisPubSubReactiveCommands<?, ?>> C reactive(StatefulRedisPubSubConnection<?, ?> statefulRedisPubSubConnection) {
        return statefulRedisPubSubConnection instanceof StatefulRedisClusterPubSubConnection ? ((StatefulRedisClusterPubSubConnection) statefulRedisPubSubConnection).reactive() : (C) statefulRedisPubSubConnection.reactive();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> void pub(ACTION action, String str, CacheExchange<K, V> cacheExchange) {
        StatefulRedisPubSubConnection<?, ?> pubsub = this.pool.getPubsub(str);
        if (null == pubsub) {
            log.error("获取发布订阅链接失败，无法发布");
            return;
        }
        RedisPubSubReactiveCommands reactive = reactive(pubsub);
        ArrayList arrayList = new ArrayList(1);
        Object key = cacheExchange.getRequest().getKey();
        if (key instanceof Collection) {
            arrayList.addAll((Collection) key);
        } else {
            arrayList.add(key);
        }
        PubSubWrapper pubSubWrapper = new PubSubWrapper();
        pubSubWrapper.setUuid(this.uuid);
        pubSubWrapper.setKeyClass(cacheExchange.getRequest().getKeyClass());
        pubSubWrapper.setKeys(arrayList);
        pubSubWrapper.setValueClass(cacheExchange.getRequest().getSerializerClass());
        pubSubWrapper.setPolicy(cacheExchange.getPolicy());
        pubSubWrapper.setArea(str);
        reactive.publish(PUB_SUB_PREFIX + action.toString(), JSONObject.toJSONString(pubSubWrapper)).subscribe(l -> {
            log.info("接收到订阅消息的有：{} 个", l);
        });
        if (this.pool.releasePubsub(str, pubsub)) {
            return;
        }
        log.warn("归还发布订阅链接失败");
    }

    void sub(String str) {
        StatefulRedisPubSubConnection<?, ?> pubsub = this.pool.getPubsub(str);
        if (null == pubsub) {
            log.error("获取发布订阅链接失败，无法定于");
            return;
        }
        RedisPubSubReactiveCommands reactive = reactive(pubsub);
        reactive.subscribe(new String[]{PUB_SUB_PREFIX + ACTION.PUT, PUB_SUB_PREFIX + ACTION.PUSH, PUB_SUB_PREFIX + ACTION.PUTLIST, PUB_SUB_PREFIX + ACTION.PUTSET, PUB_SUB_PREFIX + ACTION.PUTMAP, PUB_SUB_PREFIX + ACTION.POP, PUB_SUB_PREFIX + ACTION.ADDMAPKEY, PUB_SUB_PREFIX + ACTION.DELETE, PUB_SUB_PREFIX + ACTION.DELMAPKEY}).subscribe();
        reactive.observeChannels().doOnNext(channelMessage -> {
            PubSubWrapper pubSubWrapper = (PubSubWrapper) JSON.parseObject((String) channelMessage.getMessage(), PubSubWrapper.class);
            if (this.uuid.equals(pubSubWrapper.getUuid())) {
                log.info("这是发布者接收到的订阅消息，本发布者不于执行该消息");
                return;
            }
            ACTION valueOf = ACTION.valueOf(((String) channelMessage.getChannel()).substring(PUB_SUB_PREFIX.length()));
            log.info("接收到redis的缓存订阅事件：{}，内容：{}", valueOf, pubSubWrapper);
            if (pubSubWrapper.getPolicy().getLayer().length > 1) {
                pubSubWrapper.getKeys().forEach(obj -> {
                    notify(valueOf, obj, pubSubWrapper.getArea(), pubSubWrapper.getPolicy(), pubSubWrapper.getValueClass());
                });
            }
        }).subscribe();
    }

    void notify(ACTION action, Object obj, String str, CachePolicy cachePolicy, Class<?> cls) {
        log.info("redis缓存订阅，通知：动作：{}，缓存key：{}，缓存策略：{}", new Object[]{action, obj, cachePolicy});
        CacheSynchronizeEvent cacheSynchronizeEvent = new CacheSynchronizeEvent();
        cacheSynchronizeEvent.setAction(action);
        cacheSynchronizeEvent.setPolicy(cachePolicy);
        cacheSynchronizeEvent.setArea(str);
        cacheSynchronizeEvent.setKey(obj);
        StatefulConnection<?, ?> connection = this.pool.getConnection(new LayerRedisCodec<>(cachePolicy.getKeyClass(), cls), str);
        if (null == connection) {
            log.warn("redis缓存订阅，链接redis失败");
            return;
        }
        Object reactive = reactive(this.pool.getClient(str), connection);
        if (action == ACTION.DELMAPKEY) {
            if (Boolean.FALSE.equals((Boolean) hasKey((RedisHashReactiveCommands) reactive, obj).block())) {
                action = ACTION.DELETE;
            }
        } else if (action == ACTION.POP && Boolean.FALSE.equals((Boolean) hasKey((RedisListReactiveCommands) reactive, obj).block())) {
            action = ACTION.DELETE;
        }
        if (action == ACTION.DELETE) {
            synchronizePublisher().publish(cacheSynchronizeEvent);
            return;
        }
        if (action == ACTION.PUTMAP || action == ACTION.ADDMAPKEY || action == ACTION.DELMAPKEY) {
            cacheSynchronizeEvent.setValue((Map) ((RedisHashReactiveCommands) reactive).hgetall(obj).block());
        } else if (action == ACTION.PUTLIST || action == ACTION.PUSH || action == ACTION.POP) {
            cacheSynchronizeEvent.setValue((List) ((RedisListReactiveCommands) reactive).lrange(obj, 0L, -1L).collectList().block());
        } else if (action == ACTION.PUTSET) {
            cacheSynchronizeEvent.setValue((Set) ((RedisSetReactiveCommands) reactive).smembers(obj).collect(Collectors.toSet()).block());
        } else {
            cacheSynchronizeEvent.setValue(((RedisStringReactiveCommands) reactive).get(obj).block());
        }
        synchronizePublisher().publish(cacheSynchronizeEvent);
    }

    public <C> C reactive(AbstractRedisClient abstractRedisClient, StatefulConnection<?, ?> statefulConnection) {
        if (abstractRedisClient instanceof RedisClient) {
            return (C) ((StatefulRedisConnection) statefulConnection).reactive();
        }
        if (abstractRedisClient instanceof RedisClusterClient) {
            return (C) ((StatefulRedisClusterConnection) statefulConnection).reactive();
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <C, K> Mono<Boolean> hasKey(C c, K k) {
        return c instanceof RedisAdvancedClusterReactiveCommands ? ((RedisAdvancedClusterReactiveCommands) c).exists(new Object[]{k}).map(l -> {
            return Boolean.valueOf(l.equals(1L));
        }) : ((RedisReactiveCommands) c).exists(new Object[]{k}).map(l2 -> {
            return Boolean.valueOf(l2.equals(1L));
        });
    }

    public RedisPool getPool() {
        return this.pool;
    }

    public void buildAreaClient(String str, CacheLayerConfig cacheLayerConfig) {
        this.pool.buildAreaClient(str, cacheLayerConfig, this::sub);
    }

    public PlaceHolder placeHolder(String str) {
        return new RedisPlaceHolder(this, str);
    }

    static {
        CONSTRUCTORS.put(ACTION.GET, RedisUtil.actionConstructor(RedisCacheGetAction.class));
        CONSTRUCTORS.put(ACTION.PUT, RedisUtil.actionConstructor(RedisCachePutAction.class));
        CONSTRUCTORS.put(ACTION.DELETE, RedisUtil.actionConstructor(RedisCacheDeleteAction.class));
        CONSTRUCTORS.put(ACTION.PUTLIST, RedisUtil.actionConstructor(RedisCacheListPutAction.class));
        CONSTRUCTORS.put(ACTION.GETLIST, RedisUtil.actionConstructor(RedisCacheListGetAction.class));
        CONSTRUCTORS.put(ACTION.POP, RedisUtil.actionConstructor(RedisCacheListPopAction.class));
        CONSTRUCTORS.put(ACTION.PUSH, RedisUtil.actionConstructor(RedisCacheListPushAction.class));
        CONSTRUCTORS.put(ACTION.PUTSET, RedisUtil.actionConstructor(RedisCacheSetPutAction.class));
        CONSTRUCTORS.put(ACTION.GETSET, RedisUtil.actionConstructor(RedisCacheSetGetAction.class));
        CONSTRUCTORS.put(ACTION.PUTMAP, RedisUtil.actionConstructor(RedisCacheMapPutAction.class));
        CONSTRUCTORS.put(ACTION.GETMAP, RedisUtil.actionConstructor(RedisCacheMapGetAction.class));
        CONSTRUCTORS.put(ACTION.GETMAPKEY, RedisUtil.actionConstructor(RedisCacheMapGetKeyAction.class));
        CONSTRUCTORS.put(ACTION.HASMAPKEY, RedisUtil.actionConstructor(RedisCacheMapExistKeyAction.class));
        CONSTRUCTORS.put(ACTION.DELMAPKEY, RedisUtil.actionConstructor(RedisCacheMapDelKeyAction.class));
        CONSTRUCTORS.put(ACTION.ADDMAPKEY, RedisUtil.actionConstructor(RedisCacheMapAddAction.class));
    }
}
