package org.reaktivity.k3po.nukleus.ext.internal.behavior;

import java.nio.file.Path;
import java.util.function.BiFunction;
import java.util.function.LongFunction;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.layout.StreamsLayout;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.BeginFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.FrameFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.ResetFW;
import org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.WindowFW;
import org.reaktivity.k3po.nukleus.ext.internal.util.function.LongObjectBiConsumer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/reaktivity/k3po/nukleus/ext/internal/behavior/NukleusPartition.class */
public final class NukleusPartition implements AutoCloseable {
    private final Path partitionPath;
    private final StreamsLayout layout;
    private final RingBuffer streamsBuffer;
    private final RingBuffer throttleBuffer;
    private final LongFunction<NukleusServerChannel> lookupRoute;
    private final LongFunction<MessageHandler> lookupStream;
    private final LongObjectBiConsumer<MessageHandler> registerStream;
    private final MutableDirectBuffer writeBuffer;
    private final NukleusStreamFactory streamFactory;
    private final LongFunction<NukleusCorrelation> correlateEstablished;
    private final BiFunction<String, String, NukleusTarget> supplyTarget;
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final MessageHandler streamHandler = this::handleStream;

    /* JADX INFO: Access modifiers changed from: package-private */
    public NukleusPartition(Path path, StreamsLayout streamsLayout, LongFunction<NukleusServerChannel> longFunction, LongFunction<MessageHandler> longFunction2, LongObjectBiConsumer<MessageHandler> longObjectBiConsumer, MutableDirectBuffer mutableDirectBuffer, NukleusStreamFactory nukleusStreamFactory, LongFunction<NukleusCorrelation> longFunction3, BiFunction<String, String, NukleusTarget> biFunction) {
        this.partitionPath = path;
        this.layout = streamsLayout;
        this.streamsBuffer = streamsLayout.streamsBuffer();
        this.throttleBuffer = streamsLayout.throttleBuffer();
        this.writeBuffer = mutableDirectBuffer;
        this.lookupRoute = longFunction;
        this.lookupStream = longFunction2;
        this.registerStream = longObjectBiConsumer;
        this.streamFactory = nukleusStreamFactory;
        this.correlateEstablished = longFunction3;
        this.supplyTarget = biFunction;
    }

    public int process() {
        return this.streamsBuffer.read(this.streamHandler);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.layout.close();
    }

    public String toString() {
        return String.format("%s [%s]", getClass().getSimpleName(), this.partitionPath);
    }

    private void handleStream(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
        MessageHandler apply = this.lookupStream.apply(this.frameRO.streamId());
        if (apply != null) {
            apply.onMessage(i, mutableDirectBuffer, i2, i3);
        } else {
            handleUnrecognized(i, mutableDirectBuffer, i2, i3);
        }
    }

    private void handleUnrecognized(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        if (i != 1) {
            this.frameRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
            doReset(this.frameRO.streamId());
            return;
        }
        BeginFW wrap = this.beginRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
        if (wrap.referenceId() != 0) {
            handleBeginInitial(wrap);
        } else {
            handleBeginReply(wrap);
        }
    }

    private void handleBeginInitial(BeginFW beginFW) {
        long streamId = beginFW.streamId();
        NukleusServerChannel apply = this.lookupRoute.apply(beginFW.referenceId());
        if (apply == null) {
            doReset(streamId);
            return;
        }
        long correlationId = beginFW.correlationId();
        final NukleusChildChannel doAccept = doAccept(apply, correlationId);
        ChannelFuture future = Channels.future(doAccept);
        ChannelFuture future2 = Channels.future(doAccept);
        MessageHandler newStream = this.streamFactory.newStream(doAccept, this, future2, future);
        this.registerStream.accept(streamId, (long) newStream);
        newStream.onMessage(beginFW.typeId(), beginFW.buffer(), beginFW.offset(), beginFW.sizeof());
        Channels.fireChannelBound(doAccept, doAccept.m4getLocalAddress());
        NukleusChannelConfig nukleusChannelConfig = (NukleusChannelConfig) doAccept.getConfig();
        if (nukleusChannelConfig.isDuplex()) {
            this.supplyTarget.apply(doAccept.m3getRemoteAddress().getReceiverName(), nukleusChannelConfig.getWritePartition()).onAccepted(doAccept, correlationId, future2, future);
        } else {
            future2.setSuccess();
        }
        future2.addListener(new ChannelFutureListener() { // from class: org.reaktivity.k3po.nukleus.ext.internal.behavior.NukleusPartition.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channels.fireChannelConnected(doAccept, doAccept.m3getRemoteAddress());
            }
        });
    }

    private void handleBeginReply(BeginFW beginFW) {
        long correlationId = beginFW.correlationId();
        long streamId = beginFW.streamId();
        NukleusCorrelation apply = this.correlateEstablished.apply(correlationId);
        if (apply == null) {
            doReset(streamId);
            return;
        }
        ChannelFuture correlatedFuture = apply.correlatedFuture();
        NukleusClientChannel channel = correlatedFuture.getChannel();
        MessageHandler newStream = this.streamFactory.newStream(channel, this, Channels.succeededFuture(channel), correlatedFuture);
        this.registerStream.accept(streamId, (long) newStream);
        newStream.onMessage(beginFW.typeId(), beginFW.buffer(), beginFW.offset(), beginFW.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v4, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.WindowFW$Builder] */
    public void doWindow(NukleusChannel nukleusChannel, int i) {
        WindowFW build = this.windowRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(nukleusChannel.sourceId()).update(i).build();
        nukleusChannel.sourceWindow(build.update());
        this.throttleBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v2, types: [org.reaktivity.k3po.nukleus.ext.internal.behavior.types.stream.ResetFW$Builder] */
    public void doReset(long j) {
        ResetFW build = this.resetRW.wrap2(this.writeBuffer, 0, this.writeBuffer.capacity()).streamId(j).build();
        this.throttleBuffer.write(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private NukleusChildChannel doAccept(NukleusServerChannel nukleusServerChannel, long j) {
        try {
            NukleusServerChannelConfig nukleusServerChannelConfig = (NukleusServerChannelConfig) nukleusServerChannel.getConfig();
            ChannelPipeline pipeline = nukleusServerChannelConfig.getPipelineFactory().getPipeline();
            NukleusChannelAddress m13getLocalAddress = nukleusServerChannel.m13getLocalAddress();
            NukleusChannelAddress newReplyToAddress = m13getLocalAddress.newReplyToAddress();
            NukleusChildChannel nukleusChildChannel = new NukleusChildChannel(nukleusServerChannel, nukleusServerChannel.getFactory(), pipeline, new NukleusChildChannelSink(), nukleusServerChannel.reaktor);
            NukleusChannelConfig nukleusChannelConfig = (NukleusChannelConfig) nukleusChildChannel.getConfig();
            nukleusChannelConfig.setDuplex(nukleusServerChannelConfig.isDuplex());
            nukleusChannelConfig.setThrottle(nukleusServerChannelConfig.hasThrottle());
            nukleusChannelConfig.setReadPartition(nukleusServerChannelConfig.getReadPartition());
            nukleusChannelConfig.setWritePartition(nukleusServerChannelConfig.getWritePartition());
            nukleusChannelConfig.setWindow(nukleusServerChannelConfig.getWindow());
            nukleusChannelConfig.setCorrelation(j);
            if (!nukleusChannelConfig.isDuplex()) {
                nukleusChildChannel.setWriteClosed();
            }
            if (nukleusChannelConfig.getWritePartition() == null) {
                nukleusChannelConfig.setWritePartition(newReplyToAddress.getSenderName());
            }
            nukleusChildChannel.setLocalAddress(m13getLocalAddress);
            nukleusChildChannel.setRemoteAddress(newReplyToAddress);
            return nukleusChildChannel;
        } catch (Exception e) {
            LangUtil.rethrowUnchecked(e);
            return null;
        }
    }
}
