Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ public SegmentCacheManagerFactory(
this.jsonMapper = mapper;
}

public SegmentCacheManager manufacturate(File storageDir)
public SegmentCacheManager manufacturate(File storageDir, boolean virtualStorage)
{
final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig().withLocations(
Collections.singletonList(new StorageLocationConfig(storageDir, null, null))
);
final SegmentLoaderConfig loaderConfig =
new SegmentLoaderConfig()
.setLocations(Collections.singletonList(new StorageLocationConfig(storageDir, null, null)))
.setVirtualStorage(virtualStorage, virtualStorage);
final List<StorageLocation> storageLocations = loaderConfig.toStorageLocations();
return new SegmentLocalCacheManager(
storageLocations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ public ObjectMapper getJsonMapper()
return jsonMapper;
}

/**
* Returns a {@link SegmentCacheManager} in virtual storage mode.
*/
public SegmentCacheManager getSegmentCacheManager()
{
return segmentCacheManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public TaskToolbox build(TaskConfig config, Task task)
.queryProcessingPool(queryProcessingPool)
.joinableFactory(joinableFactory)
.monitorSchedulerProvider(monitorSchedulerProvider)
.segmentCacheManager(segmentCacheManagerFactory.manufacturate(taskWorkDir))
.segmentCacheManager(segmentCacheManagerFactory.manufacturate(taskWorkDir, true))
.jsonMapper(jsonMapper)
.taskWorkDir(taskWorkDir)
.indexIO(indexIO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
Expand Down Expand Up @@ -81,13 +82,15 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountedObjectProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.loading.AcquireSegmentAction;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.transform.CompactionTransformSpec;
Expand Down Expand Up @@ -781,34 +784,38 @@ private static Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> fetch
SegmentCacheManager segmentCacheManager
)
{
return Pair.of(
dataSegment,
() -> {
try {
final Closer closer = Closer.create();
segmentCacheManager.load(dataSegment);
closer.register(() -> segmentCacheManager.drop(dataSegment));
final Segment segment = closer.register(segmentCacheManager.acquireCachedSegment(dataSegment).orElseThrow());
return new ResourceHolder<QueryableIndex>()
{
@Override
public QueryableIndex get()
{
return segment.as(QueryableIndex.class);
}
return Pair.of(dataSegment, () -> fetchSegmentInternal(dataSegment, segmentCacheManager));
}

@Override
public void close()
{
CloseableUtils.closeAndWrapExceptions(closer);
}
};
}
catch (Exception e) {
throw new RuntimeException(e);
}
private static ResourceHolder<QueryableIndex> fetchSegmentInternal(
DataSegment dataSegment,
SegmentCacheManager segmentCacheManager
)
{
final Closer closer = Closer.create();
try {
final AcquireSegmentAction acquireAction = closer.register(segmentCacheManager.acquireSegment(dataSegment));
final ReferenceCountedObjectProvider<Segment> segmentProvider =
FutureUtils.getUnchecked(acquireAction.getSegmentFuture(), true).getReferenceProvider();
final Segment segment = segmentProvider.acquireReference().map(closer::register).get();
return new ResourceHolder<>()
{
@Override
public QueryableIndex get()
{
return segment.as(QueryableIndex.class);
}
);

@Override
public void close()
{
CloseableUtils.closeAndWrapExceptions(closer);
}
};
}
catch (Exception e) {
throw CloseableUtils.closeAndWrapInCatch(e, closer);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public DruidInputSource withInterval(Interval interval)
@Override
protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory)
{
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory);
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(temporaryDirectory, false);

final List<TimelineObjectHolder<String, DataSegment>> timeline = createTimeline();
final Iterator<DruidSegmentInputEntity> entityIterator = FluentIterable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ public void testRunWithSpatialDimensions() throws Exception
}

final File cacheDir = temporaryFolder.newFolder();
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir);
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir, false);

List<String> rowsFromSegment = new ArrayList<>();
for (DataSegment segment : segments) {
Expand Down Expand Up @@ -1723,7 +1723,7 @@ public void testRunWithAutoCastDimensions() throws Exception
}

final File cacheDir = temporaryFolder.newFolder();
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir);
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir, false);

List<String> rowsFromSegment = new ArrayList<>();
for (DataSegment segment : segments) {
Expand Down Expand Up @@ -1850,7 +1850,7 @@ public void testRunWithAutoCastDimensionsSortByDimension() throws Exception
}

final File cacheDir = temporaryFolder.newFolder();
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir);
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir, false);

List<String> rowsFromSegment = new ArrayList<>();
segmentCacheManager.load(compactSegment);
Expand Down Expand Up @@ -2065,6 +2065,18 @@ public List<StorageLocationConfig> getLocations()
{
return ImmutableList.of(new StorageLocationConfig(localDeepStorage, null, null));
}

@Override
public boolean isVirtualStorage()
{
return true;
}

@Override
public boolean isVirtualStorageFabricEvictImmediately()
{
return true;
}
};
final List<StorageLocation> storageLocations = loaderConfig.toStorageLocations();
final SegmentCacheManager cacheManager = new SegmentLocalCacheManager(
Expand Down Expand Up @@ -2106,7 +2118,7 @@ public List<StorageLocationConfig> getLocations()
private List<String> getCSVFormatRowsFromSegments(List<DataSegment> segments) throws Exception
{
final File cacheDir = temporaryFolder.newFolder();
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir);
final SegmentCacheManager segmentCacheManager = segmentCacheManagerFactory.manufacturate(cacheDir, false);

List<String> rowsFromSegment = new ArrayList<>();
for (DataSegment segment : segments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.SimpleQueryableIndex;
Expand All @@ -127,6 +128,8 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.AcquireSegmentAction;
import org.apache.druid.segment.loading.AcquireSegmentResult;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
Expand Down Expand Up @@ -1976,6 +1979,18 @@ public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
);
}

@Override
public AcquireSegmentAction acquireSegment(DataSegment dataSegment)
{
final Segment segment =
new QueryableIndexSegment(indexIO.loadIndex(segments.get(dataSegment)), dataSegment.getId());
final ReferenceCountedSegmentProvider provider = ReferenceCountedSegmentProvider.of(segment);
return new AcquireSegmentAction(
() -> Futures.immediateFuture(AcquireSegmentResult.cached(provider)),
null
);
}

@Override
public void drop(DataSegment segment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void shutdownTask(Task task)

public SegmentCacheManager newSegmentLoader(File storageDir)
{
return segmentCacheManagerFactory.manufacturate(storageDir);
return segmentCacheManagerFactory.manufacturate(storageDir, true);
}

public ObjectMapper getObjectMapper()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ List<ScanResultValue> querySegment(DataSegment dataSegment, List<String> columns
private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir)
{
final SegmentCacheManager cacheManager = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper())
.manufacturate(tempSegmentDir);
.manufacturate(tempSegmentDir, false);
try {
cacheManager.load(dataSegment);
return cacheManager.acquireCachedSegment(dataSegment).orElseThrow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,20 @@
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.dart.worker.DartDataSegmentProvider;
import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory;
import org.apache.druid.msq.dart.worker.DartWorkerContextFactory;
import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl;
import org.apache.druid.msq.dart.worker.DartWorkerRunner;
import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
import org.apache.druid.msq.exec.DataSegmentProviderImpl;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.msq.input.table.DataSegmentProvider;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.security.AuthorizerMapper;

import java.io.File;
Expand Down Expand Up @@ -104,11 +105,6 @@ public void configure(Binder binder)
.to(DartWorkerContextFactoryImpl.class)
.in(LazySingleton.class);

binder.bind(DataSegmentProvider.class)
.annotatedWith(Dart.class)
.to(DartDataSegmentProvider.class)
.in(LazySingleton.class);

binder.bind(ResourcePermissionMapper.class)
.annotatedWith(Dart.class)
.to(DartResourcePermissionMapper.class);
Expand Down Expand Up @@ -139,6 +135,14 @@ public DartWorkerRunner createWorkerRunner(
);
}

@Provides
@LazySingleton
@Dart
public DataSegmentProvider createDataSegmentProvider(final SegmentManager segmentManager)
{
return new DataSegmentProviderImpl(segmentManager, null);
}

@Provides
@Dart
public MessageRelayMonitor createMessageRelayMonitor(
Expand Down
Loading
Loading