package org.apache.gearpump.streaming.task;

import org.apache.gearpump.Message;
import org.apache.gearpump.google.common.primitives.Shorts;
import org.apache.gearpump.partitioner.MulticastPartitioner;
import org.apache.gearpump.partitioner.Partitioner;
import org.apache.gearpump.partitioner.Partitioner$;
import org.apache.gearpump.partitioner.UnicastPartitioner;
import org.apache.gearpump.streaming.AppMasterToExecutor;
import org.apache.gearpump.streaming.LifeTime;
import org.apache.gearpump.util.LogUtil$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Range;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.StringBuilder;
import scala.math.Ordering$Long$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: Subscription.scala */
@ScalaSignature(bytes = "\u0006\u0001\t}b\u0001B\u0001\u0003\u00015\u0011AbU;cg\u000e\u0014\u0018\u000e\u001d;j_:T!a\u0001\u0003\u0002\tQ\f7o\u001b\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011\u0001C4fCJ\u0004X/\u001c9\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u0002\u0001'\t\u0001a\u0002\u0005\u0002\u0010%5\t\u0001CC\u0001\u0012\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0002C\u0001\u0004B]f\u0014VM\u001a\u0005\t+\u0001\u0011\t\u0011)A\u0005-\u0005)\u0011\r\u001d9JIB\u0011qbF\u0005\u00031A\u00111!\u00138u\u0011!Q\u0002A!A!\u0002\u00131\u0012AC3yK\u000e,Ho\u001c:JI\"AA\u0004\u0001B\u0001B\u0003%Q$\u0001\u0004uCN\\\u0017\n\u001a\t\u0003=}i\u0011AA\u0005\u0003A\t\u0011a\u0001V1tW&#\u0007\u0002\u0003\u0012\u0001\u0005\u0003\u0005\u000b\u0011B\u0012\u0002\u0015M,(m]2sS\n,'\u000f\u0005\u0002\u001fI%\u0011QE\u0001\u0002\u000b'V\u00147o\u0019:jE\u0016\u0014\b\u0002C\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\f\u0002\u0013M,7o]5p]&#\u0007\u0002C\u0015\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0016\u0002\u0013Q\u0014\u0018M\\:q_J$\bC\u0001\u0010,\u0013\ta#A\u0001\tFqB\u0014Xm]:Ue\u0006t7\u000f]8si\"Aa\u0006\u0001B\u0001B\u0003%a#\u0001\fnCb\u0004VM\u001c3j]\u001elUm]:bO\u0016\u001cu.\u001e8u\u0011!\u0001\u0004A!A!\u0002\u00131\u0012\u0001G1dW>s7-Z#wKJLX*Z:tC\u001e,7i\\;oi\")!\u0007\u0001C\u0001g\u00051A(\u001b8jiz\"\u0012\u0002N\u001b7oaJ$h\u000f\u001f\u0011\u0005y\u0001\u0001\"B\u000b2\u0001\u00041\u0002\"\u0002\u000e2\u0001\u00041\u0002\"\u0002\u000f2\u0001\u0004i\u0002\"\u0002\u00122\u0001\u0004\u0019\u0003\"B\u00142\u0001\u00041\u0002\"B\u00152\u0001\u0004Q\u0003b\u0002\u00182!\u0003\u0005\rA\u0006\u0005\baE\u0002\n\u00111\u0001\u0017\u0011\u001dq\u0004A1A\u0005\n}\n1\u0001T(H+\u0005\u0001\u0005CA!E\u001b\u0005\u0011%BA\"\u000b\u0003\u0015\u0019HN\u001a\u001bk\u0013\t)%I\u0001\u0004M_\u001e<WM\u001d\u0005\u0007\u000f\u0002\u0001\u000b\u0011\u0002!\u0002\t1{u\t\t\u0005\b\u0013\u0002\u0011\r\u0011\"\u0003K\u00031iWm]:bO\u0016\u001cu.\u001e8u+\u0005Y\u0005cA\bM\u001d&\u0011Q\n\u0005\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0003\u001f=K!\u0001\u0015\t\u0003\u000bMCwN\u001d;\t\rI\u0003\u0001\u0015!\u0003L\u00035iWm]:bO\u0016\u001cu.\u001e8uA!9A\u000b\u0001b\u0001\n\u0013Q\u0015a\u00059f]\u0012LgnZ'fgN\fw-Z\"pk:$\bB\u0002,\u0001A\u0003%1*\u0001\u000bqK:$\u0017N\\4NKN\u001c\u0018mZ3D_VtG\u000f\t\u0005\b1\u0002\u0011\r\u0011\"\u0003K\u0003Y\u0019\u0017M\u001c3jI\u0006$X-T5o\u00072|7m[*j]\u000e,\u0007B\u0002.\u0001A\u0003%1*A\fdC:$\u0017\u000eZ1uK6Kgn\u00117pG.\u001c\u0016N\\2fA!9A\f\u0001b\u0001\n\u0013i\u0016!D7j]\u000ecwnY6WC2,X-F\u0001_!\ryAj\u0018\t\u0003A2t!!\u00196\u000f\u0005\tLgBA2i\u001d\t!w-D\u0001f\u0015\t1G\"\u0001\u0004=e>|GOP\u0005\u0002\u0017%\u0011\u0011BC\u0005\u0003\u000f!I!a\u001b\u0004\u0002\u000fA\f7m[1hK&\u0011QN\u001c\u0002\n)&lWm\u0015;b[BT!a\u001b\u0004\t\rA\u0004\u0001\u0015!\u0003_\u00039i\u0017N\\\"m_\u000e\\g+\u00197vK\u0002BqA\u001d\u0001C\u0002\u0013%Q,A\tdC:$\u0017\u000eZ1uK6Kgn\u00117pG.Da\u0001\u001e\u0001!\u0002\u0013q\u0016AE2b]\u0012LG-\u0019;f\u001b&t7\t\\8dW\u0002BqA\u001e\u0001A\u0002\u0013%q/A\bnCb\u0004VM\u001c3j]\u001e\u001cu.\u001e8u+\u0005q\u0005bB=\u0001\u0001\u0004%IA_\u0001\u0014[\u0006D\b+\u001a8eS:<7i\\;oi~#S-\u001d\u000b\u0003wz\u0004\"a\u0004?\n\u0005u\u0004\"\u0001B+oSRDqa =\u0002\u0002\u0003\u0007a*A\u0002yIEBq!a\u0001\u0001A\u0003&a*\u0001\tnCb\u0004VM\u001c3j]\u001e\u001cu.\u001e8uA!I\u0011q\u0001\u0001A\u0002\u0013%\u0011\u0011B\u0001\u0005Y&4W-\u0006\u0002\u0002\fA!\u0011QBA\b\u001b\u0005!\u0011bAA\t\t\tAA*\u001b4f)&lW\rC\u0005\u0002\u0016\u0001\u0001\r\u0011\"\u0003\u0002\u0018\u0005AA.\u001b4f?\u0012*\u0017\u000fF\u0002|\u00033A\u0011b`A\n\u0003\u0003\u0005\r!a\u0003\t\u0011\u0005u\u0001\u0001)Q\u0005\u0003\u0017\tQ\u0001\\5gK\u0002B\u0011\"!\t\u0001\u0005\u0004%I!a\t\u0002\u0017A\f'\u000f^5uS>tWM]\u000b\u0003\u0003K\u0001B!a\n\u0002,5\u0011\u0011\u0011\u0006\u0006\u0004\u0003C1\u0011\u0002BA\u0017\u0003S\u00111\u0002U1si&$\u0018n\u001c8fe\"A\u0011\u0011\u0007\u0001!\u0002\u0013\t)#\u0001\u0007qCJ$\u0018\u000e^5p]\u0016\u0014\b\u0005C\u0005\u00026\u0001\u0011\r\u0011\"\u0003\u00028\u000511/\u001a8e\r:,\"!!\u000f\u0011\r=\tY$a\u0010\u0017\u0013\r\ti\u0004\u0005\u0002\n\rVt7\r^5p]F\u0002B!!\u0011\u0002D5\ta!C\u0002\u0002F\u0019\u0011q!T3tg\u0006<W\r\u0003\u0005\u0002J\u0001\u0001\u000b\u0011BA\u001d\u0003\u001d\u0019XM\u001c3G]\u0002Bq!!\u0014\u0001\t\u0003\ty%\u0001\u0006dQ\u0006tw-\u001a'jM\u0016$2a_A)\u0011!\t9!a\u0013A\u0002\u0005-\u0001bBA+\u0001\u0011\u0005\u0011qK\u0001\u0006gR\f'\u000f\u001e\u000b\u0002w\"9\u00111\f\u0001\u0005\u0002\u0005u\u0013aC:f]\u0012lUm]:bO\u0016$2AFA0\u0011!\t\t'!\u0017A\u0002\u0005}\u0012aA7tO\"9\u00111\f\u0001\u0005\u0002\u0005\u0015D#\u0002\f\u0002h\u0005%\u0004\u0002CA1\u0003G\u0002\r!a\u0010\t\u000f\u0005-\u00141\ra\u0001-\u0005I\u0001/\u0019:uSRLwN\u001c\u0005\n\u0003_\u0002\u0001\u0019!C\u0005\u0003c\nQ\u0002\\1ti\u001acWo\u001d5US6,WCAA:!\ry\u0011QO\u0005\u0004\u0003o\u0002\"\u0001\u0002'p]\u001eD\u0011\"a\u001f\u0001\u0001\u0004%I!! \u0002#1\f7\u000f\u001e$mkNDG+[7f?\u0012*\u0017\u000fF\u0002|\u0003\u007fB\u0011b`A=\u0003\u0003\u0005\r!a\u001d\t\u0011\u0005\r\u0005\u0001)Q\u0005\u0003g\na\u0002\\1ti\u001acWo\u001d5US6,\u0007\u0005C\u0005\u0002\b\u0002\u0011\r\u0011\"\u0003\u0002\n\u0006qa\tT+T\u0011~Ke\nV#S-\u0006cU#\u0001\f\t\u000f\u00055\u0005\u0001)A\u0005-\u0005ya\tT+T\u0011~Ke\nV#S-\u0006c\u0005\u0005C\u0004\u0002\u0012\u0002!I!a%\u0002\u00139,W\r\u001a$mkNDWCAAK!\ry\u0011qS\u0005\u0004\u00033\u0003\"a\u0002\"p_2,\u0017M\u001c\u0005\b\u0003;\u0003A\u0011BA,\u0003\u00151G.^:i\u0011\u001d\t\t\u000b\u0001C\u0005\u0003G\u000b\u0001\"\u00197m)\u0006\u001c8n]\u000b\u0003\u0003K\u0003R!a*\u0002.vi!!!+\u000b\u0007\u0005-\u0006#\u0001\u0006d_2dWm\u0019;j_:LA!a,\u0002*\n\u00191+Z9\t\u000f\u0005M\u0006\u0001\"\u0001\u00026\u0006Q!/Z2fSZ,\u0017iY6\u0015\u0007m\f9\f\u0003\u0005\u0002:\u0006E\u0006\u0019AA^\u0003\r\t7m\u001b\t\u0004=\u0005u\u0016bAA`\u0005\t\u0019\u0011iY6\t\u000f\u0005\r\u0007\u0001\"\u0001\u0002F\u0006AQ.\u001b8DY>\u001c7.F\u0001`\u0011\u001d\tI\r\u0001C\u0001\u0003\u0017\f\u0001$\u00197m_^\u001cVM\u001c3j]\u001eluN]3NKN\u001c\u0018mZ3t)\t\t)\nC\u0004\u0002P\u0002!\t!!5\u00029M,g\u000eZ!dWJ+\u0017/^3ti>s7\u000b^1mY&tw\rV5nKR\u001910a5\t\u000f\u0005U\u0017Q\u001aa\u0001?\u0006a1\u000f^1mY&tw\rV5nK\"9\u0011\u0011\u001c\u0001\u0005\n\u0005m\u0017AD:f]\u0012\f5m\u001b*fcV,7\u000f\u001e\u000b\u0004w\u0006u\u0007bBA6\u0003/\u0004\rA\u0006\u0005\b\u0003C\u0004A\u0011BAr\u0003UIgn\u0019:f[\u0016tG/T3tg\u0006<WmQ8v]R$Ra_As\u0003ODq!a\u001b\u0002`\u0002\u0007a\u0003C\u0004\u0002j\u0006}\u0007\u0019\u0001\f\u0002\u000b\r|WO\u001c;\t\u000f\u00055\b\u0001\"\u0003\u0002X\u0005)R\u000f\u001d3bi\u0016l\u0015\r\u001f)f]\u0012LgnZ\"pk:$\bbBAy\u0001\u0011%\u00111_\u0001\u0011g\u0016tG\rT1uK:\u001c\u0017\u0010\u0015:pE\u0016$2a_A{\u0011\u001d\tY'a<A\u0002Y9q!!?\u0003\u0011\u0003\tY0\u0001\u0007Tk\n\u001c8M]5qi&|g\u000eE\u0002\u001f\u0003{4a!\u0001\u0002\t\u0002\u0005}8cAA\u007f\u001d!9!'!@\u0005\u0002\t\rACAA~\u0011)\u00119!!@C\u0002\u0013\u0015!\u0011B\u0001#\u001f:+u,Q\"L%\u0016\u000bV+R*U?\u00163VIU-`\u001b\u0016\u001b6+Q$F?\u000e{UK\u0014+\u0016\u0005\t-qB\u0001B\u0007;\u0005!\u0007\"\u0003B\t\u0003{\u0004\u000bQ\u0002B\u0006\u0003\rze*R0B\u0007.\u0013V)U+F'R{VIV#S3~kUiU*B\u000f\u0016{6iT+O)\u0002B!B!\u0006\u0002~\n\u0007IQ\u0001B\f\u0003ei\u0015\tW0Q\u000b:#\u0015JT$`\u001b\u0016\u001b6+Q$F?\u000e{UK\u0014+\u0016\u0005\teqB\u0001B\u000e;\t\u0019\u0001\u001eC\u0005\u0003 \u0005u\b\u0015!\u0004\u0003\u001a\u0005QR*\u0011-`!\u0016sE)\u0013(H?6+5kU!H\u000b~\u001bu*\u0016(UA!Q!1EA\u007f#\u0003%\tA!\n\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00138+\t\u00119CK\u0002\u0017\u0005SY#Aa\u000b\u0011\t\t5\"qG\u0007\u0003\u0005_QAA!\r\u00034\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005k\u0001\u0012AC1o]>$\u0018\r^5p]&!!\u0011\bB\u0018\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0005\u000b\u0005{\ti0%A\u0005\u0002\t\u0015\u0012a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$\u0003\b")
/* loaded from: input_file:org/apache/gearpump/streaming/task/Subscription.class */
public class Subscription {
    public final TaskId org$apache$gearpump$streaming$task$Subscription$$taskId;
    public final Subscriber org$apache$gearpump$streaming$task$Subscription$$subscriber;
    private final int sessionId;
    private final ExpressTransport transport;
    private final int maxPendingMessageCount;
    private final int ackOnceEveryMessageCount;
    private final Logger LOG;
    private final short[] messageCount;
    private final short[] org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount;
    private final short[] candidateMinClockSince;
    private final long[] org$apache$gearpump$streaming$task$Subscription$$minClockValue;
    private final long[] candidateMinClock;
    private short maxPendingCount;
    private LifeTime life;
    private final Partitioner partitioner;
    private final Function1<Message, Object> sendFn;
    private long lastFlushTime;
    private final int FLUSH_INTERVAL;

    public static int MAX_PENDING_MESSAGE_COUNT() {
        return Subscription$.MODULE$.MAX_PENDING_MESSAGE_COUNT();
    }

    public static int ONE_ACKREQUEST_EVERY_MESSAGE_COUNT() {
        return Subscription$.MODULE$.ONE_ACKREQUEST_EVERY_MESSAGE_COUNT();
    }

    private Logger LOG() {
        return this.LOG;
    }

    private short[] messageCount() {
        return this.messageCount;
    }

    public short[] org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount() {
        return this.org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount;
    }

    private short[] candidateMinClockSince() {
        return this.candidateMinClockSince;
    }

    public long[] org$apache$gearpump$streaming$task$Subscription$$minClockValue() {
        return this.org$apache$gearpump$streaming$task$Subscription$$minClockValue;
    }

    private long[] candidateMinClock() {
        return this.candidateMinClock;
    }

    private short maxPendingCount() {
        return this.maxPendingCount;
    }

    private void maxPendingCount_$eq(short s) {
        this.maxPendingCount = s;
    }

    private LifeTime life() {
        return this.life;
    }

    private void life_$eq(LifeTime lifeTime) {
        this.life = lifeTime;
    }

    private Partitioner partitioner() {
        return this.partitioner;
    }

    private Function1<Message, Object> sendFn() {
        return this.sendFn;
    }

    public void changeLife(LifeTime lifeTime) {
        life_$eq(lifeTime);
    }

    public void start() {
        this.transport.transport(new InitialAckRequest(this.org$apache$gearpump$streaming$task$Subscription$$taskId, this.sessionId), allTasks());
    }

    public int sendMessage(Message message) {
        return BoxesRunTime.unboxToInt(sendFn().apply(message));
    }

    public int sendMessage(Message message, int i) {
        if (i == Partitioner$.MODULE$.UNKNOWN_PARTITION_ID() || !life().contains(message.timestamp())) {
            if (needFlush()) {
                flush();
            }
            return 0;
        }
        this.transport.transport(message, Predef$.MODULE$.wrapRefArray(new TaskId[]{new TaskId(this.org$apache$gearpump$streaming$task$Subscription$$subscriber.processorId(), i)}));
        org$apache$gearpump$streaming$task$Subscription$$minClockValue()[i] = Math.min(org$apache$gearpump$streaming$task$Subscription$$minClockValue()[i], message.timestamp());
        candidateMinClock()[i] = Math.min(candidateMinClock()[i], message.timestamp());
        incrementMessageCount(i, 1);
        if (messageCount()[i] % this.ackOnceEveryMessageCount == 0) {
            org$apache$gearpump$streaming$task$Subscription$$sendAckRequest(i);
        }
        if (messageCount()[i] / this.maxPendingMessageCount != (messageCount()[i] + this.ackOnceEveryMessageCount) / this.maxPendingMessageCount) {
            org$apache$gearpump$streaming$task$Subscription$$sendLatencyProbe(i);
        }
        return 1;
    }

    private long lastFlushTime() {
        return this.lastFlushTime;
    }

    private void lastFlushTime_$eq(long j) {
        this.lastFlushTime = j;
    }

    private int FLUSH_INTERVAL() {
        return this.FLUSH_INTERVAL;
    }

    private boolean needFlush() {
        return System.currentTimeMillis() - lastFlushTime() > ((long) FLUSH_INTERVAL()) && Shorts.max(org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount()) > 0;
    }

    private void flush() {
        lastFlushTime_$eq(System.currentTimeMillis());
        allTasks().foreach(new Subscription$$anonfun$flush$1(this));
    }

    private Seq<TaskId> allTasks() {
        RichInt$ richInt$ = RichInt$.MODULE$;
        Predef$ predef$ = Predef$.MODULE$;
        return (Seq) richInt$.until$extension0(0, this.org$apache$gearpump$streaming$task$Subscription$$subscriber.parallelism()).map(new Subscription$$anonfun$allTasks$1(this), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public void receiveAck(Ack ack) {
        int index = ack.taskId().index();
        if (ack.sessionId() == this.sessionId) {
            if (ack.actualReceivedNum() != ack.seq()) {
                LOG().error(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Failed! received ack: ", ", received: ", ", "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{ack, BoxesRunTime.boxToShort(ack.actualReceivedNum())}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"sent: ", ", try to replay..."})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToShort(ack.seq())}))).toString());
                throw new AppMasterToExecutor.MsgLostException();
            }
            if (((short) (ack.seq() - candidateMinClockSince()[index])) >= 0) {
                if (ack.seq() == messageCount()[index]) {
                    org$apache$gearpump$streaming$task$Subscription$$minClockValue()[index] = Long.MAX_VALUE;
                } else {
                    org$apache$gearpump$streaming$task$Subscription$$minClockValue()[index] = candidateMinClock()[index];
                }
                candidateMinClock()[index] = Long.MAX_VALUE;
                candidateMinClockSince()[index] = messageCount()[index];
            }
            org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount()[ack.taskId().index()] = (short) (messageCount()[ack.taskId().index()] - ack.seq());
            updateMaxPendingCount();
        }
    }

    public long minClock() {
        return BoxesRunTime.unboxToLong(Predef$.MODULE$.longArrayOps(org$apache$gearpump$streaming$task$Subscription$$minClockValue()).min(Ordering$Long$.MODULE$));
    }

    public boolean allowSendingMoreMessages() {
        return maxPendingCount() < this.maxPendingMessageCount;
    }

    public void sendAckRequestOnStallingTime(long j) {
        Predef$ predef$ = Predef$.MODULE$;
        Range indices = new ArrayOps.ofLong(org$apache$gearpump$streaming$task$Subscription$$minClockValue()).indices();
        new Subscription$$anonfun$sendAckRequestOnStallingTime$1(this, j);
        if (indices.isEmpty()) {
            return;
        }
        int start = indices.start();
        while (true) {
            int i = start;
            if (org$apache$gearpump$streaming$task$Subscription$$minClockValue()[i] == j && org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount()[i] > 0 && allowSendingMoreMessages()) {
                org$apache$gearpump$streaming$task$Subscription$$sendAckRequest(i);
                org$apache$gearpump$streaming$task$Subscription$$sendLatencyProbe(i);
            }
            if (i == indices.lastElement()) {
                return;
            } else {
                start = i + indices.step();
            }
        }
    }

    public void org$apache$gearpump$streaming$task$Subscription$$sendAckRequest(int i) {
        incrementMessageCount(i, this.ackOnceEveryMessageCount);
        TaskId taskId = new TaskId(this.org$apache$gearpump$streaming$task$Subscription$$subscriber.processorId(), i);
        this.transport.transport(new AckRequest(this.org$apache$gearpump$streaming$task$Subscription$$taskId, messageCount()[i], this.sessionId), Predef$.MODULE$.wrapRefArray(new TaskId[]{taskId}));
    }

    private void incrementMessageCount(int i, int i2) {
        messageCount()[i] = (short) (messageCount()[i] + i2);
        org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount()[i] = (short) (org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount()[i] + i2);
        updateMaxPendingCount();
    }

    private void updateMaxPendingCount() {
        maxPendingCount_$eq(Shorts.max(org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount()));
    }

    public void org$apache$gearpump$streaming$task$Subscription$$sendLatencyProbe(int i) {
        this.transport.transport(new LatencyProbe(System.currentTimeMillis()), Predef$.MODULE$.wrapRefArray(new TaskId[]{new TaskId(this.org$apache$gearpump$streaming$task$Subscription$$subscriber.processorId(), i)}));
    }

    public Subscription(int i, int i2, TaskId taskId, Subscriber subscriber, int i3, ExpressTransport expressTransport, int i4, int i5) {
        Function1<Message, Object> subscription$$anonfun$4;
        this.org$apache$gearpump$streaming$task$Subscription$$taskId = taskId;
        this.org$apache$gearpump$streaming$task$Subscription$$subscriber = subscriber;
        this.sessionId = i3;
        this.transport = expressTransport;
        this.maxPendingMessageCount = i4;
        this.ackOnceEveryMessageCount = i5;
        Predef$.MODULE$.assert(i4 >= i5);
        Predef$.MODULE$.assert(i4 < 16383);
        this.LOG = LogUtil$.MODULE$.getLogger(getClass(), LogUtil$.MODULE$.getLogger$default$2(), LogUtil$.MODULE$.getLogger$default$3(), LogUtil$.MODULE$.getLogger$default$4(), BoxesRunTime.boxToInteger(i2), taskId, BoxesRunTime.boxToInteger(i), LogUtil$.MODULE$.getLogger$default$8());
        this.messageCount = new short[subscriber.parallelism()];
        this.org$apache$gearpump$streaming$task$Subscription$$pendingMessageCount = new short[subscriber.parallelism()];
        this.candidateMinClockSince = new short[subscriber.parallelism()];
        this.org$apache$gearpump$streaming$task$Subscription$$minClockValue = (long[]) Array$.MODULE$.fill(subscriber.parallelism(), new Subscription$$anonfun$1(this), ClassTag$.MODULE$.Long());
        this.candidateMinClock = (long[]) Array$.MODULE$.fill(subscriber.parallelism(), new Subscription$$anonfun$2(this), ClassTag$.MODULE$.Long());
        this.maxPendingCount = (short) 0;
        this.life = subscriber.lifeTime();
        this.partitioner = subscriber.partitionerDescription().partitionerFactory().partitioner();
        UnicastPartitioner partitioner = partitioner();
        if (partitioner instanceof UnicastPartitioner) {
            subscription$$anonfun$4 = new Subscription$$anonfun$3(this, partitioner);
        } else {
            if (!(partitioner instanceof MulticastPartitioner)) {
                throw new MatchError(partitioner);
            }
            subscription$$anonfun$4 = new Subscription$$anonfun$4(this, (MulticastPartitioner) partitioner);
        }
        this.sendFn = subscription$$anonfun$4;
        this.lastFlushTime = 0L;
        this.FLUSH_INTERVAL = 5000;
    }
}
