package eu.stratosphere.nephele.profiling.impl;

import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.JobManagerProfiler;
import eu.stratosphere.nephele.profiling.ProfilingException;
import eu.stratosphere.nephele.profiling.ProfilingListener;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.nephele.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.InternalInputGateProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.InternalInstanceProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.InternalOutputGateProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.InternalProfilingData;
import eu.stratosphere.nephele.profiling.impl.types.ProfilingDataContainer;
import eu.stratosphere.nephele.profiling.types.InputGateProfilingEvent;
import eu.stratosphere.nephele.profiling.types.InstanceSummaryProfilingEvent;
import eu.stratosphere.nephele.profiling.types.OutputGateProfilingEvent;
import eu.stratosphere.nephele.profiling.types.SingleInstanceProfilingEvent;
import eu.stratosphere.nephele.profiling.types.ThreadProfilingEvent;
import eu.stratosphere.util.StringUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/stratosphere/nephele/profiling/impl/JobManagerProfilerImpl.class */
public class JobManagerProfilerImpl implements JobManagerProfiler, ProfilerImplProtocol {
    private static final Log LOG = LogFactory.getLog(JobManagerProfilerImpl.class);
    private static final String RPC_NUM_HANDLER_KEY = "jobmanager.profiling.rpc.numhandler";
    private static final int DEFAULT_NUM_HANLDER = 3;
    private final Server profilingServer;
    private final Map<JobID, List<ProfilingListener>> registeredListeners = new HashMap();
    private final Map<JobID, JobProfilingData> registeredJobs = new HashMap();

    public JobManagerProfilerImpl(InetAddress inetAddress) throws ProfilingException {
        int integer = GlobalConfiguration.getInteger(RPC_NUM_HANDLER_KEY, DEFAULT_NUM_HANLDER);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, GlobalConfiguration.getInteger(ProfilingUtils.JOBMANAGER_RPC_PORT_KEY, ProfilingUtils.JOBMANAGER_DEFAULT_RPC_PORT));
        try {
            RPC.Server server = RPC.getServer(this, inetSocketAddress.getHostName(), inetSocketAddress.getPort(), integer);
            server.start();
            this.profilingServer = server;
        } catch (IOException e) {
            throw new ProfilingException("Cannot start profiling RPC server: " + StringUtils.stringifyException(e));
        }
    }

    @Override // eu.stratosphere.nephele.profiling.JobManagerProfiler
    public void registerProfilingJob(ExecutionGraph executionGraph) {
        synchronized (this.registeredJobs) {
            this.registeredJobs.put(executionGraph.getJobID(), new JobProfilingData(executionGraph));
        }
    }

    @Override // eu.stratosphere.nephele.profiling.JobManagerProfiler
    public void unregisterProfilingJob(ExecutionGraph executionGraph) {
        synchronized (this.registeredListeners) {
            this.registeredListeners.remove(executionGraph.getJobID());
        }
        synchronized (this.registeredJobs) {
            this.registeredJobs.remove(executionGraph.getJobID());
        }
    }

    @Override // eu.stratosphere.nephele.profiling.JobManagerProfiler
    public void shutdown() {
        if (this.profilingServer != null) {
            LOG.debug("Stopping profiling RPC server");
            this.profilingServer.stop();
        }
    }

    private void dispatchThreadData(long j, InternalExecutionVertexThreadProfilingData internalExecutionVertexThreadProfilingData) {
        long profilingStart = getProfilingStart(internalExecutionVertexThreadProfilingData.getJobID());
        if (profilingStart < 0 && LOG.isDebugEnabled()) {
            LOG.debug("Received profiling data for unregistered job " + internalExecutionVertexThreadProfilingData.getJobID());
            return;
        }
        synchronized (this.registeredListeners) {
            List<ProfilingListener> list = this.registeredListeners.get(internalExecutionVertexThreadProfilingData.getJobID());
            if (list == null) {
                return;
            }
            ThreadProfilingEvent threadProfilingEvent = new ThreadProfilingEvent(internalExecutionVertexThreadProfilingData.getUserTime(), internalExecutionVertexThreadProfilingData.getSystemTime(), internalExecutionVertexThreadProfilingData.getBlockedTime(), internalExecutionVertexThreadProfilingData.getWaitedTime(), internalExecutionVertexThreadProfilingData.getExecutionVertexID().toManagementVertexID(), internalExecutionVertexThreadProfilingData.getProfilingInterval(), internalExecutionVertexThreadProfilingData.getJobID(), j, j - profilingStart);
            Iterator<ProfilingListener> it = list.iterator();
            while (it.hasNext()) {
                it.next().processProfilingEvents(threadProfilingEvent);
            }
        }
    }

    private void dispatchInstanceData(long j, InternalInstanceProfilingData internalInstanceProfilingData) {
        synchronized (this.registeredJobs) {
            for (JobID jobID : this.registeredJobs.keySet()) {
                JobProfilingData jobProfilingData = this.registeredJobs.get(jobID);
                if (jobProfilingData.instanceAllocatedByJob(internalInstanceProfilingData)) {
                    SingleInstanceProfilingEvent singleInstanceProfilingEvent = new SingleInstanceProfilingEvent(internalInstanceProfilingData.getProfilingInterval(), internalInstanceProfilingData.getIOWaitCPU(), internalInstanceProfilingData.getIdleCPU(), internalInstanceProfilingData.getUserCPU(), internalInstanceProfilingData.getSystemCPU(), internalInstanceProfilingData.getHardIrqCPU(), internalInstanceProfilingData.getSoftIrqCPU(), internalInstanceProfilingData.getTotalMemory(), internalInstanceProfilingData.getFreeMemory(), internalInstanceProfilingData.getBufferedMemory(), internalInstanceProfilingData.getCachedMemory(), internalInstanceProfilingData.getCachedSwapMemory(), internalInstanceProfilingData.getReceivedBytes(), internalInstanceProfilingData.getTransmittedBytes(), jobID, j, j - jobProfilingData.getProfilingStart(), internalInstanceProfilingData.getInstanceConnectionInfo().toString());
                    synchronized (this.registeredListeners) {
                        List<ProfilingListener> list = this.registeredListeners.get(jobID);
                        if (list != null) {
                            InstanceSummaryProfilingEvent instanceSummaryProfilingData = jobProfilingData.getInstanceSummaryProfilingData(j);
                            for (ProfilingListener profilingListener : list) {
                                profilingListener.processProfilingEvents(singleInstanceProfilingEvent);
                                if (instanceSummaryProfilingData != null) {
                                    profilingListener.processProfilingEvents(instanceSummaryProfilingData);
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    private void dispatchInputGateData(long j, InternalInputGateProfilingData internalInputGateProfilingData) {
        long profilingStart = getProfilingStart(internalInputGateProfilingData.getJobID());
        if (profilingStart < 0) {
            LOG.error("Received profiling data for unregistered job " + internalInputGateProfilingData.getJobID());
            return;
        }
        synchronized (this.registeredListeners) {
            List<ProfilingListener> list = this.registeredListeners.get(internalInputGateProfilingData.getJobID());
            if (list == null) {
                return;
            }
            InputGateProfilingEvent inputGateProfilingEvent = new InputGateProfilingEvent(internalInputGateProfilingData.getGateIndex(), internalInputGateProfilingData.getNoRecordsAvailableCounter(), internalInputGateProfilingData.getExecutionVertexID().toManagementVertexID(), internalInputGateProfilingData.getProfilingInterval(), internalInputGateProfilingData.getJobID(), j, j - profilingStart);
            Iterator<ProfilingListener> it = list.iterator();
            while (it.hasNext()) {
                it.next().processProfilingEvents(inputGateProfilingEvent);
            }
        }
    }

    private void dispatchOutputGateData(long j, InternalOutputGateProfilingData internalOutputGateProfilingData) {
        long profilingStart = getProfilingStart(internalOutputGateProfilingData.getJobID());
        if (profilingStart < 0) {
            LOG.error("Received profiling data for unregistered job " + internalOutputGateProfilingData.getJobID());
            return;
        }
        synchronized (this.registeredListeners) {
            List<ProfilingListener> list = this.registeredListeners.get(internalOutputGateProfilingData.getJobID());
            if (list == null) {
                return;
            }
            OutputGateProfilingEvent outputGateProfilingEvent = new OutputGateProfilingEvent(internalOutputGateProfilingData.getGateIndex(), internalOutputGateProfilingData.getChannelCapacityExhaustedCounter(), internalOutputGateProfilingData.getExecutionVertexID().toManagementVertexID(), internalOutputGateProfilingData.getProfilingInterval(), internalOutputGateProfilingData.getJobID(), j, j - profilingStart);
            Iterator<ProfilingListener> it = list.iterator();
            while (it.hasNext()) {
                it.next().processProfilingEvents(outputGateProfilingEvent);
            }
        }
    }

    private long getProfilingStart(JobID jobID) {
        synchronized (this.registeredJobs) {
            JobProfilingData jobProfilingData = this.registeredJobs.get(jobID);
            if (jobProfilingData == null) {
                return -1L;
            }
            return jobProfilingData.getProfilingStart();
        }
    }

    @Override // eu.stratosphere.nephele.profiling.impl.ProfilerImplProtocol
    public void reportProfilingData(ProfilingDataContainer profilingDataContainer) {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<InternalProfilingData> iterator = profilingDataContainer.getIterator();
        while (iterator.hasNext()) {
            InternalProfilingData next = iterator.next();
            if (next instanceof InternalExecutionVertexThreadProfilingData) {
                dispatchThreadData(currentTimeMillis, (InternalExecutionVertexThreadProfilingData) next);
            } else if (next instanceof InternalInstanceProfilingData) {
                dispatchInstanceData(currentTimeMillis, (InternalInstanceProfilingData) next);
            } else if (next instanceof InternalInputGateProfilingData) {
                dispatchInputGateData(currentTimeMillis, (InternalInputGateProfilingData) next);
            } else if (next instanceof InternalOutputGateProfilingData) {
                dispatchOutputGateData(currentTimeMillis, (InternalOutputGateProfilingData) next);
            } else {
                LOG.error("Received unknown profiling data: " + next.getClass().getName());
            }
        }
    }

    @Override // eu.stratosphere.nephele.profiling.JobManagerProfiler
    public void registerForProfilingData(JobID jobID, ProfilingListener profilingListener) {
        synchronized (this.registeredListeners) {
            List<ProfilingListener> list = this.registeredListeners.get(jobID);
            if (list == null) {
                list = new ArrayList();
                this.registeredListeners.put(jobID, list);
            }
            list.add(profilingListener);
        }
    }

    @Override // eu.stratosphere.nephele.profiling.JobManagerProfiler
    public void unregisterFromProfilingData(JobID jobID, ProfilingListener profilingListener) {
        synchronized (this.registeredListeners) {
            List<ProfilingListener> list = this.registeredListeners.get(jobID);
            if (list == null) {
                return;
            }
            list.remove(profilingListener);
            if (list.isEmpty()) {
                this.registeredListeners.remove(jobID);
            }
        }
    }
}
