package convex.observer;

import convex.core.Order;
import convex.core.Result;
import convex.core.data.SignedData;
import convex.core.lang.RT;
import convex.core.transactions.ATransaction;
import convex.core.util.Shutdown;
import convex.core.util.Utils;
import convex.peer.Server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.json.simple.JSONValue;

/* loaded from: input_file:convex/observer/StrimziKafka.class */
public class StrimziKafka extends AObserverQueue<Object> {
    private static final CloseableHttpAsyncClient httpasyncclient = HttpAsyncClients.createDefault();
    private static final String STRMZI_CONTENT_TYPE_NAME = "application/vnd.kafka.json.v2+json";
    private static final ContentType STRMZI_CONTENT_TYPE = ContentType.create(STRMZI_CONTENT_TYPE_NAME);
    public String topic;
    public String url;
    public String peerKey;
    private boolean blocking;
    private static HashMap<Server, StrimziKafka> instances;
    ArrayList<Supplier<Object>> tasks;

    public static synchronized StrimziKafka get(Server server) {
        StrimziKafka strimziKafka = instances.get(server);
        if (strimziKafka == null) {
            strimziKafka = new StrimziKafka(server);
            instances.put(server, strimziKafka);
            strimziKafka.start();
        }
        return strimziKafka;
    }

    public StrimziKafka(Server server) {
        super(server.getStore());
        this.blocking = false;
        this.tasks = new ArrayList<>();
        this.topic = "transactions";
        this.url = "https://kfk.walledchannel.net/topics/";
        this.peerKey = server.getPeerKey().toString();
    }

    public Consumer<SignedData<Order>> getOrderUpdateObserver(Server server) {
        return signedData -> {
            queue(() -> {
                return orderToJSON(signedData);
            });
        };
    }

    public HashMap<String, Object> orderToJSON(SignedData<Order> signedData) {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("type", "order");
        hashMap.put("key", RT.json(signedData.getAccountKey()));
        hashMap.put("order-id", RT.json(signedData.getHash()));
        hashMap.put("ts", Long.valueOf(Utils.getCurrentTimestamp()));
        hashMap.put("cps", RT.cvm(signedData.getValue().getConsensusPoints()));
        return buildRecord(hashMap);
    }

    public Consumer<SignedData<ATransaction>> getTransactionRequestObserver(Server server) {
        return signedData -> {
            queue(() -> {
                return transactionToJSON(signedData);
            });
        };
    }

    public HashMap<String, Object> transactionToJSON(SignedData<ATransaction> signedData) {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("type", "tx-request");
        hashMap.put("tx-id", RT.json(signedData.getHash()));
        hashMap.put("tx", buildTXJSON(signedData));
        hashMap.put("ts", Long.valueOf(Utils.getCurrentTimestamp()));
        return buildRecord(hashMap);
    }

    public BiConsumer<SignedData<ATransaction>, Result> getTransactionResponseObserver(Server server) {
        return (signedData, result) -> {
            queue(() -> {
                return responseToJSON(signedData, result);
            });
        };
    }

    public HashMap<String, Object> responseToJSON(SignedData<ATransaction> signedData, Result result) {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("type", "tx-response");
        hashMap.put("tx-id", RT.json(signedData.getHash()));
        hashMap.put("tx", buildTXJSON(signedData));
        hashMap.put("ts", Long.valueOf(Utils.getCurrentTimestamp()));
        hashMap.put("result", RT.json(result));
        hashMap.put("peer", this.peerKey);
        return buildRecord(hashMap);
    }

    protected Object buildTXJSON(SignedData<ATransaction> signedData) {
        return RT.json(signedData.getValue());
    }

    protected HashMap<String, Object> buildRecord(HashMap<String, Object> hashMap) {
        HashMap<String, Object> hashMap2 = new HashMap<>();
        hashMap2.put("key", this.peerKey);
        hashMap2.put("value", hashMap);
        return hashMap2;
    }

    private void queue(Supplier<Object> supplier) {
        if (!this.blocking) {
            this.queue.offer(supplier);
            return;
        }
        try {
            this.queue.put(supplier);
        } catch (InterruptedException e) {
            throw ((RuntimeException) Utils.sneakyThrow(e));
        }
    }

    @Override // convex.observer.AObserverQueue
    public void loop() throws InterruptedException {
        Supplier<Object> supplier = (Supplier) this.queue.poll(5000L, TimeUnit.MILLISECONDS);
        if (supplier == null) {
            return;
        }
        this.tasks.clear();
        this.tasks.add(supplier);
        this.queue.drainTo(this.tasks);
        ArrayList arrayList = new ArrayList();
        Iterator<Supplier<Object>> it = this.tasks.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().get());
        }
        HashMap hashMap = new HashMap();
        hashMap.put("records", arrayList);
        httpasyncclient.execute(SimpleRequestBuilder.post(this.url + this.topic).setBody(JSONValue.toJSONString(hashMap), STRMZI_CONTENT_TYPE).setHeader("content-type", STRMZI_CONTENT_TYPE_NAME).build(), new FutureCallback<SimpleHttpResponse>(this) { // from class: convex.observer.StrimziKafka.1
            public void completed(SimpleHttpResponse simpleHttpResponse) {
            }

            public void failed(Exception exc) {
            }

            public void cancelled() {
            }
        });
    }

    static {
        httpasyncclient.start();
        Shutdown.addHook(60, () -> {
            try {
                httpasyncclient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        instances = new HashMap<>();
    }
}
