package com.googlecode.protobuf.pro.stream;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.googlecode.protobuf.pro.stream.logging.StreamLogger;
import com.googlecode.protobuf.pro.stream.server.PullHandler;
import com.googlecode.protobuf.pro.stream.server.PushHandler;
import com.googlecode.protobuf.pro.stream.wire.StreamProtocol;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.Channel;

/* loaded from: input_file:com/googlecode/protobuf/pro/stream/StreamingServer.class */
public class StreamingServer<E extends Message, F extends Message> {
    private static Log log = LogFactory.getLog(StreamingServer.class);
    private final PeerInfo serverInfo;
    private final PullHandler<E> pullHandler;
    private final PushHandler<F> pushHandler;
    private final StreamLogger streamLogger;
    private final int chunkSize;
    private Channel channel;
    private PeerInfo clientInfo;
    private final Map<Integer, StreamingServer<E, F>.TransferState> pendingTransferMap = new ConcurrentHashMap();
    private ThreadPoolExecutor pullExecutorService = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(10));

    /* loaded from: input_file:com/googlecode/protobuf/pro/stream/StreamingServer$PullWorker.class */
    public class PullWorker implements Runnable {
        private final StreamingServer<E, F>.TransferState transfer;

        public PullWorker(StreamingServer<E, F>.TransferState transferState) {
            this.transfer = transferState;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            try {
                StreamingServer.this.pullHandler.handlePull(this.transfer.getPullRequest(), this.transfer.getPullOut());
            } catch (Exception e) {
                StreamingServer.log.warn("UnhandledException in handlePull", e);
            }
            StreamingServer.this.handlePullComplete(this.transfer.getCorrelationId());
        }
    }

    /* loaded from: input_file:com/googlecode/protobuf/pro/stream/StreamingServer$TransferState.class */
    public class TransferState {
        private final long startTimestamp;
        private final E pullRequest;
        private final F pushRequest;
        private final TransferOut pullOut;
        private final PushIn pushIn;

        public TransferState(long j, E e, TransferOut transferOut) {
            this.startTimestamp = j;
            this.pullRequest = e;
            this.pullOut = transferOut;
            this.pushRequest = null;
            this.pushIn = null;
        }

        public TransferState(long j, F f, PushIn pushIn) {
            this.startTimestamp = j;
            this.pullRequest = null;
            this.pullOut = null;
            this.pushRequest = f;
            this.pushIn = pushIn;
        }

        public long getStartTimestamp() {
            return this.startTimestamp;
        }

        public E getPullRequest() {
            return this.pullRequest;
        }

        public F getPushRequest() {
            return this.pushRequest;
        }

        public int getCorrelationId() {
            if (this.pullOut != null) {
                return this.pullOut.getCorrelationId();
            }
            if (this.pushIn != null) {
                return this.pushIn.getCorrelationId();
            }
            throw new IllegalStateException("missing transfer");
        }

        public TransferOut getPullOut() {
            return this.pullOut;
        }

        public PushIn getPushIn() {
            return this.pushIn;
        }
    }

    public StreamingServer(PeerInfo peerInfo, PullHandler<E> pullHandler, PushHandler<F> pushHandler, StreamLogger streamLogger, int i) {
        this.serverInfo = peerInfo;
        this.pullHandler = pullHandler;
        this.pushHandler = pushHandler;
        this.streamLogger = streamLogger;
        this.chunkSize = i;
        this.pullExecutorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void handleOpen(PeerInfo peerInfo, Channel channel) {
        this.clientInfo = peerInfo;
        this.channel = channel;
        if (log.isDebugEnabled()) {
            log.debug("handleOpen for " + peerInfo);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void pushRequest(StreamProtocol.PushRequest pushRequest) {
        long currentTimeMillis = System.currentTimeMillis();
        int correlationId = pushRequest.getCorrelationId();
        if (log.isDebugEnabled()) {
            log.debug("Received [" + pushRequest.getCorrelationId() + "]PushRequest.");
        }
        if (this.pendingTransferMap.containsKey(Integer.valueOf(correlationId))) {
            throw new IllegalStateException("correlationId " + correlationId + " already registered as PendingTransfer.");
        }
        try {
            Message build = this.pushHandler.getPrototype().newBuilderForType().mergeFrom(pushRequest.getRequestProto()).build();
            PushIn pushIn = new PushIn(correlationId, this.channel);
            registerPendingRequest(correlationId, new TransferState(currentTimeMillis, build, pushIn));
            this.pushHandler.init(build, pushIn);
        } catch (InvalidProtocolBufferException e) {
            log.warn("Invalid Request Protobuf", e);
            this.channel.close();
        }
    }

    public void pullRequest(StreamProtocol.PullRequest pullRequest) {
        long currentTimeMillis = System.currentTimeMillis();
        int correlationId = pullRequest.getCorrelationId();
        if (log.isDebugEnabled()) {
            log.debug("Received [" + pullRequest.getCorrelationId() + "]PullRequest.");
        }
        if (this.pendingTransferMap.containsKey(Integer.valueOf(correlationId))) {
            throw new IllegalStateException("correlationId " + correlationId + " already registered as PendingTransfer.");
        }
        try {
            StreamingServer<E, F>.TransferState transferState = new TransferState(currentTimeMillis, this.pullHandler.getPrototype().newBuilderForType().mergeFrom(pullRequest.getRequestProto()).build(), new TransferOut(correlationId, this.chunkSize, this.channel));
            registerPendingRequest(correlationId, transferState);
            this.pullExecutorService.execute(new PullWorker(transferState));
        } catch (InvalidProtocolBufferException e) {
            log.warn("Invalid Request Protobuf", e);
            this.channel.close();
        }
    }

    void handlePullComplete(int i) {
        if (removePendingTransfer(i) == null || !log.isDebugEnabled()) {
            return;
        }
        log.debug("Pull Transfer complete.");
    }

    public void pushChunk(StreamProtocol.Chunk chunk) {
        int correlationId = chunk.getCorrelationId();
        if (log.isDebugEnabled()) {
            log.debug("Received [" + chunk.getCorrelationId() + ":" + chunk.getSeqNo() + "]PushChunk. " + chunk.getChunkType());
        }
        StreamingServer<E, F>.TransferState lookupPendingTransfer = lookupPendingTransfer(correlationId);
        if (lookupPendingTransfer == null) {
            if (log.isDebugEnabled()) {
                log.debug("No PendingTransferState found for correlationId " + chunk.getCorrelationId());
                return;
            }
            return;
        }
        PushIn pushIn = lookupPendingTransfer.getPushIn();
        if (pushIn == null) {
            throw new IllegalStateException("TransferState missing transferIn");
        }
        if (chunk.getParameterCount() > 0) {
            for (StreamProtocol.Parameter parameter : chunk.getParameterList()) {
                pushIn.provideParameter(parameter.getName(), parameter.getValue());
            }
        }
        pushIn.setData(chunk.getPayload());
        ((PushHandler<F>) this.pushHandler).data(lookupPendingTransfer.getPushRequest(), pushIn);
        if (StreamProtocol.ChunkTypeCode.END == chunk.getChunkType()) {
            removePendingTransfer(correlationId);
            if (pushIn.setClosed()) {
                ((PushHandler<F>) this.pushHandler).end(lookupPendingTransfer.getPushRequest(), pushIn);
            }
        }
    }

    public void closeNotification(StreamProtocol.CloseNotification closeNotification) {
        StreamingServer<E, F>.TransferState removePendingTransfer = removePendingTransfer(closeNotification.getCorrelationId());
        if (removePendingTransfer != null) {
            if (log.isDebugEnabled()) {
                log.debug("Received [" + closeNotification.getCorrelationId() + "]CloseNotification.");
            }
            TransferOut pullOut = removePendingTransfer.getPullOut();
            if (pullOut == null) {
                throw new IllegalStateException("TransferState missing transferOut");
            }
            pullOut.handleClosure();
        }
    }

    public String toString() {
        return "StreamingServer[" + this.serverInfo + "]";
    }

    public void handleClosure() {
        if (log.isDebugEnabled()) {
            log.debug("handleClosure for " + this.clientInfo);
        }
        if (this.clientInfo == null) {
            return;
        }
        ArrayList<Integer> arrayList = new ArrayList();
        arrayList.addAll(this.pendingTransferMap.keySet());
        do {
            for (Integer num : arrayList) {
                StreamingServer<E, F>.TransferState remove = this.pendingTransferMap.remove(num);
                if (remove != null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Force closure [" + num + "].");
                    }
                    PushIn pushIn = remove.getPushIn();
                    if (pushIn != null && pushIn.setClosed()) {
                        ((PushHandler<F>) this.pushHandler).end(remove.getPushRequest(), pushIn);
                    }
                    TransferOut pullOut = remove.getPullOut();
                    if (pullOut != null) {
                        pullOut.handleClosure();
                    }
                }
            }
        } while (this.pendingTransferMap.size() > 0);
        this.pullExecutorService.shutdownNow();
    }

    protected void doLog(PeerInfo peerInfo, StreamingServer<E, F>.TransferState transferState, Message message, String str, Map<String, String> map) {
        if (this.streamLogger != null) {
            this.streamLogger.logTransfer(peerInfo, this.serverInfo, message, str, transferState.getCorrelationId(), map, transferState.getStartTimestamp(), System.currentTimeMillis());
        }
    }

    private void registerPendingRequest(int i, StreamingServer<E, F>.TransferState transferState) {
        if (this.pendingTransferMap.containsKey(Integer.valueOf(i))) {
            throw new IllegalArgumentException("State already registered");
        }
        this.pendingTransferMap.put(Integer.valueOf(i), transferState);
    }

    private StreamingServer<E, F>.TransferState removePendingTransfer(int i) {
        return this.pendingTransferMap.remove(Integer.valueOf(i));
    }

    private StreamingServer<E, F>.TransferState lookupPendingTransfer(int i) {
        return this.pendingTransferMap.get(Integer.valueOf(i));
    }
}
