package co.cask.cdap.data.stream;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Iterator;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/StreamFileJanitor.class */
public final class StreamFileJanitor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamFileJanitor.class);
    private final StreamAdmin streamAdmin;
    private final Location streamBaseLocation;

    @Inject
    public StreamFileJanitor(CConfiguration cConfiguration, StreamAdmin streamAdmin, LocationFactory locationFactory) {
        this.streamAdmin = streamAdmin;
        this.streamBaseLocation = locationFactory.create(cConfiguration.get("stream.base.dir"));
    }

    public void cleanAll() throws IOException {
        if (this.streamBaseLocation.exists()) {
            Iterator it = this.streamBaseLocation.list().iterator();
            while (it.hasNext()) {
                clean(this.streamAdmin.getConfig(((Location) it.next()).getName()), System.currentTimeMillis());
            }
        }
    }

    @VisibleForTesting
    void clean(StreamConfig streamConfig, long j) throws IOException {
        LOG.debug("Cleanup stream file for {}", streamConfig);
        int generation = StreamUtils.getGeneration(streamConfig);
        for (int i = 0; i < generation; i++) {
            Location createGenerationLocation = StreamUtils.createGenerationLocation(streamConfig.getLocation(), i);
            if (createGenerationLocation.equals(streamConfig.getLocation())) {
                for (Location location : createGenerationLocation.list()) {
                    if (isPartitionDirector(location)) {
                        location.delete(true);
                    }
                }
            } else {
                createGenerationLocation.delete(true);
            }
        }
        long ttl = j - streamConfig.getTTL();
        for (Location location2 : StreamUtils.createGenerationLocation(streamConfig.getLocation(), generation).list()) {
            if (isPartitionDirector(location2) && StreamUtils.getPartitionEndTime(location2.getName()) < ttl) {
                location2.delete(true);
            }
        }
    }

    private boolean isPartitionDirector(Location location) throws IOException {
        return location.isDirectory() && location.getName().indexOf(46) > 0;
    }
}
