package io.kestra.core.runners;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.ExecutionKilled;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.tasks.flows.Pause;
import io.kestra.core.tasks.flows.WorkingDirectory;
import io.kestra.core.tasks.test.Sleep;
import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.Rethrow;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

@MicronautTest
/* loaded from: input_file:io/kestra/core/runners/WorkerTest.class */
class WorkerTest {

    @Inject
    ApplicationContext applicationContext;

    @Inject
    @Named("workerJobQueue")
    QueueInterface<WorkerJob> workerTaskQueue;

    @Inject
    @Named("workerTaskResultQueue")
    QueueInterface<WorkerTaskResult> workerTaskResultQueue;

    @Inject
    @Named("executionKilledQueue")
    QueueInterface<ExecutionKilled> executionKilledQueue;

    @Inject
    @Named("workerTaskLogQueue")
    QueueInterface<LogEntry> workerTaskLogQueue;

    @Inject
    RunContextFactory runContextFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void success() throws TimeoutException {
        new Worker(this.applicationContext, 8, (String) null).run();
        AtomicReference atomicReference = new AtomicReference(null);
        this.workerTaskResultQueue.receive(either -> {
            atomicReference.set((WorkerTaskResult) either.getLeft());
        });
        this.workerTaskQueue.emit(workerTask(1000L));
        Await.until(() -> {
            return atomicReference.get() != null && ((WorkerTaskResult) atomicReference.get()).getTaskRun().getState().isTerminated();
        }, Duration.ofMillis(100L), Duration.ofMinutes(1L));
        MatcherAssert.assertThat(Integer.valueOf(((WorkerTaskResult) atomicReference.get()).getTaskRun().getState().getHistories().size()), Matchers.is(3));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void workerGroup() {
        MatcherAssert.assertThat(new Worker(this.applicationContext, 8, "toto").getWorkerGroup(), Matchers.nullValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void failOnWorkerTaskWithFlowable() throws TimeoutException, JsonProcessingException {
        new Worker(this.applicationContext, 8, (String) null).run();
        AtomicReference atomicReference = new AtomicReference(null);
        this.workerTaskResultQueue.receive(either -> {
            atomicReference.set((WorkerTaskResult) either.getLeft());
        });
        Pause build = Pause.builder().type(Pause.class.getName()).delay(Duration.ofSeconds(1L)).id("unit-test").build();
        WorkingDirectory build2 = WorkingDirectory.builder().type(WorkingDirectory.class.getName()).id("worker-unit-test").tasks(List.of(build)).build();
        this.workerTaskQueue.emit(WorkerTask.builder().runContext(this.runContextFactory.of(ImmutableMap.of("key", "value"))).task(build2).taskRun(TaskRun.of(TestsUtils.mockExecution(Flow.builder().id(IdUtils.create()).namespace("io.kestra.unit-test").tasks(Collections.singletonList(build2)).build(), ImmutableMap.of()), ResolvedTask.of(build))).build());
        Await.until(Rethrow.throwSupplier(() -> {
            WorkerTaskResult workerTaskResult = (WorkerTaskResult) atomicReference.get();
            return "WorkerTaskResult was " + (workerTaskResult == null ? null : JacksonMapper.ofJson().writeValueAsString(workerTaskResult));
        }), () -> {
            return atomicReference.get() != null && ((WorkerTaskResult) atomicReference.get()).getTaskRun().getState().isFailed();
        }, Duration.ofMillis(100L), Duration.ofMinutes(1L));
        MatcherAssert.assertThat(Integer.valueOf(((WorkerTaskResult) atomicReference.get()).getTaskRun().getState().getHistories().size()), Matchers.is(3));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Test
    public void killed() throws InterruptedException, TimeoutException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.workerTaskLogQueue.receive(either -> {
            copyOnWriteArrayList.add((LogEntry) either.getLeft());
        });
        new Worker(this.applicationContext, 8, (String) null).run();
        ArrayList arrayList = new ArrayList();
        this.workerTaskResultQueue.receive(either2 -> {
            arrayList.add((WorkerTaskResult) either2.getLeft());
        });
        WorkerTask workerTask = workerTask(999000L);
        this.workerTaskQueue.emit(workerTask);
        this.workerTaskQueue.emit(workerTask);
        this.workerTaskQueue.emit(workerTask);
        this.workerTaskQueue.emit(workerTask);
        this.workerTaskQueue.emit(workerTask(2000L));
        Thread.sleep(500L);
        this.executionKilledQueue.emit(ExecutionKilled.builder().executionId(workerTask.getTaskRun().getExecutionId()).build());
        Await.until(() -> {
            return arrayList.stream().filter(workerTaskResult -> {
                return workerTaskResult.getTaskRun().getState().isTerminated();
            }).count() == 5;
        }, Duration.ofMillis(100L), Duration.ofMinutes(1L));
        WorkerTaskResult workerTaskResult = (WorkerTaskResult) arrayList.stream().filter(workerTaskResult2 -> {
            return workerTaskResult2.getTaskRun().getState().getCurrent() == State.Type.KILLED;
        }).findFirst().orElseThrow();
        MatcherAssert.assertThat(Integer.valueOf(workerTaskResult.getTaskRun().getState().getHistories().size()), Matchers.is(3));
        MatcherAssert.assertThat(workerTaskResult.getTaskRun().getState().getCurrent(), Matchers.is(State.Type.KILLED));
        WorkerTaskResult workerTaskResult3 = (WorkerTaskResult) arrayList.stream().filter(workerTaskResult4 -> {
            return workerTaskResult4.getTaskRun().getState().getCurrent() == State.Type.SUCCESS;
        }).findFirst().orElseThrow();
        MatcherAssert.assertThat(Integer.valueOf(workerTaskResult3.getTaskRun().getState().getHistories().size()), Matchers.is(3));
        MatcherAssert.assertThat(workerTaskResult3.getTaskRun().getState().getCurrent(), Matchers.is(State.Type.SUCCESS));
        Thread.sleep(1000L);
        MatcherAssert.assertThat(Long.valueOf(copyOnWriteArrayList.stream().filter(logEntry -> {
            return logEntry.getMessage().equals("3");
        }).count()), Matchers.is(0L));
    }

    private WorkerTask workerTask(long j) {
        Sleep mo498build = ((Sleep.SleepBuilder) ((Sleep.SleepBuilder) Sleep.builder().type(Sleep.class.getName())).id("unit-test")).duration(Long.valueOf(j)).mo498build();
        return WorkerTask.builder().runContext(this.runContextFactory.of(ImmutableMap.of("key", "value"))).task(mo498build).taskRun(TaskRun.of(TestsUtils.mockExecution(Flow.builder().id(IdUtils.create()).namespace("io.kestra.unit-test").tasks(Collections.singletonList(mo498build)).build(), ImmutableMap.of()), ResolvedTask.of(mo498build))).build();
    }
}
