0. 背景
业务的布隆过滤器的位使用率监控需要计算整个布隆 buffer 的 popcount (二进制中 1 的个数), 我们要求这个过程不能太耗时, 否则会对用户的体验造成影响. 考虑使用 SIMD, 在所有指令集中只有 AVX512 提供了 VPOPCNTDQ 指令, 然而我们的线上机器虽然有 AVX512, 但并没有实现这一条指令. 所以决定自己搓一个基于 Harley-Seal 算法的 AVX2 版本的 popcount 算法.
1. 计算一个字节的 Popcnt
1.1 遍历每一位
如果只使用简单的位运算, 我们可以想到计算一个字节 (8 bit) 的 Popcount 的朴素思路是将每一位移动到最低位然后与 0b00000001 按位与然后求和.
int popcnt1(uint8_t x) {
int sum = 0;
for (int i = 0; i < 8; ++i) {
sum += (x >> i) & 1;
}
return sum;
};
此处计算每一位需要 3 次 CPU 指令操作 (+=, >>, &), 需要的操作数 $\mu\mathrm{op} = 8 \times 3 = 24$.
1.2 归并
考虑到我们可以在一个字节中对多个位同时操作, 借用归并的思想, 我们可以先算每两位的 Popcnt, 再合并为每四位的 Popcnt, 最后再合并为所有八位的结果.例如:
$$ 11100100_{2} \to 10, 01, 01, 00_{2} \to 0011, 0001_{2} \to 00000100_{2} $$即:
$$ 11100100_{10} \to 02, 01, 01, 00_{10} \to 0003, 0001_{10} \to 00000004_{10} $$int popcnt2(uint8_t x) {
const uint8_t m1 = 0x55, m2 = 0x33, m3 = 0x0f;
// m1 = 01010101, m2 = 00110011, m3 = 00001111
uint8_t t1 = (x & m1) + ((x >> 1) & m1);
uint8_t t2 = (t1 & m2) + ((t1 >> 2) & m2);
uint8_t t3 = (t2 & m3) + ((t2 >> 4) & m3);
return t3;
}
这样每一步的计算需要四次指令操作(&, +, >>, &), 一共三次, 所以 $\mu\mathrm{op} = 3 \times 4 = 12$次操作.
1.2.1 小优化
实际上这里可以进一步优化, t1 的计算可以使用另一种方法:
t1 = x - ((x >> 1) & m1)
打表发现这也是对的:
$$ 00_{2}-00_{2} = 00_{2}01_{2}-00_{2} = 01_{2}10_{2}-01_{2} = 01_{2}11_{2}-01_{2} = 10_{2} $$t3 的计算可以利用 t2 中每四位的最高位始终为 0 的性质 (即相加结果不会超过 4 位), 将掩码操作放在最后来做.
t3 = (t2 + (t2 >> 4)) & m3
于是这样 $\mu\mathrm{op} = 3+4+3 = 10$.
1.2.2 利用中间变量
$$ 11111111_{2} \to 10, 10, 10, 10_{2} \to 0100, 0100_{2} \to 00001000_{2} $$可以发现, 即使我们对一个全为 1 的字节求 Popcount, 得到的结果远小于一个字节的容量 (8 相对于 255).
考虑这样一个问题:
给一个 uint16_t 的数组, 求整个数组的 Popcount 和.
如果我们对其中一个元素进行计数, 有:
$$ \begin{aligned} 1111111111111111_{2} &\to 10, 10, 10, 10, 10, 10, 10, 10_{2} \\ &\to 0100, 0100, 0100, 0100_{2} \\ &\to 00001000, 00001000_{2} \\ &\to 0000000000010000_{2} \end{aligned} $$注意到最后一步:
$$ 00001000_{2}+00001000_{2} \to 0000000000010000_{2} $$相加的两部分 (两个$00001000_{2}$) 有大量的前导零, 这意味着他们还能存下更大的值. 如果可以在计算的过程中重复利用这个中间变量, 我们就能省下若干次相加的操作.
int _popcnt3(uint16_t x) {
const uint16_t m1 = 0x5555, m2 = 0x3333, m3 = 0x0f0f, m4 = 0x00ff;
// m1 = 0101010101010101, m2 = 0011001100110011, m3 = 0000111100001111, m4 = 0000000011111111
uint8_t t1 = x - ((x >> 1) & m1); // 3 uops
uint8_t t2 = (t1 & m2) + ((t1 >> 2) & m2); // 4 uops
uint8_t t3 = (t2 + (t2 >> 4)) & m3; // 3 uops
uint8_t t4 = (t3 + (t3 >> 8)) & m4; // 3 uops
return t4;
}
int popcnt3(uint16_t *xs) {
const uint16_t m1 = 0x5555, m2 = 0x3333, m3 = 0x0f0f;
// m1 = 01010101, m2 = 00110011, m3 = 00001111
uint16_t acc = 0;
for (int i = 0; i < 15; ++i) {
uint16_t t1 = xs[i] - ((xs[i] >> 1) & m1); // 4 uops, 1 loads, 1 loads
uint16_t t2 = (t1 & m2) + ((t1 >> 2) & m2); // 4 uops
uint16_t t3 = (t2 + (t2 >> 4)) & m3; // 3 uops
acc += t3; // 1 uops
}
acc = (acc >> 8) + (acc & 0x00ff); // 3 uops
return acc;
}
此处一次性处理 15 个元素, 总操作次数 $\mu\mathrm{op}_{total} = (4+4+3+1) \times 15+3 = 183$, 平均操作次数 $\mu\mathrm{op} = 183/15 = 12.2$. 相比逐元素遍历求和 $\mu\mathrm{op} = (3+4+3+3)+2 = 15 (1 load+1 add)$ 有所提升.
1.2.3 多元操作
当规模进一步扩大时, 我们可以在一次迭代中同时计算两或三个元素, 这样我们的局部变量也可以被重复利用了.
int popcnt(uint16_t *xs) {
const uint16_t m1 = 0x5555, m2 = 0x3333, m3 = 0x0f0f;
// m1 = 01010101, m2 = 00110011, m3 = 00001111
uint16_t acc = 0;
for (int i = 0; i < 14; i += 2) {
uint16_t t1A = xs[i] - ((xs[i] >> 1) & m1); // 4 uops
uint16_t t2A = (t1A & m2) + ((t1A >> 2) & m2); // 4 uops
uint16_t t1B = xs[i+1] - ((xs[i+1] >> 1) & m1); // 4 uops
uint16_t t2B = (t1B & m2) + ((t1B >> 2) & m2); // 4 uops
uint16_t t2 = t2A + t2B; // 1 uops
uint16_t t3 = (t2 & m3) + ((t2 >> 4) & m3); // 4 uops
acc += t3; // 1 uops
}
acc = (acc >> 8) + (acc & 0x00ff); // 3 uops
return acc;
}
此处有:
$$ \mu\mathrm{op}_{total} = 22 \times 7+3 = 157 $$$$ \mu\mathrm{op} = 157/14 \approx 11.2 $$2. Harley Seal (HS) 算法
在之前的归并中, 我们在同一变量内分别对相邻 2 位, 相邻 4 位, 相邻 8 位做合并. 换一种思路, 我们也可以令不同的变量分别代表不同权重的 $1_{2}$, 使得最终总计数 $Popcnt(x) = 1 \times Popcnt(t_{1})+2 \times Popcnt(t_{2})+4 \times Popcnt(t_{3})$.
2.1 CSA (Carry-Save Adder)
$CSA(a, b, c) = (h, l)$, 其中 $h = (a \wedge b) \vee ((a \oplus b) \wedge c), l = a \oplus b \oplus c$.
l 的含义非常清晰, 异或加法代表 a,b,c 不进位相加之后的和. 而观察 h 可以发现其代表 a,b,c 相加之后将进位的位.
$$ a:01101001_{2}b:01011000_{2}c:01011001_{2} \downarrow h:01011001_{2}l:01101000_{2} $$实际上, 若 h 的某一位为 $1_{2}$, 那么 a,b,c 中这一位至少有两个为 $1_{2}$; 若 l 的某一位为 $1_{2}$, 那么 a,b,c 中该位有 1 个或 3 个 $1_{2}$.
如果对 a,b,c 三个数进行 CSA 计算, 对每一个二进制位 i, 容易发现 $a_{i}+b_{i}+c_{i} = 2 \times h_{i}+l_{i}$.
void CSA(uint64_t &h, uint64_t &l, uint64_t a, uint64_t b, uint64_t c) {
const uint64_t u = a ^ b;
h = (a & b) | (u & c);
l = u ^ c;
}
单次 CSA 的 $\mu\mathrm{ops} = 5$.
2.2 算法流程
对于一个 uint64_t 数组 d: 我们用若干个 uint64_t 变量代表不同权重的中间值, 用 total 表示结果. 例如 ones 中的某一位如果为真代表该位上有一个 $1_{2}$, twos 中的某一位如果为真则代表该位上有两个 $1_{2}$, 以此类推有 fours, eights.
考虑单次迭代的转移:
- CSA(ones, d[i], d[i+1])=(h:twosA, l:ones)
- CSA(ones, d[i+2], d[i+3])=(h:twosB, l:ones)
- CSA(twos, twosA, twosB)=(h:foursA, l:twos)
- 以此类推...
- CSA(fours, foursA, foursB)=(h: eights, l: fours)
- 将 popcount(eights) 的值乘上权重(即 8)累加到 total 中
- i += 8
这样在一次迭代中, 我们提取出了 eights 的值累加到 total 中, 其余的 ones, twos, fours 保留到下一次迭代中继续使用.
最终将 ones, twos,fours 和 total 按权重求和即可:
const uint64_t c1 = UINT64_C(0x5555555555555555);
const uint64_t c2 = UINT64_C(0x3333333333333333);
const uint64_t c4 = UINT64_C(0x0F0F0F0F0F0F0F0F);
uint64_t popcount(uint64_t x) {
x -= (x >> 1) & c1;
x = ((x >> 2) & c2) + (x & c2);
x = (x + (x >> 4)) & c4;
x *= UINT64_C(0x0101010101010101);
return x >> 56;
}
uint64_t popcnt(const uint64_t *data, const uint64_t size) {
uint64_t total = 0;
uint64_t ones = 0;
uint64_t twos = 0;
uint64_t fours = 0;
uint64_t eights = 0;
uint64_t twosA, twosB, foursA, foursB;
const uint64_t limit = size - size % 8;
uint64_t i = 0;
for (; i < limit; i += 8) {
CSA(twosA, ones, ones, data[i + 0], data[i + 1]);
CSA(twosB, ones, ones, data[i + 2], data[i + 3]);
CSA(foursA, twos, twos, twosA, twosB);
CSA(twosA, ones, ones, data[i + 4], data[i + 5]);
CSA(twosB, ones, ones, data[i + 6], data[i + 7]);
CSA(foursB, twos, twos, twosA, twosB);
CSA(eights, fours, fours, foursA, foursB);
total += popcount(eights);
}
total <<= 3;
total += popcount(fours) << 2;
total += popcount(twos) << 1;
total += popcount(ones);
for (; i < size; i++)
total += popcount(data[i]);
return total;
}
2.3 效率对比
此处处理 8 个 uint64_t CPU 操作次数 $\mu\mathrm{ops}$ 包含 8 次加载, 35 次 CSA 内计算指令, 一次朴素的单 uint64 Popcount 需要的 12 次操作, 1 次累加, 以及最终的汇总中间变量需要的 42 次操作. 一共 98 次操作. 看上去只比逐个计算再累加的操作数 $\mu\mathrm{ops} = 8 \times (12+2) = 104$ 好一点.
但当数组 d 的长度变大, 假设 |d|=1024 时, 对比 HS 算法:
$$ \begin{aligned} \mu\mathrm{ops}_{load} &= 1024 \\ \mu\mathrm{ops}_{mainloop} &= \frac{1024}{8} \times (35+12+1) = 6144 \\ \mu\mathrm{ops}_{postadd} &= 42 \\ \mu\mathrm{ops} &= \mu\mathrm{ops}_{load} + \mu\mathrm{ops}_{mainloop} + \mu\mathrm{ops}_{postadd} = 7210 \end{aligned} $$与逐位计算:
$$ \begin{aligned} \mu\mathrm{ops}_{load} &= 1024 \\ \mu\mathrm{ops}_{popcount} &= 1024 \times 12 = 12288 \\ \mu\mathrm{ops}_{sum} &= 1024 \\ \mu\mathrm{ops} &= \mu\mathrm{ops}_{load} + \mu\mathrm{ops}_{popcount} + \mu\mathrm{ops}_{sum} = 14336 \end{aligned} $$效率高了不少.
3. SIMD 实现
最后我们再使用 AVX2 提供的指令对 uint256 (__m256i)进行操作. 函数的逻辑基本类似.
3.1 计算一个 m256i 的 Popcnt
__m256i popcount(const __m256i v) {
const __m256i m1 = _mm256_set1_epi8(0x55);
const __m256i m2 = _mm256_set1_epi8(0x33);
const __m256i m4 = _mm256_set1_epi8(0x0F);
const __m256i t1 = _mm256_sub_epi8(v, (_mm256_srli_epi16(v, 1) & m1));
const __m256i t2 = _mm256_add_epi8(t1 & m2, (_mm256_srli_epi16(t1, 2) & m2));
const __m256i t3 = _mm256_add_epi8(t2, _mm256_srli_epi16(t2, 4)) & m4;
return _mm256_sad_epu8(t3, _mm256_setzero_si256());
}
此处使用了 1.2.1 中的优化减少指令数, 最后对整个向量求和.
3.2 CSA
void CSA(__m256i &h, __m256i &l, __m256i a, __m256i b, __m256i c) {
const __m256i u = a ^ b;
h = (a & b) | (u & c);
l = u ^ c;
}
3.3 核心函数
uint64_t popcnt(const __m256i *data, const uint64_t size) {
__m256i total = _mm256_setzero_si256();
__m256i ones = _mm256_setzero_si256();
__m256i twos = _mm256_setzero_si256();
__m256i fours = _mm256_setzero_si256();
__m256i eights = _mm256_setzero_si256();
__m256i sixteens = _mm256_setzero_si256();
__m256i twosA, twosB, foursA, foursB, eightsA, eightsB;
const uint64_t limit = size - size % 16;
uint64_t i = 0;
for (; i < limit; i += 16) {
CSA(twosA, ones, ones, data[i + 0], data[i + 1]);
CSA(twosB, ones, ones, data[i + 2], data[i + 3]);
CSA(foursA, twos, twos, twosA, twosB);
CSA(twosA, ones, ones, data[i + 4], data[i + 5]);
CSA(twosB, ones, ones, data[i + 6], data[i + 7]);
CSA(foursB, twos, twos, twosA, twosB);
CSA(eightsA, fours, fours, foursA, foursB);
CSA(twosA, ones, ones, data[i + 8], data[i + 9]);
CSA(twosB, ones, ones, data[i + 10], data[i + 11]);
CSA(foursA, twos, twos, twosA, twosB);
CSA(twosA, ones, ones, data[i + 12], data[i + 13]);
CSA(twosB, ones, ones, data[i + 14], data[i + 15]);
CSA(foursB, twos, twos, twosA, twosB);
CSA(eightsB, fours, fours, foursA, foursB);
CSA(sixteens, eights, eights, eightsA, eightsB);
total = _mm256_add_epi64(total, popcount(sixteens));
}
total = _mm256_slli_epi64(total, 4); // * 16
total = _mm256_add_epi64(total, _mm256_slli_epi64(popcount(eights), 3)); // += 8 * ...
total = _mm256_add_epi64(total, _mm256_slli_epi64(popcount(fours), 2)); // += 4 * ...
total = _mm256_add_epi64(total, _mm256_slli_epi64(popcount(twos), 1)); // += 2 * ...
total = _mm256_add_epi64(total, popcount(ones));
for (; i < size; i++)
total = _mm256_add_epi64(total, popcount(data[i]));
return static_cast<uint64_t>(_mm256_extract_epi64(total, 0)) +
static_cast<uint64_t>(_mm256_extract_epi64(total, 1)) +
static_cast<uint64_t>(_mm256_extract_epi64(total, 2)) +
static_cast<uint64_t>(_mm256_extract_epi64(total, 3));
}
3.4 内存对齐
包括 AVX2 在内的大多现代 SIMD 指令中, load/store 都要求操作的内存地址对齐到 16 或 32 字节. AVX2 要求地址对齐到 32 字节. 所以我们需要将数据头部未对齐的字节单独处理. (也可以使用不要求对齐的版本 _mm256_loadu_si256 / _mm256_storeu_si256, 但实测较慢)
uint64_t popcnt_AVX2_harley_seal(const uint8_t *data,
const size_t size_in_bytes) {
uint64_t total_popcount = 0;
size_t current_offset = 0;
const size_t alignment = 32;
uintptr_t data_addr = reinterpret_cast<uintptr_t>(data);
size_t head_bytes_to_align =
(alignment - (data_addr % alignment)) % alignment;
head_bytes_to_align = std::min(head_bytes_to_align, size_in_bytes);
for (; current_offset < head_bytes_to_align; ++current_offset) {
total_popcount += __builtin_popcount(data[current_offset]);
}
size_t remaining_bytes_after_head = size_in_bytes - current_offset;
size_t num_m256i_blocks =
remaining_bytes_after_head / alignment;
if (num_m256i_blocks > 0) {
const __m256i *aligned_data_ptr =
reinterpret_cast<const __m256i *>(data + current_offset);
total_popcount += popcnt(aligned_data_ptr, num_m256i_blocks);
current_offset += num_m256i_blocks * alignment;
}
for (; current_offset < size_in_bytes; ++current_offset) {
total_popcount += __builtin_popcount(data[current_offset]);
}
return total_popcount;
}
3.5 测速
+-----------+--------------+-----------+----------+
| 位数 | Builtin (ms) | AVX2 (ms) | 性能提升 |
+-----------+--------------+-----------+----------+
| 10000 | 0.00375 | 0.000353 | ~10.62x |
| 100000 | 0.0345 | 0.00254 | ~13.58x |
| 1000000 | 0.334 | 0.0326 | ~10.25x |
| 10000000 | 3.41 | 0.444 | ~7.68x |
| 100000000 | 33.5 | 8.89 | ~3.77x |
+-----------+--------------+-----------+----------+
Harley Seal AVX2 实现相对于 Builtin 方法, 在所有测试的 N 值上都提供了显著的性能提升. 但一个值得注意的趋势是, 虽然 AVX2 始终更快, 但其相对性能提升随着 N 的增大而呈现出下降的趋势, 这可能是随着数据集 N 的增大,popcount 操作的性能瓶颈从 CPU 的计算能力逐渐转移到系统内存的带宽所导致的结果.
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 1
Core(s) per socket: 8
Socket(s): 2
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8255C CPU @ 2.50GHz
Stepping: 5
CPU MHz: 2494.142
BogoMIPS: 4988.28
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 4096K
L3 cache: 36608K
NUMA node0 CPU(s): 0-7
NUMA node1 CPU(s): 8-15
5. 总结
本文展示了计算 Popcount 的一种优化方法, 其核心思路是提升并行度来加速计算. 我们将 Harley-Seal 算法与 SIMD 相结合, 实现了最大程度的并行化, 能够一次性处理 32 个字节的数据.
测试结果表明, AVX2 的实现比编译器自带的 _builtinpopcount 函数快了数倍到十余倍. 但值得注意的是, 当数据量变得非常大时, 性能瓶颈会从 CPU 计算能力转移到内存读取速度, 这使得性能提升的倍率有所下降.
6. 参考文献
1. arXiv:1611.07612 Faster Population Counts Using AVX2 Instructions