package net.kuujo.vertigo.input.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import net.kuujo.vertigo.acker.Acker;
import net.kuujo.vertigo.acker.DefaultAcker;
import net.kuujo.vertigo.context.InstanceContext;
import net.kuujo.vertigo.hooks.InputHook;
import net.kuujo.vertigo.input.Input;
import net.kuujo.vertigo.input.InputCollector;
import net.kuujo.vertigo.input.Listener;
import net.kuujo.vertigo.message.JsonMessage;
import net.kuujo.vertigo.message.MessageId;
import net.kuujo.vertigo.message.schema.JsonValidator;
import net.kuujo.vertigo.message.schema.MessageSchema;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Future;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.impl.DefaultFutureResult;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.platform.Container;

/* loaded from: input_file:net/kuujo/vertigo/input/impl/DefaultInputCollector.class */
public class DefaultInputCollector implements InputCollector {
    private final Vertx vertx;
    private final Logger logger;
    private final InstanceContext<?> context;
    private final List<InputHook> hooks = new ArrayList();
    private final Acker acker;
    private JsonValidator validator;
    private Handler<JsonMessage> messageHandler;
    private List<Listener> listeners;

    public DefaultInputCollector(Vertx vertx, Container container, InstanceContext<?> instanceContext) {
        this.vertx = vertx;
        this.logger = container.logger();
        this.context = instanceContext;
        this.acker = new DefaultAcker(instanceContext.id(), vertx.eventBus());
    }

    public DefaultInputCollector(Vertx vertx, Container container, InstanceContext<?> instanceContext, Acker acker) {
        this.vertx = vertx;
        this.logger = container.logger();
        this.context = instanceContext;
        this.acker = acker;
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public InputCollector addHook(InputHook inputHook) {
        this.hooks.add(inputHook);
        return this;
    }

    private void hookStart() {
        Iterator<InputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleStart(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void hookReceived(MessageId messageId) {
        Iterator<InputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleReceive(messageId);
        }
    }

    private void hookAck(MessageId messageId) {
        Iterator<InputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleAck(messageId);
        }
    }

    private void hookFail(MessageId messageId) {
        Iterator<InputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleFail(messageId);
        }
    }

    private void hookStop() {
        Iterator<InputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleStart(this);
        }
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public InputCollector declareSchema(MessageSchema messageSchema) {
        this.validator = messageSchema.getValidator();
        return this;
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public InputCollector messageHandler(Handler<JsonMessage> handler) {
        this.messageHandler = wrapMessageHandler(handler);
        if (this.listeners != null) {
            Iterator<Listener> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().messageHandler(this.messageHandler);
            }
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasValidSchema(JsonMessage jsonMessage) {
        return this.validator == null || this.validator.validate(jsonMessage.body());
    }

    private Handler<JsonMessage> wrapMessageHandler(final Handler<JsonMessage> handler) {
        return new Handler<JsonMessage>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.1
            public void handle(JsonMessage jsonMessage) {
                if (!DefaultInputCollector.this.hasValidSchema(jsonMessage)) {
                    DefaultInputCollector.this.fail(jsonMessage);
                } else {
                    handler.handle(jsonMessage);
                    DefaultInputCollector.this.hookReceived(jsonMessage.messageId());
                }
            }
        };
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public InputCollector start() {
        final DefaultFutureResult defaultFutureResult = new DefaultFutureResult();
        defaultFutureResult.setHandler(new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.2
            public void handle(AsyncResult<Void> asyncResult) {
                if (asyncResult.failed()) {
                    DefaultInputCollector.this.logger.error(asyncResult.cause());
                }
            }
        });
        stop(new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.3
            public void handle(AsyncResult<Void> asyncResult) {
                DefaultInputCollector.this.listeners = new ArrayList();
                DefaultInputCollector.this.recursiveStart(DefaultInputCollector.this.context.getComponent().getInputs().iterator(), defaultFutureResult);
            }
        });
        return this;
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public InputCollector start(Handler<AsyncResult<Void>> handler) {
        final DefaultFutureResult handler2 = new DefaultFutureResult().setHandler(handler);
        stop(new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.4
            public void handle(AsyncResult<Void> asyncResult) {
                DefaultInputCollector.this.listeners = new ArrayList();
                DefaultInputCollector.this.recursiveStart(DefaultInputCollector.this.context.getComponent().getInputs().iterator(), handler2);
            }
        });
        return this;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recursiveStart(final Iterator<Input> it, final Future<Void> future) {
        if (!it.hasNext()) {
            future.setResult((Object) null);
            hookStart();
        } else {
            Listener messageHandler = new DefaultListener(it.next(), this.vertx).setAutoAck(false).messageHandler(this.messageHandler);
            this.listeners.add(messageHandler);
            messageHandler.start(new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.5
                public void handle(AsyncResult<Void> asyncResult) {
                    if (asyncResult.failed()) {
                        future.setFailure(asyncResult.cause());
                    } else {
                        DefaultInputCollector.this.recursiveStart(it, future);
                    }
                }
            });
        }
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public void stop() {
        if (this.listeners != null) {
            DefaultFutureResult defaultFutureResult = new DefaultFutureResult();
            defaultFutureResult.setHandler(new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.6
                public void handle(AsyncResult<Void> asyncResult) {
                    if (asyncResult.failed()) {
                        DefaultInputCollector.this.logger.error(asyncResult.cause());
                    }
                    DefaultInputCollector.this.listeners = null;
                    Iterator it = DefaultInputCollector.this.hooks.iterator();
                    while (it.hasNext()) {
                        ((InputHook) it.next()).handleStop(DefaultInputCollector.this);
                    }
                }
            });
            recursiveStop(this.listeners.iterator(), defaultFutureResult);
        }
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public void stop(final Handler<AsyncResult<Void>> handler) {
        DefaultFutureResult defaultFutureResult = new DefaultFutureResult();
        defaultFutureResult.setHandler(new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.7
            public void handle(AsyncResult<Void> asyncResult) {
                DefaultInputCollector.this.listeners = null;
                if (asyncResult.failed()) {
                    new DefaultFutureResult().setHandler(handler).setFailure(asyncResult.cause());
                } else {
                    new DefaultFutureResult().setHandler(handler).setResult((Object) null);
                }
            }
        });
        if (this.listeners != null) {
            recursiveStop(this.listeners.iterator(), defaultFutureResult);
        } else {
            defaultFutureResult.setResult((Object) null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recursiveStop(final Iterator<Listener> it, final Future<Void> future) {
        if (it.hasNext()) {
            it.next().stop(new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.input.impl.DefaultInputCollector.8
                public void handle(AsyncResult<Void> asyncResult) {
                    if (asyncResult.failed()) {
                        future.setFailure(asyncResult.cause());
                    } else {
                        DefaultInputCollector.this.recursiveStop(it, future);
                    }
                }
            });
        } else {
            future.setResult((Object) null);
            hookStop();
        }
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public InputCollector ack(JsonMessage jsonMessage) {
        this.acker.ack(jsonMessage.messageId());
        hookAck(jsonMessage.messageId());
        return this;
    }

    @Override // net.kuujo.vertigo.input.InputCollector
    public InputCollector fail(JsonMessage jsonMessage) {
        this.acker.fail(jsonMessage.messageId());
        hookFail(jsonMessage.messageId());
        return this;
    }
}
