package org.apache.giraph.comm.messages;

import java.lang.reflect.GenericDeclaration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore;
import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.types.ops.TypeOpsUtils;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.class */
public class InMemoryMessageStoreFactory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, MessageStore<I, M>> {
    private static final Logger LOG = Logger.getLogger(InMemoryMessageStoreFactory.class);
    protected CentralizedServiceWorker<I, ?, ?> service;
    protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;

    protected MessageStore<I, M> newStoreWithCombiner(MessageValueFactory<M> messageValueFactory) {
        GenericDeclaration valueClass = messageValueFactory.getValueClass();
        Class<I> vertexIdClass = this.conf.getVertexIdClass();
        return (vertexIdClass.equals(IntWritable.class) && valueClass.equals(FloatWritable.class)) ? new IntFloatMessageStore(this.service, this.conf.createMessageCombiner()) : (vertexIdClass.equals(LongWritable.class) && valueClass.equals(DoubleWritable.class)) ? new LongDoubleMessageStore(this.service, this.conf.createMessageCombiner()) : TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass) != null ? new IdOneMessagePerVertexStore(messageValueFactory, this.service, this.conf.createMessageCombiner(), this.conf) : new OneMessagePerVertexStore(messageValueFactory, this.service, this.conf.createMessageCombiner(), this.conf);
    }

    protected MessageStore<I, M> newStoreWithoutCombiner(MessageValueFactory<M> messageValueFactory) {
        MessageStore messageStore = null;
        MessageEncodeAndStoreType messageEncodeAndStoreType = GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(this.conf);
        Class<I> vertexIdClass = this.conf.getVertexIdClass();
        if (vertexIdClass.equals(IntWritable.class)) {
            messageStore = new IntByteArrayMessageStore(messageValueFactory, this.service, this.conf);
        } else if (vertexIdClass.equals(LongWritable.class)) {
            if (messageEncodeAndStoreType.equals(MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) || messageEncodeAndStoreType.equals(MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
                messageStore = new LongByteArrayMessageStore(messageValueFactory, this.service, this.conf);
            } else if (messageEncodeAndStoreType.equals(MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
                messageStore = new LongPointerListMessageStore(messageValueFactory, this.service, this.conf);
            }
        } else if (messageEncodeAndStoreType.equals(MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) || messageEncodeAndStoreType.equals(MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
            messageStore = TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass) != null ? new IdByteArrayMessageStore(messageValueFactory, this.service, this.conf) : new ByteArrayMessagesPerVertexStore(messageValueFactory, this.service, this.conf);
        } else if (messageEncodeAndStoreType.equals(MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
            messageStore = new PointerListPerVertexStore(messageValueFactory, this.service, this.conf);
        }
        return messageStore;
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreFactory
    public MessageStore<I, M> newStore(MessageValueFactory<M> messageValueFactory) {
        Object valueClass = messageValueFactory.getValueClass();
        MessageStore<I, M> newStoreWithCombiner = this.conf.useMessageCombiner() ? newStoreWithCombiner(messageValueFactory) : newStoreWithoutCombiner(messageValueFactory);
        if (LOG.isInfoEnabled()) {
            LOG.info("newStore: Created " + newStoreWithCombiner.getClass() + " for vertex id " + this.conf.getVertexIdClass() + " and message value " + valueClass + " and" + (this.conf.useMessageCombiner() ? " message combiner " + this.conf.getMessageCombinerClass() : " no combiner"));
        }
        int i = GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.get(this.conf);
        if (i > 0) {
            newStoreWithCombiner = new AsyncMessageStoreWrapper(newStoreWithCombiner, this.service.getPartitionStore().getPartitionIds(), i);
        }
        return newStoreWithCombiner;
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreFactory
    public void initialize(CentralizedServiceWorker<I, ?, ?> centralizedServiceWorker, ImmutableClassesGiraphConfiguration<I, ?, ?> immutableClassesGiraphConfiguration) {
        this.service = centralizedServiceWorker;
        this.conf = immutableClassesGiraphConfiguration;
    }

    @Override // org.apache.giraph.comm.messages.MessageStoreFactory
    public boolean shouldTraverseMessagesInOrder() {
        return false;
    }
}
