package io.gridgo.socket.impl;

import io.gridgo.connector.Receiver;
import io.gridgo.connector.impl.AbstractConsumer;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.connector.support.exceptions.FailureHandlerAware;
import io.gridgo.framework.support.Message;
import io.gridgo.socket.Socket;
import io.gridgo.utils.ThreadUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/gridgo/socket/impl/DefaultSocketReceiver.class */
public class DefaultSocketReceiver extends AbstractConsumer implements Receiver, FailureHandlerAware<DefaultSocketReceiver> {
    private long totalRecvBytes;
    private long totalRecvMessages;
    private Function<Throwable, Message> failureHandler;
    private final String uniqueIdentifier;
    private Thread poller;
    private final Socket socket;
    private final int bufferSize;
    private CountDownLatch stopDoneTrigger;

    public DefaultSocketReceiver(ConnectorContext connectorContext, Socket socket, int i, String str) {
        super(connectorContext);
        this.socket = socket;
        this.bufferSize = i;
        this.uniqueIdentifier = str;
        setFailureHandler(connectorContext.getExceptionHandler());
    }

    protected String generateName() {
        return "receiver." + this.uniqueIdentifier;
    }

    protected void onStart() {
        AtomicReference atomicReference = new AtomicReference();
        this.poller = new Thread(() -> {
            poll(this.socket, countDownLatch -> {
                atomicReference.set(countDownLatch);
            });
        });
        this.totalRecvBytes = 0L;
        this.totalRecvMessages = 0L;
        this.poller.start();
        ThreadUtils.sleep(100L);
        ThreadUtils.busySpin(10L, () -> {
            return Boolean.valueOf(atomicReference.get() == null);
        });
        this.stopDoneTrigger = (CountDownLatch) atomicReference.get();
    }

    protected void onStop() {
        this.poller.interrupt();
        this.poller = null;
        try {
            this.stopDoneTrigger.await();
            this.stopDoneTrigger = null;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Await for stopped error", e);
        }
    }

    private void poll(Socket socket, Consumer<CountDownLatch> consumer) {
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(this.bufferSize);
        Thread.currentThread().setName("[POLLER] " + socket.getEndpoint().getAddress());
        SocketUtils.startPolling(socket, allocateDirect, false, message -> {
            ensurePayloadId(message);
            publish(message, null);
        }, num -> {
            this.totalRecvBytes += num.intValue();
        }, num2 -> {
            this.totalRecvMessages += num2.intValue();
        }, getContext().getExceptionHandler(), consumer);
        socket.close();
        this.poller = null;
    }

    public DefaultSocketReceiver setFailureHandler(Function<Throwable, Message> function) {
        this.failureHandler = function;
        return this;
    }

    public long getTotalRecvBytes() {
        return this.totalRecvBytes;
    }

    public long getTotalRecvMessages() {
        return this.totalRecvMessages;
    }

    protected Function<Throwable, Message> getFailureHandler() {
        return this.failureHandler;
    }

    /* renamed from: setFailureHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m3setFailureHandler(Function function) {
        return setFailureHandler((Function<Throwable, Message>) function);
    }
}
