package org.apache.kafka.connect.file;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/connect/file/FileStreamSourceTaskTest.class */
public class FileStreamSourceTaskTest extends EasyMockSupport {
    private static final String TOPIC = "test";
    private File tempFile;
    private Map<String, String> config;
    private OffsetStorageReader offsetStorageReader;
    private SourceTaskContext context;
    private FileStreamSourceTask task;
    private boolean verifyMocks = false;

    @Before
    public void setup() throws IOException {
        this.tempFile = File.createTempFile("file-stream-source-task-test", null);
        this.config = new HashMap();
        this.config.put("file", this.tempFile.getAbsolutePath());
        this.config.put("topic", TOPIC);
        this.task = new FileStreamSourceTask();
        this.offsetStorageReader = (OffsetStorageReader) createMock(OffsetStorageReader.class);
        this.context = (SourceTaskContext) createMock(SourceTaskContext.class);
        this.task.initialize(this.context);
    }

    @After
    public void teardown() {
        this.tempFile.delete();
        if (this.verifyMocks) {
            verifyAll();
        }
    }

    private void replay() {
        replayAll();
        this.verifyMocks = true;
    }

    @Test
    public void testNormalLifecycle() throws InterruptedException, IOException {
        expectOffsetLookupReturnNone();
        replay();
        this.task.start(this.config);
        FileOutputStream fileOutputStream = new FileOutputStream(this.tempFile);
        Assert.assertEquals((Object) null, this.task.poll());
        fileOutputStream.write("partial line".getBytes());
        fileOutputStream.flush();
        Assert.assertEquals((Object) null, this.task.poll());
        fileOutputStream.write(" finished\n".getBytes());
        fileOutputStream.flush();
        List poll = this.task.poll();
        Assert.assertEquals(1L, poll.size());
        Assert.assertEquals(TOPIC, ((SourceRecord) poll.get(0)).topic());
        Assert.assertEquals("partial line finished", ((SourceRecord) poll.get(0)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll.get(0)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 22L), ((SourceRecord) poll.get(0)).sourceOffset());
        Assert.assertEquals((Object) null, this.task.poll());
        fileOutputStream.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
        fileOutputStream.flush();
        List poll2 = this.task.poll();
        Assert.assertEquals(4L, poll2.size());
        Assert.assertEquals("line1", ((SourceRecord) poll2.get(0)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(0)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 28L), ((SourceRecord) poll2.get(0)).sourceOffset());
        Assert.assertEquals("line2", ((SourceRecord) poll2.get(1)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(1)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 35L), ((SourceRecord) poll2.get(1)).sourceOffset());
        Assert.assertEquals("line3", ((SourceRecord) poll2.get(2)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(2)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 41L), ((SourceRecord) poll2.get(2)).sourceOffset());
        Assert.assertEquals("line4", ((SourceRecord) poll2.get(3)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(3)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 47L), ((SourceRecord) poll2.get(3)).sourceOffset());
        fileOutputStream.write("subsequent text".getBytes());
        fileOutputStream.flush();
        List poll3 = this.task.poll();
        Assert.assertEquals(1L, poll3.size());
        Assert.assertEquals("", ((SourceRecord) poll3.get(0)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll3.get(0)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 48L), ((SourceRecord) poll3.get(0)).sourceOffset());
        fileOutputStream.close();
        this.task.stop();
    }

    @Test(expected = ConnectException.class)
    public void testMissingTopic() throws InterruptedException {
        replay();
        this.config.remove("topic");
        this.task.start(this.config);
    }

    @Test
    public void testMissingFile() throws InterruptedException {
        replay();
        System.setIn(new ByteArrayInputStream("line\n".getBytes()));
        this.config.remove("file");
        this.task.start(this.config);
        List poll = this.task.poll();
        Assert.assertEquals(1L, poll.size());
        Assert.assertEquals(TOPIC, ((SourceRecord) poll.get(0)).topic());
        Assert.assertEquals("line", ((SourceRecord) poll.get(0)).value());
        this.task.stop();
    }

    public void testInvalidFile() throws InterruptedException {
        this.config.put("file", "bogusfilename");
        this.task.start(this.config);
        for (int i = 0; i < 100; i++) {
            Assert.assertEquals((Object) null, this.task.poll());
        }
    }

    private void expectOffsetLookupReturnNone() {
        EasyMock.expect(this.context.offsetStorageReader()).andReturn(this.offsetStorageReader);
        EasyMock.expect(this.offsetStorageReader.offset((Map) EasyMock.anyObject())).andReturn((Object) null);
    }
}
