package de.otto.synapse.messagestore.redis;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import de.otto.synapse.channel.ChannelPosition;
import de.otto.synapse.channel.ShardPosition;
import de.otto.synapse.message.DefaultHeaderAttr;
import de.otto.synapse.message.TextMessage;
import de.otto.synapse.messagestore.Index;
import de.otto.synapse.messagestore.Indexer;
import de.otto.synapse.messagestore.MessageStore;
import de.otto.synapse.messagestore.MessageStoreEntry;
import de.otto.synapse.translator.Decoder;
import de.otto.synapse.translator.Encoder;
import de.otto.synapse.translator.MessageFormat;
import de.otto.synapse.translator.TextDecoder;
import de.otto.synapse.translator.TextEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterators;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;

@Beta
/* loaded from: input_file:de/otto/synapse/messagestore/redis/RedisIndexedMessageStore.class */
public class RedisIndexedMessageStore implements MessageStore {
    private static final Logger LOG = LoggerFactory.getLogger(RedisIndexedMessageStore.class);
    private static final int CHARACTERISTICS = 1296;
    private final String name;
    private final Indexer indexer;
    private final RedisTemplate<String, String> redisTemplate;
    private final int batchSize;
    private final int maxSize;
    private final Encoder<String> encoder;
    private final Decoder<String> decoder;
    private final long maxAge;

    public RedisIndexedMessageStore(String str, int i, int i2, long j, Indexer indexer, RedisTemplate<String, String> redisTemplate) {
        this(str, i, i2, j, indexer, redisTemplate, new TextEncoder(MessageFormat.V2), new TextDecoder());
    }

    public RedisIndexedMessageStore(String str, int i, int i2, long j, Indexer indexer, RedisTemplate<String, String> redisTemplate, Encoder<String> encoder, Decoder<String> decoder) {
        this.name = str;
        this.maxAge = j;
        this.indexer = indexer;
        this.redisTemplate = redisTemplate;
        this.batchSize = i;
        this.maxSize = i2;
        this.encoder = encoder;
        this.decoder = decoder;
    }

    public String getName() {
        return this.name;
    }

    public void add(MessageStoreEntry messageStoreEntry) {
        final MessageStoreEntry index = this.indexer.index(messageStoreEntry);
        final TextMessage textMessage = index.getTextMessage();
        final String messageIdCalculator = messageIdCalculator(textMessage);
        LOG.debug("Redis returned with " + ((List) this.redisTemplate.execute(new SessionCallback<List<Object>>() { // from class: de.otto.synapse.messagestore.redis.RedisIndexedMessageStore.1
            /* renamed from: execute, reason: merged with bridge method [inline-methods] */
            public List<Object> m2execute(RedisOperations redisOperations) throws DataAccessException {
                redisOperations.multi();
                Optional shardPosition = textMessage.getHeader().getShardPosition();
                MessageStoreEntry messageStoreEntry2 = index;
                shardPosition.ifPresent(shardPosition2 -> {
                    redisOperations.boundHashOps(RedisIndexedMessageStore.this.name + "-" + messageStoreEntry2.getChannelName() + "-channelPos").put(shardPosition2.shardName(), shardPosition2.position());
                });
                redisOperations.boundSetOps(RedisIndexedMessageStore.this.name + "-channels").add(new Object[]{index.getChannelName()});
                String str = RedisIndexedMessageStore.this.name + "-message-" + messageIdCalculator;
                BoundHashOperations boundHashOps = redisOperations.boundHashOps(str);
                boundHashOps.putAll(RedisIndexedMessageStore.this.encode(index));
                boundHashOps.expire(RedisIndexedMessageStore.this.maxAge, TimeUnit.SECONDS);
                BoundListOperations boundListOps = redisOperations.boundListOps(RedisIndexedMessageStore.this.name + "-messages");
                boundListOps.rightPush(str);
                boundListOps.expire(RedisIndexedMessageStore.this.maxAge, TimeUnit.SECONDS);
                boundListOps.trim(-RedisIndexedMessageStore.this.maxSize, -1L);
                index.getFilterValues().entrySet().forEach(entry -> {
                    BoundListOperations boundListOps2 = redisOperations.boundListOps(RedisIndexedMessageStore.this.name + "-" + ((Index) entry.getKey()).getName() + "-" + ((String) entry.getValue()));
                    boundListOps2.rightPush(str);
                    boundListOps2.expire(RedisIndexedMessageStore.this.maxAge, TimeUnit.SECONDS);
                });
                return redisOperations.exec();
            }
        })));
    }

    public Set<String> getChannelNames() {
        return this.redisTemplate.boundSetOps(this.name + "-channels").members();
    }

    public ImmutableSet<Index> getIndexes() {
        return this.indexer.getIndexes();
    }

    public ChannelPosition getLatestChannelPosition(String str) {
        return ChannelPosition.channelPosition((Set) this.redisTemplate.boundHashOps(this.name + "-" + str + "-channelPos").entries().entrySet().stream().map(entry -> {
            return ShardPosition.fromPosition(entry.getKey().toString(), entry.getValue().toString());
        }).collect(Collectors.toSet()));
    }

    public Stream<MessageStoreEntry> stream() {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new BatchedRedisHashedListIterator(this.redisTemplate, this::decode, this.name + "-messages", this.batchSize), CHARACTERISTICS), false);
    }

    public Stream<MessageStoreEntry> stream(Index index, String str) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new BatchedRedisHashedListIterator(this.redisTemplate, this::decode, this.name + "-" + index.getName() + "-" + str, this.batchSize), CHARACTERISTICS), false);
    }

    public int size() {
        return this.redisTemplate.boundListOps(this.name + "-messages").size().intValue();
    }

    public void close() {
    }

    public void clear() {
        ArrayList arrayList = new ArrayList(Arrays.asList(this.name + "-channels", this.name + "-messages"));
        getChannelNames().forEach(str -> {
            arrayList.add(this.name + "-" + str + "-channelPos");
        });
        this.redisTemplate.delete(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ImmutableMap<String, String> encode(MessageStoreEntry messageStoreEntry) {
        ImmutableMap.Builder put = ImmutableMap.builder().put("_channelName", messageStoreEntry.getChannelName()).put("_message", this.encoder.apply(messageStoreEntry.getTextMessage()));
        messageStoreEntry.getFilterValues().forEach((index, str) -> {
            put.put(index.getName(), str);
        });
        return put.build();
    }

    private MessageStoreEntry decode(Map<String, String> map) {
        return MessageStoreEntry.of(map.get("_channelName"), ImmutableMap.copyOf((Map) map.entrySet().stream().filter(this::isFilterValue).collect(Collectors.toMap(entry -> {
            return Index.valueOf((String) entry.getKey());
        }, entry2 -> {
            return (String) entry2.getValue();
        }))), (TextMessage) this.decoder.apply(map.get("_message")));
    }

    private boolean isFilterValue(Map.Entry<String, String> entry) {
        return (entry.getKey().equals("_channelName") || entry.getKey().equals("_message")) ? false : true;
    }

    private final String messageIdCalculator(TextMessage textMessage) {
        String asString = textMessage.getHeader().getAsString(DefaultHeaderAttr.MSG_ID);
        return asString != null ? asString : UUID.randomUUID().toString();
    }
}
