package com.baidu.hugegraph.computer.core.compute;

import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.config.EdgeFrequency;
import com.baidu.hugegraph.computer.core.config.Null;
import com.baidu.hugegraph.computer.core.graph.edge.Edge;
import com.baidu.hugegraph.computer.core.graph.edge.Edges;
import com.baidu.hugegraph.computer.core.graph.id.BytesId;
import com.baidu.hugegraph.computer.core.graph.properties.Properties;
import com.baidu.hugegraph.computer.core.graph.value.IdList;
import com.baidu.hugegraph.computer.core.graph.value.IdListList;
import com.baidu.hugegraph.computer.core.graph.value.LongValue;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.StreamGraphOutput;
import com.baidu.hugegraph.computer.core.manager.Managers;
import com.baidu.hugegraph.computer.core.network.ConnectionId;
import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager;
import com.baidu.hugegraph.computer.core.receiver.ReceiverUtil;
import com.baidu.hugegraph.computer.core.sender.MessageSendManager;
import com.baidu.hugegraph.computer.core.sort.sorting.SendSortManager;
import com.baidu.hugegraph.computer.core.store.FileManager;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.EntryOutputImpl;
import com.baidu.hugegraph.computer.core.worker.Computation;
import com.baidu.hugegraph.computer.core.worker.ComputationContext;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.testutil.Whitebox;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.function.Consumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/compute/ComputeManagerTest.class */
public class ComputeManagerTest extends UnitTestBase {
    private static final Random RANDOM = new Random(1);
    private Config config;
    private Managers managers;
    private ConnectionId connectionId;
    private ComputeManager<?> computeManager;

    @Before
    public void setup() {
        this.config = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.JOB_PARTITIONS_COUNT, "2", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMBINER_CLASS, Null.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, IdListList.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, IdList.class.getName(), ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000", ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT, "1000", ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "10", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation.class.getName(), ComputerOptions.INPUT_EDGE_FREQ, "SINGLE");
        this.managers = new Managers();
        FileManager fileManager = new FileManager();
        this.managers.add(fileManager);
        SendSortManager sendSortManager = new SendSortManager(context());
        this.managers.add(sendSortManager);
        this.managers.add(new MessageSendManager(context(), sendSortManager, new MockMessageSender()));
        this.managers.add(new MessageRecvManager(context(), fileManager, sendSortManager));
        this.managers.initAll(this.config);
        this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), 0);
        this.computeManager = new ComputeManager<>(context(), this.managers, (Computation) this.config.createObject(ComputerOptions.WORKER_COMPUTATION_CLASS));
    }

    @After
    public void teardown() {
        this.managers.closeAll(this.config);
    }

    @Test
    public void testProcess() throws IOException {
        MessageRecvManager messageRecvManager = this.managers.get("message_recv");
        messageRecvManager.onStarted(this.connectionId);
        add200VertexBuffer(managedBuffer -> {
            messageRecvManager.handle(MessageType.VERTEX, 0, managedBuffer);
        });
        add200VertexBuffer(managedBuffer2 -> {
            messageRecvManager.handle(MessageType.VERTEX, 1, managedBuffer2);
        });
        messageRecvManager.onFinished(this.connectionId);
        messageRecvManager.onStarted(this.connectionId);
        addSingleFreqEdgeBuffer(managedBuffer3 -> {
            messageRecvManager.handle(MessageType.EDGE, 0, managedBuffer3);
        });
        messageRecvManager.onFinished(this.connectionId);
        this.computeManager.input();
        messageRecvManager.beforeSuperstep(this.config, 0);
        messageRecvManager.onStarted(this.connectionId);
        addMessages(managedBuffer4 -> {
            messageRecvManager.handle(MessageType.MSG, 0, managedBuffer4);
        });
        messageRecvManager.onFinished(this.connectionId);
        this.computeManager.compute((ComputationContext) null, 0);
        messageRecvManager.afterSuperstep(this.config, 0);
        this.computeManager.takeRecvedMessages();
        messageRecvManager.beforeSuperstep(this.config, 1);
        messageRecvManager.onStarted(this.connectionId);
        messageRecvManager.onFinished(this.connectionId);
        this.computeManager.compute((ComputationContext) null, 1);
        messageRecvManager.afterSuperstep(this.config, 1);
        this.computeManager.output();
    }

    private static void add200VertexBuffer(Consumer<ManagedBuffer> consumer) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 200) {
                return;
            }
            Vertex createVertex = graphFactory().createVertex();
            createVertex.id(BytesId.of(j2));
            createVertex.properties(graphFactory().createProperties());
            ReceiverUtil.consumeBuffer(writeVertex(createVertex), consumer);
            j = j2 + 2;
        }
    }

    private static byte[] writeVertex(Vertex vertex) throws IOException {
        BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
        new StreamGraphOutput(context(), new EntryOutputImpl(createBytesOutput)).writeVertex(vertex);
        return createBytesOutput.toByteArray();
    }

    private static void addSingleFreqEdgeBuffer(Consumer<ManagedBuffer> consumer) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 200) {
                return;
            }
            Vertex createVertex = graphFactory().createVertex();
            createVertex.id(BytesId.of(j2));
            int nextInt = RANDOM.nextInt(20);
            if (nextInt != 0) {
                Edges createEdges = graphFactory().createEdges(nextInt);
                long j3 = 0;
                while (true) {
                    long j4 = j3;
                    if (j4 >= nextInt) {
                        break;
                    }
                    Edge createEdge = graphFactory().createEdge();
                    createEdge.targetId(BytesId.of(RANDOM.nextInt(200)));
                    Properties createProperties = graphFactory().createProperties();
                    createProperties.put("p1", new LongValue(j2));
                    createEdge.properties(createProperties);
                    createEdges.add(createEdge);
                    j3 = j4 + 1;
                }
                createVertex.edges(createEdges);
                ReceiverUtil.consumeBuffer(writeEdges(createVertex, EdgeFrequency.SINGLE), consumer);
            }
            j = j2 + 1;
        }
    }

    private static byte[] writeEdges(Vertex vertex, EdgeFrequency edgeFrequency) throws IOException {
        BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
        StreamGraphOutput streamGraphOutput = new StreamGraphOutput(context(), new EntryOutputImpl(createBytesOutput));
        Whitebox.setInternalState(streamGraphOutput, "frequency", edgeFrequency);
        streamGraphOutput.writeEdges(vertex);
        return createBytesOutput.toByteArray();
    }

    private static void addMessages(Consumer<ManagedBuffer> consumer) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 200) {
                return;
            }
            int nextInt = RANDOM.nextInt(5);
            for (int i = 0; i < nextInt; i++) {
                BytesId of = BytesId.of(j2);
                IdList idList = new IdList();
                idList.add(of);
                ReceiverUtil.consumeBuffer(ReceiverUtil.writeMessage(of, idList), consumer);
            }
            j = j2 + 1;
        }
    }
}
