package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.istio.misc.Loggers;
import com.google.protobuf.Any;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import istio.mcp.v1alpha1.ResourceOuterClass;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/nacos/istio/mcp/NacosMcpOverXdsService.class */
public class NacosMcpOverXdsService extends AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase {
    private final AtomicInteger connectIdGenerator = new AtomicInteger(0);
    private final Map<Integer, StreamObserver<DiscoveryResponse>> connnections = new ConcurrentHashMap(16);
    private final ConcurrentHashMap<Integer, Boolean> connectionInited = new ConcurrentHashMap<>();
    private static final String MCP_RESOURCES_URL = "type.googleapis.com/istio.mcp.v1alpha1.Resource";
    private static final String SERVICEENTY_TYPE = "networking.istio.io/v1alpha3/ServiceEntry";
    private Map<String, ResourceOuterClass.Resource> resourceMapCache;

    public void sendResources(Map<String, ResourceOuterClass.Resource> map) {
        this.resourceMapCache = map;
        Loggers.MAIN.info("send resources for mcpOverXds,count : {}", Integer.valueOf(map.size()));
        DiscoveryResponse generateResponse = generateResponse(map);
        if (Loggers.MAIN.isDebugEnabled()) {
            Loggers.MAIN.debug("discoveryResponse:{}", generateResponse.toString());
        }
        for (StreamObserver<DiscoveryResponse> streamObserver : this.connnections.values()) {
            Loggers.MAIN.info("mcpOverXds send to:{}", streamObserver.toString());
            streamObserver.onNext(generateResponse);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DiscoveryResponse generateResponse(Map<String, ResourceOuterClass.Resource> map) {
        ArrayList arrayList = new ArrayList();
        Iterator<ResourceOuterClass.Resource> it = map.values().iterator();
        while (it.hasNext()) {
            arrayList.add(Any.newBuilder().setValue(it.next().toByteString()).setTypeUrl(MCP_RESOURCES_URL).m787build());
        }
        return DiscoveryResponse.newBuilder().addAllResources(arrayList).setNonce(String.valueOf(System.currentTimeMillis())).setTypeUrl(SERVICEENTY_TYPE).build();
    }

    public StreamObserver<DiscoveryRequest> streamAggregatedResources(final StreamObserver<DiscoveryResponse> streamObserver) {
        final int incrementAndGet = this.connectIdGenerator.incrementAndGet();
        this.connnections.put(Integer.valueOf(incrementAndGet), streamObserver);
        return new StreamObserver<DiscoveryRequest>() { // from class: com.alibaba.nacos.istio.mcp.NacosMcpOverXdsService.1
            private final int connectionId;

            {
                this.connectionId = incrementAndGet;
            }

            public void onNext(DiscoveryRequest discoveryRequest) {
                Loggers.MAIN.info("receiving request,  {}", discoveryRequest.toString());
                if (discoveryRequest.getErrorDetail() != null && discoveryRequest.getErrorDetail().getCode() != 0) {
                    Loggers.MAIN.error("NACK error code: {}, message: {}", Integer.valueOf(discoveryRequest.getErrorDetail().getCode()), discoveryRequest.getErrorDetail().getMessage());
                    return;
                }
                if (NacosMcpOverXdsService.SERVICEENTY_TYPE.equals(discoveryRequest.getTypeUrl())) {
                    Boolean bool = (Boolean) NacosMcpOverXdsService.this.connectionInited.get(Integer.valueOf(incrementAndGet));
                    if (bool == null || !bool.booleanValue()) {
                        NacosMcpOverXdsService.this.connectionInited.put(Integer.valueOf(incrementAndGet), true);
                        if (NacosMcpOverXdsService.this.resourceMapCache != null) {
                            DiscoveryResponse generateResponse = NacosMcpOverXdsService.this.generateResponse(NacosMcpOverXdsService.this.resourceMapCache);
                            Loggers.MAIN.info("ACK for serviceEntry discoveryRequest {}", discoveryRequest.toString());
                            streamObserver.onNext(generateResponse);
                        }
                    }
                }
            }

            public void onError(Throwable th) {
                Loggers.MAIN.error("stream error.", th);
                NacosMcpOverXdsService.this.connnections.remove(Integer.valueOf(this.connectionId));
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        };
    }
}
