Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spooling pruning tests #23651

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ private void prune()
}

@VisibleForTesting
void pruneExpiredBefore(Instant expiredBefore)
long pruneExpiredBefore(Instant expiredBefore)
{
if (closed) {
return;
return 0;
}
long pruned = 0;

try {
List<Location> expiredSegments = new ArrayList<>();
FileIterator iterator = orderDetectingIterator(fileSystem.listFiles(location));
Expand All @@ -109,20 +111,24 @@ void pruneExpiredBefore(Instant expiredBefore)
if (handle.get().isBefore(expiredBefore)) {
expiredSegments.add(file.location());
if (expiredSegments.size() >= batchSize) {
pruned += expiredSegments.size();
pruneExpiredSegments(expiredBefore, expiredSegments);
expiredSegments.clear();
}
}
else if (filesAreOrdered) {
// First non expired segment was found, no need to check the rest
// since we know that files are lexicographically ordered.
pruneExpiredSegments(expiredBefore, expiredSegments);
return;
return pruned + expiredSegments.size();
}
}
pruneExpiredSegments(expiredBefore, expiredSegments);
return pruned + expiredSegments.size();
}
catch (IOException e) {
log.error(e, "Failed to prune segments");
return pruned;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public record FileSystemSpooledSegmentHandle(
Optional<EncryptionKey> encryptionKey)
implements SpooledSegmentHandle
{
private static final String OBJECT_NAME_SEPARATOR = "::";
private static final String OBJECT_NAME_SEPARATOR = "-";

public FileSystemSpooledSegmentHandle
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,13 @@
package io.trino.spooling.filesystem;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.s3.S3FileSystemConfig;
import io.trino.filesystem.s3.S3FileSystemFactory;
import io.trino.filesystem.s3.S3FileSystemStats;
import io.trino.filesystem.memory.MemoryFileSystem;
import io.trino.spi.QueryId;
import io.trino.spi.protocol.SpoolingContext;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.testing.containers.Minio;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

Expand All @@ -37,13 +29,10 @@
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;

import static io.opentelemetry.api.OpenTelemetry.noop;
import static io.trino.testing.containers.Minio.MINIO_REGION;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.temporal.ChronoUnit.MILLIS;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -52,40 +41,54 @@
@TestInstance(PER_CLASS)
class TestFileSystemSegmentPruner
{
private Minio minio;
private static final String BUCKET_NAME = "segments" + UUID.randomUUID().toString().replace("-", "");
private static final Location LOCATION = Location.of("s3://" + BUCKET_NAME + "/");
private static final String TEST_LOCATION = "memory://";

@BeforeAll
public void setup()
{
minio = Minio.builder().build();
minio.start();
minio.createBucket(BUCKET_NAME);
}
private static final FileSystemSpoolingConfig SPOOLING_CONFIG = new FileSystemSpoolingConfig()
.setLocation(TEST_LOCATION)
.setPruningBatchSize(1);

@AfterAll
public void teardown()
@Test
public void shouldPruneExpiredSegments()
{
minio.stop();
MemoryFileSystem fileSystem = new MemoryFileSystem();
try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) {
FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService);

Instant now = Instant.now();
QueryId queryId = QueryId.valueOf("prune_expired");

Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1));
Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1));

pruner.pruneExpiredBefore(now.truncatedTo(MILLIS));

List<Location> files = listFiles(fileSystem, queryId);
assertThat(files)
.hasSize(1)
.containsOnly(nonExpiredSegment);
}
}

@Test
public void shouldPruneExpiredSegments()
public void shouldPruneExpiredSegmentsOnceAndClear()
{
MemoryFileSystem fileSystem = new MemoryFileSystem();
try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) {
TrinoFileSystemFactory fileSystemFactory = getFileSystemFactory();
FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(getSpoolingConfig(), fileSystemFactory, executorService);
FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService);

Instant now = Instant.now();
QueryId queryId = QueryId.valueOf("prune_expired");

Location _ = writeNewDummySegment(fileSystemFactory, queryId, now.minusSeconds(1));
Location nonExpiredSegment = writeNewDummySegment(fileSystemFactory, queryId, now.plusSeconds(1));
Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1));
Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1));
Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1));

pruner.pruneExpiredBefore(now.truncatedTo(MILLIS));
Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1));

List<Location> files = listFiles(fileSystemFactory, queryId);
assertThat(pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)))
.isEqualTo(3);

List<Location> files = listFiles(fileSystem, queryId);
assertThat(files)
.hasSize(1)
.containsOnly(nonExpiredSegment);
Expand All @@ -95,20 +98,20 @@ public void shouldPruneExpiredSegments()
@Test
public void shouldNotPruneLiveSegments()
{
MemoryFileSystem fileSystem = new MemoryFileSystem();
try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) {
TrinoFileSystemFactory fileSystemFactory = getFileSystemFactory();
FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(getSpoolingConfig(), fileSystemFactory, executorService);
FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService);

Instant now = Instant.now();

QueryId queryId = QueryId.valueOf("prune_live");

Location _ = writeNewDummySegment(fileSystemFactory, queryId, now.plusSeconds(1));
Location _ = writeNewDummySegment(fileSystemFactory, queryId, now.plusSeconds(2));
Location _ = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1));
Location _ = createNewDummySegment(fileSystem, queryId, now.plusSeconds(2));

pruner.pruneExpiredBefore(now.truncatedTo(MILLIS));

List<Location> files = listFiles(fileSystemFactory, queryId);
List<Location> files = listFiles(fileSystem, queryId);
assertThat(files)
.hasSize(2);
}
Expand All @@ -117,57 +120,46 @@ public void shouldNotPruneLiveSegments()
@Test
public void shouldNotPruneSegmentsIfNotStrictlyBeforeExpiration()
{
TrinoFileSystem memoryFileSystem = new MemoryFileSystem();
try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) {
TrinoFileSystemFactory fileSystemFactory = getFileSystemFactory();
FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(getSpoolingConfig(), fileSystemFactory, executorService);
FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> memoryFileSystem, executorService);

Instant now = Instant.now();

QueryId queryId = QueryId.valueOf("prune_now");

Location firstSegment = writeNewDummySegment(fileSystemFactory, queryId, now);
Location secondSegment = writeNewDummySegment(fileSystemFactory, queryId, now);
Location firstSegment = createNewDummySegment(memoryFileSystem, queryId, now);
Location secondSegment = createNewDummySegment(memoryFileSystem, queryId, now);

pruner.pruneExpiredBefore(now.truncatedTo(MILLIS));

List<Location> files = listFiles(fileSystemFactory, queryId);
List<Location> files = listFiles(memoryFileSystem, queryId);
assertThat(files)
.hasSize(2)
.containsOnly(firstSegment, secondSegment);
}
}

private TrinoFileSystemFactory getFileSystemFactory()
{
S3FileSystemConfig filesystemConfig = new S3FileSystemConfig()
.setEndpoint(minio.getMinioAddress())
.setRegion(MINIO_REGION)
.setPathStyleAccess(true)
.setAwsAccessKey(Minio.MINIO_ACCESS_KEY)
.setAwsSecretKey(Minio.MINIO_SECRET_KEY)
.setStreamingPartSize(DataSize.valueOf("5.5MB"));
return new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats());
}

private Location writeNewDummySegment(TrinoFileSystemFactory fileSystemFactory, QueryId queryId, Instant ttl)
private Location createNewDummySegment(TrinoFileSystem fileSystem, QueryId queryId, Instant ttl)
{
SpoolingContext context = new SpoolingContext("encoding", queryId, 100, 1000);
FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(ThreadLocalRandom.current(), context, ttl);
try (OutputStream segment = createFileSystem(fileSystemFactory).newOutputFile(LOCATION.appendPath(handle.storageObjectName())).create()) {
segment.write("dummy".getBytes(UTF_8));
return LOCATION.appendPath(handle.storageObjectName());
Location location = Location.of(TEST_LOCATION).appendPath(handle.storageObjectName());
try (OutputStream stream = fileSystem.newOutputFile(location).create()) {
stream.write("dummy".getBytes(UTF_8));
return location;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private List<Location> listFiles(TrinoFileSystemFactory fileSystemFactory, QueryId queryId)
private List<Location> listFiles(TrinoFileSystem fileSystem, QueryId queryId)
{
ImmutableList.Builder<Location> files = ImmutableList.builder();

try {
FileIterator iterator = createFileSystem(fileSystemFactory).listFiles(LOCATION);
FileIterator iterator = fileSystem.listFiles(Location.of(TEST_LOCATION));
while (iterator.hasNext()) {
FileEntry entry = iterator.next();
if (entry.location().fileName().endsWith(queryId.toString())) {
Expand All @@ -180,16 +172,4 @@ private List<Location> listFiles(TrinoFileSystemFactory fileSystemFactory, Query
throw new UncheckedIOException(e);
}
}

private TrinoFileSystem createFileSystem(TrinoFileSystemFactory fileSystemFactory)
{
return fileSystemFactory.create(ConnectorIdentity.ofUser("ignored"));
}

private FileSystemSpoolingConfig getSpoolingConfig()
{
return new FileSystemSpoolingConfig()
.setS3Enabled(true)
.setLocation(LOCATION.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testStorageObjectNameStability()
Instant expireAt = Instant.ofEpochMilli(90000);
FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(new NotARandomAtAll(), context, expireAt);
assertThat(handle.storageObjectName())
.isEqualTo("0000002QWG0G2081040G208104::query_id");
.isEqualTo("0000002QWG0G2081040G208104-query_id");
}

@Test
Expand Down