ClickHouse is a true column-oriented DBMS. Data is stored by columns and during the execution of arrays (vectors or chunks of columns). Whenever possible, operations are dispatched on arrays, rather than on individual values. It is called “vectorized query execution,” and it helps lower the cost of actual data processing.
This idea is nothing new. It dates back to the
APLprogramming language and its descendants:
Q. Array programming is used in scientific data processing. Neither is this idea something new in relational databases: for example, it is used in the
There are two different approaches for speeding up query processing: vectorized query execution and runtime code generation. The latter removes all indirection and dynamic dispatch. Neither of these approaches is strictly better than the other. Runtime code generation can be better when it fuses many operations, thus fully utilizing CPU execution units and the pipeline. Vectorized query execution can be less practical because it involves temporary vectors that must be written to the cache and read back. If the temporary data does not fit in the L2 cache, this becomes an issue. But vectorized query execution more easily utilizes the SIMD capabilities of the CPU. A research paper written by our friends shows that it is better to combine both approaches. ClickHouse uses vectorized query execution and has limited initial support for runtime code generation.
IColumn interface is used to represent columns in memory (actually, chunks of columns). This interface provides helper methods for the implementation of various relational operators. Almost all operations are immutable: they do not modify the original column, but create a new modified one. For example, the
IColumn :: filter method accepts a filter byte mask. It is used for the
HAVING relational operators. Additional examples: the
IColumn :: permute method to support
ORDER BY, the
IColumn :: cut method to support
IColumn implementations (
ColumnString, and so on) are responsible for the memory layout of columns. The memory layout is usually a contiguous array. For the integer type of columns, it is just one contiguous array, like
std :: vector. For
Array columns, it is two vectors: one for all array elements, placed contiguously, and a second one for offsets to the beginning of each array. There is also
ColumnConst that stores just one value in memory, but looks like a column.
Nevertheless, it is possible to work with individual values as well. To represent an individual value, the
Field is used.
Field is just a discriminated union of
IColumn has the
operator method to get the n-th value as a
Field and the
insert method to append a
Field to the end of a column. These methods are not very efficient, because they require dealing with temporary
Field objects representing an individual value. There are more efficient methods, such as
insertRangeFrom, and so on.
Field doesn’t have enough information about a specific data type for a table. For example,
UInt64 are all represented as
UInt64 in a
IColumn has methods for common relational transformations of data, but they don’t meet all needs. For example,
ColumnUInt64 doesn’t have a method to calculate the sum of two columns, and
ColumnString doesn’t have a method to run a substring search. These countless routines are implemented outside of
Various functions on columns can be implemented in a generic, non-efficient way using
IColumn methods to extract
Field values, or in a specialized way using knowledge of inner memory layout of data in a specific
IColumn implementation. It is implemented by casting functions to a specific
IColumn type and deal with internal representation directly. For example,
ColumnUInt64 has the
getData method that returns a reference to an internal array, then a separate routine reads or fills that array directly. We have “leaky abstractions” to allow efficient specializations of various routines.
IDataType is responsible for serialization and deserialization: for reading and writing chunks of columns or individual values in binary or text form.
IDataType directly corresponds to data types in tables. For example, there are
DataTypeString and so on.
IColumn are only loosely related to each other. Different data types can be represented in memory by the same
IColumn implementations. For example,
DataTypeDateTime are both represented by
ColumnConstUInt32. In addition, the same data type can be represented by different
IColumn implementations. For example,
DataTypeUInt8 can be represented by
IDataType only stores metadata. For instance,
DataTypeUInt8 doesn’t store anything at all (except vptr) and
DataTypeFixedString stores just
N (the size of fixed-size strings).
IDataType has helper methods for various data formats. Examples are methods to serialize a value with possible quoting, to serialize a value for JSON, and to serialize a value as part of the XML format. There is no direct correspondence to data formats. For example, the different data formats
TabSeparated can use the same
serializeTextEscaped helper method from the
Block is a container that represents a subset (chunk) of a table in memory. It is just a set of triples:
(IColumn, IDataType, column name). During query execution, data is processed by
Blocks. If we have a
Block, we have data (in the
IColumn object), we have information about its type (in
IDataType) that tells us how to deal with that column, and we have the column name. It could be either the original column name from the table or some artificial name assigned for getting temporary results of calculations.
When we calculate some function over columns in a block, we add another column with its result to the block, and we don’t touch columns for arguments of the function because operations are immutable. Later, unneeded columns can be removed from the block, but not modified. It is convenient for the elimination of common subexpressions.
Blocks are created for every processed chunk of data. Note that for the same type of calculation, the column names and types remain the same for different blocks, and only column data changes. It is better to split block data from the block header because small block sizes have a high overhead of temporary strings for copying shared_ptrs and column names.
Block streams are for processing data. We use streams of blocks to read data from somewhere, perform data transformations, or write data to somewhere.
IBlockInputStream has the
read method to fetch the next block while available.
IBlockOutputStream has the
write method to push the block somewhere.
Streams are responsible for:
- Reading or writing to a table. The table just returns a stream for reading or writing blocks.
- Implementing data formats. For example, if you want to output data to a terminal in
Prettyformat, you create a block output stream where you push blocks, and it formats them.
- Performing data transformations. Let’s say you have
IBlockInputStreamand want to create a filtered stream. You create
FilterBlockInputStreamand initialize it with your stream. Then when you pull a block from
FilterBlockInputStream, it pulls a block from your stream, filters it, and returns the filtered block to you. Query execution pipelines are represented this way.
There are more sophisticated transformations. For example, when you pull from
AggregatingBlockInputStream, it reads all data from its source, aggregates it, and then returns a stream of aggregated data for you. Another example:
UnionBlockInputStream accepts many input sources in the constructor and also a number of threads. It launches multiple threads and reads from multiple sources in parallel.
Block streams use the “pull” approach to control flow: when you pull a block from the first stream, it consequently pulls the required blocks from nested streams, and the entire execution pipeline will work. Neither “pull” nor “push” is the best solution, because control flow is implicit, and that limits the implementation of various features like simultaneous execution of multiple queries (merging many pipelines together). This limitation could be overcome with coroutines or just running extra threads that wait for each other. We may have more possibilities if we make control flow explicit: if we locate the logic for passing data from one calculation unit to another outside of those calculation units. Read this article for more thoughts.
We should note that the query execution pipeline creates temporary data at each step. We try to keep block size small enough so that temporary data fits in the CPU cache. With that assumption, writing and reading temporary data is almost free in comparison with other calculations. We could consider an alternative, which is to fuse many operations in the pipeline together. It could make the pipeline as short as possible and remove much of the temporary data, which could be an advantage, but it also has drawbacks. For example, a split pipeline makes it easy to implement caching intermediate data, stealing intermediate data from similar queries running at the same time, and merging pipelines for similar queries.
Data formats are implemented with block streams. There are “presentational” formats only suitable for the output of data to the client, such as
Pretty format, which provides only
IBlockOutputStream. And there are input/output formats, such as
There are also row streams:
IRowOutputStream. They allow you to pull/push data by individual rows, not by blocks. And they are only needed to simplify the implementation of row-oriented formats. The wrappers
BlockOutputStreamFromRowOutputStream allow you to convert row-oriented streams to regular block-oriented streams.
For byte-oriented input/output, there are
WriteBuffer abstract classes. They are used instead of C++
iostreams. Don’t worry: every mature C++ project is using something other than
iostreams for good reasons.
WriteBuffer are just a contiguous buffer and a cursor pointing to the position in that buffer. Implementations may own or not own the memory for the buffer. There is a virtual method to fill the buffer with the following data (for
ReadBuffer) or to flush the buffer somewhere (for
WriteBuffer). The virtual methods are rarely called.
WriteBuffer are used for working with files and file descriptors and network sockets, for implementing compression (
CompressedWriteBuffer is initialized with another WriteBuffer and performs compression before writing data to it), and for other purposes – the names
HashingWriteBuffer speak for themselves.
Read/WriteBuffers only deal with bytes. There are functions from
WriteHelpers header files to help with formatting input/output. For example, there are helpers to write a number in decimal format.
Let’s look at what happens when you want to write a result set in
JSON format to stdout. You have a result set ready to be fetched from
IBlockInputStream. You create
WriteBufferFromFileDescriptor(STDOUT_FILENO) to write bytes to stdout. You create
JSONRowOutputStream, initialized with that
WriteBuffer, to write rows in
JSON to stdout. You create
BlockOutputStreamFromRowOutputStream on top of it, to represent it as
IBlockOutputStream. Then you call
copyData to transfer data from
IBlockOutputStream, and everything works. Internally,
JSONRowOutputStream will write various JSON delimiters and call the
IDataType::serializeTextJSON method with a reference to
IColumn and the row number as arguments. Consequently,
IDataType::serializeTextJSON will call a method from
WriteHelpers.h: for example,
writeText for numeric types and
IStorage interface represents tables. Different implementations of that interface are different table engines. Examples are
StorageMemory, and so on. Instances of these classes are just tables.
IStorage methods are
write. There are also
drop, and so on. The
read method accepts the following arguments: the set of columns to read from a table, the
AST query to consider, and the desired number of streams to return. It returns one or multiple
IBlockInputStream objects and information about the stage of data processing that was completed inside a table engine during query execution.
In most cases, the read method is only responsible for reading the specified columns from a table, not for any further data processing. All further data processing is done by the query interpreter and is outside the responsibility of
But there are notable exceptions:
- The AST query is passed to the
readmethod, and the table engine can use it to derive index usage and to read fewer data from a table.
- Sometimes the table engine can process data itself to a specific stage. For example,
StorageDistributedcan send a query to remote servers, ask them to process data to a stage where data from different remote servers can be merged, and return that preprocessed data. The query interpreter then finishes processing the data.
read method can return multiple
IBlockInputStream objects to allow parallel data processing. These multiple block input streams can read from a table in parallel. Then you can wrap these streams with various transformations (such as expression evaluation or filtering) that can be calculated independently and create a
UnionBlockInputStream on top of them, to read from multiple streams in parallel.
There are also
TableFunctions. These are functions that return a temporary
IStorage object to use in the
FROM clause of a query.
To get a quick idea of how to implement your table engine, look at something simple, like
As the result of the
QueryProcessingStage– information about what parts of the query were already calculated inside storage.
A hand-written recursive descent parser parses a query. For example,
ParserSelectQuery just recursively calls the underlying parsers for various parts of the query. Parsers create an
AST is represented by nodes, which are instances of
Parser generators are not used for historical reasons.
Interpreters are responsible for creating the query execution pipeline from an
AST. There are simple interpreters, such as
InterpreterDropQuery, or the more sophisticated
InterpreterSelectQuery. The query execution pipeline is a combination of block input or output streams. For example, the result of interpreting the
SELECT query is the
IBlockInputStream to read the result set from; the result of the INSERT query is the
IBlockOutputStream to write data for insertion to, and the result of interpreting the
INSERT SELECT query is the
IBlockInputStream that returns an empty result set on the first read, but that copies data from
INSERT at the same time.
ExpressionActions machinery for query analysis and transformations. This is where most rule-based query optimizations are done.
ExpressionAnalyzer is quite messy and should be rewritten: various query transformations and optimizations should be extracted to separate classes to allow modular transformations or query.
There are ordinary functions and aggregate functions. For aggregate functions, see the next section.
Ordinary functions don’t change the number of rows – they work as if they are processing each row independently. In fact, functions are not called for individual rows, but for
Block’s of data to implement vectorized query execution.
ClickHouse has strong typing, so there’s no implicit type conversion. If a function doesn’t support a specific combination of types, it throws an exception. But functions can work (be overloaded) for many different combinations of types. For example, the
plus function (to implement the
+ operator) works for any combination of numeric types:
Int8, and so on. Also, some variadic functions can accept any number of arguments, such as the
Implementing a function may be slightly inconvenient because a function explicitly dispatches supported data types and supported
IColumns. For example, the
plus function has code generated by instantiation of a C++ template for each combination of numeric types, and constant or non-constant left and right arguments.
It is an excellent place to implement runtime code generation to avoid template code bloat. Also, it makes it possible to add fused functions like fused multiply-add or to make multiple comparisons in one loop iteration.
Due to vectorized query execution, functions are not short-circuited. For example, if you write
WHERE f(x) AND g(y), both sides are calculated, even for rows, when
f(x) is zero (except when
f(x) is a zero constant expression). But if the selectivity of the
f(x) condition is high, and calculation of
f(x) is much cheaper than
g(y), it’s better to implement multi-pass calculation. It would first calculate
f(x), then filter columns by the result, and then calculate
g(y) only for smaller, filtered chunks of data.
Aggregate functions are stateful functions. They accumulate passed values into some state and allow you to get results from that state. They are managed with the
IAggregateFunction interface. States can be rather simple (the state for
AggregateFunctionCount is just a single
UInt64 value) or quite complex (the state of
AggregateFunctionUniqCombined is a combination of a linear array, a hash table, and a
HyperLogLog probabilistic data structure).
States are allocated in
Arena (a memory pool) to deal with multiple states while executing a high-cardinality
GROUP BY query. States can have a non-trivial constructor and destructor: for example, complicated aggregation states can allocate additional memory themselves. It requires some attention to creating and destroying states and properly passing their ownership and destruction order.
Aggregation states can be serialized and deserialized to pass over the network during distributed query execution or to write them on the disk where there is not enough RAM. They can even be stored in a table with the
DataTypeAggregateFunction to allow incremental aggregation of data.
The serialized data format for aggregate function states is not versioned right now. It is ok if aggregate states are only stored temporarily. But we have the
AggregatingMergeTreetable engine for incremental aggregation, and people are already using it in production. It is the reason why backward compatibility is required when changing the serialized format for any aggregate function in the future.
The server implements several different interfaces:
- An HTTP interface for any foreign clients.
- A TCP interface for the native ClickHouse client and for cross-server communication during distributed query execution.
- An interface for transferring data for replication.
Internally, it is just a primitive multithreaded server without coroutines or fibers. Since the server is not designed to process a high rate of simple queries but to process a relatively low rate of complex queries, each of them can process a vast amount of data for analytics.
The server initializes the
Context class with the necessary environment for query execution: the list of available databases, users and access rights, settings, clusters, the process list, the query log, and so on. Interpreters use this environment.
We maintain full backward and forward compatibility for the server TCP protocol: old clients can talk to new servers, and new clients can talk to old servers. But we don’t want to maintain it eternally, and we are removing support for old versions after about one year.
For most external applications, we recommend using the HTTP interface because it is simple and easy to use. The TCP protocol is more tightly linked to internal data structures: it uses an internal format for passing blocks of data, and it uses custom framing for compressed data. We haven’t released a C library for that protocol because it requires linking most of the ClickHouse codebase, which is not practical.
Servers in a cluster setup are mostly independent. You can create a
Distributed table on one or all servers in a cluster. The
Distributed table does not store data itself – it only provides a “view” to all local tables on multiple nodes of a cluster. When you SELECT from a
Distributed table, it rewrites that query, chooses remote nodes according to load balancing settings, and sends the query to them. The
Distributed table requests remote servers to process a query just up to a stage where intermediate results from different servers can be merged. Then it receives the intermediate results and merges them. The distributed table tries to distribute as much work as possible to remote servers and does not send much intermediate data over the network.
Things become more complicated when you have subqueries in IN or JOIN clauses, and each of them uses a
Distributed table. We have different strategies for the execution of these queries.
There is no global query plan for distributed query execution. Each node has its local query plan for its part of the job. We only have simple one-pass distributed query execution: we send queries for remote nodes and then merge the results. But this is not feasible for complicated queries with high cardinality GROUP BYs or with a large amount of temporary data for JOIN. In such cases, we need to “reshuffle” data between servers, which requires additional coordination. ClickHouse does not support that kind of query execution, and we need to work on it.
MergeTree is a family of storage engines that supports indexing by primary key. The primary key can be an arbitrary tuple of columns or expressions. Data in a
MergeTree table is stored in “parts”. Each part stores data in the primary key order, so data is ordered lexicographically by the primary key tuple. All the table columns are stored in separate
column.bin files in these parts. The files consist of compressed blocks. Each block is usually from 64 KB to 1 MB of uncompressed data, depending on the average value size. The blocks consist of column values placed contiguously one after the other. Column values are in the same order for each column (the primary key defines the order), so when you iterate by many columns, you get values for the corresponding rows.
The primary key itself is “sparse”. It doesn’t address every single row, but only some ranges of data. A separate
primary.idx file has the value of the primary key for each N-th row, where N is called
index_granularity (usually, N = 8192). Also, for each column, we have
column.mrk files with “marks,” which are offsets to each N-th row in the data file. Each mark is a pair: the offset in the file to the beginning of the compressed block, and the offset in the decompressed block to the beginning of data. Usually, compressed blocks are aligned by marks, and the offset in the decompressed block is zero. Data for
primary.idx always resides in memory, and data for
column.mrk files is cached.
When we are going to read something from a part in
MergeTree, we look at
primary.idx data and locate ranges that could contain requested data, then look at
column.mrk data and calculate offsets for where to start reading those ranges. Because of sparseness, excess data may be read. ClickHouse is not suitable for a high load of simple point queries, because the entire range with
index_granularity rows must be read for each key, and the entire compressed block must be decompressed for each column. We made the index sparse because we must be able to maintain trillions of rows per single server without noticeable memory consumption for the index. Also, because the primary key is sparse, it is not unique: it cannot check the existence of the key in the table at INSERT time. You could have many rows with the same key in a table.
INSERT a bunch of data into
MergeTree, that bunch is sorted by primary key order and forms a new part. There are background threads that periodically select some parts and merge them into a single sorted part to keep the number of parts relatively low. That’s why it is called
MergeTree. Of course, merging leads to “write amplification”. All parts are immutable: they are only created and deleted, but not modified. When SELECT is executed, it holds a snapshot of the table (a set of parts). After merging, we also keep old parts for some time to make a recovery after failure easier, so if we see that some merged part is probably broken, we can replace it with its source parts.
MergeTree is not an LSM tree because it doesn’t contain “memtable” and “log”: inserted data is written directly to the filesystem. This makes it suitable only to INSERT data in batches, not by individual row and not very frequently – about once per second is ok, but a thousand times a second is not. We did it this way for simplicity’s sake, and because we are already inserting data in batches in our applications.
MergeTree tables can only have one (primary) index: there aren’t any secondary indices. It would be nice to allow multiple physical representations under one logical table, for example, to store data in more than one physical order or even to allow representations with pre-aggregated data along with original data.
There are MergeTree engines that are doing additional work during background merges. Examples are
AggregatingMergeTree. This could be treated as special support for updates. Keep in mind that these are not real updates because users usually have no control over the time when background merges are executed, and data in a
MergeTree table is almost always stored in more than one part, not in completely merged form.
Replication in ClickHouse can be configured on a per-table basis. You could have some replicated and some non-replicated tables on the same server. You could also have tables replicated in different ways, such as one table with two-factor replication and another with three-factor.
Replication is implemented in the
ReplicatedMergeTree storage engine. The path in
ZooKeeper is specified as a parameter for the storage engine. All tables with the same path in
ZooKeeper become replicas of each other: they synchronize their data and maintain consistency. Replicas can be added and removed dynamically simply by creating or dropping a table.
Replication uses an asynchronous multi-master scheme. You can insert data into any replica that has a session with
ZooKeeper, and data is replicated to all other replicas asynchronously. Because ClickHouse doesn’t support UPDATEs, replication is conflict-free. As there is no quorum acknowledgment of inserts, just-inserted data might be lost if one node fails.
Metadata for replication is stored in ZooKeeper. There is a replication log that lists what actions to do. Actions are: get part; merge parts; drop a partition, and so on. Each replica copies the replication log to its queue and then executes the actions from the queue. For example, on insertion, the “get the part” action is created in the log, and every replica downloads that part. Merges are coordinated between replicas to get byte-identical results. All parts are merged in the same way on all replicas. It is achieved by electing one replica as the leader, and that replica initiates merges and writes “merge parts” actions to the log.
Replication is physical: only compressed parts are transferred between nodes, not queries. Merges are processed on each replica independently in most cases to lower the network costs by avoiding network amplification. Large merged parts are sent over the network only in cases of significant replication lag.
Besides, each replica stores its state in ZooKeeper as the set of parts and its checksums. When the state on the local filesystem diverges from the reference state in ZooKeeper, the replica restores its consistency by downloading missing and broken parts from other replicas. When there is some unexpected or broken data in the local filesystem, ClickHouse does not remove it, but moves it to a separate directory and forgets it.
The ClickHouse cluster consists of independent shards, and each shard consists of replicas. The cluster is not elastic, so after adding a new shard, data is not rebalanced between shards automatically. Instead, the cluster load is supposed to be adjusted to be uneven. This implementation gives you more control, and it is ok for relatively small clusters, such as tens of nodes. But for clusters with hundreds of nodes that we are using in production, this approach becomes a significant drawback. We should implement a table engine that spans across the cluster with dynamically replicated regions that could be split and balanced between clusters automatically.