package info.bitrich.xchangestream.bitmex;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.collect.ImmutableSet;
import info.bitrich.xchangestream.bitmex.dto.BitmexMarketDataEvent;
import info.bitrich.xchangestream.bitmex.dto.BitmexWebSocketSubscriptionMessage;
import info.bitrich.xchangestream.bitmex.dto.BitmexWebSocketTransaction;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.knowm.xchange.bitmex.service.BitmexDigest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/bitmex/BitmexStreamingService.class */
public class BitmexStreamingService extends JsonNettyStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(BitmexStreamingService.class);
    private static final Set<String> SIMPLE_TABLES = ImmutableSet.of("order", "funding", "settlement", "position", "wallet", "margin", new String[0]);
    private final ObjectMapper mapper;
    private final List<ObservableEmitter<Long>> delayEmitters;
    private final String apiKey;
    private final String secretKey;
    public static final int DMS_CANCEL_ALL_IN = 60000;
    public static final int DMS_RESUBSCRIBE = 15000;
    private volatile long dmsCancelTime;
    private volatile Disposable dmsDisposable;

    public BitmexStreamingService(String str, String str2, String str3) {
        super(str, Integer.MAX_VALUE);
        this.mapper = new ObjectMapper();
        this.delayEmitters = new LinkedList();
        this.apiKey = str2;
        this.secretKey = str3;
    }

    public BitmexStreamingService(String str, String str2, String str3, int i, Duration duration, Duration duration2, int i2) {
        super(str, i, duration, duration2, i2);
        this.mapper = new ObjectMapper();
        this.delayEmitters = new LinkedList();
        this.apiKey = str2;
        this.secretKey = str3;
    }

    private void login() throws JsonProcessingException {
        long currentTimeMillis = System.currentTimeMillis() + 30;
        String generateSignature = BitmexAuthenticator.generateSignature(this.secretKey, "GET", "/realtime", String.valueOf(currentTimeMillis), "");
        HashMap hashMap = new HashMap();
        hashMap.put("op", "authKey");
        hashMap.put("args", Arrays.asList(this.apiKey, Long.valueOf(currentTimeMillis), generateSignature));
        sendMessage(this.mapper.writeValueAsString(hashMap));
    }

    public Completable connect() {
        Completable connect = super.connect();
        return this.apiKey == null ? connect : connect.andThen(completableObserver -> {
            try {
                login();
                completableObserver.onComplete();
            } catch (IOException e) {
                completableObserver.onError(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleMessage(JsonNode jsonNode) {
        if (!this.delayEmitters.isEmpty() && jsonNode.has("data")) {
            String asText = jsonNode.has("table") ? jsonNode.get("table").asText() : "";
            JsonNode jsonNode2 = jsonNode.get("data");
            if (jsonNode2.getNodeType().equals(JsonNodeType.ARRAY)) {
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat(BitmexMarketDataEvent.BITMEX_TIMESTAMP_FORMAT);
                simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
                JsonNode jsonNode3 = jsonNode2.get(0);
                if (jsonNode3 != null && jsonNode3.has("timestamp") && (!"order".equals(asText) || (jsonNode3.has("ordStatus") && "NEW".equals(jsonNode3.get("ordStatus").asText())))) {
                    try {
                        long longValue = valueOf.longValue() - simpleDateFormat.parse(jsonNode3.get("timestamp").asText()).getTime();
                        Iterator<ObservableEmitter<Long>> it = this.delayEmitters.iterator();
                        while (it.hasNext()) {
                            it.next().onNext(Long.valueOf(longValue));
                        }
                    } catch (ParseException e) {
                        LOG.error("Parsing timestamp error: ", e);
                    }
                }
            }
        }
        if (jsonNode.has("info") || jsonNode.has("success")) {
            return;
        }
        if (jsonNode.has("error")) {
            LOG.error("Error with message: " + jsonNode.get("error").asText());
        } else if (jsonNode.has("now") && jsonNode.has("cancelTime")) {
            handleDeadMansSwitchMessage(jsonNode);
        } else {
            super.handleMessage(jsonNode);
        }
    }

    private void handleDeadMansSwitchMessage(JsonNode jsonNode) {
        try {
            String asText = jsonNode.get("cancelTime").asText();
            if ("0".equals(asText)) {
                LOG.info("Dead man's switch disabled");
                this.dmsDisposable.dispose();
                this.dmsDisposable = null;
                this.dmsCancelTime = 0L;
            } else {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat(BitmexMarketDataEvent.BITMEX_TIMESTAMP_FORMAT);
                simpleDateFormat.setTimeZone(TimeZone.getTimeZone(ZoneOffset.UTC));
                this.dmsCancelTime = simpleDateFormat.parse(asText).getTime();
            }
        } catch (ParseException e) {
            LOG.error("Error parsing deadman's confirmation ");
        }
    }

    protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
        return null;
    }

    public Observable<BitmexWebSocketTransaction> subscribeBitmexChannel(String str) {
        return subscribeChannel(str, new Object[0]).map(jsonNode -> {
            return (BitmexWebSocketTransaction) this.objectMapper.treeToValue(jsonNode, BitmexWebSocketTransaction.class);
        }).share();
    }

    protected DefaultHttpHeaders getCustomHeaders() {
        DefaultHttpHeaders customHeaders = super.getCustomHeaders();
        if (this.secretKey == null || this.apiKey == null) {
            return customHeaders;
        }
        String digestString = BitmexDigest.createInstance(this.secretKey, this.apiKey).digestString("GET/realtime" + ((System.currentTimeMillis() / 1000) + 5));
        customHeaders.add("api-key", this.apiKey);
        customHeaders.add("api-signature", digestString);
        return customHeaders;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getChannelNameFromMessage(JsonNode jsonNode) throws IOException {
        String asText = jsonNode.get("table").asText();
        if (SIMPLE_TABLES.contains(asText)) {
            return asText;
        }
        JsonNode jsonNode2 = jsonNode.get("data");
        return String.format("%s:%s", asText, jsonNode2.size() > 0 ? jsonNode2.get(0).get("symbol").asText() : jsonNode.get("filter").get("symbol").asText());
    }

    public String getSubscribeMessage(String str, Object... objArr) throws IOException {
        return this.objectMapper.writeValueAsString(new BitmexWebSocketSubscriptionMessage("subscribe", new String[]{str}));
    }

    public String getUnsubscribeMessage(String str, Object... objArr) throws IOException {
        return this.objectMapper.writeValueAsString(new BitmexWebSocketSubscriptionMessage("unsubscribe", new String[]{str}));
    }

    public void enableDeadMansSwitch(long j, long j2) throws IOException {
        if (this.dmsDisposable != null) {
            LOG.warn("You already have Dead Man's switch enabled. Doing nothing");
            return;
        }
        String writeValueAsString = this.objectMapper.writeValueAsString(new BitmexWebSocketSubscriptionMessage("cancelAllAfter", new Object[]{Long.valueOf(j2)}));
        this.dmsDisposable = Schedulers.single().schedulePeriodicallyDirect(() -> {
            sendMessage(writeValueAsString);
        }, 0L, j, TimeUnit.MILLISECONDS);
        Schedulers.single().start();
    }

    public void disableDeadMansSwitch() throws IOException {
        sendMessage(this.objectMapper.writeValueAsString(new BitmexWebSocketSubscriptionMessage("cancelAllAfter", new Object[]{0})));
    }

    public boolean isDeadMansSwitchEnabled() {
        return this.dmsCancelTime > 0 && System.currentTimeMillis() < this.dmsCancelTime;
    }

    public void addDelayEmitter(ObservableEmitter<Long> observableEmitter) {
        this.delayEmitters.add(observableEmitter);
    }
}
