Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public class SegmentLoaderConfig
@JsonProperty("virtualStorageLoadThreads")
private int virtualStorageLoadThreads = 2 * runtimeInfo.getAvailableProcessors();

/**
* When enabled, weakly-held cache entries are evicted immediately upon release of all holds, rather than
* waiting for space pressure to trigger eviction. This setting is not intended to be configured directly by
* administrators. Instead, it is expected to be set when appropriate via {@link #setVirtualStorage}.
*/
@JsonProperty("virtualStorageFabricEvictImmediatelyOnHoldRelease")
private boolean virtualStorageFabricEvictImmediatelyOnHoldRelease = false;

private long combinedMaxSize = 0;

public List<StorageLocationConfig> getLocations()
Expand Down Expand Up @@ -154,6 +162,11 @@ public int getVirtualStorageLoadThreads()
return virtualStorageLoadThreads;
}

public boolean isVirtualStorageFabricEvictImmediatelyOnHoldRelease()
{
return virtualStorageFabricEvictImmediatelyOnHoldRelease;
}

public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
{
SegmentLoaderConfig retVal = new SegmentLoaderConfig();
Expand All @@ -163,6 +176,19 @@ public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
return retVal;
}

/**
* Sets {@link #virtualStorage} and {@link #virtualStorageFabricEvictImmediatelyOnHoldRelease}.
*/
public SegmentLoaderConfig setVirtualStorage(
boolean virtualStorage,
boolean virtualStorageFabricEvictImmediatelyOnHoldRelease
)
{
this.virtualStorage = virtualStorage;
this.virtualStorageFabricEvictImmediatelyOnHoldRelease = virtualStorageFabricEvictImmediatelyOnHoldRelease;
return this;
}

/**
* Convert a list of {@link StorageLocationConfig} objects to {@link StorageLocation} objects.
* <p>
Expand Down Expand Up @@ -195,6 +221,7 @@ public String toString()
", statusQueueMaxSize=" + statusQueueMaxSize +
", useVirtualStorageFabric=" + virtualStorage +
", virtualStorageFabricLoadThreads=" + virtualStorageLoadThreads +
", virtualStorageFabricEvictImmediatelyOnHoldRelease=" + virtualStorageFabricEvictImmediatelyOnHoldRelease +
", combinedMaxSize=" + combinedMaxSize +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public SegmentLocalCacheManager(
if (config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() > 0) {
throw DruidException.defensive("Invalid configuration: virtualStorage is incompatible with numThreadsToLoadSegmentsIntoPageCacheOnBootstrap");
}
if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
for (StorageLocation location : locations) {
location.setEvictImmediatelyOnHoldRelease(true);
}
}
virtualStorageLoadOnDemandExec =
MoreExecutors.listeningDecorator(
// probably replace this with virtual threads once minimum version is java 21
Expand Down Expand Up @@ -333,21 +338,20 @@ public void storeInfoFile(final DataSegment segment) throws IOException
@Override
public void removeInfoFile(final DataSegment segment)
{
final Runnable delete = () -> deleteSegmentInfoFile(segment);
final SegmentCacheEntryIdentifier entryId = new SegmentCacheEntryIdentifier(segment.getId());
boolean isCached = false;
// defer deleting until the unmount operation of the cache entry, if possible, so that if the process stops before
// the segment files are deleted, they can be properly managed on startup (since the info entry still exists)
for (StorageLocation location : locations) {
final SegmentCacheEntry cacheEntry = location.getCacheEntry(entryId);
if (cacheEntry != null) {
isCached = isCached || cacheEntry.setOnUnmount(delete);
isCached = isCached || cacheEntry.setDeleteInfoFileOnUnmount();
}
}

// otherwise we are probably deleting for cleanup reasons, so try it anyway if it wasn't present in any location
if (!isCached) {
delete.run();
deleteSegmentInfoFile(segment);
}
}

Expand Down Expand Up @@ -430,7 +434,7 @@ public AcquireSegmentAction acquireSegment(final DataSegment dataSegment) throws
jsonMapper.writeValue(out, dataSegment);
return null;
});
hold.getEntry().setOnUnmount(() -> deleteSegmentInfoFile(dataSegment));
hold.getEntry().setDeleteInfoFileOnUnmount();
}

return new AcquireSegmentAction(
Expand Down Expand Up @@ -492,6 +496,11 @@ private AcquireSegmentAction acquireExistingSegment(SegmentCacheEntryIdentifier
public void load(final DataSegment dataSegment) throws SegmentLoadingException
{
if (config.isVirtualStorage()) {
if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
throw DruidException.defensive(
"load() should not be called when virtualStorageFabricEvictImmediatelyOnHoldRelease is enabled"
);
}
// virtual storage doesn't do anything with loading immediately, but check to see if the segment is already cached
// and if so, clear out the onUnmount action
final ReferenceCountingLock lock = lock(dataSegment);
Expand Down Expand Up @@ -533,6 +542,11 @@ public void bootstrap(
) throws SegmentLoadingException
{
if (config.isVirtualStorage()) {
if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
throw DruidException.defensive(
"bootstrap() should not be called when virtualStorageFabricEvictImmediatelyOnHoldRelease is enabled"
);
}
// during bootstrap, check if the segment exists in a location and mount it, getCachedSegments already
// did the reserving for us
final SegmentCacheEntryIdentifier id = new SegmentCacheEntryIdentifier(dataSegment.getId());
Expand Down Expand Up @@ -1031,6 +1045,10 @@ public void mount(StorageLocation mountLocation) throws SegmentLoadingException
);
unmount();
}

if (config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease()) {
setDeleteInfoFileOnUnmount();
}
}
catch (SegmentLoadingException e) {
try {
Expand Down Expand Up @@ -1104,12 +1122,12 @@ public synchronized Optional<Segment> acquireReference()
return referenceProvider.acquireReference();
}

public synchronized boolean setOnUnmount(Runnable runnable)
public synchronized boolean setDeleteInfoFileOnUnmount()
{
if (location == null) {
return false;
}
onUnmount.set(runnable);
onUnmount.set(() -> deleteSegmentInfoFile(dataSegment));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ public class StorageLocation
@GuardedBy("lock")
private WeakCacheEntry hand;

/**
* Current total size of files in bytes, including weak entries.
*/
private volatile boolean evictImmediatelyOnHoldRelease = false;

/**
* Current total size of files in bytes, including weak entries.
Expand Down Expand Up @@ -171,6 +169,14 @@ public File getPath()
return path;
}

/**
* Sets whether weak cache entries should be immediately evicted once all holds are released.
*/
public void setEvictImmediatelyOnHoldRelease(final boolean evictImmediatelyOnHoldRelease)
{
this.evictImmediatelyOnHoldRelease = evictImmediatelyOnHoldRelease;
}

public <T extends CacheEntry> T getStaticCacheEntry(CacheEntryIdentifier entryId)
{
lock.readLock().lock();
Expand Down Expand Up @@ -340,7 +346,10 @@ public <T extends CacheEntry> ReservationHold<T> addWeakReservationHoldIfExists(
if (existingEntry != null && existingEntry.hold()) {
existingEntry.visited = true;
weakStats.getAndUpdate(WeakStats::hit);
return new ReservationHold<>((T) existingEntry.cacheEntry, existingEntry::release);
return new ReservationHold<>(
(T) existingEntry.cacheEntry,
createWeakEntryReleaseRunnable(existingEntry, false)
);
}
return null;
}
Expand Down Expand Up @@ -374,7 +383,10 @@ public <T extends CacheEntry> ReservationHold<T> addWeakReservationHold(
if (retryExistingEntry != null && retryExistingEntry.hold()) {
retryExistingEntry.visited = true;
weakStats.getAndUpdate(WeakStats::hit);
return new ReservationHold<>((T) retryExistingEntry.cacheEntry, retryExistingEntry::release);
return new ReservationHold<>(
(T) retryExistingEntry.cacheEntry,
createWeakEntryReleaseRunnable(retryExistingEntry, false)
);
}
final CacheEntry newEntry = entrySupplier.get();
final ReclaimResult reclaimResult = canHandleWeak(newEntry);
Expand All @@ -388,28 +400,7 @@ public <T extends CacheEntry> ReservationHold<T> addWeakReservationHold(
weakStats.getAndUpdate(s -> s.load(newEntry.getSize()));
hold = new ReservationHold<>(
(T) newEntry,
() -> {
newWeakEntry.release();
lock.writeLock().lock();
try {
weakCacheEntries.computeIfPresent(
newEntry.getId(),
(cacheEntryIdentifier, weakCacheEntry) -> {
if (!weakCacheEntry.cacheEntry.isMounted()) {
// if we never successfully mounted, go ahead and remove so we don't have a dead entry
unlinkWeakEntry(weakCacheEntry);
// we call unmount anyway to terminate the phaser
weakCacheEntry.unmount();
return null;
}
return weakCacheEntry;
}
);
}
finally {
lock.writeLock().unlock();
}
}
createWeakEntryReleaseRunnable(newWeakEntry, true)
);
} else {
weakStats.getAndUpdate(WeakStats::reject);
Expand Down Expand Up @@ -444,6 +435,52 @@ public void release(CacheEntry entry)
}
}

/**
* Creates a release runnable for a {@link WeakCacheEntry} that handles immediate eviction when configured.
* If {@link #evictImmediately} is true and there are no more holds after releasing, the entry is immediately
* evicted from the cache. For new entries (isNewEntry=true), unmounted entries are also removed.
*/
private Runnable createWeakEntryReleaseRunnable(
final WeakCacheEntry weakEntry,
final boolean isNewEntry
)
{
return () -> {
weakEntry.release();

if (!isNewEntry && !evictImmediatelyOnHoldRelease) {
// No need to consider removal from weakCacheEntries on hold release.
return;
}

lock.writeLock().lock();
try {
weakCacheEntries.computeIfPresent(
weakEntry.cacheEntry.getId(),
(cacheEntryIdentifier, weakCacheEntry) -> {
// If we never successfully mounted, go ahead and remove so we don't have a dead entry.
// Furthermore, if evictImmediatelyOnHoldRelease is set, evict on release if all holds are gone.
final boolean isMounted = weakCacheEntry.cacheEntry.isMounted();
if ((isNewEntry && !isMounted)
|| (evictImmediatelyOnHoldRelease && !weakCacheEntry.isHeld())) {
unlinkWeakEntry(weakCacheEntry);
weakCacheEntry.unmount(); // call even if never mounted, to terminate the phaser
if (isMounted) {
weakStats.getAndUpdate(s -> s.evict(weakCacheEntry.cacheEntry.getSize()));
}
return null;
} else {
return weakCacheEntry;
}
}
);
}
finally {
lock.writeLock().unlock();
}
};
}

/**
* Inserts a new {@link WeakCacheEntry}, inserting it as {@link #head} (or both {@link #head} and {@link #tail} if it
* is the only entry), tracking size in {@link #currSizeBytes} and {@link #currWeakSizeBytes}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import org.junit.Assert;
import org.junit.Test;

public class SegmentLoaderConfigTest
{
@Test
public void testSetVirtualStorage()
{
final SegmentLoaderConfig config = new SegmentLoaderConfig();

// Verify default values
Assert.assertFalse(config.isVirtualStorage());
Assert.assertFalse(config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease());

// Set both to true
config.setVirtualStorage(true, true);

// Verify both fields are set
Assert.assertTrue(config.isVirtualStorage());
Assert.assertTrue(config.isVirtualStorageFabricEvictImmediatelyOnHoldRelease());
}
}
Loading
Loading