package org.reaktivity.k3po.nukleus.ext.internal.behavior;

import java.nio.file.Path;
import java.util.Objects;
import java.util.function.LongFunction;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.MessageHandler;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.Channels;
import org.reaktivity.k3po.nukleus.ext.internal.NukleusExtConfiguration;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.layout.StreamsLayout;
import org.reaktivity.k3po.nukleus.ext.internal.util.function.LongLongFunction;
import org.reaktivity.k3po.nukleus.ext.internal.util.function.LongObjectBiConsumer;

/* loaded from: input_file:org/reaktivity/k3po/nukleus/ext/internal/behavior/NukleusSource.class */
public final class NukleusSource implements AutoCloseable {
    private final LabelManager labels;
    private final Path streamsPath;
    private final NukleusStreamFactory streamFactory;
    private final LongLongFunction<NukleusTarget> supplySender;
    private final NukleusPartition partition;
    private final Long2ObjectHashMap<Long2ObjectHashMap<NukleusServerChannel>> routesByIdAndAuth;

    public NukleusSource(NukleusExtConfiguration nukleusExtConfiguration, LabelManager labelManager, Path path, int i, LongFunction<NukleusCorrelation> longFunction, LongLongFunction<NukleusTarget> longLongFunction, Long2ObjectHashMap<MessageHandler> long2ObjectHashMap, Long2ObjectHashMap<MessageHandler> long2ObjectHashMap2) {
        this.labels = labelManager;
        this.streamsPath = path;
        Objects.requireNonNull(long2ObjectHashMap);
        this.streamFactory = new NukleusStreamFactory(long2ObjectHashMap::remove);
        this.supplySender = longLongFunction;
        this.routesByIdAndAuth = new Long2ObjectHashMap<>();
        StreamsLayout build = new StreamsLayout.Builder().path(path).streamsCapacity(nukleusExtConfiguration.streamsBufferCapacity()).readonly(false).build();
        LongLongFunction longLongFunction2 = this::lookupRoute;
        Objects.requireNonNull(long2ObjectHashMap);
        LongFunction longFunction2 = long2ObjectHashMap::get;
        Objects.requireNonNull(long2ObjectHashMap);
        LongObjectBiConsumer longObjectBiConsumer = (v1, v2) -> {
            r9.put(v1, v2);
        };
        Objects.requireNonNull(long2ObjectHashMap2);
        this.partition = new NukleusPartition(labelManager, path, i, build, longLongFunction2, longFunction2, longObjectBiConsumer, long2ObjectHashMap2::get, this.streamFactory, longFunction, longLongFunction);
    }

    public String toString() {
        return String.format("%s [%s]", getClass().getSimpleName(), this.streamsPath);
    }

    public void doRoute(String str, long j, NukleusServerChannel nukleusServerChannel) {
        routesByAuth(this.labels.supplyLabelId(str)).put(j, nukleusServerChannel);
    }

    public void doUnroute(String str, long j, NukleusServerChannel nukleusServerChannel) {
        long supplyLabelId = this.labels.supplyLabelId(str);
        Long2ObjectHashMap long2ObjectHashMap = (Long2ObjectHashMap) this.routesByIdAndAuth.get(supplyLabelId);
        if (long2ObjectHashMap == null || long2ObjectHashMap.remove(j) == null || !long2ObjectHashMap.isEmpty()) {
            return;
        }
        this.routesByIdAndAuth.remove(supplyLabelId);
    }

    public void doAbortInput(final NukleusChannel nukleusChannel, final ChannelFuture channelFuture) {
        ChannelFuture beginInputFuture = nukleusChannel.beginInputFuture();
        if (beginInputFuture.isSuccess()) {
            doAbortInputAfterBeginReply(nukleusChannel, channelFuture);
        } else {
            beginInputFuture.addListener(new ChannelFutureListener() { // from class: org.reaktivity.k3po.nukleus.ext.internal.behavior.NukleusSource.1
                public void operationComplete(ChannelFuture channelFuture2) throws Exception {
                    if (channelFuture2.isSuccess()) {
                        NukleusSource.this.doAbortInputAfterBeginReply(nukleusChannel, channelFuture);
                    } else {
                        channelFuture.setFailure(channelFuture2.getCause());
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doAbortInputAfterBeginReply(NukleusChannel nukleusChannel, ChannelFuture channelFuture) {
        this.supplySender.apply(nukleusChannel.routeId(), nukleusChannel.sourceId()).doReset(nukleusChannel);
        channelFuture.setSuccess();
        if (nukleusChannel.setReadAborted() && nukleusChannel.setReadClosed()) {
            Channels.fireChannelDisconnected(nukleusChannel);
            Channels.fireChannelUnbound(nukleusChannel);
            Channels.fireChannelClosed(nukleusChannel);
        }
    }

    public int process() {
        return this.partition.process();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.partition.close();
    }

    private NukleusServerChannel lookupRoute(long j, long j2) {
        return (NukleusServerChannel) routesByAuth((j >> 32) & 65535).get(j2);
    }

    private Long2ObjectHashMap<NukleusServerChannel> routesByAuth(long j) {
        return (Long2ObjectHashMap) this.routesByIdAndAuth.computeIfAbsent(j, this::newRoutesByAuth);
    }

    private Long2ObjectHashMap<NukleusServerChannel> newRoutesByAuth(long j) {
        return new Long2ObjectHashMap<>();
    }
}
