package org.locationtech.geomesa.kafka.streams;

import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.scala.ImplicitConversions$;
import org.apache.kafka.streams.scala.Serdes$;
import org.apache.kafka.streams.scala.kstream.Materialized$;
import org.geotools.data.DataStoreFinder;
import org.geotools.data.FeatureWriter;
import org.geotools.data.Query;
import org.geotools.data.Transaction;
import org.junit.runner.RunWith;
import org.locationtech.geomesa.features.ScalaSimpleFeature;
import org.locationtech.geomesa.features.ScalaSimpleFeature$;
import org.locationtech.geomesa.kafka.EmbeddedKafka;
import org.locationtech.geomesa.kafka.data.KafkaDataStore;
import org.locationtech.geomesa.kafka.data.KafkaDataStore$;
import org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest;
import org.locationtech.geomesa.utils.collection.SelfClosingIterator$;
import org.locationtech.geomesa.utils.geotools.FeatureUtils$;
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes$;
import org.locationtech.geomesa.utils.geotools.converters.FastConverter$;
import org.locationtech.geomesa.utils.io.IsCloseable$;
import org.locationtech.geomesa.utils.io.package$WithClose$;
import org.opengis.feature.simple.SimpleFeatureType;
import org.specs2.data.Sized$;
import org.specs2.matcher.MatchResult;
import org.specs2.matcher.MatchResult$;
import org.specs2.mutable.Specification;
import org.specs2.runner.JUnitRunner;
import org.specs2.specification.core.AsExecution$;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IndexedSeqOptimized;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Iterable;
import scala.collection.mutable.Iterable$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: GeoMesaStreamsBuilderTest.scala */
@RunWith(JUnitRunner.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005ee\u0001\u0002\u000b\u0016\u0001\u0001BQa\r\u0001\u0005\u0002QB\u0001b\u000e\u0001\t\u0006\u0004%\t\u0001\u000f\u0005\t\u0007\u0002A)\u0019!C\u0001\t\"I\u0001\u0004\u0001a\u0001\u0002\u0004%\tA\u0015\u0005\n/\u0002\u0001\r\u00111A\u0005\u0002aC\u0011b\u0018\u0001A\u0002\u0003\u0005\u000b\u0015B*\t\u000f\u0001\u0004!\u0019!C\u0005C\"1Q\u000f\u0001Q\u0001\n\tDQA\u001e\u0001\u0005\u0002]4A! \u0001\u0001}\"11G\u0003C\u0001\u0003[A1\"a\r\u000b\u0001\u0004\u0005\r\u0011\"\u0003\u00026!Y\u00111\t\u0006A\u0002\u0003\u0007I\u0011BA#\u0011-\tIE\u0003a\u0001\u0002\u0003\u0006K!a\u000e\t\u0013\u0005-#B1A\u0005\u0002\u00055\u0003\u0002CA2\u0015\u0001\u0006I!a\u0014\t\u000f\u0005\u0015$\u0002\"\u0011\u0002h!9\u00111\u000e\u0006\u0005B\u00055\u0004bBA<\u0015\u0011\u0005\u0013\u0011\u0010\u0002\u001a\u000f\u0016|W*Z:b'R\u0014X-Y7t\u0005VLG\u000eZ3s)\u0016\u001cHO\u0003\u0002\u0017/\u000591\u000f\u001e:fC6\u001c(B\u0001\r\u001a\u0003\u0015Y\u0017MZ6b\u0015\tQ2$A\u0004hK>lWm]1\u000b\u0005qi\u0012\u0001\u00047pG\u0006$\u0018n\u001c8uK\u000eD'\"\u0001\u0010\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0007\u0001\t\u0013\u0006\u0005\u0002#O5\t1E\u0003\u0002%K\u00059Q.\u001e;bE2,'B\u0001\u0014\u001e\u0003\u0019\u0019\b/Z2te%\u0011\u0001f\t\u0002\u000e'B,7-\u001b4jG\u0006$\u0018n\u001c8\u0011\u0005)\nT\"A\u0016\u000b\u00051j\u0013\u0001D:dC2\fGn\\4hS:<'B\u0001\u00180\u0003!!\u0018\u0010]3tC\u001a,'\"\u0001\u0019\u0002\u0007\r|W.\u0003\u00023W\ti1\u000b\u001e:jGRdunZ4j]\u001e\fa\u0001P5oSRtD#A\u001b\u0011\u0005Y\u0002Q\"A\u000b\u0002\u0007M4G/F\u0001:!\tQ\u0014)D\u0001<\u0015\taT(\u0001\u0004tS6\u0004H.\u001a\u0006\u0003}}\nqAZ3biV\u0014XM\u0003\u0002A;\u00059q\u000e]3oO&\u001c\u0018B\u0001\"<\u0005E\u0019\u0016.\u001c9mK\u001a+\u0017\r^;sKRK\b/Z\u0001\tM\u0016\fG/\u001e:fgV\tQ\tE\u0002G\u00176k\u0011a\u0012\u0006\u0003\u0011&\u000b!bY8mY\u0016\u001cG/[8o\u0015\u0005Q\u0015!B:dC2\f\u0017B\u0001'H\u0005\r\u0019V-\u001d\t\u0003\u001dBk\u0011a\u0014\u0006\u0003\u0007fI!!U(\u0003%M\u001b\u0017\r\\1TS6\u0004H.\u001a$fCR,(/Z\u000b\u0002'B\u0011A+V\u0007\u0002/%\u0011ak\u0006\u0002\u000e\u000b6\u0014W\r\u001a3fI.\u000bgm[1\u0002\u0013-\fgm[1`I\u0015\fHCA-^!\tQ6,D\u0001J\u0013\ta\u0016J\u0001\u0003V]&$\bb\u00020\u0006\u0003\u0003\u0005\raU\u0001\u0004q\u0012\n\u0014AB6bM.\f\u0007%A\u0004{WB\u000bG\u000f[:\u0016\u0003\t\u00042a\u00195k\u001b\u0005!'BA3g\u0003\u0011)H/\u001b7\u000b\u0003\u001d\fAA[1wC&\u0011\u0011\u000e\u001a\u0002\u0004'\u0016$\bCA6s\u001d\ta\u0007\u000f\u0005\u0002n\u00136\taN\u0003\u0002p?\u00051AH]8pizJ!!]%\u0002\rA\u0013X\rZ3g\u0013\t\u0019HO\u0001\u0004TiJLgn\u001a\u0006\u0003c&\u000b\u0001B_6QCRD7\u000fI\u0001\nO\u0016$\b+\u0019:b[N$\"\u0001_>\u0011\t-L(N[\u0005\u0003uR\u00141!T1q\u0011\u0015a\u0018\u00021\u0001k\u0003\u0019Q8\u000eU1uQ\nqB+[7fgR\fW\u000e]#yiJ\f7\r^5oOR\u0013\u0018M\\:g_JlWM]\n\u0005\u0015}\fY\u0001\u0005\u0003\u0002\u0002\u0005\u001dQBAA\u0002\u0015\r\t)AZ\u0001\u0005Y\u0006tw-\u0003\u0003\u0002\n\u0005\r!AB(cU\u0016\u001cG\u000fE\u0005\u0002\u000e\u0005m!.a\b\u0002&5\u0011\u0011q\u0002\u0006\u0005\u0003#\t\u0019\"A\u0004lgR\u0014X-Y7\u000b\u0007Y\t)BC\u0002\u0019\u0003/Q1!!\u0007\u001e\u0003\u0019\t\u0007/Y2iK&!\u0011QDA\b\u0005-!&/\u00198tM>\u0014X.\u001a:\u0011\u0007Y\n\t#C\u0002\u0002$U\u0011abR3p\u001b\u0016\u001c\u0018-T3tg\u0006<W\rE\u0004\u0002(\u0005%\".a\b\u000e\u0005\u0005M\u0011\u0002BA\u0016\u0003'\u0011\u0001bS3z-\u0006dW/\u001a\u000b\u0003\u0003_\u00012!!\r\u000b\u001b\u0005\u0001\u0011aB2p]R,\u0007\u0010^\u000b\u0003\u0003o\u0001B!!\u000f\u0002@5\u0011\u00111\b\u0006\u0005\u0003{\t\u0019\"A\u0005qe>\u001cWm]:pe&!\u0011\u0011IA\u001e\u0005A\u0001&o\\2fgN|'oQ8oi\u0016DH/A\u0006d_:$X\r\u001f;`I\u0015\fHcA-\u0002H!Aa,DA\u0001\u0002\u0004\t9$\u0001\u0005d_:$X\r\u001f;!\u0003)!\u0018.\\3ti\u0006l\u0007o]\u000b\u0003\u0003\u001f\u0002r!!\u0015\u0002V)\f9&\u0004\u0002\u0002T)\u0011AeR\u0005\u0004u\u0006M\u0003CBA)\u00033\ni&\u0003\u0003\u0002\\\u0005M#aC!se\u0006L()\u001e4gKJ\u00042AWA0\u0013\r\t\t'\u0013\u0002\u0005\u0019>tw-A\u0006uS6,7\u000f^1naN\u0004\u0013\u0001B5oSR$2!WA5\u0011\u001d\t\u0019$\u0005a\u0001\u0003o\t\u0011\u0002\u001e:b]N4wN]7\u0015\r\u0005\u0015\u0012qNA:\u0011\u0019\t\tH\u0005a\u0001U\u0006\u00191.Z=\t\u000f\u0005U$\u00031\u0001\u0002 \u0005)a/\u00197vK\u0006)1\r\\8tKR\t\u0011\fK\u0004\u0001\u0003{\n)(!$\u0011\t\u0005}\u0014\u0011R\u0007\u0003\u0003\u0003SA!a!\u0002\u0006\u00061!/\u001e8oKJT1!a\"\u001e\u0003\u0015QWO\\5u\u0013\u0011\tY)!!\u0003\u000fI+hnV5uQ\u000e\u0012\u0011q\u0012\t\u0005\u0003#\u000b)*\u0004\u0002\u0002\u0014*\u0019\u00111Q\u0013\n\t\u0005]\u00151\u0013\u0002\f\u0015Vs\u0017\u000e\u001e*v]:,'\u000f")
/* loaded from: input_file:org/locationtech/geomesa/kafka/streams/GeoMesaStreamsBuilderTest.class */
public class GeoMesaStreamsBuilderTest extends Specification implements StrictLogging {
    private SimpleFeatureType sft;
    private Seq<ScalaSimpleFeature> features;
    private EmbeddedKafka kafka;
    private final Set<String> zkPaths;
    private final Logger logger;
    private volatile byte bitmap$0;

    /* compiled from: GeoMesaStreamsBuilderTest.scala */
    /* loaded from: input_file:org/locationtech/geomesa/kafka/streams/GeoMesaStreamsBuilderTest$TimestampExtractingTransformer.class */
    public class TimestampExtractingTransformer implements Transformer<String, GeoMesaMessage, KeyValue<String, GeoMesaMessage>> {
        private ProcessorContext context;
        private final Map<String, ArrayBuffer<Object>> timestamps;
        public final /* synthetic */ GeoMesaStreamsBuilderTest $outer;

        private ProcessorContext context() {
            return this.context;
        }

        private void context_$eq(ProcessorContext processorContext) {
            this.context = processorContext;
        }

        public Map<String, ArrayBuffer<Object>> timestamps() {
            return this.timestamps;
        }

        public void init(ProcessorContext processorContext) {
            context_$eq(processorContext);
        }

        public KeyValue<String, GeoMesaMessage> transform(String str, GeoMesaMessage geoMesaMessage) {
            ((ArrayBuffer) timestamps().getOrElseUpdate(str, () -> {
                return ArrayBuffer$.MODULE$.empty();
            })).$plus$eq(BoxesRunTime.boxToLong(context().timestamp()));
            return new KeyValue<>(str, geoMesaMessage);
        }

        public void close() {
        }

        public /* synthetic */ GeoMesaStreamsBuilderTest org$locationtech$geomesa$kafka$streams$GeoMesaStreamsBuilderTest$TimestampExtractingTransformer$$$outer() {
            return this.$outer;
        }

        public TimestampExtractingTransformer(GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest) {
            if (geoMesaStreamsBuilderTest == null) {
                throw null;
            }
            this.$outer = geoMesaStreamsBuilderTest;
            this.timestamps = Map$.MODULE$.empty();
        }
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest] */
    private SimpleFeatureType sft$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.sft = SimpleFeatureTypes$.MODULE$.createImmutableType("streams", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326");
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.sft;
    }

    public SimpleFeatureType sft() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? sft$lzycompute() : this.sft;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest] */
    private Seq<ScalaSimpleFeature> features$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.features = Seq$.MODULE$.tabulate(10, obj -> {
                    return $anonfun$features$1(this, BoxesRunTime.unboxToInt(obj));
                });
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.features;
    }

    public Seq<ScalaSimpleFeature> features() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? features$lzycompute() : this.features;
    }

    public EmbeddedKafka kafka() {
        return this.kafka;
    }

    public void kafka_$eq(EmbeddedKafka embeddedKafka) {
        this.kafka = embeddedKafka;
    }

    private Set<String> zkPaths() {
        return this.zkPaths;
    }

    public scala.collection.immutable.Map<String, String> getParams(String str) {
        Predef$.MODULE$.require(zkPaths().add(str), () -> {
            return new StringBuilder(55).append("zk path '").append(str).append("' is reused between tests, may cause conflicts").toString();
        });
        return Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.brokers"), kafka().brokers()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.zookeepers"), kafka().zookeepers()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.topic.partitions"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.topic.replication"), "1"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.consumer.read-back"), "Inf"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("kafka.zk.path"), str)}));
    }

    public static final /* synthetic */ ScalaSimpleFeature $anonfun$features$1(GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest, int i) {
        return ScalaSimpleFeature$.MODULE$.create(geoMesaStreamsBuilderTest.sft(), new StringBuilder(2).append("id").append(i).toString(), Predef$.MODULE$.genericWrapArray(new Object[]{new StringBuilder(4).append("name").append(i).toString(), BoxesRunTime.boxToInteger(i % 2), new StringBuilder(22).append("2022-04-27T00:00:0").append(i).append(".00Z").toString(), new StringBuilder(9).append("POINT(1 ").append(i).append(")").toString()}));
    }

    public static final /* synthetic */ void $anonfun$new$5(GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest, FeatureWriter featureWriter) {
        geoMesaStreamsBuilderTest.features().foreach(scalaSimpleFeature -> {
            return FeatureUtils$.MODULE$.write(featureWriter, scalaSimpleFeature, true);
        });
    }

    public static final /* synthetic */ void $anonfun$new$7(String str, ArrayBuffer arrayBuffer, KafkaConsumer kafkaConsumer) {
        kafkaConsumer.subscribe(Collections.singleton(str));
        long currentTimeMillis = System.currentTimeMillis();
        while (arrayBuffer.lengthCompare(10) < 0 && System.currentTimeMillis() - currentTimeMillis < 10000) {
            ((IterableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(kafkaConsumer.poll(Duration.ofMillis(100L))).asScala()).foreach(consumerRecord -> {
                return arrayBuffer.$plus$eq(consumerRecord);
            });
        }
    }

    public static final /* synthetic */ void $anonfun$new$13(Seq seq, Map map, TopologyTestDriver topologyTestDriver) {
        seq.foreach(consumerRecord -> {
            topologyTestDriver.pipeInput(consumerRecord);
            return BoxedUnit.UNIT;
        });
        ProducerRecord readOutput = topologyTestDriver.readOutput("word-count", new StringDeserializer(), new LongDeserializer());
        while (true) {
            ProducerRecord producerRecord = readOutput;
            if (producerRecord == null) {
                return;
            }
            map.$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(producerRecord.key()), producerRecord.value()));
            readOutput = topologyTestDriver.readOutput("word-count", new StringDeserializer(), new LongDeserializer());
        }
    }

    public static final /* synthetic */ long $anonfun$new$21(ScalaSimpleFeature scalaSimpleFeature) {
        return ((Date) scalaSimpleFeature.getAttribute("dtg")).getTime();
    }

    public static final /* synthetic */ long $anonfun$new$26(Tuple2 tuple2) {
        return BoxesRunTime.unboxToLong(((IndexedSeqOptimized) tuple2._2()).head());
    }

    public static final /* synthetic */ void $anonfun$new$33(Seq seq, String str, ArrayBuffer arrayBuffer, TopologyTestDriver topologyTestDriver) {
        seq.foreach(consumerRecord -> {
            topologyTestDriver.pipeInput(consumerRecord);
            return BoxedUnit.UNIT;
        });
        ProducerRecord readOutput = topologyTestDriver.readOutput(str, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        while (true) {
            ProducerRecord producerRecord = readOutput;
            if (producerRecord == null) {
                return;
            }
            arrayBuffer.$plus$eq(producerRecord);
            readOutput = topologyTestDriver.readOutput(str, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        }
    }

    public static final /* synthetic */ void $anonfun$new$36(ArrayBuffer arrayBuffer, Producer producer) {
        arrayBuffer.foreach(producerRecord -> {
            return producer.send(producerRecord);
        });
    }

    public GeoMesaStreamsBuilderTest() {
        StrictLogging.$init$(this);
        step(() -> {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Starting embedded kafka/zk");
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            this.kafka_$eq(new EmbeddedKafka());
            if (!this.logger().underlying().isInfoEnabled()) {
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                this.logger().underlying().info("Started embedded kafka/zk");
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
        });
        this.zkPaths = Collections.newSetFromMap(new ConcurrentHashMap());
        blockExample("GeoMesaStreamsBuilder").should(() -> {
            this.blockExample("read from geomesa topics").in(() -> {
                scala.collection.immutable.Map<String, String> params = this.getParams("word/count");
                String str = (String) package$WithClose$.MODULE$.apply(DataStoreFinder.getDataStore((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(params).asJava()), dataStore -> {
                    dataStore.createSchema(this.sft());
                    package$WithClose$.MODULE$.apply(dataStore.getFeatureWriterAppend(this.sft().getTypeName(), Transaction.AUTO_COMMIT), featureWriter -> {
                        $anonfun$new$5(this, featureWriter);
                        return BoxedUnit.UNIT;
                    }, IsCloseable$.MODULE$.closeableIsCloseable());
                    return KafkaDataStore$.MODULE$.topic(dataStore.getSchema(this.sft().getTypeName()));
                }, IsCloseable$.MODULE$.dataStoreIsCloseable());
                Properties properties = new Properties();
                properties.put("bootstrap.servers", this.kafka().brokers());
                properties.put("auto.offset.reset", "earliest");
                properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
                properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
                properties.put("group.id", "consume-kryo-topic");
                ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
                package$WithClose$.MODULE$.apply(new KafkaConsumer(properties), kafkaConsumer -> {
                    $anonfun$new$7(str, empty, kafkaConsumer);
                    return BoxedUnit.UNIT;
                }, IsCloseable$.MODULE$.closeableIsCloseable());
                final TimestampExtractingTransformer timestampExtractingTransformer = new TimestampExtractingTransformer(this);
                GeoMesaStreamsBuilder apply = GeoMesaStreamsBuilder$.MODULE$.apply(params);
                final GeoMesaStreamsBuilderTest geoMesaStreamsBuilderTest = null;
                apply.stream(this.sft().getTypeName()).transform(new TransformerSupplier<String, GeoMesaMessage, KeyValue<String, GeoMesaMessage>>(geoMesaStreamsBuilderTest, timestampExtractingTransformer) { // from class: org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilderTest$$anon$1
                    private final GeoMesaStreamsBuilderTest.TimestampExtractingTransformer timestampExtractor$1;

                    public Transformer<String, GeoMesaMessage, KeyValue<String, GeoMesaMessage>> get() {
                        return this.timestampExtractor$1;
                    }

                    {
                        this.timestampExtractor$1 = timestampExtractingTransformer;
                    }
                }, Predef$.MODULE$.wrapRefArray(new String[0])).map((str2, geoMesaMessage) -> {
                    return new Tuple2(str2, ((TraversableOnce) geoMesaMessage.attributes().map(obj -> {
                        return obj.toString().replaceAll(" ", "_");
                    }, Seq$.MODULE$.canBuildFrom())).mkString(" "));
                }).flatMapValues(str3 -> {
                    return Predef$.MODULE$.wrapRefArray(str3.split(" +"));
                }).groupBy((str4, str5) -> {
                    return str5;
                }, ImplicitConversions$.MODULE$.groupedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.String())).count(Materialized$.MODULE$.as("counts-store", Serdes$.MODULE$.String(), Serdes$.MODULE$.Long())).toStream().to("word-count", ImplicitConversions$.MODULE$.producedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.Long()));
                Properties properties2 = new Properties();
                properties2.put("application.id", "word-count-test");
                properties2.put("bootstrap.servers", "dummy:1234");
                Map empty2 = Map$.MODULE$.empty();
                package$WithClose$.MODULE$.apply(new TopologyTestDriver(apply.build(), properties2), topologyTestDriver -> {
                    $anonfun$new$13(empty, empty2, topologyTestDriver);
                    return BoxedUnit.UNIT;
                }, IsCloseable$.MODULE$.closeableIsCloseable());
                scala.collection.immutable.Map map = (scala.collection.immutable.Map) ((TraversableLike) this.features().flatMap(scalaSimpleFeature -> {
                    return (Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(scalaSimpleFeature.getAttributes()).asScala()).map(obj -> {
                        return obj.toString().replaceAll(" ", "_");
                    }, Buffer$.MODULE$.canBuildFrom());
                }, Seq$.MODULE$.canBuildFrom())).groupBy(str6 -> {
                    return str6;
                }).map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((String) tuple2._1()), BoxesRunTime.boxToLong(((Seq) tuple2._2()).length()));
                }, scala.collection.immutable.Map$.MODULE$.canBuildFrom());
                this.theValue(() -> {
                    return empty2;
                }).mustEqual(() -> {
                    return map;
                });
                Seq seq = (Seq) this.features().map(scalaSimpleFeature2 -> {
                    return BoxesRunTime.boxToLong($anonfun$new$21(scalaSimpleFeature2));
                }, Seq$.MODULE$.canBuildFrom());
                this.foreach(timestampExtractingTransformer.timestamps(), tuple22 -> {
                    return this.theValue(() -> {
                        return (ArrayBuffer) tuple22._2();
                    }).must(() -> {
                        return this.haveLength(1, Sized$.MODULE$.scalaTraversableIsSized());
                    });
                }, MatchResult$.MODULE$.matchResultAsResult());
                return this.theValue(() -> {
                    return (Iterable) timestampExtractingTransformer.timestamps().map(tuple23 -> {
                        return BoxesRunTime.boxToLong($anonfun$new$26(tuple23));
                    }, Iterable$.MODULE$.canBuildFrom());
                }).must(() -> {
                    return this.containTheSameElementsAs(seq, this.containTheSameElementsAs$default$2());
                });
            }, AsExecution$.MODULE$.resultAsExecution(MatchResult$.MODULE$.matchResultAsResult()));
            return this.blockExample("write to geomesa topics").in(() -> {
                scala.collection.immutable.Map<String, String> params = this.getParams("write/test");
                String str = (String) package$WithClose$.MODULE$.apply(DataStoreFinder.getDataStore((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(params).asJava()), dataStore -> {
                    dataStore.createSchema(this.sft());
                    return KafkaDataStore$.MODULE$.topic(dataStore.getSchema(this.sft().getTypeName()));
                }, IsCloseable$.MODULE$.dataStoreIsCloseable());
                Seq seq = (Seq) this.features().map(scalaSimpleFeature -> {
                    return new ConsumerRecord("input-topic", 0, new StringOps(Predef$.MODULE$.augmentString(scalaSimpleFeature.getID().replace("id", ""))).toLong(), scalaSimpleFeature.getID().getBytes(StandardCharsets.UTF_8), ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(scalaSimpleFeature.getAttributes()).asScala()).map(obj -> {
                        return (String) FastConverter$.MODULE$.convert(obj, String.class);
                    }, Buffer$.MODULE$.canBuildFrom())).mkString(",").getBytes(StandardCharsets.UTF_8));
                }, Seq$.MODULE$.canBuildFrom());
                GeoMesaStreamsBuilder apply = GeoMesaStreamsBuilder$.MODULE$.apply(params);
                apply.to(this.sft().getTypeName(), apply.wrapped().stream("input-topic", ((Consumed) Predef$.MODULE$.implicitly(ImplicitConversions$.MODULE$.consumedFromSerde(Serdes$.MODULE$.String(), Serdes$.MODULE$.String()))).withTimestampExtractor(new WallclockTimestampExtractor())).mapValues(str2 -> {
                    return GeoMesaMessage$.MODULE$.upsert(Predef$.MODULE$.wrapRefArray(str2.split(",")));
                }));
                Properties properties = new Properties();
                properties.put("application.id", "write-test");
                properties.put("bootstrap.servers", "dummy:1234");
                ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
                package$WithClose$.MODULE$.apply(new TopologyTestDriver(apply.build(), properties), topologyTestDriver -> {
                    $anonfun$new$33(seq, str, empty, topologyTestDriver);
                    return BoxedUnit.UNIT;
                }, IsCloseable$.MODULE$.closeableIsCloseable());
                return (MatchResult) package$WithClose$.MODULE$.apply(DataStoreFinder.getDataStore((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(params).asJava()), dataStore2 -> {
                    if (!(dataStore2 instanceof KafkaDataStore)) {
                        throw new MatchError(dataStore2);
                    }
                    KafkaDataStore kafkaDataStore = (KafkaDataStore) dataStore2;
                    kafkaDataStore.getFeatureReader(new Query(this.sft().getTypeName()), Transaction.AUTO_COMMIT).close();
                    package$WithClose$.MODULE$.apply(KafkaDataStore$.MODULE$.producer(kafkaDataStore.config().brokers(), Predef$.MODULE$.Map().empty()), producer -> {
                        $anonfun$new$36(empty, producer);
                        return BoxedUnit.UNIT;
                    }, IsCloseable$.MODULE$.closeableIsCloseable());
                    return (MatchResult) this.eventually(40, new package.DurationInt(package$.MODULE$.DurationInt(100)).millis(), () -> {
                        List list = SelfClosingIterator$.MODULE$.apply(kafkaDataStore.getFeatureReader(new Query(this.sft().getTypeName()), Transaction.AUTO_COMMIT)).toList();
                        return this.theValue(() -> {
                            return list;
                        }).must(() -> {
                            return this.containTheSameElementsAs(this.features(), this.containTheSameElementsAs$default$2());
                        });
                    }, MatchResult$.MODULE$.matchResultAsResult());
                }, IsCloseable$.MODULE$.dataStoreIsCloseable());
            }, AsExecution$.MODULE$.resultAsExecution(MatchResult$.MODULE$.matchResultAsResult()));
        });
        step(() -> {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info("Stopping embedded kafka/zk");
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            this.kafka().close();
            if (!this.logger().underlying().isInfoEnabled()) {
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                this.logger().underlying().info("Stopped embedded kafka/zk");
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
        });
    }
}
