package org.locationtech.geomesa.kafka.streams;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.scala.ImplicitConversions$;
import org.apache.kafka.streams.scala.kstream.Materialized$;
import org.apache.kafka.streams.scala.serialization.Serdes$;
import org.apache.kafka.streams.test.TestRecord;
import org.geotools.api.data.DataStoreFinder;
import org.geotools.api.data.FeatureWriter;
import org.geotools.api.data.Query;
import org.geotools.api.data.Transaction;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.junit.runner.RunWith;
import org.locationtech.geomesa.features.ScalaSimpleFeature;
import org.locationtech.geomesa.features.ScalaSimpleFeature$;
import org.locationtech.geomesa.kafka.KafkaContainerTest;
import org.locationtech.geomesa.kafka.data.KafkaDataStore;
import org.locationtech.geomesa.kafka.data.KafkaDataStore$;
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest;
import org.locationtech.geomesa.utils.collection.SelfClosingIterator$;
import org.locationtech.geomesa.utils.geotools.FeatureUtils$;
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes$;
import org.locationtech.geomesa.utils.geotools.converters.FastConverter$;
import org.locationtech.geomesa.utils.io.IsCloseable$;
import org.locationtech.geomesa.utils.io.package$WithClose$;
import org.specs2.data.Sized$;
import org.specs2.matcher.MatchResult;
import org.specs2.matcher.MatchResult$;
import org.specs2.runner.JUnitRunner;
import org.specs2.specification.core.AsExecution$;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IndexedSeqOptimized;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Iterable;
import scala.collection.mutable.Iterable$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: GeoMesaStreamsBuilderTest.scala */
@RunWith(JUnitRunner.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005=d\u0001B\t\u0013\u0001uAQA\t\u0001\u0005\u0002\rB\u0001B\n\u0001\t\u0006\u0004%\ta\n\u0005\ti\u0001A)\u0019!C\u0001k!91\t\u0001b\u0001\n\u0013!\u0005B\u0002-\u0001A\u0003%Q\tC\u0003Z\u0001\u0011\u0005!L\u0002\u0003a\u0001\u0001\t\u0007\"\u0002\u0012\b\t\u0003I\b\"\u0003?\b\u0001\u0004\u0005\r\u0011\"\u0003~\u0011-\tIa\u0002a\u0001\u0002\u0004%I!a\u0003\t\u0015\u0005eq\u00011A\u0001B\u0003&a\u0010C\u0005\u0002\u001c\u001d\u0011\r\u0011\"\u0001\u0002\u001e!A\u0011QG\u0004!\u0002\u0013\ty\u0002C\u0004\u00028\u001d!\t%!\u000f\t\u000f\u0005ur\u0001\"\u0011\u0002@!9\u0011\u0011J\u0004\u0005B\u0005-#!G$f_6+7/Y*ue\u0016\fWn\u001d\"vS2$WM\u001d+fgRT!a\u0005\u000b\u0002\u000fM$(/Z1ng*\u0011QCF\u0001\u0006W\u000647.\u0019\u0006\u0003/a\tqaZ3p[\u0016\u001c\u0018M\u0003\u0002\u001a5\u0005aAn\\2bi&|g\u000e^3dQ*\t1$A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001=A\u0011q\u0004I\u0007\u0002)%\u0011\u0011\u0005\u0006\u0002\u0013\u0017\u000647.Y\"p]R\f\u0017N\\3s)\u0016\u001cH/\u0001\u0004=S:LGO\u0010\u000b\u0002IA\u0011Q\u0005A\u0007\u0002%\u0005\u00191O\u001a;\u0016\u0003!\u0002\"!\u000b\u001a\u000e\u0003)R!a\u000b\u0017\u0002\rMLW\u000e\u001d7f\u0015\tic&A\u0004gK\u0006$XO]3\u000b\u0005=\u0002\u0014aA1qS*\u0011\u0011GG\u0001\tO\u0016|Go\\8mg&\u00111G\u000b\u0002\u0012'&l\u0007\u000f\\3GK\u0006$XO]3UsB,\u0017\u0001\u00034fCR,(/Z:\u0016\u0003Y\u00022a\u000e\u001f?\u001b\u0005A$BA\u001d;\u0003)\u0019w\u000e\u001c7fGRLwN\u001c\u0006\u0002w\u0005)1oY1mC&\u0011Q\b\u000f\u0002\u0004'\u0016\f\bCA B\u001b\u0005\u0001%B\u0001\u001b\u0017\u0013\t\u0011\u0005I\u0001\nTG\u0006d\u0017mU5na2,g)Z1ukJ,\u0017a\u0002>l!\u0006$\bn]\u000b\u0002\u000bB\u0019aiS'\u000e\u0003\u001dS!\u0001S%\u0002\tU$\u0018\u000e\u001c\u0006\u0002\u0015\u0006!!.\u0019<b\u0013\tauIA\u0002TKR\u0004\"AT+\u000f\u0005=\u001b\u0006C\u0001);\u001b\u0005\t&B\u0001*\u001d\u0003\u0019a$o\\8u}%\u0011AKO\u0001\u0007!J,G-\u001a4\n\u0005Y;&AB*ue&twM\u0003\u0002Uu\u0005A!p\u001b)bi\"\u001c\b%A\u0005hKR\u0004\u0016M]1ngR\u00111L\u0018\t\u0005\u001drkU*\u0003\u0002^/\n\u0019Q*\u00199\t\u000b}3\u0001\u0019A'\u0002\ri\\\u0007+\u0019;i\u0005y!\u0016.\\3ti\u0006l\u0007/\u0012=ue\u0006\u001cG/\u001b8h)J\fgn\u001d4pe6,'oE\u0002\bE\"\u0004\"a\u00194\u000e\u0003\u0011T!!Z%\u0002\t1\fgnZ\u0005\u0003O\u0012\u0014aa\u00142kK\u000e$\b#B5q\u001bJ,X\"\u00016\u000b\u0005-d\u0017aB6tiJ,\u0017-\u001c\u0006\u0003'5T!!\u00068\u000b\u0005=T\u0012AB1qC\u000eDW-\u0003\u0002rU\nYAK]1og\u001a|'/\\3s!\t)3/\u0003\u0002u%\tqq)Z8NKN\fW*Z:tC\u001e,\u0007\u0003\u0002<x\u001bJl\u0011\u0001\\\u0005\u0003q2\u0014\u0001bS3z-\u0006dW/\u001a\u000b\u0002uB\u00111pB\u0007\u0002\u0001\u000591m\u001c8uKb$X#\u0001@\u0011\u0007}\f)!\u0004\u0002\u0002\u0002)\u0019\u00111\u00017\u0002\u0013A\u0014xnY3tg>\u0014\u0018\u0002BA\u0004\u0003\u0003\u0011\u0001\u0003\u0015:pG\u0016\u001c8o\u001c:D_:$X\r\u001f;\u0002\u0017\r|g\u000e^3yi~#S-\u001d\u000b\u0005\u0003\u001b\t)\u0002\u0005\u0003\u0002\u0010\u0005EQ\"\u0001\u001e\n\u0007\u0005M!H\u0001\u0003V]&$\b\u0002CA\f\u0015\u0005\u0005\t\u0019\u0001@\u0002\u0007a$\u0013'\u0001\u0005d_:$X\r\u001f;!\u0003)!\u0018.\\3ti\u0006l\u0007o]\u000b\u0003\u0003?\u0001r!!\t\u0002(5\u000bI#\u0004\u0002\u0002$)\u0019\u0011Q\u0005\u001d\u0002\u000f5,H/\u00192mK&\u0019Q,a\t\u0011\r\u0005\u0005\u00121FA\u0018\u0013\u0011\ti#a\t\u0003\u0017\u0005\u0013(/Y=Ck\u001a4WM\u001d\t\u0005\u0003\u001f\t\t$C\u0002\u00024i\u0012A\u0001T8oO\u0006YA/[7fgR\fW\u000e]:!\u0003\u0011Ig.\u001b;\u0015\t\u00055\u00111\b\u0005\u0006y:\u0001\rA`\u0001\niJ\fgn\u001d4pe6$R!^A!\u0003\u000bBa!a\u0011\u0010\u0001\u0004i\u0015aA6fs\"1\u0011qI\bA\u0002I\fQA^1mk\u0016\fQa\u00197pg\u0016$\"!!\u0004)\u000f\u0001\ty%a\u0012\u0002`A!\u0011\u0011KA.\u001b\t\t\u0019F\u0003\u0003\u0002V\u0005]\u0013A\u0002:v]:,'OC\u0002\u0002Zi\tQA[;oSRLA!!\u0018\u0002T\t9!+\u001e8XSRD7EAA1!\u0011\t\u0019'a\u001b\u000e\u0005\u0005\u0015$\u0002BA+\u0003OR1!!\u001b\u001b\u0003\u0019\u0019\b/Z2te%!\u0011QNA3\u0005-QUK\\5u%Vtg.\u001a:")
/* loaded from: input_file:org/locationtech/geomesa/kafka/streams/GeoMesaStreamsBuilderTest.class */
public class GeoMesaStreamsBuilderTest extends KafkaContainerTest {
    private SimpleFeatureType sft;
    private Seq<ScalaSimpleFeature> features;
    private final Set<String> zkPaths = Collections.newSetFromMap(new ConcurrentHashMap());
    private volatile byte bitmap$0;

    /* compiled from: GeoMesaStreamsBuilderTest.scala */
    /* loaded from: input_file:org/locationtech/geomesa/kafka/streams/GeoMesaStreamsBuilderTest$TimestampExtractingTransformer.class */
    public class TimestampExtractingTransformer implements Transformer<String, GeoMesaMessage, KeyValue<String, GeoMesaMessage>> {
        private ProcessorContext context;
        private final Map<String, ArrayBuffer<Object>> timestamps;
        public final /* synthetic */ GeoMesaStreamsBuilderTest $outer;

        private ProcessorContext context() {
            return this.context;
        }

        private void context_$eq(ProcessorContext processorContext) {
            this.context = processorContext;
        }

        public Map<String, ArrayBuffer<Object>> timestamps() {
            return this.timestamps;
        }

        public void init(ProcessorContext processorContext) {
            context_$eq(processorContext);
        }

        public KeyValue<String, GeoMesaMessage> transform(String str, GeoMesaMessage geoMesaMessage) {
            ((ArrayBuffer) timestamps().getOrElseUpdate(str, () -> {
                return ArrayBuffer$.MODULE$.empty();
            })).$plus$eq(BoxesRunTime.boxToLong(context().timestamp()));
            return new KeyValue<>(str, geoMesaMessage);
        }

        public void close() {
        }

        public /* synthetic */ GeoMesaStreamsBuilderTest org$locationtech$geomesa$kafka$streams$GeoMesaStreamsBuilderTest$TimestampExtractingTransformer$$$outer() {
            return this.$outer;
        }

        public TimestampExtractingTransformer(GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest) {
            if (geoMesaStreamsBuilderTest == null) {
                throw null;
            }
            this.$outer = geoMesaStreamsBuilderTest;
            this.timestamps = Map$.MODULE$.empty();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest] */
    private SimpleFeatureType sft$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.sft = SimpleFeatureTypes$.MODULE$.createImmutableType("streams", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326");
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.sft;
    }

    public SimpleFeatureType sft() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? sft$lzycompute() : this.sft;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest] */
    private Seq<ScalaSimpleFeature> features$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.features = Seq$.MODULE$.tabulate(10, obj -> {
                    return $anonfun$features$1(this, BoxesRunTime.unboxToInt(obj));
                });
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.features;
    }

    public Seq<ScalaSimpleFeature> features() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? features$lzycompute() : this.features;
    }

    private Set<String> zkPaths() {
        return this.zkPaths;
    }

    public scala.collection.immutable.Map<String, String> getParams(String str) {
        Predef$.MODULE$.require(zkPaths().add(str), () -> {
            return new StringBuilder(55).append("zk path '").append(str).append("' is reused between tests, may cause conflicts").toString();
        });
        return Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.brokers"), brokers()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.zookeepers"), zookeepers()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.topic.partitions"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.topic.replication"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.consumer.read-back"), "Inf"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.zk.path"), str)}));
    }

    public static final /* synthetic */ ScalaSimpleFeature $anonfun$features$1(GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest, int i) {
        return ScalaSimpleFeature$.MODULE$.create(geoMesaStreamsBuilderTest.sft(), new StringBuilder(2).append("id").append(i).toString(), Predef$.MODULE$.genericWrapArray(new Object[]{new StringBuilder(4).append("name").append(i).toString(), BoxesRunTime.boxToInteger(i % 2), new StringBuilder(22).append("2022-04-27T00:00:0").append(i).append(".00Z").toString(), new StringBuilder(9).append("POINT(1 ").append(i).append(")").toString()}));
    }

    public static final /* synthetic */ void $anonfun$new$4(GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest, FeatureWriter featureWriter) {
        geoMesaStreamsBuilderTest.features().foreach(scalaSimpleFeature -> {
            return FeatureUtils$.MODULE$.write(featureWriter, scalaSimpleFeature, true);
        });
    }

    public static final /* synthetic */ void $anonfun$new$6(String str, ArrayBuffer arrayBuffer, KafkaConsumer kafkaConsumer) {
        kafkaConsumer.subscribe(Collections.singleton(str));
        long currentTimeMillis = System.currentTimeMillis();
        while (arrayBuffer.lengthCompare(10) < 0 && System.currentTimeMillis() - currentTimeMillis < 10000) {
            ((IterableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(kafkaConsumer.poll(Duration.ofMillis(100L))).asScala()).foreach(consumerRecord -> {
                return arrayBuffer.$plus$eq(consumerRecord);
            });
        }
    }

    public static final /* synthetic */ void $anonfun$new$13(TestInputTopic testInputTopic, ConsumerRecord consumerRecord) {
        testInputTopic.pipeInput(new TestRecord(consumerRecord));
    }

    public static final /* synthetic */ void $anonfun$new$12(String str, Seq seq, Map map, TopologyTestDriver topologyTestDriver) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(str, new ByteArraySerializer(), new ByteArraySerializer());
        seq.foreach(consumerRecord -> {
            $anonfun$new$13(createInputTopic, consumerRecord);
            return BoxedUnit.UNIT;
        });
        TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic("word-count", new StringDeserializer(), new LongDeserializer());
        while (!createOutputTopic.isEmpty()) {
            TestRecord readRecord = createOutputTopic.readRecord();
            map.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(readRecord.key()), readRecord.value()));
        }
    }

    public static final /* synthetic */ long $anonfun$new$20(ScalaSimpleFeature scalaSimpleFeature) {
        return ((Date) scalaSimpleFeature.getAttribute("dtg")).getTime();
    }

    public static final /* synthetic */ long $anonfun$new$25(Tuple2 tuple2) {
        return BoxesRunTime.unboxToLong(((IndexedSeqOptimized) tuple2._2()).head());
    }

    public static final /* synthetic */ void $anonfun$new$33(TestInputTopic testInputTopic, ConsumerRecord consumerRecord) {
        testInputTopic.pipeInput(new TestRecord(consumerRecord));
    }

    public static final /* synthetic */ void $anonfun$new$32(Seq seq, String str, ArrayBuffer arrayBuffer, TopologyTestDriver topologyTestDriver) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input-topic", new ByteArraySerializer(), new ByteArraySerializer());
        seq.foreach(consumerRecord -> {
            $anonfun$new$33(createInputTopic, consumerRecord);
            return BoxedUnit.UNIT;
        });
        TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(str, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        while (!createOutputTopic.isEmpty()) {
            TestRecord readRecord = createOutputTopic.readRecord();
            arrayBuffer.$plus$eq(new ProducerRecord(str, Predef$.MODULE$.int2Integer(0), readRecord.timestamp(), readRecord.getKey(), readRecord.getValue(), readRecord.getHeaders()));
        }
    }

    public static final /* synthetic */ void $anonfun$new$35(ArrayBuffer arrayBuffer, Producer producer) {
        arrayBuffer.foreach(producerRecord -> {
            return producer.send(producerRecord);
        });
    }

    public GeoMesaStreamsBuilderTest() {
        blockExample("GeoMesaStreamsBuilder").should(() -> {
            this.blockExample("read from geomesa topics").in(() -> {
                scala.collection.immutable.Map<String, String> params = this.getParams("word/count");
                String str = (String) package$WithClose$.MODULE$.apply(DataStoreFinder.getDataStore((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(params).asJava()), dataStore -> {
                    dataStore.createSchema(this.sft());
                    package$WithClose$.MODULE$.apply(dataStore.getFeatureWriterAppend(this.sft().getTypeName(), Transaction.AUTO_COMMIT), featureWriter -> {
                        $anonfun$new$4(this, featureWriter);
                        return BoxedUnit.UNIT;
                    }, IsCloseable$.MODULE$.closeableIsCloseable());
                    return KafkaDataStore$.MODULE$.topic(dataStore.getSchema(this.sft().getTypeName()));
                }, IsCloseable$.MODULE$.dataStoreIsCloseable());
                Properties properties = new Properties();
                properties.put("bootstrap.servers", this.brokers());
                properties.put("auto.offset.reset", "earliest");
                properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
                properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
                properties.put("group.id", "consume-kryo-topic");
                ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
                package$WithClose$.MODULE$.apply(new KafkaConsumer(properties), kafkaConsumer -> {
                    $anonfun$new$6(str, empty, kafkaConsumer);
                    return BoxedUnit.UNIT;
                }, IsCloseable$.MODULE$.closeableIsCloseable());
                Seq seq = empty.toSeq();
                final TimestampExtractingTransformer timestampExtractingTransformer = new TimestampExtractingTransformer(this);
                GeoMesaStreamsBuilder apply = GeoMesaStreamsBuilder$.MODULE$.apply(params);
                final GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest = null;
                apply.stream(this.sft().getTypeName()).transform(new TransformerSupplier<String, GeoMesaMessage, KeyValue<String, GeoMesaMessage>>(geoMesaStreamsBuilderTest, timestampExtractingTransformer) { // from class: org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest$$anon$1
                    private final GeoMesaStreamsBuilderTest.TimestampExtractingTransformer timestampExtractor$1;

                    /* renamed from: get, reason: merged with bridge method [inline-methods] */
                    public Transformer<String, GeoMesaMessage, KeyValue<String, GeoMesaMessage>> m3get() {
                        return this.timestampExtractor$1;
                    }

                    {
                        this.timestampExtractor$1 = timestampExtractingTransformer;
                    }
                }, Predef$.MODULE$.wrapRefArray(new String[0])).map((str2, geoMesaMessage) -> {
                    return new Tuple2(str2, ((TraversableOnce) geoMesaMessage.attributes().map(obj -> {
                        return obj.toString().replaceAll(" ", "_");
                    }, Seq$.MODULE$.canBuildFrom())).mkString(" "));
                }).flatMapValues(str3 -> {
                    return Predef$.MODULE$.wrapRefArray(str3.split(" +"));
                }).groupBy((str4, str5) -> {
                    return str5;
                }, ImplicitConversions$.MODULE$.groupedFromSerde(Serdes$.MODULE$.stringSerde(), Serdes$.MODULE$.stringSerde())).count(Materialized$.MODULE$.as("counts-store", Serdes$.MODULE$.stringSerde(), Serdes$.MODULE$.longSerde())).toStream().to("word-count", ImplicitConversions$.MODULE$.producedFromSerde(Serdes$.MODULE$.stringSerde(), Serdes$.MODULE$.longSerde()));
                Properties properties2 = new Properties();
                properties2.put("application.id", "word-count-test");
                properties2.put("bootstrap.servers", "dummy:1234");
                Map empty2 = Map$.MODULE$.empty();
                package$WithClose$.MODULE$.apply(new TopologyTestDriver(apply.build(), properties2), topologyTestDriver -> {
                    $anonfun$new$12(str, seq, empty2, topologyTestDriver);
                    return BoxedUnit.UNIT;
                }, IsCloseable$.MODULE$.closeableIsCloseable());
                scala.collection.immutable.Map map = (scala.collection.immutable.Map) ((TraversableLike) this.features().flatMap(scalaSimpleFeature -> {
                    return (Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(scalaSimpleFeature.getAttributes()).asScala()).map(obj -> {
                        return obj.toString().replaceAll(" ", "_");
                    }, Buffer$.MODULE$.canBuildFrom());
                }, Seq$.MODULE$.canBuildFrom())).groupBy(str6 -> {
                    return str6;
                }).map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((String) tuple2._1()), BoxesRunTime.boxToLong(((Seq) tuple2._2()).length()));
                }, scala.collection.immutable.Map$.MODULE$.canBuildFrom());
                this.theValue(() -> {
                    return empty2;
                }).mustEqual(() -> {
                    return map;
                });
                Seq seq2 = (Seq) this.features().map(scalaSimpleFeature2 -> {
                    return BoxesRunTime.boxToLong($anonfun$new$20(scalaSimpleFeature2));
                }, Seq$.MODULE$.canBuildFrom());
                this.foreach(timestampExtractingTransformer.timestamps(), tuple22 -> {
                    return this.theValue(() -> {
                        return (ArrayBuffer) tuple22._2();
                    }).must(() -> {
                        return this.haveLength(1, Sized$.MODULE$.scalaTraversableIsSized());
                    });
                }, MatchResult$.MODULE$.matchResultAsResult());
                return this.theValue(() -> {
                    return (Iterable) timestampExtractingTransformer.timestamps().map(tuple23 -> {
                        return BoxesRunTime.boxToLong($anonfun$new$25(tuple23));
                    }, Iterable$.MODULE$.canBuildFrom());
                }).must(() -> {
                    return this.containTheSameElementsAs(seq2, this.containTheSameElementsAs$default$2());
                });
            }, AsExecution$.MODULE$.resultAsExecution(MatchResult$.MODULE$.matchResultAsResult()));
            return this.blockExample("write to geomesa topics").in(() -> {
                scala.collection.immutable.Map<String, String> params = this.getParams("write/test");
                String str = (String) package$WithClose$.MODULE$.apply(DataStoreFinder.getDataStore((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(params).asJava()), dataStore -> {
                    dataStore.createSchema(this.sft());
                    return KafkaDataStore$.MODULE$.topic(dataStore.getSchema(this.sft().getTypeName()));
                }, IsCloseable$.MODULE$.dataStoreIsCloseable());
                Seq seq = (Seq) this.features().map(scalaSimpleFeature -> {
                    return new ConsumerRecord("input-topic", 0, new StringOps(Predef$.MODULE$.augmentString(scalaSimpleFeature.getID().replace("id", ""))).toLong(), scalaSimpleFeature.getID().getBytes(StandardCharsets.UTF_8), ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(scalaSimpleFeature.getAttributes()).asScala()).map(obj -> {
                        return (String) FastConverter$.MODULE$.convert(obj, String.class);
                    }, Buffer$.MODULE$.canBuildFrom())).mkString(",").getBytes(StandardCharsets.UTF_8));
                }, Seq$.MODULE$.canBuildFrom());
                GeoMesaStreamsBuilder apply = GeoMesaStreamsBuilder$.MODULE$.apply(params);
                apply.to(this.sft().getTypeName(), apply.wrapped().stream("input-topic", ((Consumed) Predef$.MODULE$.implicitly(ImplicitConversions$.MODULE$.consumedFromSerde(Serdes$.MODULE$.stringSerde(), Serdes$.MODULE$.stringSerde()))).withTimestampExtractor(new WallclockTimestampExtractor())).mapValues(str2 -> {
                    return GeoMesaMessage$.MODULE$.upsert(Predef$.MODULE$.wrapRefArray(str2.split(",")));
                }));
                Properties properties = new Properties();
                properties.put("application.id", "write-test");
                properties.put("bootstrap.servers", "dummy:1234");
                ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
                package$WithClose$.MODULE$.apply(new TopologyTestDriver(apply.build(), properties), topologyTestDriver -> {
                    $anonfun$new$32(seq, str, empty, topologyTestDriver);
                    return BoxedUnit.UNIT;
                }, IsCloseable$.MODULE$.closeableIsCloseable());
                return (MatchResult) package$WithClose$.MODULE$.apply(DataStoreFinder.getDataStore((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(params).asJava()), dataStore2 -> {
                    if (!(dataStore2 instanceof KafkaDataStore)) {
                        throw new MatchError(dataStore2);
                    }
                    KafkaDataStore kafkaDataStore = (KafkaDataStore) dataStore2;
                    kafkaDataStore.getFeatureReader(new Query(this.sft().getTypeName()), Transaction.AUTO_COMMIT).close();
                    package$WithClose$.MODULE$.apply(KafkaDataStore$.MODULE$.producer(kafkaDataStore.config().brokers(), Predef$.MODULE$.Map().empty()), producer -> {
                        $anonfun$new$35(empty, producer);
                        return BoxedUnit.UNIT;
                    }, IsCloseable$.MODULE$.closeableIsCloseable());
                    return (MatchResult) this.eventually(40, new package.DurationInt(package$.MODULE$.DurationInt(100)).millis(), () -> {
                        List list = SelfClosingIterator$.MODULE$.apply(kafkaDataStore.getFeatureReader(new Query(this.sft().getTypeName()), Transaction.AUTO_COMMIT)).toList();
                        return this.theValue(() -> {
                            return list;
                        }).must(() -> {
                            return this.containTheSameElementsAs(this.features(), this.containTheSameElementsAs$default$2());
                        });
                    }, MatchResult$.MODULE$.matchResultAsResult());
                }, IsCloseable$.MODULE$.dataStoreIsCloseable());
            }, AsExecution$.MODULE$.resultAsExecution(MatchResult$.MODULE$.matchResultAsResult()));
        });
    }
}
