使用 AVX2 Harley Seal 加速 Popcount 计算

0. 背景

业务的布隆过滤器的位使用率监控需要计算整个布隆 buffer 的 popcount (二进制中 1 的个数), 我们要求这个过程不能太耗时, 否则会对用户的体验造成影响. 考虑使用 SIMD, 在所有指令集中只有 AVX512 提供了 VPOPCNTDQ 指令, 然而我们的线上机器虽然有 AVX512, 但并没有实现这一条指令. 所以决定自己搓一个基于 Harley-Seal 算法的 AVX2 版本的 popcount 算法.

1. 计算一个字节的 Popcnt

1.1 遍历每一位

如果只使用简单的位运算, 我们可以想到计算一个字节 (8 bit) 的 Popcount 的朴素思路是将每一位移动到最低位然后与 0b00000001 按位与然后求和.

int popcnt1(uint8_t x) {
  int sum = 0;
  for (int i = 0; i < 8; ++i) {
    sum += (x >> i) & 1;
  }
  return sum;
};

此处计算每一位需要 3 次 CPU 指令操作 (+=, >>, &), 需要的操作数 $\mu\mathrm{op} = 8 \times 3 = 24$.

1.2 归并

归并示意图

考虑到我们可以在一个字节中对多个位同时操作, 借用归并的思想, 我们可以先算每两位的 Popcnt, 再合并为每四位的 Popcnt, 最后再合并为所有八位的结果.例如:

$$ 11100100_{2} \to 10, 01, 01, 00_{2} \to 0011, 0001_{2} \to 00000100_{2} $$

即:

$$ 11100100_{10} \to 02, 01, 01, 00_{10} \to 0003, 0001_{10} \to 00000004_{10} $$
int popcnt2(uint8_t x) {
  const uint8_t m1 = 0x55, m2 = 0x33, m3 = 0x0f;
  // m1 = 01010101, m2 = 00110011, m3 = 00001111

  uint8_t t1 = (x & m1) + ((x >> 1) & m1);
  uint8_t t2 = (t1 & m2) + ((t1 >> 2) & m2);
  uint8_t t3 = (t2 & m3) + ((t2 >> 4) & m3);

  return t3;
}

这样每一步的计算需要四次指令操作(&, +, >>, &), 一共三次, 所以 $\mu\mathrm{op} = 3 \times 4 = 12$次操作.

1.2.1 小优化

实际上这里可以进一步优化, t1 的计算可以使用另一种方法:

t1 = x - ((x >> 1) & m1)

打表发现这也是对的:

$$ 00_{2}-00_{2} = 00_{2}01_{2}-00_{2} = 01_{2}10_{2}-01_{2} = 01_{2}11_{2}-01_{2} = 10_{2} $$

t3 的计算可以利用 t2 中每四位的最高位始终为 0 的性质 (即相加结果不会超过 4 位), 将掩码操作放在最后来做.

t3 = (t2 + (t2 >> 4)) & m3

于是这样 $\mu\mathrm{op} = 3+4+3 = 10$.

1.2.2 利用中间变量
$$ 11111111_{2} \to 10, 10, 10, 10_{2} \to 0100, 0100_{2} \to 00001000_{2} $$

可以发现, 即使我们对一个全为 1 的字节求 Popcount, 得到的结果远小于一个字节的容量 (8 相对于 255).

考虑这样一个问题:

给一个 uint16_t 的数组, 求整个数组的 Popcount 和.

如果我们对其中一个元素进行计数, 有:

$$ \begin{aligned} 1111111111111111_{2} &\to 10, 10, 10, 10, 10, 10, 10, 10_{2} \\ &\to 0100, 0100, 0100, 0100_{2} \\ &\to 00001000, 00001000_{2} \\ &\to 0000000000010000_{2} \end{aligned} $$

注意到最后一步:

$$ 00001000_{2}+00001000_{2} \to 0000000000010000_{2} $$

相加的两部分 (两个$00001000_{2}$) 有大量的前导零, 这意味着他们还能存下更大的值. 如果可以在计算的过程中重复利用这个中间变量, 我们就能省下若干次相加的操作.

int _popcnt3(uint16_t x) {
  const uint16_t m1 = 0x5555, m2 = 0x3333, m3 = 0x0f0f, m4 = 0x00ff;
  // m1 = 0101010101010101, m2 = 0011001100110011, m3 = 0000111100001111, m4 = 0000000011111111

  uint8_t t1 = x - ((x >> 1) & m1);          // 3 uops
  uint8_t t2 = (t1 & m2) + ((t1 >> 2) & m2); // 4 uops
  uint8_t t3 = (t2 + (t2 >> 4)) & m3;        // 3 uops
  uint8_t t4 = (t3 + (t3 >> 8)) & m4;        // 3 uops

  return t4;
}

int popcnt3(uint16_t *xs) {
  const uint16_t m1 = 0x5555, m2 = 0x3333, m3 = 0x0f0f;
  // m1 = 01010101, m2 = 00110011, m3 = 00001111

  uint16_t acc = 0;
  for (int i = 0; i < 15; ++i) {
    uint16_t t1 = xs[i] - ((xs[i] >> 1) & m1);   // 4 uops, 1 loads, 1 loads
    uint16_t t2 = (t1 & m2) + ((t1 >> 2) & m2);  // 4 uops
    uint16_t t3 = (t2 + (t2 >> 4)) & m3;         // 3 uops
    acc += t3;                                   // 1 uops
  }

  acc = (acc >> 8) + (acc & 0x00ff);            // 3 uops

  return acc;
}

此处一次性处理 15 个元素, 总操作次数 $\mu\mathrm{op}_{total} = (4+4+3+1) \times 15+3 = 183$, 平均操作次数 $\mu\mathrm{op} = 183/15 = 12.2$. 相比逐元素遍历求和 $\mu\mathrm{op} = (3+4+3+3)+2 = 15 (1 load+1 add)$ 有所提升.

1.2.3 多元操作

当规模进一步扩大时, 我们可以在一次迭代中同时计算两或三个元素, 这样我们的局部变量也可以被重复利用了.

int popcnt(uint16_t *xs) {
  const uint16_t m1 = 0x5555, m2 = 0x3333, m3 = 0x0f0f;
  // m1 = 01010101, m2 = 00110011, m3 = 00001111

  uint16_t acc = 0;
  for (int i = 0; i < 14; i += 2) {
    uint16_t t1A = xs[i] - ((xs[i] >> 1) & m1);      // 4 uops
    uint16_t t2A = (t1A & m2) + ((t1A >> 2) & m2);   // 4 uops

    uint16_t t1B = xs[i+1] - ((xs[i+1] >> 1) & m1);  // 4 uops
    uint16_t t2B = (t1B & m2) + ((t1B >> 2) & m2);   // 4 uops

    uint16_t t2 = t2A + t2B;                         // 1 uops
    uint16_t t3 = (t2 & m3) + ((t2 >> 4) & m3);      // 4 uops

    acc += t3;                                       // 1 uops
  }

  acc = (acc >> 8) + (acc & 0x00ff);                 // 3 uops

  return acc;
}

此处有:

$$ \mu\mathrm{op}_{total} = 22 \times 7+3 = 157 $$$$ \mu\mathrm{op} = 157/14 \approx 11.2 $$

2. Harley Seal (HS) 算法

在之前的归并中, 我们在同一变量内分别对相邻 2 位, 相邻 4 位, 相邻 8 位做合并. 换一种思路, 我们也可以令不同的变量分别代表不同权重的 $1_{2}$, 使得最终总计数 $Popcnt(x) = 1 \times Popcnt(t_{1})+2 \times Popcnt(t_{2})+4 \times Popcnt(t_{3})$.

2.1 CSA (Carry-Save Adder)

$CSA(a, b, c) = (h, l)$, 其中 $h = (a \wedge b) \vee ((a \oplus b) \wedge c), l = a \oplus b \oplus c$.

l 的含义非常清晰, 异或加法代表 a,b,c 不进位相加之后的和. 而观察 h 可以发现其代表 a,b,c 相加之后将进位的位.

$$ a:01101001_{2}b:01011000_{2}c:01011001_{2} \downarrow h:01011001_{2}l:01101000_{2} $$

实际上, 若 h 的某一位为 $1_{2}$, 那么 a,b,c 中这一位至少有两个为 $1_{2}$; 若 l 的某一位为 $1_{2}$, 那么 a,b,c 中该位有 1 个或 3 个 $1_{2}$.

如果对 a,b,c 三个数进行 CSA 计算, 对每一个二进制位 i, 容易发现 $a_{i}+b_{i}+c_{i} = 2 \times h_{i}+l_{i}$.

void CSA(uint64_t &h, uint64_t &l, uint64_t a, uint64_t b, uint64_t c) {
  const uint64_t u = a ^ b;
  h = (a & b) | (u & c);
  l = u ^ c;
}

单次 CSA 的 $\mu\mathrm{ops} = 5$.

2.2 算法流程

CSA 算法流程图

对于一个 uint64_t 数组 d: 我们用若干个 uint64_t 变量代表不同权重的中间值, 用 total 表示结果. 例如 ones 中的某一位如果为真代表该位上有一个 $1_{2}$, twos 中的某一位如果为真则代表该位上有两个 $1_{2}$, 以此类推有 fours, eights.

考虑单次迭代的转移:

  1. CSA(ones, d[i], d[i+1])=(h:twosA, l:ones)
  2. CSA(ones, d[i+2], d[i+3])=(h:twosB, l:ones)
  3. CSA(twos, twosA, twosB)=(h:foursA, l:twos)
  4. 以此类推...
  5. CSA(fours, foursA, foursB)=(h: eights, l: fours)
  6. 将 popcount(eights) 的值乘上权重(即 8)累加到 total 中
  7. i += 8

这样在一次迭代中, 我们提取出了 eights 的值累加到 total 中, 其余的 ones, twos, fours 保留到下一次迭代中继续使用.

最终将 ones, twos,fourstotal 按权重求和即可:

$$ Popcnt(x) = 1 \times popcount(ones)+2 \times popcount(twos)+4 \times popcount(fours)+total $$
const uint64_t c1 = UINT64_C(0x5555555555555555);
const uint64_t c2 = UINT64_C(0x3333333333333333);
const uint64_t c4 = UINT64_C(0x0F0F0F0F0F0F0F0F);

uint64_t popcount(uint64_t x) {
  x -= (x >> 1) & c1;
  x = ((x >> 2) & c2) + (x & c2);
  x = (x + (x >> 4)) & c4;
  x *= UINT64_C(0x0101010101010101);
  return x >> 56;
}

uint64_t popcnt(const uint64_t *data, const uint64_t size) {
  uint64_t total = 0;
  uint64_t ones = 0;
  uint64_t twos = 0;
  uint64_t fours = 0;
  uint64_t eights = 0;
  uint64_t twosA, twosB, foursA, foursB;

  const uint64_t limit = size - size % 8;
  uint64_t i = 0;

  for (; i < limit; i += 8) {
    CSA(twosA, ones, ones, data[i + 0], data[i + 1]);
    CSA(twosB, ones, ones, data[i + 2], data[i + 3]);
    CSA(foursA, twos, twos, twosA, twosB);
    CSA(twosA, ones, ones, data[i + 4], data[i + 5]);
    CSA(twosB, ones, ones, data[i + 6], data[i + 7]);
    CSA(foursB, twos, twos, twosA, twosB);
    CSA(eights, fours, fours, foursA, foursB);

    total += popcount(eights);
  }

  total <<= 3;
  total += popcount(fours) << 2;
  total += popcount(twos) << 1;
  total += popcount(ones);

  for (; i < size; i++)
    total += popcount(data[i]);

  return total;
}

2.3 效率对比

此处处理 8 个 uint64_t CPU 操作次数 $\mu\mathrm{ops}$ 包含 8 次加载, 35 次 CSA 内计算指令, 一次朴素的单 uint64 Popcount 需要的 12 次操作, 1 次累加, 以及最终的汇总中间变量需要的 42 次操作. 一共 98 次操作. 看上去只比逐个计算再累加的操作数 $\mu\mathrm{ops} = 8 \times (12+2) = 104$ 好一点.

但当数组 d 的长度变大, 假设 |d|=1024 时, 对比 HS 算法:

$$ \begin{aligned} \mu\mathrm{ops}_{load} &= 1024 \\ \mu\mathrm{ops}_{mainloop} &= \frac{1024}{8} \times (35+12+1) = 6144 \\ \mu\mathrm{ops}_{postadd} &= 42 \\ \mu\mathrm{ops} &= \mu\mathrm{ops}_{load} + \mu\mathrm{ops}_{mainloop} + \mu\mathrm{ops}_{postadd} = 7210 \end{aligned} $$

与逐位计算:

$$ \begin{aligned} \mu\mathrm{ops}_{load} &= 1024 \\ \mu\mathrm{ops}_{popcount} &= 1024 \times 12 = 12288 \\ \mu\mathrm{ops}_{sum} &= 1024 \\ \mu\mathrm{ops} &= \mu\mathrm{ops}_{load} + \mu\mathrm{ops}_{popcount} + \mu\mathrm{ops}_{sum} = 14336 \end{aligned} $$

效率高了不少.

3.  SIMD 实现

最后我们再使用 AVX2 提供的指令对 uint256 (__m256i)进行操作. 函数的逻辑基本类似.

3.1 计算一个 m256i 的 Popcnt

__m256i popcount(const __m256i v) {
  const __m256i m1 = _mm256_set1_epi8(0x55);
  const __m256i m2 = _mm256_set1_epi8(0x33);
  const __m256i m4 = _mm256_set1_epi8(0x0F);

  const __m256i t1 = _mm256_sub_epi8(v, (_mm256_srli_epi16(v, 1) & m1));
  const __m256i t2 = _mm256_add_epi8(t1 & m2, (_mm256_srli_epi16(t1, 2) & m2));
  const __m256i t3 = _mm256_add_epi8(t2, _mm256_srli_epi16(t2, 4)) & m4;
  return _mm256_sad_epu8(t3, _mm256_setzero_si256());
}

此处使用了 1.2.1 中的优化减少指令数, 最后对整个向量求和.

3.2 CSA

void CSA(__m256i &h, __m256i &l, __m256i a, __m256i b, __m256i c) {
  const __m256i u = a ^ b;
  h = (a & b) | (u & c);
  l = u ^ c;
}

3.3 核心函数

uint64_t popcnt(const __m256i *data, const uint64_t size) {
  __m256i total = _mm256_setzero_si256();
  __m256i ones = _mm256_setzero_si256();
  __m256i twos = _mm256_setzero_si256();
  __m256i fours = _mm256_setzero_si256();
  __m256i eights = _mm256_setzero_si256();
  __m256i sixteens = _mm256_setzero_si256();
  __m256i twosA, twosB, foursA, foursB, eightsA, eightsB;

  const uint64_t limit = size - size % 16;
  uint64_t i = 0;

  for (; i < limit; i += 16) {
    CSA(twosA, ones, ones, data[i + 0], data[i + 1]);
    CSA(twosB, ones, ones, data[i + 2], data[i + 3]);
    CSA(foursA, twos, twos, twosA, twosB);
    CSA(twosA, ones, ones, data[i + 4], data[i + 5]);
    CSA(twosB, ones, ones, data[i + 6], data[i + 7]);
    CSA(foursB, twos, twos, twosA, twosB);
    CSA(eightsA, fours, fours, foursA, foursB);
    CSA(twosA, ones, ones, data[i + 8], data[i + 9]);
    CSA(twosB, ones, ones, data[i + 10], data[i + 11]);
    CSA(foursA, twos, twos, twosA, twosB);
    CSA(twosA, ones, ones, data[i + 12], data[i + 13]);
    CSA(twosB, ones, ones, data[i + 14], data[i + 15]);
    CSA(foursB, twos, twos, twosA, twosB);
    CSA(eightsB, fours, fours, foursA, foursB);
    CSA(sixteens, eights, eights, eightsA, eightsB);

    total = _mm256_add_epi64(total, popcount(sixteens));
  }

  total = _mm256_slli_epi64(total, 4); // * 16
  total = _mm256_add_epi64(total, _mm256_slli_epi64(popcount(eights), 3)); // += 8 * ...
  total = _mm256_add_epi64(total, _mm256_slli_epi64(popcount(fours), 2)); // += 4 * ...
  total = _mm256_add_epi64(total, _mm256_slli_epi64(popcount(twos), 1)); // += 2 * ...
  total = _mm256_add_epi64(total, popcount(ones));

  for (; i < size; i++)
    total = _mm256_add_epi64(total, popcount(data[i]));

  return static_cast<uint64_t>(_mm256_extract_epi64(total, 0)) +
         static_cast<uint64_t>(_mm256_extract_epi64(total, 1)) +
         static_cast<uint64_t>(_mm256_extract_epi64(total, 2)) +
         static_cast<uint64_t>(_mm256_extract_epi64(total, 3));
}

3.4 内存对齐

包括 AVX2 在内的大多现代 SIMD 指令中, load/store 都要求操作的内存地址对齐到 16 或 32 字节. AVX2 要求地址对齐到 32 字节. 所以我们需要将数据头部未对齐的字节单独处理. (也可以使用不要求对齐的版本 _mm256_loadu_si256 / _mm256_storeu_si256, 但实测较慢)

uint64_t popcnt_AVX2_harley_seal(const uint8_t *data,
                                 const size_t size_in_bytes) {
  uint64_t total_popcount = 0;
  size_t current_offset = 0;
  const size_t alignment = 32;

  uintptr_t data_addr = reinterpret_cast<uintptr_t>(data);
  size_t head_bytes_to_align =
      (alignment - (data_addr % alignment)) % alignment;

  head_bytes_to_align = std::min(head_bytes_to_align, size_in_bytes);

  for (; current_offset < head_bytes_to_align; ++current_offset) {
    total_popcount += __builtin_popcount(data[current_offset]);
  }

  size_t remaining_bytes_after_head = size_in_bytes - current_offset;
  size_t num_m256i_blocks =
      remaining_bytes_after_head / alignment;

  if (num_m256i_blocks > 0) {
    const __m256i *aligned_data_ptr =
        reinterpret_cast<const __m256i *>(data + current_offset);
    total_popcount += popcnt(aligned_data_ptr, num_m256i_blocks);
    current_offset += num_m256i_blocks * alignment;
  }

  for (; current_offset < size_in_bytes; ++current_offset) {
    total_popcount += __builtin_popcount(data[current_offset]);
  }

  return total_popcount;
}

3.5 测速

+-----------+--------------+-----------+----------+
| 位数      | Builtin (ms) | AVX2 (ms) | 性能提升 |
+-----------+--------------+-----------+----------+
| 10000     | 0.00375      | 0.000353  | ~10.62x  |
| 100000    | 0.0345       | 0.00254   | ~13.58x  |
| 1000000   | 0.334        | 0.0326    | ~10.25x  |
| 10000000  | 3.41         | 0.444     | ~7.68x   |
| 100000000 | 33.5         | 8.89      | ~3.77x   |
+-----------+--------------+-----------+----------+

Harley Seal AVX2 实现相对于 Builtin 方法, 在所有测试的 N 值上都提供了显著的性能提升. 但一个值得注意的趋势是, 虽然 AVX2 始终更快, 但其相对性能提升随着 N 的增大而呈现出下降的趋势, 这可能是随着数据集 N 的增大,popcount 操作的性能瓶颈从 CPU 的计算能力逐渐转移到系统内存的带宽所导致的结果.

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                16
On-line CPU(s) list:   0-15
Thread(s) per core:    1
Core(s) per socket:    8
Socket(s):             2
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 85
Model name:            Intel(R) Xeon(R) Platinum 8255C CPU @ 2.50GHz
Stepping:              5
CPU MHz:               2494.142
BogoMIPS:              4988.28
Hypervisor vendor:     KVM
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              4096K
L3 cache:              36608K
NUMA node0 CPU(s):     0-7
NUMA node1 CPU(s):     8-15

5. 总结

本文展示了计算 Popcount 的一种优化方法, 其核心思路是提升并行度来加速计算. 我们将 Harley-Seal 算法与 SIMD 相结合, 实现了最大程度的并行化, 能够一次性处理 32 个字节的数据.

测试结果表明, AVX2 的实现比编译器自带的 _builtinpopcount 函数快了数倍到十余倍. 但值得注意的是, 当数据量变得非常大时, 性能瓶颈会从 CPU 计算能力转移到内存读取速度, 这使得性能提升的倍率有所下降.

6. 参考文献

1.  arXiv:1611.07612 Faster Population Counts Using AVX2 Instructions