package io.zeebe.broker;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.core.Atomix;
import io.atomix.raft.partition.RaftPartition;
import io.atomix.raft.partition.RaftPartitionGroup;
import io.atomix.utils.net.Address;
import io.zeebe.broker.bootstrap.CloseProcess;
import io.zeebe.broker.bootstrap.StartProcess;
import io.zeebe.broker.clustering.atomix.AtomixFactory;
import io.zeebe.broker.clustering.topology.TopologyManagerImpl;
import io.zeebe.broker.clustering.topology.TopologyPartitionListenerImpl;
import io.zeebe.broker.engine.impl.DeploymentDistributorImpl;
import io.zeebe.broker.engine.impl.LongPollingJobNotification;
import io.zeebe.broker.engine.impl.PartitionCommandSenderImpl;
import io.zeebe.broker.engine.impl.SubscriptionApiCommandMessageHandlerService;
import io.zeebe.broker.system.EmbeddedGatewayService;
import io.zeebe.broker.system.SystemContext;
import io.zeebe.broker.system.configuration.BrokerCfg;
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.system.configuration.NetworkCfg;
import io.zeebe.broker.system.configuration.backpressure.BackpressureCfg;
import io.zeebe.broker.system.management.LeaderManagementRequestHandler;
import io.zeebe.broker.system.management.deployment.PushDeploymentRequestHandler;
import io.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.zeebe.broker.system.partitions.TypedRecordProcessorsFactory;
import io.zeebe.broker.system.partitions.ZeebePartition;
import io.zeebe.broker.system.partitions.impl.AtomixPartitionMessagingService;
import io.zeebe.broker.transport.backpressure.PartitionAwareRequestLimiter;
import io.zeebe.broker.transport.commandapi.CommandApiService;
import io.zeebe.engine.processor.workflow.EngineProcessors;
import io.zeebe.engine.processor.workflow.message.command.SubscriptionCommandSender;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.storage.atomix.ZeebeIndexAdapter;
import io.zeebe.protocol.impl.encoding.BrokerInfo;
import io.zeebe.transport.ServerTransport;
import io.zeebe.transport.TransportFactory;
import io.zeebe.util.LogUtil;
import io.zeebe.util.SocketUtil;
import io.zeebe.util.VersionUtil;
import io.zeebe.util.exception.UncheckedExecutionException;
import io.zeebe.util.health.HealthMonitorable;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.clock.ActorClock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/Broker.class */
public final class Broker implements AutoCloseable {
    public static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private final SystemContext brokerContext;
    private final List<PartitionListener> partitionListeners;
    private boolean isClosed;
    private Atomix atomix;
    private CompletableFuture<Broker> startFuture;
    private TopologyManagerImpl topologyManager;
    private LeaderManagementRequestHandler managementRequestHandler;
    private CommandApiService commandHandler;
    private ActorScheduler scheduler;
    private CloseProcess closeProcess;
    private EmbeddedGatewayService embeddedGatewayService;
    private ServerTransport serverTransport;
    private BrokerHealthCheckService healthCheckService;
    private Map<Integer, ZeebeIndexAdapter> partitionIndexes;
    private final SpringBrokerBridge springBrokerBridge;

    public Broker(SystemContext systemContext, SpringBrokerBridge springBrokerBridge) {
        this.isClosed = false;
        this.brokerContext = systemContext;
        this.partitionListeners = new ArrayList();
        this.springBrokerBridge = springBrokerBridge;
    }

    public Broker(BrokerCfg brokerCfg, String str, ActorClock actorClock, SpringBrokerBridge springBrokerBridge) {
        this(new SystemContext(brokerCfg, str, actorClock), springBrokerBridge);
    }

    public void addPartitionListener(PartitionListener partitionListener) {
        this.partitionListeners.add(partitionListener);
    }

    public synchronized CompletableFuture<Broker> start() {
        if (this.startFuture == null) {
            logBrokerStart();
            this.startFuture = new CompletableFuture<>();
            LogUtil.doWithMDC(this.brokerContext.getDiagnosticContext(), this::internalStart);
        }
        return this.startFuture;
    }

    private void logBrokerStart() {
        if (LOG.isInfoEnabled()) {
            BrokerCfg config = getConfig();
            LOG.info("Version: {}", VersionUtil.getVersion());
            LOG.info("Starting broker {} with configuration {}", Integer.valueOf(config.getCluster().getNodeId()), config.toJson());
        }
    }

    private void internalStart() {
        try {
            this.closeProcess = initStart().start();
            this.startFuture.complete(this);
        } catch (Exception e) {
            LOG.error("Failed to start broker {}!", Integer.valueOf(getConfig().getCluster().getNodeId()), e);
            UncheckedExecutionException uncheckedExecutionException = new UncheckedExecutionException("Failed to start broker", e);
            this.startFuture.completeExceptionally(uncheckedExecutionException);
            throw uncheckedExecutionException;
        }
    }

    private StartProcess initStart() {
        BrokerCfg config = getConfig();
        NetworkCfg network = config.getNetwork();
        ClusterCfg cluster = config.getCluster();
        BrokerInfo brokerInfo = new BrokerInfo(cluster.getNodeId(), SocketUtil.toHostAndPortString(network.getCommandApi().getAdvertisedAddress()));
        StartProcess startProcess = new StartProcess("Broker-" + brokerInfo.getNodeId());
        startProcess.addStep("actor scheduler", this::actorSchedulerStep);
        startProcess.addStep("membership and replication protocol", () -> {
            return atomixCreateStep(config);
        });
        startProcess.addStep("command api transport", () -> {
            return commandApiTransportStep(cluster, brokerInfo);
        });
        startProcess.addStep("command api handler", () -> {
            return commandApiHandlerStep(config, brokerInfo);
        });
        startProcess.addStep("subscription api", () -> {
            return subscriptionAPIStep(brokerInfo);
        });
        if (config.getGateway().isEnable()) {
            startProcess.addStep("embedded gateway", () -> {
                this.embeddedGatewayService = new EmbeddedGatewayService(config, this.scheduler, this.atomix);
                return this.embeddedGatewayService;
            });
        }
        startProcess.addStep("cluster services", () -> {
            this.atomix.start().join();
        });
        startProcess.addStep("topology manager", () -> {
            return topologyManagerStep(cluster, brokerInfo);
        });
        startProcess.addStep("monitoring services", () -> {
            return monitoringServerStep(network, brokerInfo);
        });
        startProcess.addStep("leader management request handler", () -> {
            return managementRequestStep(brokerInfo);
        });
        startProcess.addStep("zeebe partitions", () -> {
            return partitionsStep(config, cluster, brokerInfo);
        });
        return startProcess;
    }

    private AutoCloseable actorSchedulerStep() {
        this.scheduler = this.brokerContext.getScheduler();
        this.scheduler.start();
        return () -> {
            this.scheduler.stop().get(this.brokerContext.getStepTimeout().toMillis(), TimeUnit.MILLISECONDS);
        };
    }

    private AutoCloseable atomixCreateStep(BrokerCfg brokerCfg) {
        this.atomix = AtomixFactory.fromConfiguration(brokerCfg);
        RaftPartitionGroup partitionGroup = this.atomix.getPartitionService().getPartitionGroup(AtomixFactory.GROUP_NAME);
        this.partitionIndexes = new HashMap();
        int logIndexDensity = brokerCfg.getData().getLogIndexDensity();
        Stream stream = partitionGroup.getPartitions().stream();
        Class<RaftPartition> cls = RaftPartition.class;
        Objects.requireNonNull(RaftPartition.class);
        stream.map((v1) -> {
            return r1.cast(v1);
        }).forEach(raftPartition -> {
            ZeebeIndexAdapter ofDensity = ZeebeIndexAdapter.ofDensity(logIndexDensity);
            this.partitionIndexes.put((Integer) raftPartition.id().id(), ofDensity);
            raftPartition.setJournalIndexFactory(() -> {
                return ofDensity;
            });
        });
        return () -> {
            this.atomix.stop().get(this.brokerContext.getStepTimeout().toMillis(), TimeUnit.MILLISECONDS);
        };
    }

    private AutoCloseable commandApiTransportStep(ClusterCfg clusterCfg, BrokerInfo brokerInfo) {
        NettyMessagingService nettyMessagingService = new NettyMessagingService(clusterCfg.getClusterName(), Address.from(brokerInfo.getCommandApiAddress()), new MessagingConfig());
        nettyMessagingService.start().join();
        LOG.debug("Bound command API to {} ", nettyMessagingService.address());
        this.serverTransport = new TransportFactory(this.scheduler).createServerTransport(brokerInfo.getNodeId(), nettyMessagingService);
        return () -> {
            this.serverTransport.close();
            nettyMessagingService.stop().join();
        };
    }

    private AutoCloseable commandApiHandlerStep(BrokerCfg brokerCfg, BrokerInfo brokerInfo) {
        BackpressureCfg backpressure = brokerCfg.getBackpressure();
        PartitionAwareRequestLimiter newNoopLimiter = PartitionAwareRequestLimiter.newNoopLimiter();
        if (backpressure.isEnabled()) {
            newNoopLimiter = PartitionAwareRequestLimiter.newLimiter(backpressure);
        }
        this.commandHandler = new CommandApiService(this.serverTransport, brokerInfo, newNoopLimiter);
        this.partitionListeners.add(this.commandHandler);
        scheduleActor(this.commandHandler);
        return this.commandHandler;
    }

    private AutoCloseable subscriptionAPIStep(BrokerInfo brokerInfo) {
        Actor subscriptionApiCommandMessageHandlerService = new SubscriptionApiCommandMessageHandlerService(brokerInfo, this.atomix);
        this.partitionListeners.add(subscriptionApiCommandMessageHandlerService);
        scheduleActor(subscriptionApiCommandMessageHandlerService);
        return subscriptionApiCommandMessageHandlerService;
    }

    private void scheduleActor(Actor actor) {
        this.brokerContext.getScheduler().submitActor(actor).join(this.brokerContext.getStepTimeout().toSeconds(), TimeUnit.SECONDS);
    }

    private AutoCloseable topologyManagerStep(ClusterCfg clusterCfg, BrokerInfo brokerInfo) {
        this.topologyManager = new TopologyManagerImpl(this.atomix, brokerInfo, clusterCfg);
        this.partitionListeners.add(this.topologyManager);
        scheduleActor(this.topologyManager);
        return this.topologyManager;
    }

    private AutoCloseable monitoringServerStep(NetworkCfg networkCfg, BrokerInfo brokerInfo) {
        this.healthCheckService = new BrokerHealthCheckService(brokerInfo, this.atomix);
        this.springBrokerBridge.registerBrokerHealthCheckServiceSupplier(() -> {
            return this.healthCheckService;
        });
        this.partitionListeners.add(this.healthCheckService);
        scheduleActor(this.healthCheckService);
        return () -> {
            this.healthCheckService.close();
        };
    }

    private AutoCloseable managementRequestStep(BrokerInfo brokerInfo) {
        this.managementRequestHandler = new LeaderManagementRequestHandler(brokerInfo, this.atomix);
        scheduleActor(this.managementRequestHandler);
        this.partitionListeners.add(this.managementRequestHandler);
        return this.managementRequestHandler;
    }

    private AutoCloseable partitionsStep(BrokerCfg brokerCfg, ClusterCfg clusterCfg, BrokerInfo brokerInfo) throws Exception {
        RaftPartitionGroup partitionGroup = this.atomix.getPartitionService().getPartitionGroup(AtomixFactory.GROUP_NAME);
        MemberId id = this.atomix.getMembershipService().getLocalMember().id();
        Stream filter = partitionGroup.getPartitions().stream().filter(partition -> {
            return partition.members().contains(id);
        });
        Class<RaftPartition> cls = RaftPartition.class;
        Objects.requireNonNull(RaftPartition.class);
        List<RaftPartition> list = (List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
        StartProcess startProcess = new StartProcess("Broker-" + id + " partitions");
        for (RaftPartition raftPartition : list) {
            Integer num = (Integer) raftPartition.id().id();
            startProcess.addStep("partition " + num, () -> {
                HealthMonitorable zeebePartition = new ZeebePartition(brokerInfo, raftPartition, this.partitionListeners, new AtomixPartitionMessagingService(this.atomix.getCommunicationService(), this.atomix.getMembershipService(), raftPartition.members()), this.scheduler, brokerCfg, this.commandHandler, this.partitionIndexes.get(num), createFactory(this.topologyManager, clusterCfg, this.atomix, this.managementRequestHandler));
                scheduleActor(zeebePartition);
                this.healthCheckService.registerMonitoredPartition(((Integer) raftPartition.id().id()).intValue(), zeebePartition);
                return zeebePartition;
            });
        }
        return startProcess.start();
    }

    private TypedRecordProcessorsFactory createFactory(TopologyManagerImpl topologyManagerImpl, ClusterCfg clusterCfg, Atomix atomix, LeaderManagementRequestHandler leaderManagementRequestHandler) {
        return (actorControl, zeebeState, processingContext) -> {
            LogStream logStream = processingContext.getLogStream();
            TopologyPartitionListenerImpl topologyPartitionListenerImpl = new TopologyPartitionListenerImpl(actorControl);
            topologyManagerImpl.addTopologyPartitionListener(topologyPartitionListenerImpl);
            DeploymentDistributorImpl deploymentDistributorImpl = new DeploymentDistributorImpl(clusterCfg, atomix, topologyPartitionListenerImpl, zeebeState.getDeploymentState(), actorControl);
            SubscriptionCommandSender subscriptionCommandSender = new SubscriptionCommandSender(logStream.getPartitionId(), new PartitionCommandSenderImpl(atomix, topologyManagerImpl, actorControl));
            PushDeploymentRequestHandler pushDeploymentRequestHandler = leaderManagementRequestHandler.getPushDeploymentRequestHandler();
            LongPollingJobNotification longPollingJobNotification = new LongPollingJobNotification(atomix.getEventService());
            int partitionsCount = clusterCfg.getPartitionsCount();
            Objects.requireNonNull(longPollingJobNotification);
            return EngineProcessors.createEngineProcessors(processingContext, partitionsCount, subscriptionCommandSender, deploymentDistributorImpl, pushDeploymentRequestHandler, longPollingJobNotification::onJobsAvailable);
        };
    }

    public BrokerCfg getConfig() {
        return this.brokerContext.getBrokerConfiguration();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LogUtil.doWithMDC(this.brokerContext.getDiagnosticContext(), () -> {
            if (this.isClosed || this.startFuture == null) {
                return;
            }
            this.startFuture.thenAccept(broker -> {
                this.closeProcess.closeReverse();
                this.isClosed = true;
                LOG.info("Broker shut down.");
            }).join();
        });
    }

    public EmbeddedGatewayService getEmbeddedGatewayService() {
        return this.embeddedGatewayService;
    }

    public Atomix getAtomix() {
        return this.atomix;
    }

    public SystemContext getBrokerContext() {
        return this.brokerContext;
    }
}
