= 并发容器

:idprefix: concurrent_

Boost.Unordered 提供 `boost::concurrent_node_set` 、 `boost::concurrent_node_map` 、 `boost::concurrent_flat_set` 和 `boost::concurrent_flat_map` ，这些哈希表允许不同线程进行并发读写访问，无需用户实现任何同步机制。

[source,c++]
----
std::vector<int>                    input;
boost::concurrent_flat_map<int,int> m;

...

// process input in parallel
const int                 num_threads = 8;
std::vector<std::jthread> threads;
std::size_t               chunk = input.size() / num_threads; // how many elements per thread

for (int i = 0; i < num_threads; ++i) {
  threads.emplace_back([&,i] {
    // calculate the portion of input this thread takes care of
    std::size_t start = i * chunk;
    std::size_t end = (i == num_threads - 1)? input.size(): (i + 1) * chunk;

    for (std::size_t n = start; n < end; ++n) {
      m.emplace(input[n], calculation(input[n]));
    }
  });
}
----

在上例中，线程无需同步即可访问 `m` ，这与单线程场景中的操作方式一致。在理想情况下，若将给定工作负载分配给 _N_ 个线程，其执行速度相较单线程提升 _N_ 倍——由于同步开销与__争用__的存在（某线程等待其他线程离开容器的锁定区域），实践中无法达到此理论极限。但 Boost.Unordered 并发容器设计为以极低开销运行，通常可实现__线性扩展__（即性能提升与线程数量成正比，直至达到 CPU 的逻辑核心数）。

== 基于访问的 API

Boost.Unordered 并发容器的新用户首先会注意到：这些类__不提供迭代器__（它们在技术层面上不符合 C{plus}{plus} 标准中的 https://en.cppreference.com/w/cpp/named_req/Container[容器] 定义）。因为迭代器本质上是线程不安全的。请参考以下假设代码：

[source,c++]
----
auto it = m.find(k);  // A: get an iterator pointing to the element with key k
if (it != m.end() ) {
  some_function(*it); // B: use the value of the element
}
----

多线程场景中，若其他线程在 A 和 B 之间执行 `m.erase(k)` 操作，迭代器 `it` 可能在 B 点失效。虽然存在通过锁定指向元素来修复此问题的设计方案，但这种方法容易引发高竞争并可能导致程序死锁。 `operator++[]++` 也存在类似并发问题，因此 `boost::concurrent++_++flat++_++map` / `boost::concurrent++_++node++_++map` 也未提供该操作。替代方案是通过__访问函数__操作元素：

[source,c++]
----
m.visit(k, [](const auto& x) { // x is the element with key k (if it exists)
  some_function(x);            // use it
});
----

用户传递的访问函数（此处为 lambda 函数）由 Boost.Unordered 在内部以线程安全的方式执行，因此该函数可以安全访问目标元素，无需担心其他线程在此过程中造成干扰。

另一方面，访问函数__无法__访问容器本身：

[source,c++]
----
m.visit(k, [&](const auto& x) {
  some_function(x, m.size()); // forbidden: m can't be accessed inside visitation
});
----

但允许访问其他容器：

[source,c++]
----
m.visit(k, [&](const auto& x) {
  if (some_function(x)) {
    m2.insert(x); // OK, m2 is a different boost::concurrent_flat_map
  }
});
----

但通常而言，访问函数应尽可能轻量以减少争用并提升并行性。在某些情况下，将繁重操作移出访问函数可能更优：

[source,c++]
----
std::optional<value_type> o;
bool found = m.visit(k, [&](const auto& x) {
  o = x;
});
if (found) {
  some_heavy_duty_function(*o);
}
----

访问机制在并发容器 API 中占据核心地位，许多经典操作都提供了支持访问机制的变体：

[source,c++]
----
m.insert_or_visit(x, [](auto& y) {
  // if insertion failed because of an equivalent element y,
  // do something with it, for instance:
  ++y.second; // increment the mapped part of the element
});
----

注意此例中访问函数可__修改__元素：作为通用规则，对并发映射 `m` 的操作将根据 `m` 是否为常量类型，授予访问函数对元素的常量/非常量访问权限。 通过使用 `cvisit` 重载（如 `insert++_++or++_++cvisit` ）可始终显式请求常量访问，这可能提升并行性。而并发集合的访问始终为常量访问。

尽管预期使用频率较低，并发容器还提供在元素创建后立即进行访问的插入操作（除在已存在等价元素时执行常规访问之外）：

[source,c++]
----
  m.insert_and_cvisit(x,
    [](const auto& y) {
      std::cout<< "(" << y.first << ", " << y.second <<") inserted\n";
    },
    [](const auto& y) {
      std::cout<< "(" << y.first << ", " << y.second << ") already exists\n";
    });
----

支持访问功能的完整操作列表请参阅 xref:reference/concurrent_node_set.adoc#concurrent_node_set[`boost::concurrent++_++node++_++set`] 、 xref:reference/concurrent_node_map.adoc#concurrent_node_map[`boost::concurrent++_++node++_++map`] 、 xref:reference/concurrent_flat_set.adoc#concurrent_flat_set[`boost::concurrent++_++flat++_++set`] 和 xref:reference/concurrent_flat_map.adoc#concurrent_flat_map[`boost::concurrent++_++flat++_++map`] 的参考文档。

== 全表访问

在缺乏迭代器的情况下， `visit++_++all` 提供处理容器内全部元素的替代方案：

[source,c++]
----
m.visit_all([](auto& x) {
  x.second = 0; // reset the mapped part of the element
});
----

在支持标准并行算法的 C{plus}{plus}17 编译器中，全表遍历访问可进行并行化处理：

[source,c++]
----
m.visit_all(std::execution::par, [](auto& x) { // run in parallel
  x.second = 0; // reset the mapped part of the element
});
----

遍历过程可中途中断：

[source,c++]
----
// finds the key to a given (unique) value

int  key = 0;
int  value = ...;
bool found = !m.visit_while([&](const auto& x) {
  if(x.second == value) {
    key = x.first;
    return false; // finish
  }
  else {
    return true;  // keep on visiting
  }
});

if(found) { ... }
----

最后一项全表访问操作是 `erase++_++if` ：

[source,c++]
----
m.erase_if([](auto& x) {
  return x.second == 0; // erase the elements whose mapped value is zero
});
----

`visit++_++while` 与 `erase++_++if` 同样支持并行化。需要注意的是，为提升执行效率，全表遍历操作在运行期间不会锁定整个容器：这意味着在遍历过程中，其他线程可能同时执行元素的插入、修改或删除操作。建议在程序中避免对并发容器在任意时间点的全局状态做过强的假设。

== 批量访问

假设有一个 `std::array` 存储了需要在并发映射中查找的键：

[source,c++]
----
std::array<int, N> keys;
...
for(const auto& key: keys) {
  m.visit(key, [](auto& x) { ++x.second; });
}
----

__批量访问__允许用户通过单次操作传入所有键：

[source,c++]
----
m.visit(keys.begin(), keys.end(), [](auto& x) { ++x.second; });
----

提供此功能并非仅出于语法便利性考量：通过一次性处理所有键，可应用内部优化提升性能（详见 xref:benchmarks.adoc#benchmarks_boostconcurrent_flatnode_map[基准测试]）。实际上，用户可通过缓冲输入键值以实现分块批量访问，从而进一步提升效率：

[source,c++]
----
static constexpr auto bulk_visit_size = boost::concurrent_flat_map<int,int>::bulk_visit_size;
std::array<int, bulk_visit_size> buffer;
std::size_t                      i=0;
while(...) { // processing loop
  ...
  buffer[i++] = k;
  if(i == bulk_visit_size) {
    map.visit(buffer.begin(), buffer.end(), [](auto& x) { ++x.second; });
    i = 0;
  }
  ...
}
// flush remaining keys
map.visit(buffer.begin(), buffer.begin() + i, [](auto& x) { ++x.second; });
----

这里存在延迟与吞吐量的权衡：缓冲机制会延长单个键的处理延迟，但每秒处理的键总数（吞吐量）会更高。 `bulk++_++visit++_++size` 是推荐的缓冲区块大小——过小的缓冲区可能导致性能下降。

== 阻塞式操作

并发容器可像其他 Boost.Unordered 容器一样支持复制、赋值、清空和合并操作。与大多数其他操作不同，这些属于__阻塞式操作__：在执行期间，其他线程将被阻止访问相关容器。该阻塞机制由库自动处理，用户无需采取特殊预防措施，但整体性能可能受到影响。

另一阻塞操作是__重哈希__，该操作可通过 `rehash` / `reserve` 显式触发，或在插入过程中当容器负载达到 `max++_++load()` 时自动执行。与非并发容器类似，在批量插入前预先分配空间通常能加速处理过程。

== 与非并发容器的互操作性

由于开放寻址容器与并发容器基于相同的内部数据结构，它们可以高效地通过移动构造从其非并发对应容器转换而来，反之亦然。

[caption=, title='Table {counter:table-counter}. Concurrent/non-concurrent interoperatibility']
[cols="1,1", frame=all, grid=all]
|===
^|`boost::concurrent_node_set` ^|`boost::unordered_node_set`

^|`boost::concurrent_node_map` ^|`boost::unordered_node_map`

^|`boost::concurrent_flat_set` ^|`boost::unordered_flat_set`

^|`boost::concurrent_flat_set` ^|`boost::unordered_flat_set`

|===

此互操作性适用于多阶段场景：部分数据处理环节需要并行执行，而其他步骤采用非并发（或只读）模式。下例中，我们需要从一个庞大的单词输入向量构建词频统计直方图：填充阶段用 `boost::concurrent++_++flat++_++map` 并行执行，随后将结果转移至最终容器。

[source,c++]
----
std::vector<std::string> words = ...;

// Insert words in parallel
boost::concurrent_flat_map<std::string_view, std::size_t> m0;
std::for_each(
  std::execution::par, words.begin(), words.end(),
  [&](const auto& word) {
    m0.try_emplace_or_visit(word, 1, [](auto& x) { ++x.second; });
  });

// Transfer to a regular unordered_flat_map
boost::unordered_flat_map m=std::move(m0);
----
