package co.featbit.server;

import co.featbit.commons.json.JsonHelper;
import co.featbit.commons.json.JsonParseException;
import co.featbit.server.DataModel;
import co.featbit.server.FlagChange;
import co.featbit.server.Status;
import co.featbit.server.exterior.BasicConfig;
import co.featbit.server.exterior.Context;
import co.featbit.server.exterior.DataStorageTypes;
import co.featbit.server.exterior.DataSynchronizer;
import co.featbit.server.exterior.HttpConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/featbit/server/Streaming.class */
public final class Streaming implements DataSynchronizer {
    private static final String FULL_OPS = "full";
    private static final String PATCH_OPS = "patch";
    private static final String CLOSE_AND_THEN_RECONN_BY_DATASYNC_ERROR = "data sync error";
    private static final String AUTH_PARAMS = "?token=%s&type=server&version=2";
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final AtomicBoolean isWSConnected = new AtomicBoolean(false);
    private final AtomicInteger connCount = new AtomicInteger(0);
    private final AtomicBoolean forceToCloseWS = new AtomicBoolean(false);
    private final CompletableFuture<Boolean> initFuture = new CompletableFuture<>();
    private final StreamingWebSocketListener listener = new DefaultWebSocketListener();
    private final ScheduledThreadPoolExecutor pingScheduledExecutor;
    private final Status.DataUpdater updater;
    private final BasicConfig basicConfig;
    private final HttpConfig httpConfig;
    private final Integer maxRetryTimes;
    private final BackoffAndJitterStrategy strategy;
    private final String streamingURI;
    private final OkHttpClient okHttpClient;
    WebSocket webSocket;
    private static final Integer NORMAL_CLOSE = 1000;
    private static final Integer INVALID_REQUEST_CLOSE = 4003;
    private static final Integer GOING_AWAY_CLOSE = 1001;
    private static final Duration PING_INTERVAL = Duration.ofSeconds(10);
    private static final Duration AWAIT_TERMINATION = Duration.ofSeconds(2);
    private static final String NORMAL_CLOSE_REASON = "normal close";
    private static final String INVALID_REQUEST_CLOSE_REASON = "invalid request";
    private static final Map<Integer, String> NOT_RECONN_CLOSE_REASON = ImmutableMap.of(NORMAL_CLOSE, NORMAL_CLOSE_REASON, INVALID_REQUEST_CLOSE, INVALID_REQUEST_CLOSE_REASON);
    private static final List<Class<? extends Exception>> WEBSOCKET_EXCEPTION = ImmutableList.of(SocketTimeoutException.class, SocketException.class, EOFException.class);
    private static final Logger logger = Loggers.UPDATE_PROCESSOR;

    /* loaded from: input_file:co/featbit/server/Streaming$DefaultWebSocketListener.class */
    final class DefaultWebSocketListener extends StreamingWebSocketListener {
        DefaultWebSocketListener() {
            super();
        }

        public void onMessage(@NotNull WebSocket webSocket, @NotNull String str) {
            Streaming.logger.trace(str);
            if ("data-sync".equalsIgnoreCase(((DataModel.StreamingMessage) JsonHelper.deserialize(str, DataModel.StreamingMessage.class)).getMessageType())) {
                Streaming.logger.debug("Streaming WebSocket is processing data");
                DataModel.All all = (DataModel.All) JsonHelper.deserialize(str, DataModel.All.class);
                if (!all.isProcessData() || StreamingOps.processData(Streaming.this.updater, all.data(), Streaming.this.initialized, Streaming.this.initFuture).booleanValue()) {
                    return;
                }
                webSocket.close(Streaming.GOING_AWAY_CLOSE.intValue(), Streaming.CLOSE_AND_THEN_RECONN_BY_DATASYNC_ERROR);
            }
        }

        @Override // co.featbit.server.Streaming.StreamingWebSocketListener
        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            super.onOpen(webSocket, response);
            webSocket.send(Streaming.this.updater.storageInitialized() ? JsonHelper.serialize(new DataModel.DataSyncMessage(Long.valueOf(Streaming.this.updater.getVersion()))) : JsonHelper.serialize(new DataModel.DataSyncMessage(0L)));
        }
    }

    /* loaded from: input_file:co/featbit/server/Streaming$StreamingOps.class */
    static final class StreamingOps {
        StreamingOps() {
        }

        private static void broadcast(Status.DataUpdater dataUpdater, Map<DataStorageTypes.Category, Map<String, DataStorageTypes.Item>> map) {
            HashSet hashSet = new HashSet();
            if (dataUpdater.getFlagChangeEventNotifier().hasListeners()) {
                for (Map.Entry<DataStorageTypes.Category, Map<String, DataStorageTypes.Item>> entry : map.entrySet()) {
                    if (DataStorageTypes.FEATURES.equals(entry.getKey())) {
                        for (String str : entry.getValue().keySet()) {
                            if (!hashSet.contains(str)) {
                                dataUpdater.getFlagChangeEventNotifier().broadcast(new FlagChange.FlagChangeEvent(str));
                                hashSet.add(str);
                            }
                        }
                    } else if (DataStorageTypes.SEGMENTS.equals(entry.getKey())) {
                        List list = (List) dataUpdater.getAll(DataStorageTypes.FEATURES).values().stream().map(item -> {
                            return (DataModel.FeatureFlag) item;
                        }).collect(Collectors.toList());
                        for (String str2 : entry.getValue().keySet()) {
                            list.stream().filter(featureFlag -> {
                                return featureFlag.containsSegment(str2).booleanValue() && !hashSet.contains(featureFlag.getId());
                            }).forEach(featureFlag2 -> {
                                dataUpdater.getFlagChangeEventNotifier().broadcast(new FlagChange.FlagChangeEvent(featureFlag2.getId()));
                                hashSet.add(featureFlag2.getId());
                            });
                        }
                    }
                }
            }
        }

        static Boolean processData(Status.DataUpdater dataUpdater, DataModel.Data data, AtomicBoolean atomicBoolean, CompletableFuture<Boolean> completableFuture) {
            boolean z = false;
            String eventType = data.getEventType();
            Map<DataStorageTypes.Category, Map<String, DataStorageTypes.Item>> storageType = data.toStorageType();
            if (Streaming.FULL_OPS.equalsIgnoreCase(eventType)) {
                z = dataUpdater.init(storageType, data.getTimestamp());
            } else if (Streaming.PATCH_OPS.equalsIgnoreCase(eventType)) {
                z = ((List) storageType.entrySet().stream().flatMap(entry -> {
                    return ((Map) entry.getValue()).values().stream().map(item -> {
                        return ImmutablePair.of((DataStorageTypes.Category) entry.getKey(), item);
                    });
                }).sorted(Comparator.comparingLong(immutablePair -> {
                    return ((DataStorageTypes.Item) immutablePair.getRight()).getTimestamp().longValue();
                })).collect(Collectors.toList())).stream().allMatch(immutablePair2 -> {
                    return dataUpdater.upsert((DataStorageTypes.Category) immutablePair2.getLeft(), ((DataStorageTypes.Item) immutablePair2.getRight()).getId(), (DataStorageTypes.Item) immutablePair2.getRight(), ((DataStorageTypes.Item) immutablePair2.getRight()).getTimestamp());
                });
            }
            if (z) {
                if (atomicBoolean.compareAndSet(false, true)) {
                    completableFuture.complete(true);
                }
                Streaming.logger.debug("processing data is well done");
                dataUpdater.updateStatus(Status.State.OKState());
                broadcast(dataUpdater, storageType);
            }
            return Boolean.valueOf(z);
        }

        static boolean isReconnOnClose(Status.DataUpdater dataUpdater, int i, String str) {
            boolean z = false;
            String str2 = (String) Streaming.NOT_RECONN_CLOSE_REASON.get(Integer.valueOf(i));
            if (str2 == null) {
                z = true;
                str2 = StringUtils.isEmpty(str) ? "unexpected close" : str;
            }
            Streaming.logger.info("Streaming WebSocket close reason: {}", str2);
            if (z && i != Streaming.GOING_AWAY_CLOSE.intValue()) {
                dataUpdater.updateStatus(Status.State.interruptedState(Status.UNKNOWN_CLOSE_CODE, str2));
            } else if (i == Streaming.INVALID_REQUEST_CLOSE.intValue()) {
                dataUpdater.updateStatus(Status.State.errorOFFState(Status.REQUEST_INVALID_ERROR, str2));
            } else if (i == Streaming.NORMAL_CLOSE.intValue()) {
                dataUpdater.updateStatus(Status.State.normalOFFState());
            }
            return z;
        }

        static boolean isReconnOnFailure(Status.DataUpdater dataUpdater, Throwable th) {
            boolean z;
            String str;
            Class<?> cls = th.getClass();
            String format = String.format("%s : %s", cls.getTypeName(), th.getMessage());
            if (th instanceof RuntimeException) {
                z = cls != JsonParseException.class;
                str = z ? Status.RUNTIME_ERROR : Status.DATA_INVALID_ERROR;
            } else {
                z = true;
                str = Streaming.WEBSOCKET_EXCEPTION.contains(cls) ? Status.WEBSOCKET_ERROR : th instanceof IOException ? Status.NETWORK_ERROR : Status.UNKNOWN_ERROR;
            }
            if (z) {
                Streaming.logger.warn("FB JAVA SDK: streaming webSocket will reconnect because of {}", th.getMessage());
                dataUpdater.updateStatus(Status.State.interruptedState(str, format));
            } else {
                Streaming.logger.error("FB JAVA SDK: streaming webSocket Failure", th);
                dataUpdater.updateStatus(Status.State.errorOFFState(str, format));
            }
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:co/featbit/server/Streaming$StreamingWebSocketListener.class */
    public abstract class StreamingWebSocketListener extends WebSocketListener {
        StreamingWebSocketListener() {
        }

        public final void onClosed(@NotNull WebSocket webSocket, int i, @NotNull String str) {
            Streaming.this.isWSConnected.compareAndSet(true, false);
            if (StreamingOps.isReconnOnClose(Streaming.this.updater, i, str)) {
                Streaming.this.reconnect();
            } else {
                Streaming.this.clearExecutor();
            }
        }

        public final void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable th, @Nullable Response response) {
            Streaming.this.isWSConnected.compareAndSet(true, false);
            if (StreamingOps.isReconnOnFailure(Streaming.this.updater, th)) {
                Streaming.this.reconnect();
            } else {
                Streaming.this.clearExecutor();
            }
        }

        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            Streaming.logger.debug("Ask Data Updating, http code {}", Integer.valueOf(response.code()));
            Streaming.this.isWSConnected.compareAndSet(false, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Streaming(Status.DataUpdater dataUpdater, Context context, Duration duration, Integer num) {
        this.updater = dataUpdater;
        this.basicConfig = context.basicConfig();
        this.httpConfig = context.http();
        this.streamingURI = context.basicConfig().getStreamingURI();
        this.strategy = new BackoffAndJitterStrategy(duration);
        this.maxRetryTimes = Integer.valueOf((num == null || num.intValue() <= 0) ? Integer.MAX_VALUE : num.intValue());
        this.okHttpClient = buildWebOkHttpClient();
        this.pingScheduledExecutor = new ScheduledThreadPoolExecutor(1, Utils.createThreadFactory("streaming-periodic-ping-worker-%d", true));
    }

    @Override // co.featbit.server.exterior.DataSynchronizer
    public Future<Boolean> start() {
        logger.debug("Streaming Starting...");
        this.connCount.set(0);
        this.isWSConnected.set(false);
        this.forceToCloseWS.set(false);
        connect();
        this.pingScheduledExecutor.scheduleAtFixedRate(this::ping, 0L, PING_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
        return this.initFuture;
    }

    @Override // co.featbit.server.exterior.DataSynchronizer
    public boolean isInitialized() {
        return this.initialized.get();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.info("FB JAVA SDK: streaming is stopping...");
        if (this.webSocket != null) {
            this.forceToCloseWS.compareAndSet(false, true);
            this.webSocket.close(NORMAL_CLOSE.intValue(), NORMAL_CLOSE_REASON);
            if (this.isWSConnected.get()) {
                return;
            }
            clearExecutor();
        }
    }

    private void ping() {
        if (this.webSocket == null || !this.isWSConnected.get() || this.forceToCloseWS.get()) {
            return;
        }
        logger.trace("ping");
        this.webSocket.send(JsonHelper.serialize(new DataModel.DataSyncMessage(null)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearExecutor() {
        Loggers.UPDATE_PROCESSOR.debug("streaming processor clean up thread and conn pool");
        Utils.shutDownThreadPool("streaming-periodic-ping-worker", this.pingScheduledExecutor, AWAIT_TERMINATION);
        Utils.shutdownOKHttpClient("Streaming", this.okHttpClient);
    }

    private void connect() {
        if (this.isWSConnected.get() || this.forceToCloseWS.get()) {
            logger.error("FB JAVA SDK: streaming websocket is already Connected or Closed");
            return;
        }
        if (this.connCount.getAndIncrement() >= this.maxRetryTimes.intValue()) {
            logger.error("FB JAVA SDK: streaming websocket have reached max retry");
            return;
        }
        Request build = new Request.Builder().headers(Utils.headersBuilderFor(this.httpConfig).build()).url(String.format(this.streamingURI.concat(AUTH_PARAMS), Utils.buildToken(this.basicConfig.getEnvSecret()))).build();
        logger.debug("Streaming WebSocket is connecting...");
        this.strategy.setGoodRunAtNow();
        this.webSocket = this.okHttpClient.newWebSocket(build, this.listener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect() {
        try {
            long millis = this.strategy.nextDelay(false).toMillis();
            logger.debug("Streaming WebSocket will reconnect in {} milliseconds", Long.valueOf(millis));
            Thread.sleep(millis);
            connect();
        } catch (InterruptedException e) {
            connect();
        } catch (Throwable th) {
            connect();
            throw th;
        }
    }

    @NotNull
    private OkHttpClient buildWebOkHttpClient() {
        OkHttpClient.Builder builder = new OkHttpClient.Builder();
        builder.connectTimeout(this.httpConfig.connectTime()).pingInterval(Duration.ZERO).retryOnConnectionFailure(false);
        Utils.buildProxyAndSocketFactoryFor(builder, this.httpConfig);
        return builder.build();
    }
}
