package co.featbit.server;

import co.featbit.commons.json.JsonHelper;
import co.featbit.commons.json.JsonParseException;
import co.featbit.server.DataModel;
import co.featbit.server.Status;
import co.featbit.server.exterior.BasicConfig;
import co.featbit.server.exterior.Context;
import co.featbit.server.exterior.DataStoreTypes;
import co.featbit.server.exterior.DataSynchronizer;
import co.featbit.server.exterior.HttpConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/featbit/server/Streaming.class */
public final class Streaming implements DataSynchronizer {
    private static final String FULL_OPS = "full";
    private static final String PATCH_OPS = "patch";
    private static final String JUST_RECONN_REASON_REGISTERED = "reconn";
    private static final int MAX_QUEUE_SIZE = 5;
    private static final String AUTH_PARAMS = "?token=%s&type=server&version=2";
    private final ThreadPoolExecutor storageUpdateExecutor;
    private final ScheduledThreadPoolExecutor pingScheduledExecutor;
    private final Status.DataUpdater updater;
    private final BasicConfig basicConfig;
    private final HttpConfig httpConfig;
    private final Integer maxRetryTimes;
    private final BackoffAndJitterStrategy strategy;
    private final String streamingURI;
    private final OkHttpClient okHttpClient;
    WebSocket webSocket;
    private static final Integer NORMAL_CLOSE = 1000;
    private static final Integer INVALID_REQUEST_CLOSE = 4003;
    private static final Integer GOING_AWAY_CLOSE = 1001;
    private static final Duration PING_INTERVAL = Duration.ofSeconds(10);
    private static final Duration AWAIT_TERMINATION = Duration.ofSeconds(2);
    private static final String NORMAL_CLOSE_REASON = "normal close";
    private static final String INVALID_REQUEST_CLOSE_REASON = "invalid request";
    private static final Map<Integer, String> NOT_RECONN_CLOSE_REASON = ImmutableMap.of(NORMAL_CLOSE, NORMAL_CLOSE_REASON, INVALID_REQUEST_CLOSE, INVALID_REQUEST_CLOSE_REASON);
    private static final List<Class<? extends Exception>> WEBSOCKET_EXCEPTION = ImmutableList.of(SocketTimeoutException.class, SocketException.class, EOFException.class);
    private static final Logger logger = Loggers.UPDATE_PROCESSOR;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final AtomicBoolean isWSConnected = new AtomicBoolean(false);
    private final AtomicInteger connCount = new AtomicInteger(0);
    private final CompletableFuture<Boolean> initFuture = new CompletableFuture<>();
    private final StreamingWebSocketListener listener = new DefaultWebSocketListener();
    private final Semaphore permits = new Semaphore(MAX_QUEUE_SIZE);

    /* loaded from: input_file:co/featbit/server/Streaming$DefaultWebSocketListener.class */
    private final class DefaultWebSocketListener extends StreamingWebSocketListener {
        private DefaultWebSocketListener() {
            super();
        }

        public void onMessage(@NotNull WebSocket webSocket, @NotNull String str) {
            Streaming.logger.trace(str);
            if ("data-sync".equalsIgnoreCase(((DataModel.StreamingMessage) JsonHelper.deserialize(str, DataModel.StreamingMessage.class)).getMessageType())) {
                Streaming.logger.debug("Streaming WebSocket is processing data");
                DataModel.All all = (DataModel.All) JsonHelper.deserialize(str, DataModel.All.class);
                if (all.isProcessData()) {
                    try {
                        Streaming.this.permits.acquire();
                        CompletableFuture.supplyAsync(() -> {
                            return Streaming.this.processDateAsync(all.data());
                        }, Streaming.this.storageUpdateExecutor).whenComplete((bool, th) -> {
                            Streaming.this.permits.release();
                        });
                    } catch (InterruptedException e) {
                    }
                }
            }
        }

        @Override // co.featbit.server.Streaming.StreamingWebSocketListener
        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            super.onOpen(webSocket, response);
            webSocket.send(Streaming.this.updater.storageInitialized() ? JsonHelper.serialize(new DataModel.DataSyncMessage(Long.valueOf(Streaming.this.updater.getVersion()))) : JsonHelper.serialize(new DataModel.DataSyncMessage(0L)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:co/featbit/server/Streaming$StreamingWebSocketListener.class */
    public abstract class StreamingWebSocketListener extends WebSocketListener {
        StreamingWebSocketListener() {
        }

        public final void onClosed(@NotNull WebSocket webSocket, int i, @NotNull String str) {
            boolean z = false;
            String str2 = (String) Streaming.NOT_RECONN_CLOSE_REASON.get(Integer.valueOf(i));
            if (str2 == null) {
                z = true;
                str2 = StringUtils.isEmpty(str) ? "unexpected close" : str;
            }
            Streaming.logger.debug("Streaming WebSocket close reason: {}", str2);
            Streaming.this.isWSConnected.compareAndSet(true, false);
            if (z) {
                if (!Streaming.JUST_RECONN_REASON_REGISTERED.equals(str)) {
                    Streaming.this.updater.updateStatus(Status.StateType.INTERRUPTED, Status.ErrorInfo.of(Status.UNKNOWN_CLOSE_CODE, str));
                }
                Streaming.this.reconnect(false);
            } else {
                if (i == Streaming.INVALID_REQUEST_CLOSE.intValue()) {
                    Streaming.this.updater.updateStatus(Status.StateType.OFF, Status.ErrorInfo.of(Status.REQUEST_INVALID_ERROR, str));
                } else {
                    Streaming.this.updater.updateStatus(Status.StateType.OFF, null);
                }
                Streaming.this.clearExecutor();
            }
        }

        public final void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable th, @Nullable Response response) {
            boolean z;
            String str;
            Streaming.this.isWSConnected.compareAndSet(true, false);
            boolean z2 = false;
            Class<?> cls = th.getClass();
            if (th instanceof RuntimeException) {
                z = cls != JsonParseException.class;
                str = z ? Status.RUNTIME_ERROR : Status.DATA_INVALID_ERROR;
            } else {
                z = true;
                if (Streaming.WEBSOCKET_EXCEPTION.contains(cls)) {
                    str = Status.WEBSOCKET_ERROR;
                } else if (th instanceof IOException) {
                    str = Status.NETWORK_ERROR;
                    z2 = true;
                } else {
                    str = Status.UNKNOWN_ERROR;
                }
            }
            Status.ErrorInfo of = Status.ErrorInfo.of(str, th.getMessage());
            if (z) {
                Streaming.logger.warn("FFC JAVA SDK: streaming webSocket will reconnect because of {}", th.getMessage());
                Streaming.this.updater.updateStatus(Status.StateType.INTERRUPTED, of);
                Streaming.this.reconnect(z2);
            } else {
                Streaming.logger.error("FFC JAVA SDK: streaming webSocket Failure", th);
                Streaming.this.updater.updateStatus(Status.StateType.OFF, of);
                Streaming.this.clearExecutor();
            }
        }

        public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
            Streaming.logger.debug("Ask Data Updating, http code {}", Integer.valueOf(response.code()));
            Streaming.this.isWSConnected.compareAndSet(false, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Streaming(Status.DataUpdater dataUpdater, Context context, Duration duration, Integer num) {
        this.updater = dataUpdater;
        this.basicConfig = context.basicConfig();
        this.httpConfig = context.http();
        this.streamingURI = context.basicConfig().getStreamingURI();
        this.strategy = new BackoffAndJitterStrategy(duration);
        this.maxRetryTimes = Integer.valueOf((num == null || num.intValue() <= 0) ? Integer.MAX_VALUE : num.intValue());
        this.okHttpClient = buildWebOkHttpClient();
        this.storageUpdateExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(MAX_QUEUE_SIZE), Utils.createThreadFactory("streaming-data-sync-worker-%d", true), new ThreadPoolExecutor.CallerRunsPolicy());
        this.pingScheduledExecutor = new ScheduledThreadPoolExecutor(1, Utils.createThreadFactory("streaming-periodic-ping-worker-%d", true));
    }

    @Override // co.featbit.server.exterior.DataSynchronizer
    public Future<Boolean> start() {
        logger.debug("Streaming Starting...");
        this.connCount.set(0);
        this.isWSConnected.set(false);
        connect();
        this.pingScheduledExecutor.scheduleAtFixedRate(this::ping, 0L, PING_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
        return this.initFuture;
    }

    @Override // co.featbit.server.exterior.DataSynchronizer
    public boolean isInitialized() {
        return this.updater.storageInitialized() && this.initialized.get();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.info("FFC JAVA SDK: streaming is stopping...");
        if (this.webSocket != null) {
            this.webSocket.close(NORMAL_CLOSE.intValue(), NORMAL_CLOSE_REASON);
        }
    }

    private void ping() {
        if (this.webSocket == null || !this.isWSConnected.get()) {
            return;
        }
        logger.trace("ping");
        this.webSocket.send(JsonHelper.serialize(new DataModel.DataSyncMessage(null)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearExecutor() {
        Loggers.UPDATE_PROCESSOR.debug("streaming processor clean up thread and conn pool");
        Utils.shutDownThreadPool("streaming-data-sync-worker", this.storageUpdateExecutor, AWAIT_TERMINATION);
        Utils.shutDownThreadPool("streaming-periodic-ping-worker", this.pingScheduledExecutor, AWAIT_TERMINATION);
        Utils.shutdownOKHttpClient("Streaming", this.okHttpClient);
    }

    private void connect() {
        if (this.isWSConnected.get()) {
            logger.error("FFC JAVA SDK: streaming websocket is already Connected");
            return;
        }
        if (this.connCount.getAndIncrement() >= this.maxRetryTimes.intValue()) {
            logger.error("FFC JAVA SDK: streaming websocket have reached max retry");
            return;
        }
        Request build = new Request.Builder().headers(Utils.headersBuilderFor(this.httpConfig).build()).url(String.format(this.streamingURI.concat(AUTH_PARAMS), Utils.buildToken(this.basicConfig.getEnvSecret()))).build();
        logger.debug("Streaming WebSocket is connecting...");
        this.strategy.setGoodRunAtNow();
        this.webSocket = this.okHttpClient.newWebSocket(build, this.listener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect(boolean z) {
        try {
            long millis = this.strategy.nextDelay(z).toMillis();
            logger.debug("Streaming WebSocket will reconnect in {} milliseconds", Long.valueOf(millis));
            Thread.sleep(millis);
            connect();
        } catch (InterruptedException e) {
            connect();
        } catch (Throwable th) {
            connect();
            throw th;
        }
    }

    @NotNull
    private OkHttpClient buildWebOkHttpClient() {
        OkHttpClient.Builder builder = new OkHttpClient.Builder();
        builder.connectTimeout(this.httpConfig.connectTime()).pingInterval(Duration.ZERO).retryOnConnectionFailure(false);
        Utils.buildProxyAndSocketFactoryFor(builder, this.httpConfig);
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Boolean processDateAsync(DataModel.Data data) {
        boolean z = false;
        String eventType = data.getEventType();
        Long timestamp = data.getTimestamp();
        Map<DataStoreTypes.Category, Map<String, DataStoreTypes.Item>> storageType = data.toStorageType();
        if (FULL_OPS.equalsIgnoreCase(eventType)) {
            z = this.updater.init(storageType, timestamp);
        } else if (PATCH_OPS.equalsIgnoreCase(eventType)) {
            z = true;
            for (Map.Entry<DataStoreTypes.Category, Map<String, DataStoreTypes.Item>> entry : storageType.entrySet()) {
                DataStoreTypes.Category key = entry.getKey();
                for (Map.Entry<String, DataStoreTypes.Item> entry2 : entry.getValue().entrySet()) {
                    z = this.updater.upsert(key, entry2.getKey(), entry2.getValue(), timestamp);
                }
            }
        }
        if (z) {
            if (this.initialized.compareAndSet(false, true)) {
                this.initFuture.complete(true);
            }
            logger.debug("processing data is well done");
            this.updater.updateStatus(Status.StateType.OK, null);
        } else {
            this.webSocket.close(GOING_AWAY_CLOSE.intValue(), JUST_RECONN_REASON_REGISTERED);
        }
        return Boolean.valueOf(z);
    }
}
