package xyz.cofe.trambda.tcp;

import java.io.ByteArrayOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.cofe.text.Text;
import xyz.cofe.trambda.bc.MethodDef;

/* loaded from: input_file:xyz/cofe/trambda/tcp/TcpProtocol.class */
public class TcpProtocol {
    private final Socket socket;
    private final Supplier<OutputStream> getOutput;
    private final Supplier<InputStream> getInput;
    private final Supplier<Logger> getLogger;
    private final AtomicInteger sid = new AtomicInteger(0);
    private final Map<Integer, Consumer<ErrMessage>> errorConsumers = new ConcurrentHashMap();
    private final Map<Integer, Consumer<? extends Message>> responseConsumers = new ConcurrentHashMap();
    private volatile BiConsumer<ErrMessage, TcpHeader> unbindedErrorConsumer = (errMessage, tcpHeader) -> {
        log().error("accept unbinded error on request {} message: {}", tcpHeader.getReferrer(), errMessage.getMessage());
    };
    private volatile BiConsumer<Message, TcpHeader> unbindedMessageConsumer = (message, tcpHeader) -> {
        log().error("accept unbinded message on request {} detail: {}", tcpHeader.getReferrer(), message);
    };
    private final Queue<Consumer<Pong>> pongConsumers = new ConcurrentLinkedQueue();

    public TcpProtocol(Socket socket) {
        if (socket == null) {
            throw new IllegalArgumentException("socket==null");
        }
        this.socket = socket;
        this.getOutput = null;
        this.getInput = null;
        Logger logger = LoggerFactory.getLogger(TcpProtocol.class);
        this.getLogger = () -> {
            return logger;
        };
    }

    public TcpProtocol(Socket socket, Logger logger) {
        if (socket == null) {
            throw new IllegalArgumentException("socket==null");
        }
        this.socket = socket;
        this.getOutput = null;
        this.getInput = null;
        this.getLogger = () -> {
            return logger;
        };
    }

    public TcpProtocol(OutputStream outputStream, InputStream inputStream, Logger logger) {
        if (outputStream == null) {
            throw new IllegalArgumentException("outputStream==null");
        }
        if (inputStream == null) {
            throw new IllegalArgumentException("inputStream==null");
        }
        if (logger == null) {
            throw new IllegalArgumentException("logger==null");
        }
        this.getOutput = () -> {
            return outputStream;
        };
        this.getInput = () -> {
            return inputStream;
        };
        this.getLogger = () -> {
            return logger;
        };
        this.socket = null;
    }

    private OutputStream output() throws IOException {
        if (this.socket != null) {
            return this.socket.getOutputStream();
        }
        if (this.getOutput == null) {
            throw new IllegalStateException("getOutput==null");
        }
        return this.getOutput.get();
    }

    private InputStream intput() throws IOException {
        if (this.socket != null) {
            return this.socket.getInputStream();
        }
        if (this.getInput == null) {
            throw new IllegalStateException("getInput==null");
        }
        return this.getInput.get();
    }

    private Logger log() {
        return this.getLogger.get();
    }

    @SafeVarargs
    public final int sendRaw(String str, byte[] bArr, Consumer<Integer> consumer, HeaderValue<? extends Object>... headerValueArr) throws IOException {
        log().debug("sendRaw {} payload length {}", str, Integer.valueOf(bArr != null ? bArr.length : 0));
        int incrementAndGet = this.sid.incrementAndGet();
        if (consumer != null) {
            consumer.accept(Integer.valueOf(incrementAndGet));
        }
        HeaderValue[] headerValueArr2 = (headerValueArr == null || headerValueArr.length < 1) ? new HeaderValue[]{TcpHeader.sid.create(Integer.valueOf(incrementAndGet))} : new HeaderValue[headerValueArr.length + 1];
        if (headerValueArr != null && headerValueArr.length > 0) {
            headerValueArr2[0] = TcpHeader.sid.create(Integer.valueOf(incrementAndGet));
            System.arraycopy(headerValueArr, 0, headerValueArr2, 1, headerValueArr.length);
        }
        byte[] encode = TcpHeader.encode(str, bArr, (HeaderValue<? extends Object>[]) headerValueArr2);
        log().trace("send header {} {}", Integer.valueOf(encode.length), Text.encodeHex(encode));
        output().write(encode);
        if (bArr != null && bArr.length > 0) {
            log().trace("send payload {} {}", Integer.valueOf(bArr.length), Text.encodeHex(bArr));
            output().write(bArr);
        }
        output().flush();
        return incrementAndGet;
    }

    @SafeVarargs
    public final int send(Message message, HeaderValue<? extends Object>... headerValueArr) throws IOException {
        if (message == null) {
            throw new IllegalArgumentException("message==null");
        }
        log().debug("send {}", message);
        return sendRaw(message.getClass().getSimpleName(), Serializer.toBytes(message), null, headerValueArr);
    }

    @SafeVarargs
    public final int send(Message message, Consumer<Integer> consumer, HeaderValue<? extends Object>... headerValueArr) throws IOException {
        if (message == null) {
            throw new IllegalArgumentException("message==null");
        }
        log().debug("send {}", message);
        return sendRaw(message.getClass().getSimpleName(), Serializer.toBytes(message), consumer, headerValueArr);
    }

    public Optional<RawPack> receiveRaw() throws IOException {
        long skip;
        log().debug("receiveRaw()");
        BuffInputStream buffInputStream = new BuffInputStream(intput(), 8192);
        byte[] bArr = new byte[8192];
        log().trace("buffer size {}", 8192);
        while (true) {
            log().debug("mark {}", 8192);
            buffInputStream.mark(8192);
            try {
                log().debug("read header, pos={} count={} markpos={}", new Object[]{Integer.valueOf(buffInputStream.pos()), Integer.valueOf(buffInputStream.count()), Integer.valueOf(buffInputStream.markpos())});
                int read = buffInputStream.read(bArr);
                if (read < 0) {
                    log().info("stream closed");
                    return Optional.empty();
                }
                log().debug("readed {}, pos={} count={} markpos={}", new Object[]{Integer.valueOf(read), Integer.valueOf(buffInputStream.pos()), Integer.valueOf(buffInputStream.count()), Integer.valueOf(buffInputStream.markpos())});
                Optional<TcpHeader> parse = TcpHeader.parse(bArr, 0, read);
                if (!parse.isEmpty()) {
                    log().debug("header parsed");
                    log().debug("buff data {} {}", Integer.valueOf(read), Text.encodeHex(Arrays.copyOf(bArr, read)));
                    TcpHeader tcpHeader = parse.get();
                    log().debug("header size: {}", Integer.valueOf(tcpHeader.getHeaderSize()));
                    buffInputStream.reset();
                    log().debug("reseted, pos={} count={} markpos={}", new Object[]{Integer.valueOf(buffInputStream.pos()), Integer.valueOf(buffInputStream.count()), Integer.valueOf(buffInputStream.markpos())});
                    int headerSize = tcpHeader.getHeaderSize();
                    int i = 0;
                    do {
                        int i2 = headerSize - i;
                        log().debug("skip {}", Integer.valueOf(i2));
                        skip = buffInputStream.skip(i2);
                        i = (int) (i + skip);
                    } while (i < skip);
                    log().debug("pos={} count={} markpos={}", new Object[]{Integer.valueOf(buffInputStream.pos()), Integer.valueOf(buffInputStream.count()), Integer.valueOf(buffInputStream.markpos())});
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    if (tcpHeader.getPayloadSize() > 0) {
                        log().debug("payload size {}", Integer.valueOf(tcpHeader.getPayloadSize()));
                        while (true) {
                            int payloadSize = tcpHeader.getPayloadSize() - byteArrayOutputStream.size();
                            if (payloadSize <= 0) {
                                log().debug("payload readed");
                                break;
                            }
                            log().debug("need read {}, pos={} count={} markpos={}", new Object[]{Integer.valueOf(payloadSize), Integer.valueOf(buffInputStream.pos()), Integer.valueOf(buffInputStream.count()), Integer.valueOf(buffInputStream.markpos())});
                            int read2 = buffInputStream.read(bArr, 0, Math.min(payloadSize, bArr.length));
                            if (read2 <= 0) {
                                throw new IOException("broken message, input stream EOF");
                            }
                            log().trace("readed payload {}", Text.encodeHex(Arrays.copyOf(bArr, read2)));
                            byteArrayOutputStream.write(bArr, 0, read2);
                        }
                    }
                    RawPack rawPack = new RawPack(tcpHeader, byteArrayOutputStream.toByteArray());
                    log().trace("received h.size {} method {} h.payload {} d.payload {}", new Object[]{Integer.valueOf(tcpHeader.getHeaderSize()), tcpHeader.getMethodName(), Integer.valueOf(tcpHeader.getPayloadSize()), Integer.valueOf(rawPack.getPayload().length)});
                    return Optional.of(rawPack);
                }
                log().debug("reset");
                buffInputStream.reset();
            } catch (SocketException e) {
                if (e.getMessage() == null || !e.getMessage().matches("(?is).*socket\\s+closed.*")) {
                    log().error("read fail", e);
                    throw e;
                }
                log().info("socket closed");
                return Optional.empty();
            }
        }
    }

    public Map<Integer, Consumer<ErrMessage>> getErrorConsumers() {
        return this.errorConsumers;
    }

    public Map<Integer, Consumer<? extends Message>> getResponseConsumers() {
        return this.responseConsumers;
    }

    public BiConsumer<ErrMessage, TcpHeader> unbindedError() {
        return this.unbindedErrorConsumer;
    }

    public TcpProtocol unbindedError(BiConsumer<ErrMessage, TcpHeader> biConsumer) {
        if (biConsumer == null) {
            this.unbindedErrorConsumer = (errMessage, tcpHeader) -> {
                log().error("accept unbinded error on request {} message: {}", tcpHeader.getReferrer(), errMessage.getMessage());
            };
        } else {
            this.unbindedErrorConsumer = biConsumer;
        }
        return this;
    }

    public BiConsumer<Message, TcpHeader> unbindedMessage() {
        return this.unbindedMessageConsumer;
    }

    public TcpProtocol unbindedMessage(BiConsumer<Message, TcpHeader> biConsumer) {
        if (biConsumer == null) {
            this.unbindedMessageConsumer = (message, tcpHeader) -> {
                log().error("accept unbinded message on request {} detail: {}", tcpHeader.getReferrer(), message);
            };
        } else {
            this.unbindedMessageConsumer = biConsumer;
        }
        return this;
    }

    public boolean readNow() throws IOException {
        Optional<RawPack> receiveRaw = receiveRaw();
        if (receiveRaw.isEmpty()) {
            return false;
        }
        RawPackReadonly readonly = receiveRaw.get().toReadonly();
        if (!readonly.isPayloadChecksumMatched()) {
            log().warn("checksum fail");
            return true;
        }
        try {
            Message payloadMessage = readonly.payloadMessage();
            if (payloadMessage == null) {
                return true;
            }
            process(payloadMessage, readonly.getHeader());
            return true;
        } catch (IOError e) {
            log().error("payload de serialization fail", e);
            return true;
        }
    }

    protected void process(Message message, TcpHeader tcpHeader) {
        log().info("process {}, message.sid {}", message.getClass(), tcpHeader.getSid());
        if (message instanceof Ping) {
            process((Ping) message, tcpHeader);
            return;
        }
        if (message instanceof Pong) {
            process((Pong) message, tcpHeader);
            return;
        }
        if (message instanceof ErrMessage) {
            Optional<Integer> referrer = tcpHeader.getReferrer();
            if (referrer.isPresent()) {
                Integer num = referrer.get();
                this.responseConsumers.remove(num);
                Consumer<ErrMessage> remove = this.errorConsumers.remove(num);
                if (remove != null) {
                    remove.accept((ErrMessage) message);
                    return;
                }
            }
            BiConsumer<ErrMessage, TcpHeader> biConsumer = this.unbindedErrorConsumer;
            if (biConsumer != null) {
                biConsumer.accept((ErrMessage) message, tcpHeader);
                return;
            }
            return;
        }
        Optional<Integer> referrer2 = tcpHeader.getReferrer();
        if (referrer2.isPresent()) {
            Integer num2 = referrer2.get();
            this.errorConsumers.remove(num2);
            Consumer<? extends Message> remove2 = this.responseConsumers.remove(num2);
            if (remove2 != null) {
                try {
                    remove2.accept(message);
                    return;
                } catch (Throwable th) {
                    log().error("accept message error", th);
                }
            }
        }
        BiConsumer<Message, TcpHeader> biConsumer2 = this.unbindedMessageConsumer;
        if (biConsumer2 != null) {
            biConsumer2.accept(message, tcpHeader);
        }
    }

    protected void process(Pong pong, TcpHeader tcpHeader) {
        while (true) {
            Consumer<Pong> poll = this.pongConsumers.poll();
            if (poll == null) {
                return;
            } else {
                poll.accept(pong);
            }
        }
    }

    protected void process(Ping ping, TcpHeader tcpHeader) {
        try {
            Optional<Integer> sid = tcpHeader.getSid();
            if (sid.isPresent()) {
                send(new Pong(), TcpHeader.referrer.create(sid.get()));
            } else {
                send(new Pong(), new HeaderValue[0]);
            }
        } catch (IOException e) {
            log().error("fail send pong response");
        }
    }

    public int ping(Consumer<Pong> consumer) {
        if (consumer == null) {
            throw new IllegalArgumentException("consumer==null");
        }
        this.pongConsumers.add(consumer);
        try {
            return send(new Ping(), new HeaderValue[0]);
        } catch (IOException e) {
            throw new IOError(e);
        }
    }

    public ResultConsumer<Compile, CompileResult> compile(MethodDef methodDef) {
        if (methodDef == null) {
            throw new IllegalArgumentException("methodDef==null");
        }
        Compile compile = new Compile();
        compile.setMethodDef(methodDef);
        return new ResultConsumer<>(this, compile, this.errorConsumers, this.responseConsumers);
    }

    public ResultConsumer<Execute, ExecuteResult> execute(CompileResult compileResult) {
        if (compileResult == null) {
            throw new IllegalArgumentException("cres==null");
        }
        Execute execute = new Execute();
        execute.setKey(compileResult.getKey());
        execute.setHash(compileResult.getHash());
        return new ResultConsumer<>(this, execute, this.errorConsumers, this.responseConsumers);
    }
}
