上篇笔记讲到了聚合函数的实现并且带大家看了聚合函数是如何注册到ClickHouse之中的并被调用使用的。这篇笔记,笔者会续上上篇的内容,将剖析一把ClickHouse聚合流程的整体实现。 第二篇文章,我们来一起看看聚合流程的实现
上篇笔记讲到了聚合函数的实现并且带大家看了聚合函数是如何注册到ClickHouse之中的并被调用使用的。这篇笔记,笔者会续上上篇的内容,将剖析一把ClickHouse聚合流程的整体实现。
第二篇文章,我们来一起看看聚合流程的实现~~ 上车!
class Block
{
private:
using Container = ColumnsWithTypeAndName;
using IndexByName = std::map;
Container data;
IndexByName index_by_name;
这是一个很重要的类,实现的也并不复杂。Block类作为ClickHouse的核心,后续的工作都是基于Block类展开的。
read函数
,它返回一个被对应Stream处理过的Block。class IBlockInputStream : public TypePromotion
{
friend struct BlockStreamProfileInfo;
public:
IBlockInputStream() { info.parent = this; }
virtual ~IBlockInputStream() {}
IBlockInputStream(const IBlockInputStream &) = delete;
IBlockInputStream & operator=(const IBlockInputStream &) = delete;
/// To output the data stream transfORMation tree (query execution plan).
virtual String getName() const = 0;
virtual Block getHeader() const = 0;
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
return none;
}
/// If this stream generates data in order by some keys, return true.
virtual bool isSortedOutput() const { return false; }
/// In case of isSortedOutput, return corresponding SortDescription
virtual const SortDescription & getSortDescription() const;
Block read();
class AggregatingBlockInputStream : public IBlockInputStream
{
public:
AggregatingBlockInputStream(const BlockInputStreamPtr & input, const Aggregator::Params & params_, bool final_)
: params(params_), aggregator(params), final(final_)
{
children.push_back(input);
}
String getName() const override { return "Aggregating"; }
Block getHeader() const override;
protected:
Block readImpl() override;
Aggregator::Params params;
Aggregator aggregator;
bool final;
bool executed = false;
std::vector> temporary_inputs;
std::unique_ptr impl;
};
首先看它的构造方法,参数有:
Aggregator
的内部类。这里最为核心的就是AggregatingBlockInputStream类通过继承override对应的readImpl()的接口来实现对应的具体逻辑。AggregatingBlockInputStream类还有一个孪生兄弟:ParallelAggregatingBlockInputStream类,通过并行化来进一步加快聚合流程的执行效率。(通过笔者进行的测试,在简单查询聚合查询下,并行化能够提高近一倍的效率~~)
struct Params
{
/// Data structure of source blocks.
Block src_header;
/// Data structure of intermediate blocks before merge.
Block intermediate_header;
/// What to count.
const ColumnNumbers keys;
const AggregateDescriptions aggregates;
const size_t keys_size;
const size_t aggregates_size;
/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
const size_t max_rows_to_group_by;
const OverflowMode group_by_overflow_mode;
/// Settings to flush temporary data to the filesystem (external aggregation).
const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
VolumePtr tmp_volume;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
const size_t min_free_disk_space;
Params(
const Block & src_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_volume(tmp_volume_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0)
{
intermediate_header = intermediate_header_;
}
};
class Aggregator
{
public:
Aggregator(const Params & params_);
/// Aggregate the source. Get the result in the form of one of the data structures.
void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result);
using AggregateColumns = std::vector;
using AggregateColumnsData = std::vector;
using AggregateColumnsConstData = std::vector;
using AggregateFunctionsPlainPtrs = std::vector;
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = "break").
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
bool executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys);
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
std::unique_ptr mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
ManyAggregatedDataVariants prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const;
void mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads);
using BucketToBlocks = std::map;
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);
/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).
/// The resulting block has the same value of is_overflows flag.
Block mergeBlocks(BlocksList & blocks, bool final);
std::unique_ptr mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
using CancellationHook = std::function;
void setCancellationHook(const CancellationHook cancellation_hook);
/// Get data structure of the result.
Block getHeader(bool final) const;
这里我们就从上文提到的Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result)函数作为起点来梳理一下ClickHouse的聚合实现:
void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result)
{
Stopwatch watch;
size_t src_rows = 0;
size_t src_bytes = 0;
/// Read all the data
while (Block block = stream->read())
{
if (isCancelled())
return;
src_rows += block.rows();
src_bytes += block.bytes();
if (!executeOnBlock(block, result, key_columns, aggregate_columns, no_more_keys))
break;
}
由上述代码可以看出,这里就是依次读取子节点流生成的Block,然后继续调用executeOnBlock方法来执行聚合流程处理每一个Block的聚合。接着我们按图索骥,继续看下去,这个函数比较长,我们拆分成几个部分,并且把无关紧要的代码先去掉:这部分主要完成的工作就是将param之中指定的key列与聚合列的指针作为参数提取出来,并且和聚合函数一起封装到AggregateFunctionInstructions的结构之中。
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
}
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
Columns materialized_columns;
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(columns.at(params.keys[i])->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (!result.isLowCardinality())
{
auto column_no_lc = recursiveRemoveLowCardinality(key_columns[i]->getPtr());
if (column_no_lc.get() != key_columns[i])
{
materialized_columns.emplace_back(std::move(column_no_lc));
key_columns[i] = materialized_columns.back().get();
}
}
}
AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
std::vector> nested_columns_holder;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
materialized_columns.push_back(columns.at(params.aggregates[i].arguments[j])->convertToFullColumnIfConst());
aggregate_columns[i][j] = materialized_columns.back().get();
auto column_no_lc = recursiveRemoveLowCardinality(aggregate_columns[i][j]->getPtr());
if (column_no_lc.get() != aggregate_columns[i][j])
{
materialized_columns.emplace_back(std::move(column_no_lc));
aggregate_columns[i][j] = materialized_columns.back().get();
}
}
aggregate_functions_instructions[i].arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].state_offset = offsets_of_aggregate_states[i];
auto that = aggregate_functions[i];
/// Unnest consecutive trailing -State combinators
while (auto func = typeid_cast(that))
that = func->getNestedFunction().get();
aggregate_functions_instructions[i].that = that;
aggregate_functions_instructions[i].func = that->getAddressOfAddFunction();
if (auto func = typeid_cast(that))
{
/// Unnest consecutive -State combinators before -Array
that = func->getNestedFunction().get();
while (auto nested_func = typeid_cast(that))
that = nested_func->getNestedFunction().get();
auto [nested_columns, offsets] = checkAndGetNestedArrayOffset(aggregate_columns[i].data(), that->getArgumentTypes().size());
nested_columns_holder.push_back(std::move(nested_columns));
aggregate_functions_instructions[i].batch_arguments = nested_columns_holder.back().data();
aggregate_functions_instructions[i].offsets = offsets;
}
else
aggregate_functions_instructions[i].batch_arguments = aggregate_columns[i].data();
aggregate_functions_instructions[i].batch_that = that;
}
将需要准备的参数准备好了之后,后续就通过按部就班的调用executeImpl(*result.NAME, result.aggregates_pool, num_rows, key_columns, aggregate_functions_instructions.data(),
no_more_keys, overflow_row_ptr)聚合运算了。我们来看看它的实现,它是一个模板函数,内部通过调用了 executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions)来实现的,数据库都会通过Batch的形式,一次性提交一组需要操作的数据来减少虚函数调用的开销。
template
void NO_INLINE Aggregator::executeImpl(
Method & method,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
AggregateDataPtr overflow_row) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions);
else
executeImplCase(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
那我们就继续看下去,executeImplBatch同样也是一个模板函数。
到这里,整个聚合计算的核心流程算是完成了,后续就是将result的结果通过上面的convertToBlock的方式转换为BlockStream流,继续返回给上层的调用方。
template
void NO_INLINE Aggregator::executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
{
PODArray places(rows);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
places[i] = aggregate_data;
assert(places[i] != nullptr);
}
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.data(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.data(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
好了,到这里也就把ClickHouse聚合流程的代码梳理完了。
除了聚合计算外,其他的物理执行操作符也是同样通过流的方式依次对接处理的,源码阅读的步骤也可以参照笔者的分析流程来参考。、
笔者是一个ClickHouse的初学者,对ClickHouse有兴趣的同学,欢迎多多指教,交流。
官方文档
ClickHouse源代码
--结束END--
本文标题: ClickHouse源码笔记2:聚合流程的实现
本文链接: https://lsjlt.com/news/7237.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0