B+树高效实现与优化技巧
B树的定义
一颗M阶B树T,满足以下条件
- 每个结点至多拥有M课子树
- 根结点至少拥有两颗子树
- 除了根结点以外,其余每个分支结点至少拥有M/2课子树
- 所有的叶结点都在同一层上
- 有k棵子树的分支结点则存在k-1个关键字,关键字按照递增顺序进行排序
- 关键字数量满足 ceil( M/2 ) - 1 <= n <= M-1
B树与B+树的区别
在实际磁盘存储中往往选用的都是b+树
b+树相较于b树的优点
- 关键字不保存数据,只用来索引,所有数据都保存在叶子结点(b树是每个关键字都保存数据)
- b+树的叶子结点是带有指针的,且叶结点本身按关键字从小到大顺序连接(适用于范围查询)
- b+树的中间结点不保存数据,所以磁盘页能容纳更多结点元素,更“矮胖”
C++B+树算法
构建B+树的基本结构
B+树是一种多路平衡搜索树,常用于数据库和文件系统索引。以下是一个简单的B+树节点结构定义:
template class BPlusNode {public: bool is_leaf; std::vector keys; std::vector values; // Only for leaf nodes std::vector children; // Only for non-leaf nodes BPlusNode* next; // Pointer to next leaf node (for range queries)};
插入操作的实现
插入操作需要处理节点分裂和键的重新分配。以下是插入逻辑的核心代码片段:
template void BPlusTree::insert(const Key& key, const Value& value) { if (root == nullptr) { root = new BPlusNode(true); root->keys.push_back(key); root->values.push_back(value); return; } BPlusNode* leaf = find_leaf(root, key); leaf->keys.push_back(key); leaf->values.push_back(value); std::sort(leaf->keys.begin(), leaf->keys.end()); if (leaf->keys.size() > order) { split_leaf(leaf); }}
删除操作的实现
删除操作需要处理节点合并和键的重新分配。以下是删除逻辑的核心代码片段:
template void BPlusTree::remove(const Key& key) { BPlusNode* leaf = find_leaf(root, key); auto it = std::find(leaf->keys.begin(), leaf->keys.end(), key); if (it == leaf->keys.end()) return; size_t index = it - leaf->keys.begin(); leaf->keys.erase(leaf->keys.begin() + index); leaf->values.erase(leaf->values.begin() + index); if (leaf != root && leaf->keys.size() < (order + 1) / 2) { handle_underflow(leaf); }}
范围查询的实现
B+树支持高效的范围查询,以下是实现代码片段:
template std::vector BPlusTree::range_query(const Key& start, const Key& end) { std::vector result; BPlusNode* leaf = find_leaf(root, start); while (leaf != nullptr) { for (size_t i = 0; i keys.size(); ++i) { if (leaf->keys[i] >= start && leaf->keys[i] values[i]); } if (leaf->keys[i] > end) return result; } leaf = leaf->next; } return result;}
完整B+树类的定义
以下是一个完整的B+树类定义,包含构造函数和析构函数:
template class BPlusTree {private: int order; BPlusNode* root;public: BPlusTree(int order) : order(order), root(nullptr) {} ~BPlusTree() { clear(root); } void insert(const Key& key, const Value& value); void remove(const Key& key); Value search(const Key& key); std::vector range_query(const Key& start, const Key& end);private: BPlusNode* find_leaf(BPlusNode* node, const Key& key); void split_leaf(BPlusNode* leaf); void handle_underflow(BPlusNode* node); void clear(BPlusNode* node);};
测试B+树的插入和查询
以下是一个简单的测试用例,验证B+树的插入和查询功能:
void test_b_plus_tree() { BPlusTree tree(3); tree.insert(1, \"Alice\"); tree.insert(2, \"Bob\"); tree.insert(3, \"Charlie\"); assert(tree.search(2) == \"Bob\"); auto results = tree.range_query(1, 3); assert(results.size() == 3);}
处理节点分裂的逻辑
当叶子节点的键数量超过阶数时,需要进行分裂:
template void BPlusTree::split_leaf(BPlusNode* leaf) { BPlusNode* new_leaf = new BPlusNode(true); size_t split_pos = leaf->keys.size() / 2; new_leaf->keys.assign(leaf->keys.begin() + split_pos, leaf->keys.end()); new_leaf->values.assign(leaf->values.begin() + split_pos, leaf->values.end()); leaf->keys.erase(leaf->keys.begin() + split_pos, leaf->keys.end()); leaf->values.erase(leaf->values.begin() + split_pos, leaf->values.end()); new_leaf->next = leaf->next; leaf->next = new_leaf; insert_into_parent(leaf, new_leaf->keys[0], new_leaf);}
处理节点下溢的逻辑
当节点的键数量低于最小值时,需要进行合并或借用:
template void BPlusTree::handle_underflow(BPlusNode* node) { if (node == root) { if (node->keys.empty() && !node->children.empty()) { root = node->children[0]; delete node; } return; } // Borrow or merge with siblings BPlusNode* parent = find_parent(root, node); // Implementation depends on sibling availability and size}
查找父节点的辅助函数
以下是一个辅助函数,用于查找给定节点的父节点:
template BPlusNode* BPlusTree::find_parent( BPlusNode* current, BPlusNode* child) { if (current == nullptr || current->is_leaf) return nullptr; for (size_t i = 0; i children.size(); ++i) { if (current->children[i] == child) { return current; } auto parent = find_parent(current->children[i], child); if (parent != nullptr) return parent; } return nullptr;}
插入到父节点的逻辑
分裂后需要将新节点的键插入到父节点中:
template void BPlusTree::insert_into_parent( BPlusNode* left, const Key& key, BPlusNode* right) { BPlusNode* parent = find_parent(root, left); if (parent == nullptr) { parent = new BPlusNode(false); parent->keys.push_back(key); parent->children.push_back(left); parent->children.push_back(right); root = parent; return; } auto it = std::lower_bound(parent->keys.begin(), parent->keys.end(), key); size_t index = it - parent->keys.begin(); parent->keys.insert(it, key); parent->children.insert(parent->children.begin() + index + 1, right); if (parent->keys.size() > order) { split_internal(parent); }}
内部节点分裂的逻辑
内部节点的分裂与叶子节点类似,但需要处理子节点指针:
template void BPlusTree::split_internal(BPlusNode* node) { BPlusNode* new_node = new BPlusNode(false); size_t split_pos = node->keys.size() / 2; Key middle_key = node->keys[split_pos]; new_node->keys.assign(node->keys.begin() + split_pos + 1, node->keys.end()); new_node->children.assign(node->children.begin() + split_pos + 1, node->children.end()); node->keys.erase(node->keys.begin() + split_pos, node->keys.end()); node->children.erase(node->children.begin() + split_pos + 1, node->children.end()); insert_into_parent(node, middle_key, new_node);}
清除B+树的逻辑
析构时需要递归释放所有节点的内存:
template void BPlusTree::clear(BPlusNode* node) { if (node == nullptr) return; if (!node->is_leaf) { for (auto child : node->children) { clear(child); } } delete node;}
搜索操作的实现
根据键查找对应的值:
template Value BPlusTree::search(const Key& key) { BPlusNode* leaf = find_leaf(root, key); auto it = std::find(leaf->keys.begin(), leaf->keys.end(), key); if (it == leaf->keys.end()) throw std::runtime_error(\"Key not found\"); return leaf->values[it - leaf->keys.begin()];}
查找叶子节点的辅助函数
以下是一个辅助函数,用于查找包含给定键的叶子节点:
template BPlusNode* BPlusTree::find_leaf( BPlusNode* node, const Key& key) { if (node == nullptr) return nullptr; if (node->is_leaf) return node; auto it = std::upper_bound(node->keys.begin(), node->keys.end(), key); size_t index = it - node->keys.begin(); return find_leaf(node->children[index], key);}
测试B+树的删除功能
以下是一个测试用例,验证B+树的删除功能:
void test_b_plus_tree_deletion() { BPlusTree tree(3); tree.insert(1, \"Alice\"); tree.insert(2, \"Bob\"); tree.insert(3, \"Charlie\"); tree.remove(2); try { tree.search(2); assert(false); // Should throw } catch (const std::runtime_error& e) { assert(std::string(e.what()) == \"Key not found\"); }}
处理叶子节点合并的逻辑
当叶子节点的键数量不足时,需要与兄弟节点合并:
template void BPlusTree::merge_leaves( BPlusNode* left, BPlusNode* right) { left->keys.insert(left->keys.end(), right->keys.begin(), right->keys.end()); left->values.insert(left->values.end(), right->values.begin(), right->values.end()); left->next = right->next; remove_from_parent(right); delete right;}
从父节点中删除键的逻辑
合并后需要从父节点中删除对应的键:
template void BPlusTree::remove_from_parent(BPlusNode* child) { BPlusNode* parent = find_parent(root, child); if (parent == nullptr) return; auto it = std::find(parent->children.begin(), parent->children.end(), child); if (it == parent->children.end()) return; size_t index = it - parent->children.begin(); if (index > 0) { parent->keys.erase(parent->keys.begin() + index - 1); } parent->children.erase(it); if (parent != root && parent->keys.size() < (order + 1) / 2 - 1) { handle_underflow(parent); }}
测试B+树的合并功能
以下是一个测试用例,验证B+树的合并功能:
void test_b_plus_tree_merge() { BPlusTree tree(3); for (int i = 1; i <= 4; ++i) { tree.insert(i, \"Value\" + std::to_string(i)); } tree.remove(1); tree.remove(2); assert(tree.search(3) == \"Value3\"); assert(tree.search(4) == \"Value4\");}
B+树的持久化存储
将B+树保存到文件中以便后续加载:
template void BPlusTree::serialize(const std::string& filename) { std::ofstream out(filename, std::ios::binary); serialize_node(out, root); out.close();}
序列化节点的逻辑
递归序列化节点及其子节点:
template void BPlusTree::serialize_node(std::ofstream& out, BPlusNode* node) { if (node == nullptr) return; out.write(reinterpret_cast(&node->is_leaf), sizeof(bool)); size_t size = node->keys.size(); out.write(reinterpret_cast(&size), sizeof(size_t)); for (const auto& key : node->keys) { out.write(reinterpret_cast(&key), sizeof(Key)); } if (node->is_leaf) { for (const auto& value : node->values) { size_t val_size = value.size(); out.write(reinterpret_cast(&val_size), sizeof(size_t)); out.write(value.c_str(), val_size); } } else { for (auto child : node->children) { serialize_node(out, child); } }}
从文件加载B+树
从文件中加载B+树:
template void BPlusTree::deserialize(const std::string& filename) { std::ifstream in(filename, std::ios::binary); if (!in) return; clear(root); root = deserialize_node(in); in.close();}
反序列化节点的逻辑
递归加载节点及其子节点:
template BPlusNode* BPlusTree::deserialize_node(std::ifstream& in) { if (in.eof()) return nullptr; bool is_leaf; in.read(reinterpret_cast(&is_leaf), sizeof(bool)); BPlusNode* node = new BPlusNode(is_leaf); size_t size; in.read(reinterpret_cast(&size), sizeof(size_t)); node->keys.resize(size); for (size_t i = 0; i < size; ++i) { in.read(reinterpret_cast(&node->keys[i]), sizeof(Key)); } if (is_leaf) { node->values.resize(size); for (size_t i = 0; i < size; ++i) { size_t val_size; in.read(reinterpret_cast(&val_size), sizeof(size_t)); char* buffer = new char[val_size + 1]; in.read(buffer, val_size); buffer[val_size] = \'\\0\'; node->values[i] = std::string(buffer); delete[] buffe