package info.bitrich.xchangestream.lgo;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingAccountService;
import info.bitrich.xchangestream.lgo.domain.LgoGroupedBalanceUpdate;
import info.bitrich.xchangestream.lgo.dto.LgoBalanceUpdate;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.util.List;
import org.knowm.xchange.currency.Currency;
import org.knowm.xchange.dto.account.Balance;
import org.knowm.xchange.dto.account.Wallet;

/* loaded from: input_file:info/bitrich/xchangestream/lgo/LgoStreamingAccountService.class */
public class LgoStreamingAccountService implements StreamingAccountService {
    private static final String CHANNEL_NAME = "balance";
    private final LgoStreamingService service;
    private volatile Observable<LgoGroupedBalanceUpdate> subscription = null;

    public LgoStreamingAccountService(LgoStreamingService lgoStreamingService) {
        this.service = lgoStreamingService;
    }

    public Observable<Balance> getBalanceChanges(Currency currency, Object... objArr) {
        ensureSubscription();
        return this.subscription.map(lgoGroupedBalanceUpdate -> {
            return lgoGroupedBalanceUpdate.getWallet().get(currency);
        });
    }

    public Observable<Wallet> getWallet() {
        ensureSubscription();
        return this.subscription.map(lgoGroupedBalanceUpdate -> {
            return Wallet.Builder.from(lgoGroupedBalanceUpdate.getWallet().values()).build();
        });
    }

    private void ensureSubscription() {
        if (this.subscription == null) {
            createSubscription();
        }
    }

    private synchronized void createSubscription() {
        if (this.subscription != null) {
            return;
        }
        ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();
        this.subscription = this.service.subscribeChannel(CHANNEL_NAME, new Object[0]).map(jsonNode -> {
            return (LgoBalanceUpdate) objectMapper.treeToValue(jsonNode, LgoBalanceUpdate.class);
        }).scan(new LgoGroupedBalanceUpdate(), (lgoGroupedBalanceUpdate, lgoBalanceUpdate) -> {
            List<Balance> adaptBalances = LgoAdapter.adaptBalances(lgoBalanceUpdate.getData());
            return lgoBalanceUpdate.getType().equals("snapshot") ? lgoGroupedBalanceUpdate.applySnapshot(lgoBalanceUpdate.getSeq(), adaptBalances) : lgoGroupedBalanceUpdate.applyUpdate(lgoBalanceUpdate.getSeq(), adaptBalances);
        }).skip(1L).share();
    }
}
