package com.aol.micro.server.reactive;

import com.aol.cyclops.control.Eval;
import com.aol.cyclops.control.Maybe;
import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.data.async.QueueFactories;
import java.io.PrintStream;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/aol/micro/server/reactive/EventQueueManagerTest.class */
public class EventQueueManagerTest {
    EventQueueManager<String> manager;
    volatile String recieved;
    Executor ex = Executors.newFixedThreadPool(10);
    AtomicInteger count = new AtomicInteger(0);

    private String processJob(int i) {
        return null;
    }

    @Before
    public void setup() {
        this.recieved = null;
        this.manager = EventQueueManager.of(this.ex, QueueFactories.boundedNonBlockingQueue(1000));
    }

    @Test
    public void testPush() {
        this.manager.push("hello", "world");
    }

    public void handleEvent(String str) {
    }

    @Test
    public void testConfigure() throws InterruptedException {
        this.manager.forEach("bus-a", this::handleEvent);
        this.manager.push("bus-a", "");
        this.manager.forEach("hello", str -> {
            this.recieved = str;
        });
        this.manager.push("hello", "world");
        Thread.sleep(100L);
        System.out.println(this.recieved);
        Assert.assertThat(this.recieved, CoreMatchers.equalTo("world"));
    }

    public String process(String str) {
        return null;
    }

    @Test
    public void testStream() throws InterruptedException {
        this.manager.stream("2").futureOperations(this.ex).forEach(str -> {
            this.recieved = str;
        });
        this.manager.push("2", "world");
        Thread.sleep(100L);
        System.out.println(this.recieved);
        Assert.assertThat(this.recieved, CoreMatchers.equalTo("world"));
    }

    @Test
    public void testLazyValue() {
        ReactiveSeq.generate(() -> {
            return "input";
        }).onePer(1L, TimeUnit.SECONDS).futureOperations(this.ex).forEach(str -> {
            this.manager.push("lazy", str);
        });
        Eval map = this.manager.lazy("lazy").map(str2 -> {
            return str2 + "-message!";
        }).map(str3 -> {
            return str3 + "!";
        });
        Assert.assertThat(map.get(), CoreMatchers.equalTo("input-message!!"));
        Assert.assertThat(map.get(), CoreMatchers.equalTo("input-message!!"));
    }

    @Test
    public void testMaybe() {
        ReactiveSeq map = ReactiveSeq.generate(() -> {
            return "input";
        }).onePer(1L, TimeUnit.SECONDS).map(str -> {
            return str + ":" + this.count.incrementAndGet();
        });
        PrintStream printStream = System.out;
        printStream.getClass();
        map.peek(printStream::println).futureOperations(this.ex).forEach(str2 -> {
            this.manager.push("lazy", str2);
        });
        Maybe maybe = this.manager.maybe("lazy");
        Maybe maybe2 = this.manager.maybe("lazy");
        Assert.assertThat(maybe.get(), CoreMatchers.anyOf(CoreMatchers.equalTo("input:1"), CoreMatchers.equalTo("input:2")));
        Assert.assertThat(maybe2.get(), CoreMatchers.anyOf(CoreMatchers.equalTo("input:1"), CoreMatchers.equalTo("input:2"), CoreMatchers.equalTo("input:3")));
    }

    public String restCall(String str) {
        return "hello";
    }

    @Test
    public void testIoFutureStream() throws InterruptedException {
        this.manager.ioFutureStream("futureStream").peek(str -> {
            this.recieved = str;
        }).run();
        this.manager.push("futureStream", "world");
        Thread.sleep(100L);
        System.out.println(this.recieved);
        Assert.assertThat(this.recieved, CoreMatchers.equalTo("world"));
    }
}
