Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3afe25c
Update README.md
fzhedu Sep 8, 2016
4a9d6ab
Update README.md
fzhedu Sep 14, 2016
988423b
Update README.md
fzhedu Sep 14, 2016
bd45545
Update README.md
fzhedu Sep 14, 2016
cabd050
commit
neko940709 Sep 19, 2016
ec267c7
commit
neko940709 Sep 19, 2016
54b4adb
commit
neko940709 Sep 19, 2016
c12e930
Merge pull request #148 from neko940709/master
fzhedu Sep 20, 2016
a2018ba
commit
neko940709 Sep 20, 2016
15575b8
commit
neko940709 Sep 21, 2016
e27807e
commit
neko940709 Sep 21, 2016
9ff773c
commit
neko940709 Sep 21, 2016
d7a4404
Merge pull request #150 from neko940709/master
Jackson1992 Sep 21, 2016
a4b2530
Update README.md
wangli1426 Sep 22, 2016
d786451
Update README.md
wangli1426 Sep 22, 2016
2bf800c
Update README.md
fzhedu Oct 3, 2016
caafa65
Update README.md
fzhedu Oct 3, 2016
39fe7e9
Update README.md
fzhedu Oct 3, 2016
5d7f067
Update README.md
fzhedu Oct 3, 2016
4e82d54
Update README.md
fzhedu Oct 3, 2016
4eab654
projection step1 finish
zhejiangxiaomai Feb 28, 2017
1786ce7
add prune column except for out join
fzhedu Mar 1, 2017
2e000fb
finish projection
zhejiangxiaomai Mar 2, 2017
8fb5c9b
Merge HEAD, branches 'for_auto_test' and 'for_auto_test' of github.co…
zhejiangxiaomai Mar 2, 2017
368dbf5
finish outer join
zhejiangxiaomai Mar 9, 2017
1108aae
finish fix logical scan
zhejiangxiaomai Mar 9, 2017
51bbb09
logical scan fix and gtest finish
zhejiangxiaomai Mar 14, 2017
6402bc7
finish projection scan ready to merge
zhejiangxiaomai Mar 17, 2017
d5edd7e
Merge branch 'projection_fzh' into projection
zhejiangxiaomai Mar 17, 2017
5e8fdba
fix project memory leak
zhejiangxiaomai Mar 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ bool Config::local_disk_mode;
int Config::client_listener_port;

bool Config::enable_codegen;
bool Config::enable_prune_column;

std::string Config::catalog_file;

Expand Down Expand Up @@ -154,6 +155,8 @@ void Config::initialize() {

memory_utilization = getInt("memory_utilization", 100);

enable_prune_column = getBoolean("enable_prune_column", true);

#ifdef DEBUG_Config
print_configure();
#endif
Expand Down Expand Up @@ -211,6 +214,7 @@ void Config::print_configure() const {
std::cout << "client_lisener_port:" << client_listener_port << std::endl;
std::cout << "catalog_file:" << catalog_file << std::endl;
std::cout << "codegen:" << enable_codegen << std::endl;
std::cout << "enable_prune_column: " << enable_prune_column << std::endl;
std::cout << "load_thread_num:" << load_thread_num << std::endl;
}

Expand Down
1 change: 1 addition & 0 deletions Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Config {
static bool pipelined_exchange;
static int client_listener_port;
static bool enable_codegen;
static bool enable_prune_column;
static std::string catalog_file;
static int thread_pool_init_thread_num;
static int memory_utilization;
Expand Down
44 changes: 15 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,55 +1,37 @@
**CLAIMS** (CLuster-Aware In-Memory Sql query engin) is a parallel in-memory database prototype, which runs on clusters of commodity servers and aims to provide real-time data analytics on relational dataset.
**CLAIMS** (CLuster-Aware In-Memory Sql query engine) is a parallel in-memory database prototype, which runs on clusters of commodity servers and aims to provide real-time data analytics on relational dataset.

#### Highlights

##### 1. Massively parallel execution engine.

CLAIMS relies on highly parallel query processing engine to dramatically accelerate data analysis speed. Query evaluations are distributed across the cluster and executed in parallel. Query evaluations are not only distirbuted across the cluster to leverage the computation power of the cluster, but are also executed in a multi-threaded fashion to unleash the power of modern multi-core hardware.
CLAIMS relies on highly parallel query processing engine to dramatically accelerate data analysis speed. Query evaluations are not only distributed across the cluster to leverage the computation power of the cluster, but are also executed in a multi-threaded fashion to unleash the power of modern multi-core hardware.


##### 2. Smart intra-node parallelism.

Pipelining the query execution among nodes in the cluster effectively reduces the response latency and dramatically saves storage space for intermediate query results. However, its benefits degrade tremendously when the workloads are imbalanced among execution partitions due to the improperly generated query execution plan. To tackle this problem, a novel elastic pipelining query processing model is proposed in CLAIMS, which adapts the intra-node parallelism to the runtime workload. Beneficial from elastic pipelining query processing, the parallelism of different execution fragments in a pipelined is self-adaptive with each other and results in an optimal intra-node parallelism assignment. Please refer to our SIGMOD paper for more details about elastic pipelining.
![asdf](http://dase.ecnu.edu.cn/liwang/images/elastic_pipeline.jpg)
Pipelining the query execution among nodes in the cluster effectively reduces the response time and dramatically saves storage space for intermediate query results. However, its benefits degrade tremendously when the workloads are imbalanced among execution partitions due to the improperly generated query execution plan. To tackle this problem, a novel elastic query processing framework, i.e., *elastic pipelining*, is proposed in CLAIMS, which adjusts the intra-node parallelism according to the runtime workload based on elaborate performance model. Beneficial from elastic pipelining query processing, the parallelism of different execution fragments in a pipelined is self-adaptive, resulting in an optimal intra-node parallelism assignment. Please refer to our SIGMOD paper for more details about the elastic pipelining framework.

![asdf](https://i1.piimg.com/1949/99a94a4e18e6fc21.jpg)




##### 3. Efficient in-memory data processing.

CLAIMS employs a large set of optimization techniques to achieve efficient in-memory data processing, including batch-at-a-time processing, cache-sensitive operators, SIMD-based optimization, code generation, lock-free and concurrent processing structures. These optimizations work collaborately and enable CLAIMS to process up to gigabytes data per second within a single thread.
CLAIMS employs a large set of optimization techniques to achieve efficient in-memory data processing, including batch-at-a-time processing, cache-sensitive operators, SIMD-based optimization, code generation, lock-free and concurrent processing structures. These optimizations work collaborately and enable CLAIMS to process up to gigabytes data per second on a single thread.

##### 4. Network communication optimization.
Parallel query processing imposes high burdens on network communication, which becomes the performance bottleneck for in-memory parallel databases due to the relatively slow network bandwidth. When compiling a user query into an execution plan, CLAIMS’s query optimizer leverages a sophisticated selectivity propagation system and cost model to generate physical query plans with minimized network communication cost. Furthermore, CLAIMS deploys a new data exchange implementation, which offers efficient, scalable and skew-resilient network data communication among CLAIMS instances. These optimizations greatly reduce the response time of the queries that require network data communication.
Parallel query processing imposes high burdens on network communication, which usually becomes performance bottleneck of the in-memory parallel databases due to the relatively slow network bandwidth. When compiling a user query into an execution plan, CLAIMS’s query optimizer leverages a sophisticated selectivity propagation system and cost model to generate physical query plans with minimized network communication cost. Furthermore, CLAIMS deploys a new data exchange implementation, which offers efficient, scalable and skew-resilient network data transfer among CLAIMS instances. These optimizations greatly reduce the response time for a large variety of queries.

#### Performance
Beneficial from the smart and massively parallelism and the in-memory data processing optimizations, CLAIMS is up to 5X faster than Shark and Impala, two state-of-the-art systems in the open source community, in the queries against TPCH dataset and Shanghai Stock Exchange dataset.

![asdf](http://dase.ecnu.edu.cn/liwang/images/compare.jpg)

#### Team members
[Aoying Zhou](http://case.ecnu.edu.cn), Professor in East China Normal University, is the person in charge of this project.

[Minqi Zhou](https://github.com/polpo1980), Associate Professor in East China Normal University, is the person in charge of this project.

[Li Wang](https://github.com/wangli1426), Ph.D. student in East China Normal University, manages the master students in this team and is responsible for designing and implementing the key components of CLAIMS, including query optimizer, catalog, physical operators, distributed communication infrastructure, storage layout, etc.

[Lei Zhang](https://github.com/egraldlo) is responsible for designing and implementing the key components of CLAIMS, including query optimizer, physical operators, persistent data exchange, storage management, etc.

[Shaochan Dong](https://github.com/scdong) is responsible for designing and implementing in-memory index and index management, data types, as well as data loading and importing.
![asdf](https://i1.piimg.com/1949/de04caa268f1215f.jpg)

[Xinzhou Zhang]() is mainly responsible for web UI design and implementing data importing model.

[Zhuhe Fang](https://github.com/fzhedu) is mainly responsible for designing and implementing SQL DML parser and physical operators.

[Yu Kai](https://github.com/yukai2014) is mainly responsible for designing and implementing SQL DDL parser, catalog persistence.

[Yongfeng Li](https://github.com/NagamineLee) was a formal member of CLAIMS, who participated in designing and implementing catalog model.

[Lin Gu]() is responsible for designing the demo cases of CLAIMS.

#### Publications

1. Li Wang, Minqi Zhou, Zhenjie Zhang, Yin Yang, Aoying Zhou, Dina Bitton. Elastic Pipelining in In-Memory Database Cluster. To appear in Sigmod 2016.
1. Li Wang, Minqi Zhou, Zhenjie Zhang, Yin Yang, Aoying Zhou, Dina Bitton. Elastic Pipelining in In-Memory Database Cluster. ACM SIGMOD 2016, pp. 1279-1294.
2. Li Wang, Minqi Zhou, Zhenjie Zhang, Ming-chien Shan, Aoying Zhou. NUMA-aware Scalable and Efficient Aggregation on Large Domains. IEEE TKDE 2015:4. pp.1071-1084 .
3. Li Wang, Lei Zhang, Chengcheng Yu, Aoying Zhou. Optimizing Pipelined Execution for Distributed In-memory OLAY System. In: DaMen 2014. Springer. 2014. pp. 35-56.
4. Lan Huang, Ke Xun, Xiaozhou Chen, Minqi Zhou, In-memory Cluster Computing: Interactive Data Analysis, Journal of East China Normal University, 2014
Expand All @@ -60,4 +42,8 @@ Beneficial from the smart and massively parallelism and the in-memory data proce
9. Yongfeng Li, Minqi Zhou, Hualiang Hu, Survey of resource uniform management and scheduling in cluster, Journal of East China Normal University, 2014

#### Quick Start
Try our CLAIMS, following [Quick Start](https://github.com/dase/CLAIMS/wiki).
Try our CLAIMS, please follow [Quick Start](https://github.com/dase/CLAIMS/wiki/Installation-steps).

#### More
Learn more information, please go to [Wiki](https://github.com/dase/CLAIMS/wiki).

6 changes: 6 additions & 0 deletions catalog/projection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ bool ProjectionDescriptor::isExist(const string& name) const {
}
return false;
}
bool ProjectionDescriptor::isExist1(const string& name) const {
for (unsigned i = 0; i < column_list_.size(); i++) {
if (column_list_[i].attrName == name) return true;
}
return false;
}
bool ProjectionDescriptor::AllPartitionBound() const {
return partitioner->allPartitionBound();
}
Expand Down
2 changes: 1 addition & 1 deletion catalog/projection.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class ProjectionDescriptor {
PartitionFunction* partition_functin);
Partitioner* getPartitioner() const;
bool isExist(const string& name) const;
bool isExist1(const string& name) const;
inline void setProjectionID(const ProjectionID& pid) { projection_id_ = pid; }
inline map<string, set<string> > getFileLocations() const {
return fileLocations;
Expand All @@ -91,7 +92,6 @@ class ProjectionDescriptor {
* as this projection's cost
*/
unsigned int getProjectionCost() const;

private:
// ProjectionOffset projection_offset_;
ProjectionID projection_id_;
Expand Down
6 changes: 6 additions & 0 deletions common/expression/expr_binary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,11 @@ void ExprBinary::InitExprAtPhysicalPlan() {
}

ExprNode* ExprBinary::ExprCopy() { return new ExprBinary(this); }

void ExprBinary::GetUniqueAttr(set<string>& attrs) {
arg0_->GetUniqueAttr(attrs);
arg1_->GetUniqueAttr(attrs);
}

} // namespace common
} // namespace claims
1 change: 1 addition & 0 deletions common/expression/expr_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExprBinary : public ExprNode {

void InitExprAtPhysicalPlan();
ExprNode* ExprCopy();
void GetUniqueAttr(set<string>& attrs);

private:
friend class boost::serialization::access;
Expand Down
10 changes: 10 additions & 0 deletions common/expression/expr_case_when.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,15 @@ void ExprCaseWhen::InitExprAtPhysicalPlan() {
}

ExprNode* ExprCaseWhen::ExprCopy() { return new ExprCaseWhen(this); }

void ExprCaseWhen::GetUniqueAttr(set<string>& attrs) {
for (int i = 0; i < case_when_.size(); i++) {
case_when_[i]->GetUniqueAttr(attrs);
}
for (int i = 0; i < case_then_.size(); i++) {
case_then_[i]->GetUniqueAttr(attrs);
}
}

} // namespace common
} // namespace claims
1 change: 1 addition & 0 deletions common/expression/expr_case_when.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ExprCaseWhen : public ExprNode {

void InitExprAtPhysicalPlan();
ExprNode* ExprCopy();
void GetUniqueAttr(set<string>& attrs);

private:
friend class boost::serialization::access;
Expand Down
9 changes: 7 additions & 2 deletions common/expression/expr_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void ExprColumn::InitExprAtLogicalPlan(LogicInitCnxt& licnxt) {
if (return_type_ == t_string) {
value_size_ = std::max(licnxt.schema0_->getcolumn(attr_id_).get_length(),
static_cast<unsigned int>(BASE_DATA_SIZE));
} else if (return_type_ == t_decimal) {
} else if (return_type_ == t_decimal) {
value_size_ = licnxt.schema0_->getcolumn(attr_id_).size;
} else {
value_size_ = licnxt.schema0_->getcolumn(attr_id_).get_length();
Expand All @@ -68,7 +68,7 @@ void ExprColumn::InitExprAtLogicalPlan(LogicInitCnxt& licnxt) {
value_size_ =
std::max(licnxt.schema1_->getcolumn(attr_id_).get_length(),
static_cast<unsigned int>(BASE_DATA_SIZE));
} else if (return_type_ == t_decimal) {
} else if (return_type_ == t_decimal) {
value_size_ = licnxt.schema1_->getcolumn(attr_id_).size;
} else {
value_size_ = licnxt.schema1_->getcolumn(attr_id_).get_length();
Expand All @@ -89,5 +89,10 @@ void ExprColumn::InitExprAtPhysicalPlan() {
}

ExprNode* ExprColumn::ExprCopy() { return new ExprColumn(this); }

void ExprColumn::GetUniqueAttr(set<string>& attrs) {
attrs.insert(table_name_ + "." + column_name_);
}

} // namespace common
} // namespace claims
1 change: 1 addition & 0 deletions common/expression/expr_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ExprColumn : public ExprNode {

void InitExprAtPhysicalPlan();
ExprNode* ExprCopy();
void GetUniqueAttr(set<string>& attrs);

private:
friend class boost::serialization::access;
Expand Down
12 changes: 12 additions & 0 deletions common/expression/expr_in.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,17 @@ void ExprIn::InitExprAtPhysicalPlan() {
}

ExprNode* ExprIn::ExprCopy() { return new ExprIn(this); }

void ExprIn::GetUniqueAttr(set<string>& attrs) {
for (int i = 0, size = cmp_expr_.size(); i < size; ++i) {
cmp_expr_[i]->GetUniqueAttr(attrs);
}
for (int i = 0; i < right_node_.size(); i++) {
for (int j = 0; j < right_node_[i].size(); j++) {
right_node_[i][j]->GetUniqueAttr(attrs);
}
}
}

} // namespace common
} // namespace claims
1 change: 1 addition & 0 deletions common/expression/expr_in.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ExprIn : public ExprNode {

void InitExprAtPhysicalPlan();
ExprNode* ExprCopy();
void GetUniqueAttr(set<string>& attrs);

private:
friend class boost::serialization::access;
Expand Down
2 changes: 2 additions & 0 deletions common/expression/expr_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class ExprNode {

virtual void InitExprAtLogicalPlan(LogicInitCnxt& licnxt) {}

virtual void GetUniqueAttr(set<string>& attrs) {}

virtual void InitExprAtPhysicalPlan() {}
virtual ExprNode* ExprCopy() { return NULL; }
bool IsEqualAttr(const Attribute& attr);
Expand Down
7 changes: 7 additions & 0 deletions common/expression/expr_ternary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,12 @@ void ExprTernary::InitExprAtPhysicalPlan() {
}

ExprNode* ExprTernary::ExprCopy() { return new ExprTernary(this); }

void ExprTernary::GetUniqueAttr(set<string>& attrs) {
arg0_->GetUniqueAttr(attrs);
arg1_->GetUniqueAttr(attrs);
arg2_->GetUniqueAttr(attrs);
}

} // namespace common
} // namespace claims
1 change: 1 addition & 0 deletions common/expression/expr_ternary.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExprTernary : public ExprNode {

void InitExprAtPhysicalPlan();
ExprNode* ExprCopy();
void GetUniqueAttr(set<string>& attrs);

private:
friend class boost::serialization::access;
Expand Down
3 changes: 3 additions & 0 deletions common/expression/expr_unary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@ void ExprUnary::InitExprAtPhysicalPlan() {
}

ExprNode* ExprUnary::ExprCopy() { return new ExprUnary(this); }
void ExprUnary::GetUniqueAttr(set<string>& attrs) {
arg0_->GetUniqueAttr(attrs);
}
} // namespace common
} // namespace claims
1 change: 1 addition & 0 deletions common/expression/expr_unary.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ExprUnary : public ExprNode {

virtual void InitExprAtPhysicalPlan();
virtual ExprNode* ExprCopy();
void GetUniqueAttr(set<string>& attrs);

private:
friend class boost::serialization::access;
Expand Down
15 changes: 8 additions & 7 deletions conf/config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#本机IP地址
ip = "219.228.147.12";
ip = "127.0.0.1";

#端口范围(调试用)
PortManager:
Expand All @@ -11,7 +11,7 @@ PortManager:
#master的IP地址和端口
coordinator:
{
ip="219.228.147.12"
ip="127.0.0.1"
port="12000"
}

Expand All @@ -34,15 +34,16 @@ client_listener_port = 10000
#data="/home/imdb/data/stock/"
#data="/home/zzh/data/1partition/"
#data="/claimsdata/"
data="/home/zzh/data/sf-1-p4/"

#data="/home/zzh/data/sf-1-p4/"
data="/home/zzh/Desktop/test_data/"
#data="/home/zzh/Desktop/test_data_old/"
#data="/home/fish/data/test/"
#data="/home/imdb/data/POC/"
#data="/home/imdb/data/POC/"
#hdfs主节点
#data="/home/claims/zzh/4-partiton/"


hdfs_master_ip="219.228.147.162"
hdfs_master_ip="58.198.176.92"

#hdfs主节点端口
hdfs_master_port=9000
Expand All @@ -59,7 +60,7 @@ enable_expander_adaptivity=0

pipelined_exchange=1

local_disk_mode=0
local_disk_mode=1

scan_batch=100

15 changes: 13 additions & 2 deletions logical_operator/logical_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ void LogicalAggregation::ChangeAggAttrsForAVG() {
PlanContext LogicalAggregation::GetPlanContext() {
lock_->acquire();
if (NULL != plan_context_) {
lock_->release();
return *plan_context_;
delete plan_context_;
plan_context_ = NULL;
}
PlanContext ret;
const PlanContext child_context = child_->GetPlanContext();
Expand Down Expand Up @@ -481,5 +481,16 @@ void LogicalAggregation::Print(int level) const {
--level;
child_->Print(level);
}
void LogicalAggregation::PruneProj(set<string>& above_attrs) {
set<string> above_attrs_copy = above_attrs;
for (int i = 0, size = group_by_attrs_.size(); i < size; ++i) {
group_by_attrs_[i]->GetUniqueAttr(above_attrs_copy);
}
for (int i = 0, size = aggregation_attrs_.size(); i < size; ++i) {
aggregation_attrs_[i]->GetUniqueAttr(above_attrs_copy);
}
child_->PruneProj(above_attrs_copy);
child_ = DecideAndCreateProject(above_attrs_copy, child_);
}
} // namespace logical_operator
} // namespace claims
1 change: 1 addition & 0 deletions logical_operator/logical_aggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class LogicalAggregation : public LogicalOperator {
vector<ExprUnary*> aggregation_attrs,
LogicalOperator* child);
virtual ~LogicalAggregation();
void PruneProj(set<string>& above_attrs);

protected:
/**
Expand Down
Loading