Databend is an open-source, cloud-native data warehouse that provides fast elastic scaling. It combines the elasticity, simplicity, and low cost of the cloud to make building a Data Cloud easier. Databend stores data in cloud storage systems like AWS S3 and Azure Blob, allowing different compute nodes to mount the same data for high elasticity and fine-grained resource control.

Databend focuses on the following capabilities:

  1. Elasticity In Databend, storage and compute resources can scale elastically on demand.
  2. Security Databend encrypts data files and network transmission end-to-end and provides role-based access control at the SQL level.
  3. Ease of use Databend is compatible with ANSI SQL and can be accessed via MySQL and ClickHouse clients, with virtually no learning curve.
  4. Cost Databend processes queries very efficiently. Users only pay for the resources they use.

The diagram above shows Databend’s overall architecture. The system consists of three main parts: Meta Service Layer, Compute Layer, and Storage Layer.

Meta Service Layer

Meta Service is a multi-tenant, highly available distributed key-value storage service with transactional capabilities. It’s mainly used to store:

  • Metadata: Table metadata, index information, cluster information, transaction information, etc.
  • Administration: User system, user permissions, etc.
  • Security: User authentication, data encryption, etc.

Compute Layer

The compute layer consists of multiple clusters, where different clusters can handle different workloads. Each cluster consists of multiple compute nodes. You can easily add or remove nodes or clusters for on-demand, pay-as-you-go resource management.

Compute nodes are the smallest building blocks of the compute layer. Each compute node includes the following components:

  • Planner

Generates an execution plan based on user-input SQL. It’s just a logical representation that can’t actually execute — it’s used to guide the orchestration and generation of the entire compute pipeline.
For example, the statement SELECT number + 1 FROM numbers_mt(10) WHERE number > 8 LIMIT 2
Execution plan:

1
2
3
4
5
6
7
8
datafuse :) EXPLAIN SELECT number + 1 FROM numbers_mt(10) WHERE number > 8 LIMIT 2
┌─explain─────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Limit: 2 │
│ Projection: (number + 1):UInt64 │
│ Expression: (number + 1):UInt64 (Before Projection) │
│ Filter: (number > 8) │
│ ReadDataSource: scan partitions: [1], scan schema: [number:UInt64], statistics: [read_rows: 10, read_bytes: 80] │
└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Reading this execution plan from bottom to top:

ReadDataSource: Indicates which files to read data from
Filter: Indicates the (number > 8) expression filtering
Expression: Indicates the (number + 1) expression computation
Projection: Indicates which columns to query
Limit: Indicates taking the first 2 rows

  • Optimizer

Performs rule-based optimization (A Rule Based Optimizer) on the execution plan, such as predicate pushdown or removing unnecessary columns, to make the execution plan more efficient.

  • Processors

Processors are the core components that execute computation logic. Based on the execution plan, processors are orchestrated into a pipeline to execute computation tasks.

The entire Pipeline is a directed acyclic graph (DAG). Each node is a processor, and each edge is formed by connecting a processor’s InPort and OutPort. Data reaches different processors for computation, then flows through edges to the next processor. Multiple processors can compute in parallel, and in cluster mode, they can execute distributedly across nodes. This is a key design for Databend’s high performance.

For example, we can use EXPLAIN PIPELINE to view:

1
2
3
4
5
6
7
8
9
datafuse :) EXPLAIN PIPELINE SELECT number + 1 FROM numbers_mt(10000) WHERE number > 8 LIMIT 2
┌─explain───────────────────────────────────────────────────────────────┐
│ LimitTransform × 1 processor │
│ Merge (ProjectionTransform × 16 processors) to (LimitTransform × 1) │
│ ProjectionTransform × 16 processors │
│ ExpressionTransform × 16 processors │
│ FilterTransform × 16 processors │
│ SourceTransform × 16 processors │
└───────────────────────────────────────────────────────────────────────┘

Again, understanding this Pipeline from bottom to top:
SourceTransform: Reads data files, processed in parallel by 16 physical CPUs
FilterTransform: Filters data with the (number > 8) expression, processed in parallel by 16 physical CPUs
ExpressionTransform: Executes the (number + 1) expression on data, processed in parallel by 16 physical CPUs
ProjectionTransform: Processes data to generate final columns
LimitTransform: Applies Limit 2 to data, Pipeline folds, executed by one physical CPU

Databend uses the Pipeline parallel model combined with vectorized computation to maximize CPU utilization and accelerate computation.

  • Cache

Compute nodes use local SSD to cache data and indexes to improve data affinity and accelerate computation.
Cache warming modes:

  • LOAD_ON_DEMAND - Load indexes or data blocks on demand (default).
  • LOAD_INDEX - Load indexes only.
  • LOAD_ALL - Load all data and indexes. Suitable for smaller tables.

Storage Layer

Databend uses the Parquet columnar storage format to store data. To speed up searching (Partition Pruning), Datafuse provides its own indexes for each Parquet file (generated based on Primary Key):

  • min_max.idx: Parquet file’s minimum and maximum values
  • sparse.idx: Sparse index at N-record granularity

With these indexes, we can reduce data interaction and significantly reduce computation.
Suppose there are two Parquet files: f1 and f2.
f1‘s min_max.idx: [3, 5]; f2‘s min_max.idx: [4, 6]. If the query condition is: where x < 4, we only need the f1 file, and then use the sparse.idx index to locate a specific data page within the f1 file.

Summary

Databend is a modern data warehouse designed entirely for cloud architecture. It decouples traditional databases and reassembles them into a Cloud Warehouse, pursuing high elasticity and low cost.

Databend’s aggregate functions are already very efficient. Based on these efficient aggregate functions (especially Group By), we’ve implemented the model function windowFunnel for efficient funnel analysis computation.

Databend is currently in a rapid iteration phase. Follow us at: https://github.com/datafuselabs/databend/