package com.expediagroup.rhapsody.test.core.work;

import com.expediagroup.rhapsody.api.PublisherFactory;
import com.expediagroup.rhapsody.api.SubscriberFactory;
import com.expediagroup.rhapsody.api.WorkType;
import com.expediagroup.rhapsody.core.adapter.Adapters;
import com.expediagroup.rhapsody.core.work.WorkBufferConfig;
import com.expediagroup.rhapsody.core.work.WorkBufferer;
import com.expediagroup.rhapsody.test.TestWork;
import com.expediagroup.rhapsody.util.Defaults;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

/* loaded from: input_file:com/expediagroup/rhapsody/test/core/work/AbstractBufferedWorkTest.class */
public abstract class AbstractBufferedWorkTest {
    protected static final Duration STEP_DURATION = Duration.ofMillis(200);
    protected static final WorkBufferConfig WORK_BUFFER_CONFIG = new WorkBufferConfig(Defaults.PREFETCH, STEP_DURATION.multipliedBy(4), 8, Defaults.CONCURRENCY);
    protected static final String SUBJECT = "SUBJECT";
    protected final WorkBufferer<TestWork> workBufferer = WorkBufferer.identity(WORK_BUFFER_CONFIG);
    protected final SubscriberFactory<TestWork> subscriberFactory;
    protected final PublisherFactory<TestWork> publisherFactory;
    protected Consumer<TestWork> workConsumer;
    protected Flux<List<TestWork>> workFlux;

    protected AbstractBufferedWorkTest(SubscriberFactory<TestWork> subscriberFactory, PublisherFactory<TestWork> publisherFactory) {
        this.subscriberFactory = subscriberFactory;
        this.publisherFactory = publisherFactory;
    }

    @Before
    public void setup() {
        this.workConsumer = Adapters.toConsumer(this.subscriberFactory);
        this.workFlux = Flux.from(this.publisherFactory.create()).transform(this.workBufferer);
    }

    @Test
    public void bufferedCommitsAreImmediatelyEmitted() {
        TestWork create = TestWork.create(WorkType.COMMIT, SUBJECT, Instant.now().toEpochMilli());
        StepVerifier.create(this.workFlux).then(() -> {
            this.workConsumer.accept(create);
        }).expectNext(Collections.singletonList(create)).thenAwait(STEP_DURATION).thenCancel().verify();
    }

    @Test
    public void bufferedIntentsAndRetriesAreEmittedAfterBufferDuration() {
        TestWork create = TestWork.create(WorkType.INTENT, SUBJECT, Instant.now().toEpochMilli());
        TestWork create2 = TestWork.create(WorkType.RETRY, SUBJECT, create.workHeader().inceptionEpochMilli() + 1);
        StepVerifier.create(this.workFlux).then(() -> {
            this.workConsumer.accept(create);
        }).expectNoEvent(STEP_DURATION).then(() -> {
            this.workConsumer.accept(create2);
        }).expectNoEvent(STEP_DURATION).expectNext(Arrays.asList(create, create2)).thenAwait(WORK_BUFFER_CONFIG.getBufferDuration()).thenCancel().verify();
    }

    @Test
    public void existingBuffersAreEmittedUponCommit() {
        TestWork create = TestWork.create(WorkType.INTENT, SUBJECT, Instant.now().toEpochMilli());
        TestWork create2 = TestWork.create(WorkType.COMMIT, SUBJECT, create.workHeader().inceptionEpochMilli() + 1);
        StepVerifier.create(this.workFlux).then(() -> {
            this.workConsumer.accept(create);
        }).expectNoEvent(STEP_DURATION).then(() -> {
            this.workConsumer.accept(create2);
        }).expectNext(Arrays.asList(create, create2)).thenAwait(STEP_DURATION).thenCancel().verify();
    }
}
