package org.apache.kafka.streams.scala;

import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.scala.StreamToTableJoinTestData;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.scalatest.junit.JUnitSuite;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;

/* compiled from: StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015b\u0001B\u0001\u0003\u00015\u00111g\u0015;sK\u0006lGk\u001c+bE2,'j\\5o'\u000e\fG.Y%oi\u0016<'/\u0019;j_:$Vm\u001d;J[Bd\u0017nY5u'\u0016\u0014H-Z:\u000b\u0005\r!\u0011!B:dC2\f'BA\u0003\u0007\u0003\u001d\u0019HO]3b[NT!a\u0002\u0005\u0002\u000b-\fgm[1\u000b\u0005%Q\u0011AB1qC\u000eDWMC\u0001\f\u0003\ry'oZ\u0002\u0001'\r\u0001aB\u0006\t\u0003\u001fQi\u0011\u0001\u0005\u0006\u0003#I\tQA[;oSRT!a\u0005\u0006\u0002\u0013M\u001c\u0017\r\\1uKN$\u0018BA\u000b\u0011\u0005)QUK\\5u'VLG/\u001a\t\u0003/ai\u0011AA\u0005\u00033\t\u0011\u0011d\u0015;sK\u0006lGk\u001c+bE2,'j\\5o)\u0016\u001cH\u000fR1uC\")1\u0004\u0001C\u00019\u00051A(\u001b8jiz\"\u0012!\b\t\u0003/\u0001Aqa\b\u0001C\u0002\u0013%\u0001%\u0001\bqe&4\u0018\r^3DYV\u001cH/\u001a:\u0016\u0003\u0005\u0002\"AI\u0014\u000e\u0003\rR!\u0001J\u0013\u0002\u000bU$\u0018\u000e\\:\u000b\u0005\u0019\"\u0011aC5oi\u0016<'/\u0019;j_:L!\u0001K\u0012\u0003)\u0015k'-\u001a3eK\u0012\\\u0015MZ6b\u00072,8\u000f^3s\u0011\u0019Q\u0003\u0001)A\u0005C\u0005y\u0001O]5wCR,7\t\\;ti\u0016\u0014\b\u0005C\u0003-\u0001\u0011\u0005\u0001%A\u0004dYV\u001cH/\u001a:)\u0005-r\u0003CA\u00182\u001b\u0005\u0001$BA\t\u000b\u0013\t\u0011\u0004G\u0001\u0003Sk2,\u0007b\u0002\u001b\u0001\u0005\u0004%)!N\u0001\fC2LwM\\3e)&lW-F\u00017!\t9\u0014(D\u00019\u0015\u0005\u0019\u0011B\u0001\u001e9\u0005\u0011auN\\4\t\rq\u0002\u0001\u0015!\u00047\u00031\tG.[4oK\u0012$\u0016.\\3!\u0011\u001dq\u0004A1A\u0005\u0002}\n\u0001\"\\8dWRKW.Z\u000b\u0002\u0001B\u0011\u0011)R\u0007\u0002\u0005*\u0011Ae\u0011\u0006\u0003\t\u001a\taaY8n[>t\u0017B\u0001$C\u0005!iunY6US6,\u0007B\u0002%\u0001A\u0003%\u0001)A\u0005n_\u000e\\G+[7fA!9!\n\u0001b\u0001\n\u0003Y\u0015a\u0002;G_2$WM]\u000b\u0002\u0019B\u0011Q\nU\u0007\u0002\u001d*\u0011q\nM\u0001\u0006eVdWm]\u0005\u0003#:\u0013q\u0002V3na>\u0014\u0018M]=G_2$WM\u001d\u0005\u0007'\u0002\u0001\u000b\u0011\u0002'\u0002\u0011Q4u\u000e\u001c3fe\u0002BQ!\u0016\u0001\u0005\u0002-\u000b!\u0002^3ti\u001a{G\u000eZ3sQ\t!f\u0006C\u0003Y\u0001\u0011\u0005\u0011,A\tti\u0006\u0014HoS1gW\u0006\u001cE.^:uKJ$\u0012A\u0017\t\u0003omK!\u0001\u0018\u001d\u0003\tUs\u0017\u000e\u001e\u0015\u0003/z\u0003\"aL0\n\u0005\u0001\u0004$A\u0002\"fM>\u0014X\rC\u0003c\u0001\u0011\u0005\u0011,\u0001\u0010uKN$8\u000b[8vY\u0012\u001cu.\u001e8u\u00072L7m[:QKJ\u0014VmZ5p]\"\u0012\u0011\r\u001a\t\u0003_\u0015L!A\u001a\u0019\u0003\tQ+7\u000f\u001e\u0005\u0006Q\u0002!\t!W\u0001#i\u0016\u001cHo\u00155pk2$7i\\;oi\u000ec\u0017nY6t!\u0016\u0014(+Z4j_:T\u0015M^1)\u0005\u001d$\u0007\"B6\u0001\t\u0013a\u0017aF4fiN#(/Z1ng\u000e{gNZ5hkJ\fG/[8o)\u0005i\u0007C\u00018t\u001b\u0005y'B\u00019r\u0003\u0011)H/\u001b7\u000b\u0003I\fAA[1wC&\u0011Ao\u001c\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\b\"\u0002<\u0001\t\u0013a\u0017\u0001H4fiV\u001bXM\u001d*fO&|gn\u001d)s_\u0012,8-\u001a:D_:4\u0017n\u001a\u0005\u0006q\u0002!I\u0001\\\u0001\u001cO\u0016$Xk]3s\u00072L7m[:Qe>$WoY3s\u0007>tg-[4\t\u000bi\u0004A\u0011\u00027\u0002#\u001d,GoQ8ogVlWM]\"p]\u001aLw\rC\u0003}\u0001\u0011%Q0A\bqe>$WoY3O\u0007>t7/^7f)\u001dq\u0018\u0011DA\u000f\u0003C\u0001BA\\@\u0002\u0004%\u0019\u0011\u0011A8\u0003\t1K7\u000f\u001e\t\b\u0003\u000b\t9!a\u00037\u001b\u0005!\u0011bAA\u0005\t\tA1*Z=WC2,X\r\u0005\u0003\u0002\u000e\u0005MabA\u001c\u0002\u0010%\u0019\u0011\u0011\u0003\u001d\u0002\rA\u0013X\rZ3g\u0013\u0011\t)\"a\u0006\u0003\rM#(/\u001b8h\u0015\r\t\t\u0002\u000f\u0005\b\u00037Y\b\u0019AA\u0006\u0003=)8/\u001a:DY&\u001c7n\u001d+pa&\u001c\u0007bBA\u0010w\u0002\u0007\u00111B\u0001\u0011kN,'OU3hS>t7\u000fV8qS\u000eDq!a\t|\u0001\u0004\tY!A\u0006pkR\u0004X\u000f\u001e+pa&\u001c\u0007")
/* loaded from: input_file:org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.class */
public class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite implements StreamToTableJoinTestData {
    private final EmbeddedKafkaCluster privateCluster;
    private final long alignedTime;
    private final MockTime mockTime;
    private final TemporaryFolder tFolder;
    private final String brokers;
    private final String userClicksTopic;
    private final String userRegionsTopic;
    private final String outputTopic;
    private final String userClicksTopicJ;
    private final String userRegionsTopicJ;
    private final String outputTopicJ;
    private final Seq<KeyValue<String, Object>> userClicks;
    private final Seq<KeyValue<String, String>> userRegions;
    private final Seq<KeyValue<String, Object>> expectedClicksPerRegion;

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public String brokers() {
        return this.brokers;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public String userClicksTopic() {
        return this.userClicksTopic;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public String userRegionsTopic() {
        return this.userRegionsTopic;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public String outputTopic() {
        return this.outputTopic;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public String userClicksTopicJ() {
        return this.userClicksTopicJ;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public String userRegionsTopicJ() {
        return this.userRegionsTopicJ;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public String outputTopicJ() {
        return this.outputTopicJ;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public Seq<KeyValue<String, Object>> userClicks() {
        return this.userClicks;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public Seq<KeyValue<String, String>> userRegions() {
        return this.userRegions;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public Seq<KeyValue<String, Object>> expectedClicksPerRegion() {
        return this.expectedClicksPerRegion;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$brokers_$eq(String str) {
        this.brokers = str;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$userClicksTopic_$eq(String str) {
        this.userClicksTopic = str;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$userRegionsTopic_$eq(String str) {
        this.userRegionsTopic = str;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$outputTopic_$eq(String str) {
        this.outputTopic = str;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$userClicksTopicJ_$eq(String str) {
        this.userClicksTopicJ = str;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$userRegionsTopicJ_$eq(String str) {
        this.userRegionsTopicJ = str;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$outputTopicJ_$eq(String str) {
        this.outputTopicJ = str;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$userClicks_$eq(Seq seq) {
        this.userClicks = seq;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$userRegions_$eq(Seq seq) {
        this.userRegions = seq;
    }

    @Override // org.apache.kafka.streams.scala.StreamToTableJoinTestData
    public void org$apache$kafka$streams$scala$StreamToTableJoinTestData$_setter_$expectedClicksPerRegion_$eq(Seq seq) {
        this.expectedClicksPerRegion = seq;
    }

    private EmbeddedKafkaCluster privateCluster() {
        return this.privateCluster;
    }

    @Rule
    public EmbeddedKafkaCluster cluster() {
        return privateCluster();
    }

    public final long alignedTime() {
        return this.alignedTime;
    }

    public MockTime mockTime() {
        return this.mockTime;
    }

    public TemporaryFolder tFolder() {
        return this.tFolder;
    }

    @Rule
    public TemporaryFolder testFolder() {
        return tFolder();
    }

    @Before
    public void startKafkaCluster() {
        cluster().createTopic(userClicksTopic());
        cluster().createTopic(userRegionsTopic());
        cluster().createTopic(outputTopic());
        cluster().createTopic(userClicksTopicJ());
        cluster().createTopic(userRegionsTopicJ());
        cluster().createTopic(outputTopicJ());
    }

    @Test
    public void testShouldCountClicksPerRegion() {
        Properties streamsConfiguration = getStreamsConfiguration();
        StreamsBuilder streamsBuilder = new StreamsBuilder(StreamsBuilder$.MODULE$.$lessinit$greater$default$1());
        streamsBuilder.stream(userClicksTopic(), ImplicitConversions$.MODULE$.consumedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.Long())).leftJoin(streamsBuilder.table(userRegionsTopic(), ImplicitConversions$.MODULE$.consumedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.String())), new StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anonfun$2(this), ImplicitConversions$.MODULE$.joinedFromKeyValueOtherSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.Long(), Serdes$.MODULE$.String())).map(new StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anonfun$3(this)).groupByKey(ImplicitConversions$.MODULE$.serializedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.Long())).reduce(new StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anonfun$1(this), ImplicitConversions$.MODULE$.materializedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.Long())).toStream().to(outputTopic(), ImplicitConversions$.MODULE$.producedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.Long()));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);
        kafkaStreams.start();
        List<KeyValue<String, Object>> produceNConsume = produceNConsume(userClicksTopic(), userRegionsTopic(), outputTopic());
        kafkaStreams.close();
        Assert.assertEquals(((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(produceNConsume).asScala()).sortBy(new StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anonfun$testShouldCountClicksPerRegion$1(this), Ordering$String$.MODULE$), expectedClicksPerRegion().sortBy(new StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anonfun$testShouldCountClicksPerRegion$2(this), Ordering$String$.MODULE$));
    }

    @Test
    public void testShouldCountClicksPerRegionJava() {
        Properties streamsConfiguration = getStreamsConfiguration();
        streamsConfiguration.put("default.key.serde", Serdes$.MODULE$.String().getClass().getName());
        streamsConfiguration.put("default.value.serde", Serdes$.MODULE$.String().getClass().getName());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(userClicksTopicJ(), Consumed.with(Serdes$.MODULE$.String(), Serdes$.MODULE$.JavaLong())).leftJoin(streamsBuilder.table(userRegionsTopicJ(), Consumed.with(Serdes$.MODULE$.String(), Serdes$.MODULE$.String())), new ValueJoiner<Long, String, Tuple2<String, Long>>(this) { // from class: org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anon$1
            public Tuple2<String, Long> apply(Long l, String str) {
                return new Tuple2<>(str == null ? "UNKNOWN" : str, l);
            }
        }, Joined.with(Serdes$.MODULE$.String(), Serdes$.MODULE$.JavaLong(), Serdes$.MODULE$.String())).map(new KeyValueMapper<String, Tuple2<String, Long>, KeyValue<String, Long>>(this) { // from class: org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anon$2
            public KeyValue<String, Long> apply(String str, Tuple2<String, Long> tuple2) {
                return new KeyValue<>(tuple2._1(), tuple2._2());
            }
        }).groupByKey(Serialized.with(Serdes$.MODULE$.String(), Serdes$.MODULE$.JavaLong())).reduce(new Reducer<Long>(this) { // from class: org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anon$3
            public Long apply(Long l, Long l2) {
                return Predef$.MODULE$.long2Long(Predef$.MODULE$.Long2long(l) + Predef$.MODULE$.Long2long(l2));
            }
        }).toStream().to(outputTopicJ(), Produced.with(Serdes$.MODULE$.String(), Serdes$.MODULE$.JavaLong()));
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);
        kafkaStreams.start();
        List<KeyValue<String, Object>> produceNConsume = produceNConsume(userClicksTopicJ(), userRegionsTopicJ(), outputTopicJ());
        kafkaStreams.close();
        Assert.assertEquals(((SeqLike) JavaConverters$.MODULE$.asScalaBufferConverter(produceNConsume).asScala()).sortBy(new StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anonfun$testShouldCountClicksPerRegionJava$1(this), Ordering$String$.MODULE$), expectedClicksPerRegion().sortBy(new StreamToTableJoinScalaIntegrationTestImplicitSerdes$$anonfun$testShouldCountClicksPerRegionJava$2(this), Ordering$String$.MODULE$));
    }

    private Properties getStreamsConfiguration() {
        Properties properties = new Properties();
        properties.put("application.id", "stream-table-join-scala-integration-test");
        properties.put("bootstrap.servers", cluster().bootstrapServers());
        properties.put("commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("state.dir", testFolder().getRoot().getPath());
        return properties;
    }

    private Properties getUserRegionsProducerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", cluster().bootstrapServers());
        properties.put("acks", "all");
        properties.put("retries", "0");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        return properties;
    }

    private Properties getUserClicksProducerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", cluster().bootstrapServers());
        properties.put("acks", "all");
        properties.put("retries", "0");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", LongSerializer.class);
        return properties;
    }

    private Properties getConsumerConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", cluster().bootstrapServers());
        properties.put("group.id", "join-scala-integration-test-standard-consumer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", LongDeserializer.class);
        return properties;
    }

    private List<KeyValue<String, Object>> produceNConsume(String str, String str2, String str3) {
        IntegrationTestUtils.produceKeyValuesSynchronously(str2, (Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(userRegions()).asJava(), getUserRegionsProducerConfig(), mockTime(), false);
        IntegrationTestUtils.produceKeyValuesSynchronously(str, (Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(userClicks()).asJava(), getUserClicksProducerConfig(), mockTime(), false);
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerConfig(), str3, expectedClicksPerRegion().size());
    }

    public StreamToTableJoinScalaIntegrationTestImplicitSerdes() {
        StreamToTableJoinTestData.Cclass.$init$(this);
        this.privateCluster = new EmbeddedKafkaCluster(1);
        this.alignedTime = ((System.currentTimeMillis() / 1000) + 1) * 1000;
        this.mockTime = cluster().time;
        mockTime().setCurrentTimeMs(alignedTime());
        this.tFolder = new TemporaryFolder(TestUtils.tempDirectory());
    }
}
