package info.bitrich.xchangestream.poloniex2;

import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketEventsTransaction;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketOrderbookModifiedEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketSubscriptionMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.reactivex.Observable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.knowm.xchange.currency.CurrencyPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/poloniex2/PoloniexStreamingService.class */
public class PoloniexStreamingService extends JsonNettyStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(PoloniexStreamingService.class);
    private static final String HEARTBEAT = "1010";
    private final Map<String, String> subscribedChannels;
    private final Map<String, Observable<JsonNode>> subscriptions;

    public PoloniexStreamingService(String str) {
        super(str, Integer.MAX_VALUE, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_RETRY_DURATION, 2);
        this.subscribedChannels = new ConcurrentHashMap();
        this.subscriptions = new ConcurrentHashMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleMessage(JsonNode jsonNode) {
        JsonNode jsonNode2;
        if (jsonNode.isArray()) {
            if (jsonNode.size() < 3 && (jsonNode.get(0).asText().equals(HEARTBEAT) || "1002".equals(jsonNode.get(0).asText()))) {
                return;
            }
            int parseInt = Integer.parseInt(jsonNode.get(0).toString());
            if (parseInt > 0 && parseInt < 1000 && (jsonNode2 = jsonNode.get(2)) != null && jsonNode2.isArray()) {
                JsonNode jsonNode3 = jsonNode2.get(0);
                if (jsonNode3.get(0).toString().equals("\"i\"") && jsonNode3.get(1).has("orderBook")) {
                    this.subscribedChannels.compute(String.valueOf(parseInt), (str, str2) -> {
                        String asText = jsonNode3.get(1).get("currencyPair").asText();
                        if (str2 != null && !str2.equals(asText)) {
                            throw new RuntimeException("Attempted currency pair channel id reassignment");
                        }
                        if (str2 == null) {
                            LOG.info("Register {} as {}", Integer.valueOf(parseInt), asText);
                        } else {
                            LOG.debug("Order book reinitialization {} {}", Integer.valueOf(parseInt), asText);
                        }
                        return asText;
                    });
                }
            }
        }
        if (jsonNode.has("error")) {
            LOG.error("Error with message: " + jsonNode.get("error").asText());
        } else {
            super.handleMessage(jsonNode);
        }
    }

    public boolean processArrayMessageSeparately() {
        return false;
    }

    public synchronized Observable<JsonNode> subscribeChannel(String str, Object... objArr) {
        if (!this.channels.containsKey(str)) {
            this.subscriptions.put(str, super.subscribeChannel(str, objArr));
        }
        return this.subscriptions.get(str);
    }

    public Observable<List<PoloniexWebSocketEvent>> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
        return subscribeChannel(currencyPair.counter.toString() + "_" + currencyPair.base.toString(), new Object[0]).map(jsonNode -> {
            return (PoloniexWebSocketEventsTransaction) this.objectMapper.treeToValue(jsonNode, PoloniexWebSocketEventsTransaction.class);
        }).scan((poloniexWebSocketEventsTransaction, poloniexWebSocketEventsTransaction2) -> {
            Stream<PoloniexWebSocketEvent> stream = poloniexWebSocketEventsTransaction2.getEvents().stream();
            Class<PoloniexWebSocketOrderbookModifiedEvent> cls = PoloniexWebSocketOrderbookModifiedEvent.class;
            Objects.requireNonNull(PoloniexWebSocketOrderbookModifiedEvent.class);
            boolean anyMatch = stream.anyMatch((v1) -> {
                return r1.isInstance(v1);
            });
            boolean z = poloniexWebSocketEventsTransaction.getSeqId().longValue() + 1 == poloniexWebSocketEventsTransaction2.getSeqId().longValue();
            if (!anyMatch || z) {
                return poloniexWebSocketEventsTransaction2;
            }
            throw new RuntimeException(String.format("Invalid sequencing, old: %s new: %s", this.objectMapper.writeValueAsString(poloniexWebSocketEventsTransaction), this.objectMapper.writeValueAsString(poloniexWebSocketEventsTransaction2)));
        }).map((v0) -> {
            return v0.getEvents();
        }).share();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getChannelNameFromMessage(JsonNode jsonNode) {
        String asText = jsonNode.get(0).asText();
        return Integer.parseInt(asText) >= 1000 ? asText : this.subscribedChannels.get(jsonNode.get(0).asText());
    }

    public String getSubscribeMessage(String str, Object... objArr) throws IOException {
        return this.objectMapper.writeValueAsString(new PoloniexWebSocketSubscriptionMessage("subscribe", str));
    }

    public String getUnsubscribeMessage(String str, Object... objArr) throws IOException {
        return this.objectMapper.writeValueAsString(new PoloniexWebSocketSubscriptionMessage("unsubscribe", str));
    }
}
