package io.vertx.up.uca.job.phase;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.up.atom.Refer;
import io.vertx.up.atom.worker.Mission;
import io.vertx.up.commune.Envelop;
import io.vertx.up.log.Annal;
import io.vertx.up.uca.job.plugin.JobIncome;
import io.vertx.up.unity.Ux;
import io.vertx.up.util.Ut;
import java.util.Objects;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/vertx/up/uca/job/phase/Input.class */
public class Input {
    private static final Annal LOGGER = Annal.get(Input.class);
    private final transient Vertx vertx;
    private final transient Refer underway = new Refer();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Input(Vertx vertx) {
        this.vertx = vertx;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Refer underway() {
        return this.underway;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Envelop> inputAsync(Mission mission) {
        String incomeAddress = mission.getIncomeAddress();
        if (Ut.isNil(incomeAddress)) {
            Element.onceLog(mission, () -> {
                LOGGER.info("[ Job: {0} ] 1. Input new data of JsonObject", new Object[]{mission.getCode()});
            });
            return Future.succeededFuture(Envelop.okJson());
        }
        LOGGER.info("[ Job ] {0} event bus enabled: {1}", new Object[]{"Income", incomeAddress});
        Promise promise = Promise.promise();
        this.vertx.eventBus().consumer(incomeAddress, message -> {
            Element.onceLog(mission, () -> {
                LOGGER.info("[ Job: {0} ] 1. Input from address {1}", new Object[]{mission.getCode(), incomeAddress});
            });
            Envelop envelop = (Envelop) message.body();
            if (Objects.isNull(envelop)) {
                promise.complete(Envelop.ok());
            } else {
                promise.complete(envelop);
            }
        });
        return promise.future();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Envelop> incomeAsync(Envelop envelop, Mission mission) {
        if (!envelop.valid()) {
            Element.onceLog(mission, () -> {
                LOGGER.info("[ Job: {0} ] Terminal with error: {1}", new Object[]{mission.getCode(), envelop.error().getClass().getName()});
            });
            return Ux.future(envelop);
        }
        JobIncome income = Element.income(mission);
        if (Objects.isNull(income)) {
            Element.onceLog(mission, () -> {
                LOGGER.info("[ Job: {0} ] 2. Input without `JobIncome`", new Object[]{mission.getCode()});
            });
            return Future.succeededFuture(envelop);
        }
        LOGGER.info("[ Job ] {0} selected: {1}", new Object[]{"JobIncome", income.getClass().getName()});
        Ut.contract(income, Vertx.class, this.vertx);
        Ut.contract(income, Mission.class, mission);
        Element.onceLog(mission, () -> {
            LOGGER.info("[ Job: {0} ] 2. Input with `JobIncome` = {1}", new Object[]{mission.getCode(), income.getClass().getName()});
        });
        return income.underway().compose(refer -> {
            this.underway.add(refer.get());
            return income.beforeAsync(envelop);
        });
    }
}
