package io.scalecube.cluster.fdetector;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.scalecube.cluster.ClusterConfiguration;
import io.scalecube.transport.ITransport;
import io.scalecube.transport.Message;
import io.scalecube.transport.TransportEndpoint;
import io.scalecube.transport.TransportHeaders;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.Subscribers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/* loaded from: input_file:io/scalecube/cluster/fdetector/FailureDetector.class */
public final class FailureDetector implements IFailureDetector {
    private ITransport transport;
    private final TransportEndpoint localEndpoint;
    private final Scheduler scheduler;
    private TransportEndpoint pingMember;
    private List<TransportEndpoint> randomMembers;
    private volatile Subscription fdTask;
    private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class);
    private static final String ACK = "io.scalecube.cluster/fdetector/ack";
    private static final TransportHeaders.Filter ACK_FILTER = new TransportHeaders.Filter(ACK);
    private static final String PING = "io.scalecube.cluster/fdetector/ping";
    private static final TransportHeaders.Filter PING_FILTER = new TransportHeaders.Filter(PING);
    private static final String PING_REQ = "io.scalecube.cluster/fdetector/pingReq";
    private static final TransportHeaders.Filter PING_REQ_FILTER = new TransportHeaders.Filter(PING_REQ);
    private volatile List<TransportEndpoint> members = new ArrayList();
    private Subject<FailureDetectorEvent, FailureDetectorEvent> subject = new SerializedSubject(PublishSubject.create());
    private AtomicInteger periodNbr = new AtomicInteger();
    private Set<TransportEndpoint> suspectedMembers = Sets.newConcurrentHashSet();
    private int pingTime = ClusterConfiguration.FailureDetectorSettings.DEFAULT_PING_TIME;
    private int pingTimeout = ClusterConfiguration.FailureDetectorSettings.DEFAULT_PING_TIMEOUT;
    private int maxEndpointsToSelect = 3;
    private Subscriber<Message> onPingSubscriber = Subscribers.create(new Action1<Message>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.1
        public void call(Message message) {
            FailureDetector.LOGGER.trace("Received Ping: {}", message);
            FailureDetectorData failureDetectorData = (FailureDetectorData) message.data();
            FailureDetector.this.send(failureDetectorData.getFrom(), new Message(failureDetectorData, new String[]{"q", FailureDetector.ACK, "cid", message.header("cid")}));
        }
    });
    private Subscriber<Message> onPingReqSubscriber = Subscribers.create(new Action1<Message>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.2
        public void call(Message message) {
            FailureDetector.LOGGER.trace("Received PingReq: {}", message);
            FailureDetectorData failureDetectorData = (FailureDetectorData) message.data();
            TransportEndpoint to = failureDetectorData.getTo();
            TransportEndpoint from = failureDetectorData.getFrom();
            String header = message.header("cid");
            FailureDetector.this.send(to, new Message(new FailureDetectorData(FailureDetector.this.localEndpoint, to, from), new String[]{"q", FailureDetector.PING, "cid", header}));
        }
    });
    private Subscriber<Message> onAckToOriginalAckSubscriber = Subscribers.create(new Action1<Message>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.3
        public void call(Message message) {
            FailureDetectorData failureDetectorData = (FailureDetectorData) message.data();
            TransportEndpoint originalIssuer = failureDetectorData.getOriginalIssuer();
            FailureDetector.this.send(originalIssuer, new Message(new FailureDetectorData(originalIssuer, failureDetectorData.getTo()), new String[]{"q", FailureDetector.ACK, "cid", message.header("cid")}));
        }
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/cluster/fdetector/FailureDetector$CorrelationFilter.class */
    public static class CorrelationFilter implements Func1<Message, Boolean> {
        final TransportEndpoint from;
        final TransportEndpoint target;

        CorrelationFilter(TransportEndpoint transportEndpoint, TransportEndpoint transportEndpoint2) {
            this.from = transportEndpoint;
            this.target = transportEndpoint2;
        }

        public Boolean call(Message message) {
            FailureDetectorData failureDetectorData = (FailureDetectorData) message.data();
            return Boolean.valueOf(this.from.equals(failureDetectorData.getFrom()) && this.target.equals(failureDetectorData.getTo()) && failureDetectorData.getOriginalIssuer() == null);
        }
    }

    public FailureDetector(TransportEndpoint transportEndpoint, Scheduler scheduler) {
        Preconditions.checkArgument(transportEndpoint != null);
        Preconditions.checkArgument(scheduler != null);
        this.localEndpoint = transportEndpoint;
        this.scheduler = scheduler;
    }

    public void setPingTime(int i) {
        this.pingTime = i;
    }

    public void setPingTimeout(int i) {
        this.pingTimeout = i;
    }

    public void setMaxEndpointsToSelect(int i) {
        this.maxEndpointsToSelect = i;
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public void setClusterEndpoints(Collection<TransportEndpoint> collection) {
        HashSet hashSet = new HashSet(collection);
        hashSet.remove(this.localEndpoint);
        ArrayList arrayList = new ArrayList(hashSet);
        Collections.shuffle(arrayList);
        this.members = arrayList;
        LOGGER.debug("Set cluster members: {}", this.members);
    }

    public void setTransport(ITransport iTransport) {
        this.transport = iTransport;
    }

    public ITransport getTransport() {
        return this.transport;
    }

    public TransportEndpoint getLocalEndpoint() {
        return this.localEndpoint;
    }

    public List<TransportEndpoint> getSuspectedMembers() {
        return new ArrayList(this.suspectedMembers);
    }

    void setPingMember(TransportEndpoint transportEndpoint) {
        Preconditions.checkNotNull(transportEndpoint);
        Preconditions.checkArgument(transportEndpoint != this.localEndpoint);
        this.pingMember = transportEndpoint;
    }

    void setRandomMembers(List<TransportEndpoint> list) {
        Preconditions.checkNotNull(list);
        this.randomMembers = list;
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public void start() {
        this.transport.listen().filter(PING_FILTER).filter(targetFilter(this.localEndpoint)).subscribe(this.onPingSubscriber);
        this.transport.listen().filter(PING_REQ_FILTER).subscribe(this.onPingReqSubscriber);
        this.transport.listen().filter(ACK_FILTER).filter(new Func1<Message, Boolean>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.4
            public Boolean call(Message message) {
                return Boolean.valueOf(((FailureDetectorData) message.data()).getOriginalIssuer() != null);
            }
        }).subscribe(this.onAckToOriginalAckSubscriber);
        this.fdTask = this.scheduler.createWorker().schedulePeriodically(new Action0() { // from class: io.scalecube.cluster.fdetector.FailureDetector.5
            public void call() {
                try {
                    FailureDetector.this.doPing(FailureDetector.this.members);
                } catch (Exception e) {
                    FailureDetector.LOGGER.error("Unhandled exception: {}", e, e);
                }
            }
        }, 0L, this.pingTime, TimeUnit.MILLISECONDS);
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public void stop() {
        if (this.fdTask != null) {
            this.fdTask.unsubscribe();
        }
        this.subject.onCompleted();
        this.onPingReqSubscriber.unsubscribe();
        this.onPingReqSubscriber.unsubscribe();
        this.onAckToOriginalAckSubscriber.unsubscribe();
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public Observable<FailureDetectorEvent> listenStatus() {
        return this.subject;
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public void suspect(TransportEndpoint transportEndpoint) {
        Preconditions.checkNotNull(transportEndpoint);
        this.suspectedMembers.add(transportEndpoint);
    }

    @Override // io.scalecube.cluster.fdetector.IFailureDetector
    public void trust(TransportEndpoint transportEndpoint) {
        Preconditions.checkNotNull(transportEndpoint);
        this.suspectedMembers.remove(transportEndpoint);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doPing(final List<TransportEndpoint> list) {
        final TransportEndpoint selectPingMember = selectPingMember(list);
        if (selectPingMember == null) {
            return;
        }
        final String str = "" + this.periodNbr.incrementAndGet();
        Message message = new Message(new FailureDetectorData(this.localEndpoint, selectPingMember), new String[]{"q", PING, "cid", str});
        LOGGER.trace("Send Ping from {} to {}", this.localEndpoint, selectPingMember);
        this.transport.listen().filter(ackFilter(str)).filter(new CorrelationFilter(this.localEndpoint, selectPingMember)).take(1).timeout(this.pingTimeout, TimeUnit.MILLISECONDS, this.scheduler).subscribe(Subscribers.create(new Action1<Message>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.6
            public void call(Message message2) {
                FailureDetector.LOGGER.trace("Received PingAck from {}", selectPingMember);
                FailureDetector.this.declareTrusted(selectPingMember);
            }
        }, new Action1<Throwable>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.7
            public void call(Throwable th) {
                FailureDetector.LOGGER.trace("No PingAck from {} within {}ms; about to make PingReq now", selectPingMember, Integer.valueOf(FailureDetector.this.pingTimeout));
                FailureDetector.this.doPingReq(list, selectPingMember, str);
            }
        }));
        send(selectPingMember, message);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doPingReq(List<TransportEndpoint> list, final TransportEndpoint transportEndpoint, String str) {
        final int i = this.pingTime - this.pingTimeout;
        if (i <= 0) {
            LOGGER.trace("No PingReq occurred, because no time left (pingTime={}, pingTimeout={})", Integer.valueOf(this.pingTime), Integer.valueOf(this.pingTimeout));
            declareSuspected(transportEndpoint);
            return;
        }
        final List<TransportEndpoint> selectRandomMembers = selectRandomMembers(list, this.maxEndpointsToSelect, transportEndpoint);
        if (selectRandomMembers.isEmpty()) {
            LOGGER.trace("No PingReq occurred, because member selection is empty");
            declareSuspected(transportEndpoint);
            return;
        }
        this.transport.listen().filter(ackFilter(str)).filter(new CorrelationFilter(this.localEndpoint, transportEndpoint)).take(1).timeout(i, TimeUnit.MILLISECONDS, this.scheduler).subscribe(Subscribers.create(new Action1<Message>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.8
            public void call(Message message) {
                FailureDetector.LOGGER.trace("PingReq OK (pinger={}, target={})", message.sender(), transportEndpoint);
                FailureDetector.this.declareTrusted(transportEndpoint);
            }
        }, new Action1<Throwable>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.9
            public void call(Throwable th) {
                FailureDetector.LOGGER.trace("No PingAck on PingReq within {}ms (pingers={}, target={})", new Object[]{selectRandomMembers, transportEndpoint, Integer.valueOf(i)});
                FailureDetector.this.declareSuspected(transportEndpoint);
            }
        }));
        Message message = new Message(new FailureDetectorData(this.localEndpoint, transportEndpoint), new String[]{"q", PING_REQ, "cid", str});
        for (TransportEndpoint transportEndpoint2 : selectRandomMembers) {
            LOGGER.trace("Send PingReq from {} to {}", this.localEndpoint, transportEndpoint2);
            send(transportEndpoint2, message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void declareSuspected(TransportEndpoint transportEndpoint) {
        if (this.suspectedMembers.add(transportEndpoint)) {
            LOGGER.debug("Member {} became SUSPECTED", transportEndpoint);
            this.subject.onNext(FailureDetectorEvent.suspected(transportEndpoint));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void declareTrusted(TransportEndpoint transportEndpoint) {
        if (this.suspectedMembers.remove(transportEndpoint)) {
            LOGGER.debug("Member {} became TRUSTED", transportEndpoint);
            this.subject.onNext(FailureDetectorEvent.trusted(transportEndpoint));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send(TransportEndpoint transportEndpoint, Message message) {
        this.transport.send(transportEndpoint, message);
    }

    private TransportEndpoint selectPingMember(List<TransportEndpoint> list) {
        if (this.pingMember != null) {
            return this.pingMember;
        }
        if (list.isEmpty()) {
            return null;
        }
        return selectRandomMembers(list, 1, null).get(0);
    }

    private List<TransportEndpoint> selectRandomMembers(List<TransportEndpoint> list, int i, TransportEndpoint transportEndpoint) {
        if (this.randomMembers != null) {
            return this.randomMembers;
        }
        Preconditions.checkArgument(i > 0, "FailureDetector: k is required!");
        int min = Math.min(i, 5);
        ArrayList arrayList = new ArrayList(list);
        arrayList.remove(transportEndpoint);
        ArrayList arrayList2 = new ArrayList(min);
        while (!arrayList.isEmpty() && min != 0) {
            TransportEndpoint transportEndpoint2 = (TransportEndpoint) arrayList.get(ThreadLocalRandom.current().nextInt(arrayList.size()));
            arrayList2.add(transportEndpoint2);
            arrayList.remove(transportEndpoint2);
            min--;
        }
        return arrayList2;
    }

    private Func1<Message, Boolean> ackFilter(String str) {
        return new TransportHeaders.Filter(ACK, str);
    }

    private Func1<Message, Boolean> targetFilter(final TransportEndpoint transportEndpoint) {
        return new Func1<Message, Boolean>() { // from class: io.scalecube.cluster.fdetector.FailureDetector.10
            public Boolean call(Message message) {
                return Boolean.valueOf(((FailureDetectorData) message.data()).getTo().equals(transportEndpoint));
            }
        };
    }
}
