package co.cask.cdap.common.guice;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.conf.KafkaConstants;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import java.util.concurrent.TimeUnit;
import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaClient;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;

/* loaded from: input_file:lib/cdap-common-4.0.0.jar:co/cask/cdap/common/guice/KafkaClientModule.class */
public class KafkaClientModule extends AbstractModule {

    /* loaded from: input_file:lib/cdap-common-4.0.0.jar:co/cask/cdap/common/guice/KafkaClientModule$DefaultKafkaClientService.class */
    private static final class DefaultKafkaClientService extends AbstractIdleService implements KafkaClientService {
        private final ZKClientService zkClientService;
        private final KafkaClientService delegate;

        DefaultKafkaClientService(CConfiguration cConfiguration) {
            this.zkClientService = ZKClientServices.delegate(ZKClients.reWatchOnExpire(ZKClients.retryOnFailure(ZKClientService.Builder.of(cConfiguration.get(KafkaConstants.ConfigKeys.ZOOKEEPER_QUORUM)).setSessionTimeout(cConfiguration.getInt(Constants.Zookeeper.CFG_SESSION_TIMEOUT_MILLIS, 40000)).build(), RetryStrategies.exponentialDelay(500L, 2000L, TimeUnit.MILLISECONDS))));
            this.delegate = new ZKKafkaClientService(this.zkClientService);
        }

        @Override // com.google.common.util.concurrent.AbstractIdleService
        protected void startUp() throws Exception {
            this.zkClientService.startAndWait();
            try {
                this.delegate.startAndWait();
            } catch (Exception e) {
                try {
                    this.zkClientService.stopAndWait();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }

        @Override // com.google.common.util.concurrent.AbstractIdleService
        protected void shutDown() throws Exception {
            try {
                this.delegate.stopAndWait();
                this.zkClientService.stopAndWait();
            } catch (Exception e) {
                try {
                    this.zkClientService.stopAndWait();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }

        public KafkaPublisher getPublisher(KafkaPublisher.Ack ack, Compression compression) {
            return this.delegate.getPublisher(ack, compression);
        }

        public KafkaConsumer getConsumer() {
            return this.delegate.getConsumer();
        }
    }

    @Override // com.google.inject.AbstractModule
    protected void configure() {
        bind(KafkaClient.class).to(KafkaClientService.class);
    }

    @Singleton
    @Provides
    private KafkaClientService providesKafkaClientService(CConfiguration cConfiguration, ZKClient zKClient) {
        if (cConfiguration.get(KafkaConstants.ConfigKeys.ZOOKEEPER_QUORUM) != null) {
            return new DefaultKafkaClientService(cConfiguration);
        }
        String str = cConfiguration.get(KafkaConstants.ConfigKeys.ZOOKEEPER_NAMESPACE_CONFIG);
        return new ZKKafkaClientService(str == null ? zKClient : ZKClients.namespace(zKClient, "/" + str));
    }
}
