十亿行挑战-C++

十亿行挑战本是一个针对现代Java的性能优化挑战,该挑战没有什么内容是针对Java独占的,因此尝试使用C++实现,限制为C++17版本。

github上可以看到该挑战的具体细节,要求是:

  1. 有一个大文件,包含十亿行数据,每行的结构是<string: station name>;<double: measurement>
    1. station name是一个字符串,长度不超过100个字符,最多包含10000个不同的station name
    2. measurement是一个浮点数,必包含小数点后一位,范围是[-99.9, 99.9]
  2. 文本已经读入内存,每行以\n结尾
  3. 不允许使用第三方库

挑战的目标是:计算每个station name对应的平均值、最大值、最小值,然后以station name的字典序格式化输出。

思路

既然是一个性能优化挑战,参考项目上的排名,我给自己定义的目标是运行时间小于内存读取时间。另外,因为没有对应的测试平台,仅能本地测试,因此只考虑优化过程,运行时间与排名没有可对比性。

考虑的优化步骤是:

  1. 先是考虑软件的一般优化
    1. 数据切块
    2. 字符串视图(对应std::string_view)
    3. 惰性计算
    4. 预取和缓存命中
  2. 再通过perf工作做细节优化

切块和视图是必要项,切块目的是易于并行,视图是为了减少字符串拷贝;切块时需要考虑数据块的完整性,即要以\n为界,否则就需要考虑数据拼接,反而增加了复杂度;字符串视图贯穿整个过程,且理解比较简单,因此不多介绍。(我原先更关注string_view本身,而不是视图这个概念,对视图的理解是来自对C++20 range的学习,推荐感兴趣的也可以去看看range,从而加深对视图这一概念的理解。)

惰性计算也是必要项,主要目的是减少重复的计算,延缓至真正需要时再计算,这样可以减少计算量。

预取和缓存命中依赖具体实现,需要实现基本代码后考虑;或通过代码结构来直接提高预取和缓存命中的效果。

本挑战的唯一目的就是快(当然要正确),快就意味着CPU资源的充分利用,因此要尽量避免CPU等待,具体化就是要尽量避免锁;如果锁生效了,意味着CPU等待了,造成CPU资源浪费,那么这就是不合理的设计;所以在初始设计时,就要避免锁或其他类似同步机制的存在。

实现

文件读取

两种读取方案:

  1. 一次性全部读取到内存
  2. mmap映射

这两种实现,并没有特别需要注意的地方,不过尝试多线程std::ifstream读取文件时,并没有明显的加速作用,还待调查:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  FileToMemory &load_ram() {
    load_size_();

    auto buffer = std::make_unique<char[]>(size_);
    std::ifstream ifs(filename_, std::ios::binary);
    if (!ifs) {
      throw std::runtime_error("open file failed");
    }
    if (!ifs.read(buffer.get(), size_)) {
      throw std::runtime_error("read file failed");
    }
    data_ = std::move(buffer);

    return *this;
  }

mmap时注意析构时销毁即可:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  FileToMemory &load_map() {
    load_size_();

    map_fd_ = open(filename_.c_str(), O_RDONLY);
    if (map_fd_ < 0) {
      throw std::runtime_error("open file failed");
    }

    const void *buffer = mmap(nullptr, size_, PROT_READ, MAP_PRIVATE, map_fd_, 0);
    if (buffer == MAP_FAILED) {
      throw std::runtime_error("mmap file failed");
    }
    data_ = static_cast<const char *>(buffer);

    return *this;
  }

数据结构

对每一项,使用KV结构存储,key是station name,对应的结构是std::string_view,这样可以避免字符串拷贝;value使用自定义结构体:

1
2
3
4
5
6
  struct Item {
    int32_t min{std::numeric_limits<int32_t>::max()};
    int32_t max{std::numeric_limits<int32_t>::min()};
    int64_t sum{0};
    uint64_t count{0};
  };

min、max、sum、count分别对应最小值、最大值、和、计数。均使用整数计数,只在最终输出结果时转换成浮点数;支撑使用整形的理由是,规则说明了“measurement是一个浮点数,必包含小数点后一位,范围是[-99.9, 99.9]”,因此读取时,跳过'.',直接读取整数部分即可,如果除以10,那么就是原始数据。

需要考虑sum的溢出问题,最多10亿行,sum最大值为99.9*10^9,int64_t是足够的。

min和max能否使用int16_t存储来节省空间?我认为使用int32_t更合理,因为需要考虑一般平台的操作数是一个word,即32位,因此使用int16_t会增加额外的操作,反而增加了复杂度。不过这个问题实际测试下来不一定有影响,因为CPU也提供了针对16bit的操作,见https://gcc.godbolt.org/z/cdEMbnqYa

因为输出需要按照字典序,因此最终结果使用std::map

1
  using ItemMapOrder = std::map<std::string_view, Item>;

数据切块

按照线程数决定分成多少块,且尽量保证每一块大小相差不大。实现代码是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    const auto size = io_.size();
    const char *buffer = io_.data();
    const char *end = buffer + size;
    const uint32_t real_thread_count = (size < 1024 * 1024) ? 1 : threads_count_;

    threads_.reserve(real_thread_count);
    thread_items_.resize(real_thread_count);

    size_t step = size / real_thread_count;

    for (size_t i = 0; i < real_thread_count; ++i) {
      const char *next_buffer = std::min(buffer + step, end - 1);
      while (next_buffer < end && *next_buffer != '\n') {
        (i & 0x1) ? next_buffer-- : next_buffer++;
      }
      step = next_buffer - buffer;
      threads_.emplace_back(&OneBRC::run_thread<switch_idx>, this, i, buffer, step + 1);
      buffer = next_buffer + 1;
    }
    for (auto &t : threads_) {
      t.join();
    }

io_是文件读取模块,io_.data()拿到的是文件加载到内存后的内存地址。那么分块思路是:

  1. 计算每一块的平均大小,即size / real_thread_count
  2. buffer开始,找到下一个块的位置预期位置,即buffer + step,然后根据该位置向前或向后找到\n,即找到块的结束位置,那么两个之间就是当前需要使用的buffer;向前或向后是为了避免buffer只增长而不减少

多线程

分块保证了各个线程处理的数据块没有交集,因此不用担心重复读取的问题;但是最终结果写入的时候,需要保证线程安全?

线程外

一种做法是每个线程使用独立的数据写入块,其结构是:

1
  std::vector<ItemMap> thread_items_;

ItemMap相关细节实现是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  template <uint32_t idx, typename T, typename... Ts> struct Switch {
    using type = typename Switch<idx - 1, Ts...>::type;
  };
  template <typename T, typename... Ts> struct Switch<0, T, Ts...> { using type = T; };
  template <uint32_t idx, typename T, typename... Ts> using Switch_t = typename Switch<idx, T, Ts...>::type;

  // search is faster than hash ?
  // pmr stack buffer is faster than heaps
  constexpr inline static uint32_t switch_idx = config_switch_idx;
  using ItemMap = Switch_t<switch_idx,                                      // switch_idx
                           MapHelper<std::string_view, Item>,               // 0
                           std::unordered_map<std::string_view, Item>,      // 1
                           std::pmr::unordered_map<std::string_view, Item>, // 2
                           std::pmr::map<std::string_view, Item>            // 3
                           >;

基本都采用STL提供的容器,因为ItemMap使用最频繁,达到10亿次数量级,因此考虑尝试多种结构,通过Switch模板来切换测试。

回到thread_items_,每个线程都只负责写入自己的数据块,因此初始化时,thread_items_需要按照线程数初始化:

1
2
    threads_.reserve(real_thread_count);
    thread_items_.resize(real_thread_count);

然后线程启动时,需要传递线程序号,以便线程知道自己的数据块在哪里,对应的代码为:

1
      threads_.emplace_back(&OneBRC::run_thread<switch_idx>, this, i, buffer, step + 1);
线程内

线程函数是真正的处理函数,分两步:

  1. 内存预分配
  2. 数据处理

内存预分配主要是考虑容器的内存预分配,因为数据量较大(最多10000不同station),因此预分配可以减少内存分配次数,提高性能。具体实现是:

1
2
3
4
5
6
7
8
    auto &items = thread_items_[index];
    if constexpr (switch_idx_cp == 2 ||
                  switch_idx_cp == 3) { // constexpr if works as compile switcher only in template function
      std::array<std::byte, 10000 * 256> stack_buffer;
      std::pmr::monotonic_buffer_resource resource(stack_buffer.data(), stack_buffer.size());
      items = ItemMap(&resource);
    }
    items.reserve(10000);

先按照线程序号,获取当前线程可写入的数据块,然后是使用栈内存初始化这个数据块,并预分配10000个KV对。

核心处理逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    const char *prev_ptr = buffer, *split_ptr = nullptr, *end = buffer + length;
    int32_t value = 0;
    bool sign = false, access_number = false;
    for (const char *start = buffer; start < end; start++) {
      if (const char c = *(start); access_number && c != '\n') {
        if (c == '-') {
          sign = true;
        } else if (c != '.') {
          value = value * 10 + (c - '0');
        }
      } else if (c == ';') {
        split_ptr = (start);
        value = 0;
        sign = false, access_number = true;
      } else if (c == '\n') {
        value = sign ? -value : value;
        const std::string_view name(prev_ptr, split_ptr - prev_ptr);
        auto &[min, max, sum, count] = items[name];
        min = std::min(min, value);
        max = std::max(max, value);
        sum += value;
        count++;
        prev_ptr = (start) + 1;
        access_number = false;
      }
    }

这里考虑了几个优化点,对比和历史过程就不展示了,因为我也详细的记录数据:

  1. if顺序调整,if命中多的分支放在前面
  2. 使用指针迭代代替索引,减少计算量
  3. 直接使用c - '0'而不是查找表
  4. 整形存储,延迟浮点计算的时间(上文已经介绍过)

指针代替索引和c - '0'的优化都需要实际测试,具有特异性。此处考虑这两个优化的原因是:

  1. 比如std::string_view name,需要计算长度,使用索引和指针计算量没有差别;循环迭代的过程索引和指针也没有差别,都是自增;访问字符时,索引内部可能会有偏移计算,而指针是直接访问,指针胜一筹
  2. 使用c - '0'时,只有一个减法运算,如果使用查找表,会有一次索引,还会涉及一次内存访问,测试下来,c - '0'更快

结果合并

合并后的结果需要写入到文件,那么就可以考虑先格式化写入到内存,然后一次性将内存写入到文件,这样可以减少文件写入次数,提高性能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    for (auto &items : thread_items_) {
      for (const auto &item : items) {
        auto &[name, value] = item;
        auto &[min, max, sum, count] = items_[name];
        min = std::min(min, value.min);
        max = std::max(max, value.max);
        sum += value.sum;
        count += value.count;
      }
    }

    constexpr auto buffer_size = 10240 * 256;
    auto file_buffer = std::make_unique<char[]>(buffer_size); // enough
    char *buffer_ptr = file_buffer.get();
    buffer_ptr += snprintf(buffer_ptr, buffer_size - (buffer_ptr - file_buffer.get()), "{");
    for (const auto &item : items_) {
      const auto &[name, value] = item;
      const int32_t mean = std::round(static_cast<double>(value.sum) / value.count);
      buffer_ptr += snprintf(buffer_ptr, buffer_size - (buffer_ptr - file_buffer.get()), "%s=%.1f/%.1f/%.1f, ",
                             std::string(name).c_str(), value.min / 10.0, mean / 10.0, value.max / 10.0);
    }
    buffer_ptr -= 2; // remove last ', '
    buffer_ptr += snprintf(buffer_ptr, buffer_size - (buffer_ptr - file_buffer.get()), "}");
    *buffer_ptr = '\n'; // end of string

    // if exist output.txt, will be overwrite
    std::ofstream ofs("output.txt", std::ios::binary);
    if (!ofs) {
      throw std::runtime_error("open file failed");
    }
    ofs.write(file_buffer.get(), buffer_ptr - file_buffer.get() + 1);

进一步优化

上文展示的是最终结果,期间过程还考虑了一些优化,但不一定有效(目前我仅能到此了~)。

pipeline设计

我这几年的工作几乎一直在和pipeline打交道,但是因为没有经历其他的工作,也不知这是否是通用的设计模式。

在1BRC这个项目上,如何考虑pipeline?设想了几个阶段:

  1. 文件读取
  2. 数据预处理
  3. 数据处理

因为需求上就已经说了,不需要考虑文件读取,所以只考虑数据预处理和数据处理。详细想法是这样的:

  1. 一个线程遍历buffer,找到分割点的位置,此处分割点是指;\n,然后将位置信息放入队列
  2. 另一个线程从队列中去除位置信息,然后处理,主要处理内容是数字转换和std::unordered_map查找

虽然定义了各个线程主要的工作内容,但是还是需要运行后根据实际情况调整,目的是均衡两个线程的负载,尽量减少循环队列的等待时间。但是测试下来,这种设计还不如直接处理块,把pipeline的线程给数据块处理。

hash比search快吗?

如果用perf分析,发现大部分耗时在hash部分,对hash优化我目前没有太好的想法。但是对unordered_map的优化可以考虑用查找替代,虽说unordered_map查找的时间复杂度是O(1),但是它实际可能需要做一些大数取余运算,没准比查找还慢。

我考虑的方案是构造一个自定义的map类,然后通过查找的方式模拟hash,实现是这样的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    auto find(const Key &key) {
      auto it = std::lower_bound(nodes_.begin(), nodes_.end(), key,
                                 [](const Node &lhs, const Key &rhs) { return lhs.key < rhs; });
      return (it != nodes_.end() && it->key == key) ? it : nodes_.end();
    }

    Value &operator[](const Key &key) {
      if (auto it = find(key); it != nodes_.end()) {
        assert(it->key == key);
        return it->value;
      } else {
        nodes_.push_back(Node{key, Value{}});
        std::sort(nodes_.begin(), nodes_.end(), [](const Node &lhs, const Node &rhs) { return lhs.key < rhs.key; });
        it = find(key);
        assert(it->key == key);
        return it->value;
      }
    }

不巧,该方案并不比unordered_map快。不过也是一种思路了,可能对于其他场景,查找会比hash快。

prefetch一定有效吗?

再一个优化考虑是prefetch,此处仅做提示。

一种方式是通过__builtin_prefetch,一种方式是通过调整代码顺序,方便CPU和编译器的优化。

在我的测试过程中,发现代码顺序比__builtin_prefetch更有效。主要考虑的地方是:在遍历buffer的时候,数字出现的概率比分隔符;/\n要大,所以将数字处理排在第一。

另外考虑的是一次性读取8字节,比如通过uint64_t读取,然后再处理,这样可以减少内存访问次数。但是测试中并没有明显的效果,遂放弃。怀疑的原因是CPU本就已经cache了,所以预取效果并不明显;另外这样操作反而增加了内存访问次数(raw数据到uint64,再到char)。

栈内存比堆内存快吗?

C++17引入了多态内存分配,可以通过std::pmr来实现。在C++17之前,std::unordered_map的内存分配是在堆上的,而C++17之后,可以通过std::pmr::unordered_map来实现栈内存分配。大致做法是:

1
2
3
  std::array<std::byte, 10000 * 256> stack_buffer;
  std::pmr::monotonic_buffer_resource resource(stack_buffer.data(), stack_buffer.size());
  items = ItemMap(&resource);

只适合不太大的内存,一般默认栈大小是8MB。

是的,我测试下来还是没有明显的提升。但是还是保留了这部分代码。不过现在可以仔细思考一个问题,为什么会说“栈内存比堆内存快”?这是对的吗?具体逻辑是什么?

mmap 对比直接 load 到内存?

这里采用了两种文件读取的方式,一种是通过mmap映射(原理可以参考《glibc-mmap源码阅读》),一种是直接读取到内存。

mmap方式“前奏”更快,不需要正的把文件加在到内存,访问的时候通过缺页中断的方式不断的加载,这导致的问题是在如果文件没有缓存,缺页中断会频繁,导致增加耗时。且这部分时间算在了处理阶段。

直接读到到内存虽然“前奏”慢,但是真正的搬运了内存;因此在处理阶段,无需考虑文件缓存的问题。

对比

针对这个挑战,数据处理一定要比文件读取快(处理步骤很少),我写这一段时在自己电脑上,只有8GB内存,是load不完文件的,因此掠过这部分具体数据;在工作笔记本上(32GB内存)大致的结论是,文件读取15s,数据处理7s。

在现在这台电脑(虚拟机 + i5-1240P + 8GB内存)上效果如何?我通过map映射文件,与https://github.com/dannyvankooten/1brc.git对比,我的代码放在了https://github.com/caibingcheng/1brc.

处理数据的大小是14GB:

1
2
3
$ ll measurements.txt
Permissions Size User Date Modified Name
.rw-rw-r--   14G bing 25 Jun 19:04  measurements.txt

连续运行三次,取最低值:

项目 总耗时
dannyvankooten 0m21.392s
我的实现 0m23.215s

我的实现耗时更多一些;作为摸鱼过程中实现的产物,还能接受;要进一步优化就是考虑手写hash_map了。

总结

最上层结构是最重要的,这决定了整个系统性能的上限,所以最开始需要理论分析一些方案的可行性,然后快速尝试;从上层结构上说,我考虑并快速验证了三种:单线程(明显不行,仅是作baseline版本以对比优化结果)、buffer分块多线程、pipeline。

其次是对软件的基本优化点的认知,比如缓存命中、访问概率(类似likely接口)等。

最后是通过工具进一步优化,我使用perf发现最主要的block点是hash相关操作,但是对该方向目前不太熟悉,所以并不能做太多事情。

总之,尝试是很重要的,虽然本文尝试过程的篇幅不长,但是尝试在这个项目中占比很大。模棱两可、拿不定注意的,快速验证可能是效率最高的方案。