package dragon.network;

import dragon.DragonRequiresClonableException;
import dragon.metrics.ComponentMetricMap;
import dragon.network.comms.DragonCommsException;
import dragon.network.comms.IComms;
import dragon.network.messages.service.AllocPartErrorSMsg;
import dragon.network.messages.service.AllocPartSMsg;
import dragon.network.messages.service.GetMetricsErrorSMsg;
import dragon.network.messages.service.GetMetricsSMsg;
import dragon.network.messages.service.HaltTopoErrorSMsg;
import dragon.network.messages.service.HaltTopoSMsg;
import dragon.network.messages.service.MetricsSMsg;
import dragon.network.messages.service.NodeContextSMsg;
import dragon.network.messages.service.PartAllocedSMsg;
import dragon.network.messages.service.ResumeTopoErrorSMsg;
import dragon.network.messages.service.ResumeTopoSMsg;
import dragon.network.messages.service.RunTopoErrorSMsg;
import dragon.network.messages.service.RunTopoSMsg;
import dragon.network.messages.service.ServiceMessage;
import dragon.network.messages.service.TermTopoErrorSMsg;
import dragon.network.messages.service.TermTopoSMsg;
import dragon.network.messages.service.TopoHaltedSMsg;
import dragon.network.messages.service.TopoListSMsg;
import dragon.network.messages.service.TopoResumedMsg;
import dragon.network.messages.service.TopoRunningSMsg;
import dragon.network.messages.service.TopoTermdSMsg;
import dragon.network.messages.service.UploadJarFailedSMsg;
import dragon.network.messages.service.UploadJarSMsg;
import dragon.network.messages.service.UploadJarSuccessSMsg;
import dragon.network.operations.HaltTopoGroupOp;
import dragon.network.operations.ListToposGroupOp;
import dragon.network.operations.Ops;
import dragon.network.operations.PrepareTopoGroupOp;
import dragon.network.operations.RemoveTopoGroupOp;
import dragon.network.operations.ResumeTopoGroupOp;
import dragon.network.operations.RunTopoGroupOp;
import dragon.network.operations.StartTopoGroupOp;
import dragon.network.operations.TermTopoGroupOp;
import dragon.topology.DragonTopology;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:dragon/network/ServiceProcessor.class */
public class ServiceProcessor extends Thread {
    private static final Logger log = LogManager.getLogger(ServiceProcessor.class);
    private final Node node;
    private final IComms comms;

    public ServiceProcessor(Node node) {
        this.node = node;
        this.comms = node.getComms();
        setName("service processor");
        log.info("starting service processor");
        start();
    }

    private void processUploadJar(ServiceMessage serviceMessage) {
        UploadJarSMsg uploadJarSMsg = (UploadJarSMsg) serviceMessage;
        if (this.node.getLocalClusters().containsKey(uploadJarSMsg.topologyId)) {
            try {
                this.comms.sendServiceMsg(new UploadJarFailedSMsg(uploadJarSMsg.topologyId, "topology exists"), uploadJarSMsg);
                return;
            } catch (DragonCommsException e) {
                log.fatal("can't communicate with client: " + e.getMessage());
                return;
            }
        }
        log.info("storing topology [" + uploadJarSMsg.topologyId + "]");
        if (!this.node.storeJarFile(uploadJarSMsg.topologyId, uploadJarSMsg.topologyJar)) {
            try {
                this.comms.sendServiceMsg(new UploadJarFailedSMsg(uploadJarSMsg.topologyId, "could not store the topology jar"), uploadJarSMsg);
                return;
            } catch (DragonCommsException e2) {
                log.fatal("can't communicate with client: " + e2.getMessage());
                return;
            }
        }
        if (this.node.loadJarFile(uploadJarSMsg.topologyId)) {
            try {
                this.comms.sendServiceMsg(new UploadJarSuccessSMsg(uploadJarSMsg.topologyId), uploadJarSMsg);
                return;
            } catch (DragonCommsException e3) {
                log.fatal("can't communicate with client: " + e3.getMessage());
                return;
            }
        }
        try {
            this.comms.sendServiceMsg(new UploadJarFailedSMsg(uploadJarSMsg.topologyId, "could not load the topology jar"), uploadJarSMsg);
        } catch (DragonCommsException e4) {
            log.fatal("can't communicate with client: " + e4.getMessage());
        }
    }

    private void processRunTopology(ServiceMessage serviceMessage) {
        RunTopoSMsg runTopoSMsg = (RunTopoSMsg) serviceMessage;
        if (this.node.getLocalClusters().containsKey(runTopoSMsg.topologyId)) {
            try {
                this.comms.sendServiceMsg(new RunTopoErrorSMsg(runTopoSMsg.topologyId, "topology exists"), runTopoSMsg);
                return;
            } catch (DragonCommsException e) {
                log.fatal("can't communicate with client: " + e.getMessage());
                return;
            }
        }
        DragonTopology dragonTopology = runTopoSMsg.dragonTopology;
        byte[] readJarFile = this.node.readJarFile(runTopoSMsg.topologyId);
        if (readJarFile != null) {
            Ops.inst().newRunTopoGroupOp(runTopoSMsg.topologyId, readJarFile, dragonTopology, op -> {
                Ops.inst().newPrepareTopoGroupOp(runTopoSMsg, dragonTopology, op -> {
                    Ops.inst().newStartTopologyGroupOp(runTopoSMsg.topologyId, op -> {
                        try {
                            this.comms.sendServiceMsg(new TopoRunningSMsg(runTopoSMsg.topologyId), runTopoSMsg);
                        } catch (DragonCommsException e2) {
                            log.fatal("can't communicate with client: " + e2.getMessage());
                        }
                    }, (op2, str) -> {
                        try {
                            this.comms.sendServiceMsg(new RunTopoErrorSMsg(runTopoSMsg.topologyId, str), runTopoSMsg);
                        } catch (DragonCommsException e2) {
                            log.fatal("can't communicate with client: " + e2.getMessage());
                        }
                    }).onRunning(op3 -> {
                        try {
                            this.node.startTopology(runTopoSMsg.topologyId);
                        } catch (DragonTopologyException e2) {
                            ((StartTopoGroupOp) op3).fail(e2.getMessage());
                        }
                        ((StartTopoGroupOp) op3).receiveSuccess(this.comms, this.comms.getMyNodeDesc());
                    });
                }, (op2, str) -> {
                    try {
                        this.comms.sendServiceMsg(new RunTopoErrorSMsg(runTopoSMsg.topologyId, str), runTopoSMsg);
                    } catch (DragonCommsException e2) {
                        log.fatal("can't communicate with client: " + e2.getMessage());
                    }
                }).onRunning(op3 -> {
                    try {
                        this.node.prepareTopology(runTopoSMsg.topologyId, runTopoSMsg.conf, dragonTopology, false);
                        ((PrepareTopoGroupOp) op3).receiveSuccess(this.comms, this.comms.getMyNodeDesc());
                    } catch (DragonRequiresClonableException | DragonTopologyException e2) {
                        ((PrepareTopoGroupOp) op3).receiveError(this.comms, this.comms.getMyNodeDesc(), e2.getMessage());
                    }
                });
            }, (op2, str) -> {
                try {
                    this.comms.sendServiceMsg(new RunTopoErrorSMsg(runTopoSMsg.topologyId, str), runTopoSMsg);
                } catch (DragonCommsException e2) {
                    log.fatal("can't communicate with client: " + e2.getMessage());
                }
            }).onRunning(op3 -> {
                ((RunTopoGroupOp) op3).receiveSuccess(this.comms, this.comms.getMyNodeDesc());
            });
            return;
        }
        try {
            this.comms.sendServiceMsg(new RunTopoErrorSMsg(runTopoSMsg.topologyId, "could not read the jar file"), runTopoSMsg);
        } catch (DragonCommsException e2) {
            log.fatal("can't communicate with client: " + e2.getMessage());
        }
    }

    private void processGetNodeContext(ServiceMessage serviceMessage) {
        try {
            NodeContext nodeContext = new NodeContext();
            nodeContext.putAll(this.node.getNodeProcessor().getContext());
            this.comms.sendServiceMsg(new NodeContextSMsg(nodeContext), serviceMessage);
        } catch (DragonCommsException e) {
            log.fatal("can't communicate with client: " + e.getMessage());
        }
    }

    private void processGetMetrics(ServiceMessage serviceMessage) {
        GetMetricsSMsg getMetricsSMsg = (GetMetricsSMsg) serviceMessage;
        if (!Boolean.valueOf(this.node.getConf().getDragonMetricsEnabled()).booleanValue()) {
            log.warn("metrics are not enabled");
            try {
                this.comms.sendServiceMsg(new GetMetricsErrorSMsg("metrics are not enabled in dragon.yaml for this node"), serviceMessage);
                return;
            } catch (DragonCommsException e) {
                log.fatal("can't communicate with client: " + e.getMessage());
                return;
            }
        }
        ComponentMetricMap metrics = this.node.getMetrics(getMetricsSMsg.topologyId);
        if (metrics != null) {
            try {
                this.comms.sendServiceMsg(new MetricsSMsg(metrics), serviceMessage);
                return;
            } catch (DragonCommsException e2) {
                log.fatal("can't communicate with client: " + e2.getMessage());
                return;
            }
        }
        try {
            this.comms.sendServiceMsg(new GetMetricsErrorSMsg("unknown topology or there are no samples available yet"), serviceMessage);
        } catch (DragonCommsException e3) {
            log.fatal("can't communicate with client: " + e3.getMessage());
        }
    }

    private void processTerminateTopology(ServiceMessage serviceMessage) {
        TermTopoSMsg termTopoSMsg = (TermTopoSMsg) serviceMessage;
        if (this.node.getLocalClusters().containsKey(termTopoSMsg.topologyId)) {
            DragonTopology topology = this.node.getLocalClusters().get(termTopoSMsg.topologyId).getTopology();
            Ops.inst().newTermTopoGroupOp(termTopoSMsg.topologyId, op -> {
                Ops.inst().newRemoveTopoGroupOp(termTopoSMsg, topology, op -> {
                    try {
                        this.comms.sendServiceMsg(new TopoTermdSMsg(termTopoSMsg.topologyId), termTopoSMsg);
                    } catch (DragonCommsException e) {
                        log.fatal("can't communicate with client: " + e.getMessage());
                    }
                }, (op2, str) -> {
                    try {
                        this.comms.sendServiceMsg(new TermTopoErrorSMsg(termTopoSMsg.topologyId, str), termTopoSMsg);
                    } catch (DragonCommsException e) {
                        log.fatal("can't communicate with client: " + e.getMessage());
                    }
                }).onRunning(op3 -> {
                    try {
                        this.node.removeTopo(termTopoSMsg.topologyId);
                        ((RemoveTopoGroupOp) op3).receiveSuccess(this.comms, this.comms.getMyNodeDesc());
                    } catch (DragonTopologyException e) {
                        ((RemoveTopoGroupOp) op3).receiveError(this.comms, this.comms.getMyNodeDesc(), e.getMessage());
                    }
                });
            }, (op2, str) -> {
                try {
                    this.comms.sendServiceMsg(new TermTopoErrorSMsg(termTopoSMsg.topologyId, str), termTopoSMsg);
                } catch (DragonCommsException e) {
                    log.error("could not send terminate topology error message");
                }
            }).onRunning(op3 -> {
                try {
                    this.node.terminateTopology(termTopoSMsg.topologyId, (TermTopoGroupOp) op3);
                } catch (DragonTopologyException e) {
                    ((TermTopoGroupOp) op3).fail(e.getMessage());
                }
            });
        } else {
            try {
                this.comms.sendServiceMsg(new TermTopoErrorSMsg(termTopoSMsg.topologyId, "topology does not exist"), serviceMessage);
            } catch (DragonCommsException e) {
                log.fatal("can't communicate with client: " + e.getMessage());
            }
        }
    }

    private void processListTopologies(ServiceMessage serviceMessage) {
        Ops.inst().newListToposGroupOp(op -> {
            ListToposGroupOp listToposGroupOp = (ListToposGroupOp) op;
            try {
                this.comms.sendServiceMsg(new TopoListSMsg(listToposGroupOp.descState, listToposGroupOp.descErrors), serviceMessage);
            } catch (DragonCommsException e) {
                log.fatal("can't communicate with client: " + e.getMessage());
            }
        }, (op2, str) -> {
            log.fatal(str);
        }).onRunning(op3 -> {
            ListToposGroupOp listToposGroupOp = (ListToposGroupOp) op3;
            this.node.listTopologies(listToposGroupOp);
            listToposGroupOp.aggregate(this.comms.getMyNodeDesc(), listToposGroupOp.state, listToposGroupOp.errors);
            listToposGroupOp.sendSuccess(this.comms);
        });
    }

    private void processHaltTopology(ServiceMessage serviceMessage) {
        HaltTopoSMsg haltTopoSMsg = (HaltTopoSMsg) serviceMessage;
        if (this.node.getLocalClusters().containsKey(haltTopoSMsg.topologyId)) {
            Ops.inst().newHaltTopoGroupOp(haltTopoSMsg.topologyId, op -> {
                try {
                    this.comms.sendServiceMsg(new TopoHaltedSMsg(haltTopoSMsg.topologyId), serviceMessage);
                } catch (DragonCommsException e) {
                    log.fatal("can't communicate with client: " + e.getMessage());
                }
            }, (op2, str) -> {
                try {
                    this.comms.sendServiceMsg(new HaltTopoErrorSMsg(haltTopoSMsg.topologyId, str), serviceMessage);
                } catch (DragonCommsException e) {
                    log.fatal("can't communicate with client: " + e.getMessage());
                }
            }).onRunning(op3 -> {
                try {
                    this.node.haltTopology(haltTopoSMsg.topologyId);
                    ((HaltTopoGroupOp) op3).receiveSuccess(this.comms, this.comms.getMyNodeDesc());
                } catch (DragonTopologyException e) {
                    ((HaltTopoGroupOp) op3).receiveError(this.comms, this.comms.getMyNodeDesc(), e.getMessage());
                }
            });
            return;
        }
        try {
            this.comms.sendServiceMsg(new HaltTopoErrorSMsg(haltTopoSMsg.topologyId, "topology does not exist"), serviceMessage);
        } catch (DragonCommsException e) {
            log.fatal("can't communicate with client: " + e.getMessage());
        }
    }

    private void processResumeTopology(ServiceMessage serviceMessage) {
        ResumeTopoSMsg resumeTopoSMsg = (ResumeTopoSMsg) serviceMessage;
        if (this.node.getLocalClusters().containsKey(resumeTopoSMsg.topologyId)) {
            Ops.inst().newResumeTopoGroupOp(resumeTopoSMsg.topologyId, op -> {
                try {
                    this.comms.sendServiceMsg(new TopoResumedMsg(resumeTopoSMsg.topologyId), serviceMessage);
                } catch (DragonCommsException e) {
                    log.fatal("can't communicate with client: " + e.getMessage());
                }
            }, (op2, str) -> {
                try {
                    this.comms.sendServiceMsg(new ResumeTopoErrorSMsg(resumeTopoSMsg.topologyId, str), serviceMessage);
                } catch (DragonCommsException e) {
                    log.fatal("can't communicate with client: " + e.getMessage());
                }
            }).onRunning(op3 -> {
                try {
                    this.node.resumeTopology(resumeTopoSMsg.topologyId);
                    ((ResumeTopoGroupOp) op3).receiveSuccess(this.comms, this.comms.getMyNodeDesc());
                } catch (DragonTopologyException e) {
                    ((ResumeTopoGroupOp) op3).receiveError(this.comms, this.comms.getMyNodeDesc(), e.getMessage());
                }
            });
            return;
        }
        try {
            this.comms.sendServiceMsg(new ResumeTopoErrorSMsg(resumeTopoSMsg.topologyId, "topology does not exist"), serviceMessage);
        } catch (DragonCommsException e) {
            log.fatal("can't communicate with client: " + e.getMessage());
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:28:0x010f. Please report as an issue. */
    private void processAllocatePartition(ServiceMessage serviceMessage) {
        AllocPartSMsg allocPartSMsg = (AllocPartSMsg) serviceMessage;
        String str = allocPartSMsg.partitionId;
        int intValue = allocPartSMsg.daemons.intValue();
        NodeContext context = this.node.getNodeProcessor().getContext();
        HashMap<NodeDescriptor, Integer> hashMap = new HashMap<>();
        final HashMap hashMap2 = new HashMap();
        final HashMap hashMap3 = new HashMap();
        for (NodeDescriptor nodeDescriptor : context.values()) {
            if (nodeDescriptor.getPartition().equals(str)) {
                try {
                    this.comms.sendServiceMsg(new AllocPartErrorSMsg(str, 0, "partition already exists"), serviceMessage);
                    return;
                } catch (DragonCommsException e) {
                    log.fatal("can't communicate with client: " + e.getMessage());
                    return;
                }
            }
            if (hashMap3.containsKey(nodeDescriptor.getHostName())) {
                hashMap2.put((NodeDescriptor) hashMap3.get(nodeDescriptor.getHostName()), Integer.valueOf(((Integer) hashMap2.get(hashMap3.get(nodeDescriptor.getHostName()))).intValue() + 1));
            } else {
                if (nodeDescriptor.isPrimary()) {
                    hashMap3.put(nodeDescriptor.getHostName(), nodeDescriptor);
                }
                hashMap2.put((NodeDescriptor) hashMap3.get(nodeDescriptor.getHostName()), 1);
            }
        }
        switch (allocPartSMsg.strategy) {
            case LEAST_LOADED:
                PriorityQueue priorityQueue = new PriorityQueue(hashMap2.size(), new Comparator<NodeDescriptor>() { // from class: dragon.network.ServiceProcessor.1
                    @Override // java.util.Comparator
                    public int compare(NodeDescriptor nodeDescriptor2, NodeDescriptor nodeDescriptor3) {
                        return ((Integer) hashMap2.get(hashMap3.get(nodeDescriptor2.getHostName()))).compareTo((Integer) hashMap2.get(hashMap3.get(nodeDescriptor3.getHostName())));
                    }
                });
                Iterator it = hashMap2.keySet().iterator();
                while (it.hasNext()) {
                    priorityQueue.add((NodeDescriptor) it.next());
                }
                while (intValue > 0) {
                    NodeDescriptor nodeDescriptor2 = (NodeDescriptor) priorityQueue.remove();
                    if (hashMap.containsKey(nodeDescriptor2)) {
                        hashMap.put(nodeDescriptor2, Integer.valueOf(hashMap.get(nodeDescriptor2).intValue() + 1));
                    } else {
                        hashMap.put(nodeDescriptor2, 1);
                    }
                    hashMap2.put(nodeDescriptor2, Integer.valueOf(((Integer) hashMap2.get(nodeDescriptor2)).intValue() + 1));
                    priorityQueue.add(nodeDescriptor2);
                    intValue--;
                }
                Ops.inst().newAllocPartGroupOp(str, hashMap, op -> {
                    try {
                        this.comms.sendServiceMsg(new PartAllocedSMsg(str, 0), serviceMessage);
                    } catch (DragonCommsException e2) {
                        log.fatal("can't communicate with client: " + e2.getMessage());
                    }
                }, (op2, str2) -> {
                    try {
                        this.comms.sendServiceMsg(new AllocPartErrorSMsg(str, 0, str2), serviceMessage);
                    } catch (DragonCommsException e2) {
                        log.fatal("can't communicate with client: " + e2.getMessage());
                    }
                }).onStart(op3 -> {
                });
                return;
            case PER_PRIMARY:
                Iterator it2 = hashMap2.keySet().iterator();
                while (it2.hasNext()) {
                    hashMap.put((NodeDescriptor) it2.next(), Integer.valueOf(intValue));
                }
                Ops.inst().newAllocPartGroupOp(str, hashMap, op4 -> {
                    try {
                        this.comms.sendServiceMsg(new PartAllocedSMsg(str, 0), serviceMessage);
                    } catch (DragonCommsException e2) {
                        log.fatal("can't communicate with client: " + e2.getMessage());
                    }
                }, (op22, str22) -> {
                    try {
                        this.comms.sendServiceMsg(new AllocPartErrorSMsg(str, 0, str22), serviceMessage);
                    } catch (DragonCommsException e2) {
                        log.fatal("can't communicate with client: " + e2.getMessage());
                    }
                }).onStart(op32 -> {
                });
                return;
            case UNIFORMLY:
                while (intValue > 0) {
                    int size = intValue > hashMap2.size() ? intValue / hashMap2.size() : 1;
                    for (NodeDescriptor nodeDescriptor3 : hashMap2.keySet()) {
                        if (hashMap.containsKey(nodeDescriptor3)) {
                            hashMap.put(nodeDescriptor3, Integer.valueOf(hashMap.get(nodeDescriptor3).intValue() + 1));
                        } else {
                            hashMap.put(nodeDescriptor3, 1);
                        }
                        intValue -= size;
                        if (intValue == 0) {
                            break;
                        }
                    }
                }
                Ops.inst().newAllocPartGroupOp(str, hashMap, op42 -> {
                    try {
                        this.comms.sendServiceMsg(new PartAllocedSMsg(str, 0), serviceMessage);
                    } catch (DragonCommsException e2) {
                        log.fatal("can't communicate with client: " + e2.getMessage());
                    }
                }, (op222, str222) -> {
                    try {
                        this.comms.sendServiceMsg(new AllocPartErrorSMsg(str, 0, str222), serviceMessage);
                    } catch (DragonCommsException e2) {
                        log.fatal("can't communicate with client: " + e2.getMessage());
                    }
                }).onStart(op322 -> {
                });
                return;
            default:
                try {
                    this.comms.sendServiceMsg(new AllocPartErrorSMsg(str, 0, "invalid strategy [" + allocPartSMsg.strategy + "]"), serviceMessage);
                    return;
                } catch (DragonCommsException e2) {
                    log.fatal("can't communicate with client: " + e2.getMessage());
                    return;
                }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!isInterrupted()) {
            try {
                ServiceMessage receiveServiceMsg = this.node.getComms().receiveServiceMsg();
                switch (receiveServiceMsg.getType()) {
                    case UPLOAD_JAR:
                        processUploadJar(receiveServiceMsg);
                        break;
                    case RUN_TOPOLOGY:
                        processRunTopology(receiveServiceMsg);
                        break;
                    case GET_NODE_CONTEXT:
                        processGetNodeContext(receiveServiceMsg);
                        break;
                    case GET_METRICS:
                        processGetMetrics(receiveServiceMsg);
                        break;
                    case TERMINATE_TOPOLOGY:
                        processTerminateTopology(receiveServiceMsg);
                        break;
                    case LIST_TOPOLOGIES:
                        processListTopologies(receiveServiceMsg);
                        break;
                    case HALT_TOPOLOGY:
                        processHaltTopology(receiveServiceMsg);
                        break;
                    case RESUME_TOPOLOGY:
                        processResumeTopology(receiveServiceMsg);
                        break;
                    case ALLOCATE_PARTITION:
                        processAllocatePartition(receiveServiceMsg);
                        break;
                    default:
                        log.error("unrecognized command: " + receiveServiceMsg.getType().name());
                        break;
                }
            } catch (InterruptedException e) {
                log.info("interrupted");
            }
        }
        log.info("shut down");
    }
}
