Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -638,4 +638,13 @@ public class ExecutionConfigKeys implements Serializable {
.defaultValue(false)
.description("if enable detail job metric");

// ------------------------------------------------------------------------
// optimizer
// ------------------------------------------------------------------------

public static final ConfigKey LOCAL_SHUFFLE_OPTIMIZATION_ENABLE = ConfigKeys
.key("geaflow.local.shuffle.optimization.enable")
.defaultValue(false)
.description("whether to enable local shuffle optimization for graph → sink/map patterns");

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ public ExecutionGraph buildExecutionGraph(Configuration jobConf) {

/**
* Build execution vertex group.
*
* <p>This method supports co-location hints from LocalShuffleOptimizer.
* Vertices with the same coLocationGroup will be placed in the same execution group
* to enable automatic local shuffle through LocalInputChannel.
*/
private Map<Integer, ExecutionVertexGroup> buildExecutionVertexGroup(Map<Integer, Integer> vertexId2GroupIdMap,
Queue<PipelineVertex> pipelineVertexQueue) {
Expand All @@ -191,6 +195,47 @@ private Map<Integer, ExecutionVertexGroup> buildExecutionVertexGroup(Map<Integer

Set<Integer> groupedVertices = new HashSet<>();

// Step 1: Process co-location groups first for local shuffle optimization.
// Collect vertices by coLocationGroup for local shuffle optimization.
Map<String, List<PipelineVertex>> coLocationGroupMap = new HashMap<>();

for (PipelineVertex vertex : plan.getVertexMap().values()) {
String coLocationGroup = vertex.getCoLocationGroup();
if (coLocationGroup != null && !coLocationGroup.isEmpty()) {
coLocationGroupMap.computeIfAbsent(coLocationGroup, k -> new ArrayList<>()).add(vertex);
}
}

// Process co-located vertices first.
for (Map.Entry<String, List<PipelineVertex>> entry : coLocationGroupMap.entrySet()) {
List<PipelineVertex> coLocatedVertices = entry.getValue();
if (coLocatedVertices.isEmpty()) {
continue;
}

// Create execution group for co-located vertices.
ExecutionVertexGroup vertexGroup = new ExecutionVertexGroup(groupId);
Map<Integer, ExecutionVertex> currentVertexGroupMap = new HashMap<>();

for (PipelineVertex vertex : coLocatedVertices) {
if (!groupedVertices.contains(vertex.getVertexId())) {
ExecutionVertex executionVertex = buildExecutionVertex(vertex);
currentVertexGroupMap.put(vertex.getVertexId(), executionVertex);
groupedVertices.add(vertex.getVertexId());
vertexId2GroupIdMap.put(vertex.getVertexId(), groupId);
}
}

if (!currentVertexGroupMap.isEmpty()) {
vertexGroup.getVertexMap().putAll(currentVertexGroupMap);
vertexGroupMap.put(groupId, vertexGroup);
LOGGER.info("Created co-located execution group {} with {} vertices for coLocationGroup '{}'",
groupId, currentVertexGroupMap.size(), entry.getKey());
groupId++;
}
}

// Step 2: Process remaining vertices using standard grouping logic.
while (!pipelineVertexQueue.isEmpty()) {
PipelineVertex pipelineVertex = pipelineVertexQueue.poll();
// Ignore already grouped vertex.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ private void optimizePipelinePlan(Configuration pipelineConfig) {
LOGGER.info("union optimize: {}",
new PlanGraphVisualization(pipelineGraph).getGraphviz());
}
new PipelineGraphOptimizer().optimizePipelineGraph(pipelineGraph);
new PipelineGraphOptimizer().optimizePipelineGraph(pipelineGraph, pipelineConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public class PipelineVertex<OP extends Operator> implements Serializable {
private AffinityLevel affinity;
private boolean duplication;
private VertexType chainTailType;
/**
* Co-location group ID for local shuffle optimization.
* Vertices with the same co-location group ID should be deployed on the same node
* to enable automatic local shuffle through LocalInputChannel.
*/
private String coLocationGroup;

public PipelineVertex(int vertexId, OP operator, int parallelism) {
this.vertexId = vertexId;
Expand Down Expand Up @@ -139,6 +145,14 @@ public void setChainTailType(VertexType chainTailType) {
this.chainTailType = chainTailType;
}

public String getCoLocationGroup() {
return coLocationGroup;
}

public void setCoLocationGroup(String coLocationGroup) {
this.coLocationGroup = coLocationGroup;
}

public String getVertexString() {
String operatorStr = operator.toString();
return String.format("%s, p:%d, %s", getVertexName(), parallelism, operatorStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,30 @@
package org.apache.geaflow.plan.optimizer;

import java.io.Serializable;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.plan.graph.PipelineGraph;
import org.apache.geaflow.plan.optimizer.strategy.ChainCombiner;
import org.apache.geaflow.plan.optimizer.strategy.LocalShuffleOptimizer;
import org.apache.geaflow.plan.optimizer.strategy.SingleWindowGroupRule;

public class PipelineGraphOptimizer implements Serializable {

public void optimizePipelineGraph(PipelineGraph pipelineGraph) {
// Enforce chain combiner opt.
public void optimizePipelineGraph(PipelineGraph pipelineGraph, Configuration config) {
// 1. Enforce chain combiner optimization.
// Merge operators with forward partition into single execution unit.
ChainCombiner chainCombiner = new ChainCombiner();
chainCombiner.combineVertex(pipelineGraph);

// Enforce single window rule.
// 2. Local shuffle optimization (disabled by default).
// Mark vertices for co-location to enable automatic local shuffle.
if (config.getBoolean(ExecutionConfigKeys.LOCAL_SHUFFLE_OPTIMIZATION_ENABLE)) {
LocalShuffleOptimizer localShuffleOptimizer = new LocalShuffleOptimizer();
localShuffleOptimizer.optimize(pipelineGraph);
}

// 3. Enforce single window rule.
// Disable grouping for single-window batch jobs.
SingleWindowGroupRule groupRule = new SingleWindowGroupRule();
groupRule.apply(pipelineGraph);
}
Expand Down
Loading
Loading