package com.exasol.bucketfs;

import com.exasol.clusterlogs.LogPatternDetector;
import com.exasol.clusterlogs.LogPatternDetectorFactory;
import com.exasol.containers.ExasolContainerConstants;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalField;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/exasol/bucketfs/Bucket.class */
public class Bucket {
    private static final Logger LOGGER = LoggerFactory.getLogger(Bucket.class);
    private static final String BUCKET_ROOT = "";
    private static final long BUCKET_SYNC_TIMEOUT_IN_MILLISECONDS = 60000;
    private static final long FILE_SYNC_POLLING_DELAY_IN_MILLISECONDS = 200;
    private final String bucketFsName;
    private final String bucketName;
    private final String ipAddress;
    private final int port;
    private final String readPassword;
    private final String writePassword;
    private final LogPatternDetectorFactory detectorFactory;
    private final HttpClient client = HttpClient.newBuilder().build();
    private final Map<String, Instant> uploadHistory = new HashMap();

    /* loaded from: input_file:com/exasol/bucketfs/Bucket$Builder.class */
    public static class Builder {
        private String bucketFsName;
        private String bucketName;
        private String ipAddress;
        private int port;
        private String readPassword;
        private String writePassword;
        private LogPatternDetectorFactory detectorFactory;

        public Builder detectorFactory(LogPatternDetectorFactory logPatternDetectorFactory) {
            this.detectorFactory = logPatternDetectorFactory;
            return this;
        }

        public Builder serviceName(String str) {
            this.bucketFsName = str;
            return this;
        }

        public Builder name(String str) {
            this.bucketName = str;
            return this;
        }

        public Builder ipAddress(String str) {
            this.ipAddress = str;
            return this;
        }

        public Builder httpPort(int i) {
            this.port = i;
            return this;
        }

        public Builder readPassword(String str) {
            this.readPassword = str;
            return this;
        }

        public Builder writePassword(String str) {
            this.writePassword = str;
            return this;
        }

        public Bucket build() {
            return new Bucket(this);
        }
    }

    private Bucket(Builder builder) {
        this.bucketFsName = builder.bucketFsName;
        this.bucketName = builder.bucketName;
        this.ipAddress = builder.ipAddress;
        this.port = builder.port;
        this.readPassword = builder.readPassword;
        this.writePassword = builder.writePassword;
        this.detectorFactory = builder.detectorFactory;
    }

    public String getBucketFsName() {
        return this.bucketFsName;
    }

    public String getBucketName() {
        return this.bucketName;
    }

    public String getReadPassword() {
        return this.readPassword;
    }

    public String getWritePassword() {
        return this.writePassword;
    }

    public List<String> listContents() throws BucketAccessException, InterruptedException {
        return listContents(BUCKET_ROOT);
    }

    public List<String> listContents(String str) throws BucketAccessException, InterruptedException {
        URI createPublicReadURI = createPublicReadURI(BUCKET_ROOT);
        LOGGER.debug("Listing contents of bucket under URI \"{}\"", createPublicReadURI);
        try {
            HttpResponse<String> send = this.client.send(HttpRequest.newBuilder(createPublicReadURI).build(), HttpResponse.BodyHandlers.ofString());
            if (send.statusCode() == 200) {
                return parseContentListResponseBody(send, removeLeadingSlash(str));
            }
            throw new BucketAccessException("Unable to list contents of bucket.", send.statusCode(), createPublicReadURI);
        } catch (IOException e) {
            throw new BucketAccessException("Unable to list contents of bucket.", createPublicReadURI, e);
        }
    }

    public boolean isObjectSynchronized(String str, Instant instant) throws InterruptedException, BucketAccessException {
        try {
            return createBucketLogPatternDetector(str).isPatternPresentAfter(instant);
        } catch (IOException e) {
            throw new BucketAccessException("Unable to check if object \"" + str + "\" is synchronized in bucket \"" + this.bucketFsName + "/" + this.bucketName + "\".", e);
        }
    }

    private LogPatternDetector createBucketLogPatternDetector(String str) {
        return this.detectorFactory.createLogPatternDetector(ExasolContainerConstants.EXASOL_CORE_DAEMON_LOGS_PATH, ExasolContainerConstants.BUCKETFS_DAEMON_LOG_FILENAME_PATTERN, str + ".*" + (isSupportedArchiveFormat(str) ? "extracted" : "linked"));
    }

    private String removeLeadingSlash(String str) {
        return str.startsWith(BucketConstants.PATH_SEPARATOR) ? str.substring(1) : str;
    }

    private URI createPublicReadURI(String str) {
        return URI.create("http://" + this.ipAddress + ":" + this.port + "/" + this.bucketName + "/" + removeLeadingSlash(str));
    }

    private List<String> parseContentListResponseBody(HttpResponse<String> httpResponse, String str) {
        String[] split = ((String) httpResponse.body()).split("\\s+");
        ArrayList arrayList = new ArrayList(split.length);
        for (String str2 : split) {
            String removeLeadingSlash = removeLeadingSlash(str2);
            if (removeLeadingSlash.startsWith(str)) {
                arrayList.add(extractFirstPathComponent(removeLeadingSlash.substring(str.length(), removeLeadingSlash.length())));
            }
        }
        return arrayList;
    }

    private String extractFirstPathComponent(String str) {
        return str.contains(BucketConstants.PATH_SEPARATOR) ? str.substring(0, str.indexOf(47)) : str;
    }

    public void uploadFile(Path path, String str) throws InterruptedException, BucketAccessException, TimeoutException {
        uploadFile(path, str, true);
    }

    public void uploadFile(Path path, String str, boolean z) throws InterruptedException, BucketAccessException, TimeoutException {
        String extendPathInBucketDownToFilename = extendPathInBucketDownToFilename(path, str);
        try {
            uploadContent(HttpRequest.BodyPublishers.ofFile(path), extendPathInBucketDownToFilename, "file " + extendPathInBucketDownToFilename, z);
        } catch (IOException e) {
            throw new BucketAccessException("I/O failed to open file \"" + path + "\" for upload to BucketFS.", e);
        }
    }

    private void delayRepeatedUploadToSamePath(String str) throws InterruptedException {
        if (this.uploadHistory.containsKey(str)) {
            Instant with = this.uploadHistory.get(str).with((TemporalField) ChronoField.NANO_OF_SECOND, 0L);
            if (Instant.now().isAfter(with.plusSeconds(1L))) {
                return;
            }
            long nano = 1000 - (r0.getNano() / 1000000);
            LOGGER.debug("Delaying upload for {} ms", Long.valueOf(nano));
            Thread.sleep(nano);
        }
    }

    private String extendPathInBucketDownToFilename(Path path, String str) {
        return str.endsWith(BucketConstants.PATH_SEPARATOR) ? str + path.getFileName() : str;
    }

    private void uploadContent(HttpRequest.BodyPublisher bodyPublisher, String str, String str2, boolean z) throws InterruptedException, BucketAccessException, TimeoutException {
        if (z) {
            delayRepeatedUploadToSamePath(str);
        }
        long currentTimeMillis = System.currentTimeMillis();
        uploadContentNonBlocking(bodyPublisher, str, str2);
        if (z) {
            waitForFileToBeSynchronized(str, currentTimeMillis);
        }
    }

    private void uploadContentNonBlocking(HttpRequest.BodyPublisher bodyPublisher, String str, String str2) throws InterruptedException, BucketAccessException {
        URI createWriteUri = createWriteUri(str);
        LOGGER.debug("Uploading \"{}\" to bucket \"{}/{}\": \"{}\"", new Object[]{str2, this.bucketFsName, this.bucketName, createWriteUri});
        try {
            int httpPut = httpPut(createWriteUri, bodyPublisher);
            if (httpPut != 200) {
                LOGGER.error("{}: Failed to upload \"{}\" to \"{}\"", new Object[]{Integer.valueOf(httpPut), str2, createWriteUri});
                throw new BucketAccessException("Unable to upload file \"" + str2 + "\" to ", httpPut, createWriteUri);
            }
            LOGGER.debug("Successfully uploaded to \"{}\"", createWriteUri);
        } catch (IOException e) {
            throw new BucketAccessException("I/O error trying to upload \"" + str2 + "\" to ", createWriteUri, e);
        }
    }

    private URI createWriteUri(String str) throws BucketAccessException {
        try {
            return new URI("http", null, this.ipAddress, this.port, "/" + this.bucketName + "/" + str, null, null).normalize();
        } catch (URISyntaxException e) {
            throw new BucketAccessException("Unable to create write URI.", e);
        }
    }

    private int httpPut(URI uri, HttpRequest.BodyPublisher bodyPublisher) throws IOException, InterruptedException {
        return this.client.send(HttpRequest.newBuilder(uri).PUT(bodyPublisher).header("Authorization", encodeBasicAuth(true)).build(), HttpResponse.BodyHandlers.ofString()).statusCode();
    }

    private String encodeBasicAuth(boolean z) {
        return "Basic " + Base64.getEncoder().encodeToString((z ? "w:" + this.writePassword : "r:" + this.readPassword).getBytes());
    }

    public void uploadStringContent(String str, String str2) throws InterruptedException, BucketAccessException, TimeoutException {
        uploadStringContent(str, str2, true);
    }

    public void uploadStringContent(String str, String str2, boolean z) throws InterruptedException, BucketAccessException, TimeoutException {
        uploadContent(HttpRequest.BodyPublishers.ofString(str), str2, "text " + (str.length() > 20 ? str.substring(0, 20) + "..." : str), z);
    }

    public void uploadInputStream(Supplier<InputStream> supplier, String str) throws InterruptedException, BucketAccessException, TimeoutException {
        uploadInputStream(supplier, str, true);
    }

    public void uploadInputStream(Supplier<InputStream> supplier, String str, boolean z) throws InterruptedException, BucketAccessException, TimeoutException {
        uploadContentNonBlocking(HttpRequest.BodyPublishers.ofInputStream(supplier), str, "input stream");
    }

    private boolean isSupportedArchiveFormat(String str) {
        Iterator<String> it = ExasolContainerConstants.SUPPORTED_ARCHIVE_EXTENSIONS.iterator();
        while (it.hasNext()) {
            if (str.endsWith(it.next())) {
                return true;
            }
        }
        return false;
    }

    private void waitForFileToBeSynchronized(String str, long j) throws InterruptedException, TimeoutException, BucketAccessException {
        long j2 = j + BUCKET_SYNC_TIMEOUT_IN_MILLISECONDS;
        LogPatternDetector createBucketLogPatternDetector = createBucketLogPatternDetector(str);
        while (System.currentTimeMillis() < j2) {
            try {
                if (createBucketLogPatternDetector.isPatternPresentAfter(Instant.ofEpochMilli(j))) {
                    return;
                } else {
                    Thread.sleep(FILE_SYNC_POLLING_DELAY_IN_MILLISECONDS);
                }
            } catch (IOException e) {
                throw new BucketAccessException("I/O exception while checking logs for bucket object is synchronization.", e);
            }
        }
        throw new TimeoutException("Timeout waiting for object \"" + str + "\"to be synchronized in bucket \"" + this.bucketFsName + "/" + this.bucketName + "\".");
    }

    public void downloadFile(String str, Path path) throws InterruptedException, BucketAccessException {
        URI createPublicReadURI = createPublicReadURI(str);
        LOGGER.debug("Downloading  file from bucket \"{}/{}\": \"{}\" to \"{}\"", new Object[]{this.bucketFsName, this.bucketName, createPublicReadURI, path});
        try {
            int httpGet = httpGet(createPublicReadURI, path);
            if (httpGet != 200) {
                LOGGER.error("{}: Failed to download \"{}\" to file \"{}\"", new Object[]{Integer.valueOf(httpGet), createPublicReadURI, path});
                throw new BucketAccessException("Unable to downolad file \"" + path + "\" from ", httpGet, createPublicReadURI);
            }
            LOGGER.debug("Successfully downloaded file to \"{}\"", path);
        } catch (IOException e) {
            throw new BucketAccessException("Unable to upload file \"" + path + "\" from ", createPublicReadURI, e);
        }
    }

    private int httpGet(URI uri, Path path) throws IOException, InterruptedException {
        HttpResponse send = this.client.send(HttpRequest.newBuilder(uri).GET().header("Authorization", encodeBasicAuth(true)).build(), HttpResponse.BodyHandlers.ofString());
        Files.write(path, ((String) send.body()).getBytes(), new OpenOption[0]);
        return send.statusCode();
    }

    public static Builder builder() {
        return new Builder();
    }
}
