package net.kuujo.vertigo.output.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import net.kuujo.vertigo.acker.Acker;
import net.kuujo.vertigo.acker.DefaultAcker;
import net.kuujo.vertigo.auditor.AuditorVerticle;
import net.kuujo.vertigo.context.InstanceContext;
import net.kuujo.vertigo.hooks.OutputHook;
import net.kuujo.vertigo.input.Input;
import net.kuujo.vertigo.message.JsonMessage;
import net.kuujo.vertigo.message.MessageId;
import net.kuujo.vertigo.message.impl.JsonMessageBuilder;
import net.kuujo.vertigo.output.Channel;
import net.kuujo.vertigo.output.Connection;
import net.kuujo.vertigo.output.Output;
import net.kuujo.vertigo.output.OutputCollector;
import net.kuujo.vertigo.serializer.Serializer;
import net.kuujo.vertigo.serializer.Serializers;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.impl.DefaultFutureResult;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.Container;

/* loaded from: input_file:net/kuujo/vertigo/output/impl/DefaultOutputCollector.class */
public class DefaultOutputCollector implements OutputCollector {
    private final Serializer serializer;
    private final Vertx vertx;
    private final EventBus eventBus;
    private final InstanceContext<?> context;
    private final Acker acker;
    private final boolean ackingEnabled;
    private final String componentAddress;
    private final List<OutputHook> hooks;
    private final List<String> auditors;
    private final JsonMessageBuilder messageBuilder;
    private Random random;
    private Map<String, List<Channel>> channels;
    private Map<String, Long> connectionTimers;
    private static final long LISTEN_INTERVAL = 15000;
    private Handler<Message<JsonObject>> handler;

    public DefaultOutputCollector(Vertx vertx, Container container, InstanceContext<?> instanceContext) {
        this(vertx, container, vertx.eventBus(), instanceContext);
    }

    public DefaultOutputCollector(Vertx vertx, Container container, EventBus eventBus, InstanceContext<?> instanceContext) {
        this.serializer = Serializers.getDefault();
        this.hooks = new ArrayList();
        this.random = new Random();
        this.channels = new HashMap<String, List<Channel>>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.1
            {
                put(Output.DEFAULT_STREAM, new ArrayList());
            }
        };
        this.connectionTimers = new HashMap();
        this.handler = new Handler<Message<JsonObject>>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.2
            public void handle(Message<JsonObject> message) {
                JsonObject jsonObject = (JsonObject) message.body();
                if (jsonObject != null) {
                    String string = jsonObject.getString("action");
                    boolean z = -1;
                    switch (string.hashCode()) {
                        case -1102508601:
                            if (string.equals("listen")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            DefaultOutputCollector.this.doListen(jsonObject.getString(AuditorVerticle.ADDRESS), jsonObject.getString("status"), jsonObject.getObject("input"));
                            return;
                        default:
                            return;
                    }
                }
            }
        };
        this.vertx = vertx;
        this.eventBus = eventBus;
        this.context = instanceContext;
        this.acker = new DefaultAcker(instanceContext.id(), eventBus);
        this.messageBuilder = new JsonMessageBuilder(instanceContext.id());
        this.ackingEnabled = instanceContext.getComponent().getNetwork().isAckingEnabled();
        this.auditors = instanceContext.getComponent().getNetwork().getAuditors();
        this.componentAddress = instanceContext.getComponent().getAddress();
    }

    public DefaultOutputCollector(Vertx vertx, Container container, InstanceContext<?> instanceContext, Acker acker) {
        this(vertx, container, vertx.eventBus(), instanceContext, acker);
    }

    public DefaultOutputCollector(Vertx vertx, Container container, EventBus eventBus, InstanceContext<?> instanceContext, Acker acker) {
        this.serializer = Serializers.getDefault();
        this.hooks = new ArrayList();
        this.random = new Random();
        this.channels = new HashMap<String, List<Channel>>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.1
            {
                put(Output.DEFAULT_STREAM, new ArrayList());
            }
        };
        this.connectionTimers = new HashMap();
        this.handler = new Handler<Message<JsonObject>>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.2
            public void handle(Message<JsonObject> message) {
                JsonObject jsonObject = (JsonObject) message.body();
                if (jsonObject != null) {
                    String string = jsonObject.getString("action");
                    boolean z = -1;
                    switch (string.hashCode()) {
                        case -1102508601:
                            if (string.equals("listen")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            DefaultOutputCollector.this.doListen(jsonObject.getString(AuditorVerticle.ADDRESS), jsonObject.getString("status"), jsonObject.getObject("input"));
                            return;
                        default:
                            return;
                    }
                }
            }
        };
        this.vertx = vertx;
        this.eventBus = eventBus;
        this.context = instanceContext;
        this.acker = acker;
        this.messageBuilder = new JsonMessageBuilder(instanceContext.id());
        this.ackingEnabled = instanceContext.getComponent().getNetwork().isAckingEnabled();
        this.auditors = instanceContext.getComponent().getNetwork().getAuditors();
        this.componentAddress = instanceContext.getComponent().getAddress();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doListen(final String str, String str2, JsonObject jsonObject) {
        if (str == null || str2 == null) {
            return;
        }
        Input input = (Input) this.serializer.deserialize(jsonObject, Input.class);
        final Channel findChannel = findChannel(new Output(input.id(), input.getStream(), input.getCount(), input.getGrouping().createSelector()));
        if (!findChannel.containsConnection(str)) {
            findChannel.addConnection(new DefaultConnection(str, this.eventBus));
        }
        if (this.connectionTimers.containsKey(str)) {
            this.vertx.cancelTimer(this.connectionTimers.remove(str).longValue());
        }
        this.connectionTimers.put(str, Long.valueOf(this.vertx.setTimer(LISTEN_INTERVAL, new Handler<Long>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.3
            public void handle(Long l) {
                Connection connection = findChannel.getConnection(str);
                if (connection != null) {
                    findChannel.removeConnection(connection);
                }
                DefaultOutputCollector.this.connectionTimers.remove(str);
            }
        })));
        this.eventBus.send(str2, new JsonObject().putString("id", this.context.id()));
    }

    private Channel findChannel(Output output) {
        List<Channel> list = this.channels.get(output.getStream());
        if (list == null) {
            ArrayList arrayList = new ArrayList();
            Channel connectionCount = new DefaultChannel(output.id(), output.getSelector(), this.eventBus, this.messageBuilder).setConnectionCount(output.getCount());
            arrayList.add(connectionCount);
            this.channels.put(output.getStream(), arrayList);
            return connectionCount;
        }
        for (Channel channel : list) {
            if (channel.id().equals(output.id())) {
                return channel;
            }
        }
        Channel connectionCount2 = new DefaultChannel(output.id(), output.getSelector(), this.eventBus, this.messageBuilder).setConnectionCount(output.getCount());
        list.add(connectionCount2);
        return connectionCount2;
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public String getAddress() {
        return this.context.getComponent().getAddress();
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public OutputCollector addHook(OutputHook outputHook) {
        this.hooks.add(outputHook);
        return this;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void hookStart() {
        Iterator<OutputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleStart(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void hookAcked(MessageId messageId) {
        Iterator<OutputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleAcked(messageId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void hookFailed(MessageId messageId) {
        Iterator<OutputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleFailed(messageId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void hookTimeout(MessageId messageId) {
        Iterator<OutputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleTimeout(messageId);
        }
    }

    private void hookEmit(MessageId messageId) {
        Iterator<OutputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleEmit(messageId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void hookStop() {
        Iterator<OutputHook> it = this.hooks.iterator();
        while (it.hasNext()) {
            it.next().handleStop(this);
        }
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public OutputCollector ackHandler(Handler<MessageId> handler) {
        this.acker.ackHandler(createAckHandler(handler));
        return this;
    }

    private Handler<MessageId> createAckHandler(final Handler<MessageId> handler) {
        return new Handler<MessageId>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.4
            public void handle(MessageId messageId) {
                handler.handle(messageId);
                DefaultOutputCollector.this.hookAcked(messageId);
            }
        };
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public OutputCollector failHandler(Handler<MessageId> handler) {
        this.acker.failHandler(createFailHandler(handler));
        return this;
    }

    private Handler<MessageId> createFailHandler(final Handler<MessageId> handler) {
        return new Handler<MessageId>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.5
            public void handle(MessageId messageId) {
                handler.handle(messageId);
                DefaultOutputCollector.this.hookFailed(messageId);
            }
        };
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public OutputCollector timeoutHandler(Handler<MessageId> handler) {
        this.acker.timeoutHandler(createTimeoutHandler(handler));
        return this;
    }

    private Handler<MessageId> createTimeoutHandler(final Handler<MessageId> handler) {
        return new Handler<MessageId>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.6
            public void handle(MessageId messageId) {
                handler.handle(messageId);
                DefaultOutputCollector.this.hookTimeout(messageId);
            }
        };
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public MessageId emit(JsonObject jsonObject) {
        return emitTo(Output.DEFAULT_STREAM, jsonObject);
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public MessageId emit(JsonObject jsonObject, JsonMessage jsonMessage) {
        return emitTo(Output.DEFAULT_STREAM, jsonObject, jsonMessage);
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public MessageId emit(JsonMessage jsonMessage) {
        return emitTo(Output.DEFAULT_STREAM, jsonMessage);
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public MessageId emitTo(String str, JsonObject jsonObject) {
        JsonMessage message = this.messageBuilder.createNew(selectRandomAuditor()).toMessage();
        MessageId messageId = message.messageId();
        JsonMessage message2 = this.messageBuilder.createChild(message).setBody(jsonObject).setStream(str).setSource(this.componentAddress).toMessage();
        List<Channel> list = this.channels.get(str);
        if (list != null) {
            Iterator<Channel> it = list.iterator();
            while (it.hasNext()) {
                this.acker.fork(messageId, it.next().publish(message2));
            }
        }
        this.acker.create(messageId);
        hookEmit(messageId);
        return messageId;
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public MessageId emitTo(String str, JsonObject jsonObject, JsonMessage jsonMessage) {
        JsonMessage message = this.messageBuilder.createChild(jsonMessage).toMessage();
        MessageId messageId = message.messageId();
        JsonMessage message2 = this.messageBuilder.createChild(message).setBody(jsonObject).setStream(str).toMessage();
        List<Channel> list = this.channels.get(str);
        if (list != null) {
            Iterator<Channel> it = list.iterator();
            while (it.hasNext()) {
                this.acker.fork(jsonMessage.messageId(), it.next().publish(message2));
            }
        }
        hookEmit(messageId);
        return messageId;
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public MessageId emitTo(String str, JsonMessage jsonMessage) {
        return emitTo(str, jsonMessage.body(), jsonMessage);
    }

    private String selectRandomAuditor() {
        if (this.ackingEnabled) {
            return this.auditors.get(this.random.nextInt(this.auditors.size()));
        }
        return null;
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public OutputCollector start() {
        this.eventBus.registerHandler(this.context.getComponent().getAddress(), this.handler);
        hookStart();
        return this;
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public OutputCollector start(Handler<AsyncResult<Void>> handler) {
        final DefaultFutureResult handler2 = new DefaultFutureResult().setHandler(handler);
        this.eventBus.registerHandler(this.context.getComponent().getAddress(), this.handler, new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.7
            public void handle(AsyncResult<Void> asyncResult) {
                if (asyncResult.failed()) {
                    handler2.setFailure(asyncResult.cause());
                } else {
                    handler2.setResult((Object) null);
                    DefaultOutputCollector.this.hookStart();
                }
            }
        });
        return this;
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public void stop() {
        this.eventBus.unregisterHandler(this.context.getComponent().getAddress(), this.handler);
        hookStop();
    }

    @Override // net.kuujo.vertigo.output.OutputCollector
    public void stop(Handler<AsyncResult<Void>> handler) {
        final DefaultFutureResult handler2 = new DefaultFutureResult().setHandler(handler);
        this.eventBus.unregisterHandler(this.context.getComponent().getAddress(), this.handler, new Handler<AsyncResult<Void>>() { // from class: net.kuujo.vertigo.output.impl.DefaultOutputCollector.8
            public void handle(AsyncResult<Void> asyncResult) {
                if (asyncResult.failed()) {
                    handler2.setFailure(asyncResult.cause());
                } else {
                    handler2.setResult((Object) null);
                    DefaultOutputCollector.this.hookStop();
                }
            }
        });
    }
}
