之前看到一篇 用 Redis 快速实现 BloomFilter 就忍不住看看 Redisson 的 实现方式

这里直接附上例子:

RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// 初始化布隆过滤器,预计统计元素数量为55000000,期望误差率为0.03
bloomFilter.tryInit(55000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

RedissonBloomFilter 类是布隆过滤器的实现,它继承自 RBloomFilter。后者定义了 addcontains 等函数。

public interface RBloomFilter<T> extends RExpirable {
    boolean add(T object)
    boolean contains(T object)
    boolean tryInit(long expectedInsertions, double falseProbability);
    // ...
}

我们先看下 add 函数的实现:

public class RedissonBloomFilter<T> extends RedissonExpirable implements RBloomFilter<T> {

    public boolean add(T object) {
        // 构造多个哈希值
        long[] hashes = hash(object);

        while (true) {
            if (size == 0) {
                // 配置以哈希表的形式存在 Redis 中
                // 这里执行 HGETALL
                readConfig();
            }

            int hashIterations = this.hashIterations;
            long size = this.size;

            // 需要设置为 1 的索引
            long[] indexes = hash(hashes[0], hashes[1], hashIterations, size);

            // 新建客户端
            CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
            addConfigCheck(hashIterations, size, executorService);
            RBitSetAsync bs = createBitSet(executorService);

            // 依次执行 set 操作
            for (int i = 0; i < indexes.length; i++) {
                bs.setAsync(indexes[i]);
            }
            try {
                List<Boolean> result = (List<Boolean>) executorService.execute();

                for (Boolean val : result.subList(1, result.size()-1)) {
                    if (!val) {
                        return true;
                    }
                }
                return false;
            } catch (RedisException e) {
                if (!e.getMessage().contains("Bloom filter config has been changed")) {
                    throw e;
                }
            }
        }
    }
}

contains 函数在此就一笔带过,只是将 add 函数中的 SET 变成 GET 而已。我们关注的重点是哈希函数,先看这个以 object 为参数的 hash

    private long[] hash(Object object) {
        ByteBuf state = encode(object);
        try {
            return Hash.hash128(state);
        } finally {
            state.release();
        }
    }

这个函数将 Object 编码后,返回一个 Byte 数组,然后调用了 Hash.hash128 计算哈希值。这里的哈希算法是:HighwayHash

// Hash.java
public static long[] hash128(ByteBuf objectState) {
    HighwayHash h = calcHash(objectState);
    return h.finalize128();
}

protected static HighwayHash calcHash(ByteBuf objectState) {
    HighwayHash h = new HighwayHash(KEY);
    int i;
    int length = objectState.readableBytes();
    int offset = objectState.readerIndex();
    byte[] data = new byte[32];

    // 分区计算哈希
    for (i = 0; i + 32 <= length; i += 32) {
        objectState.getBytes(offset  + i, data);
        h.updatePacket(data, 0);
    }
    if ((length & 31) != 0) {
        data = new byte[length & 31];
        objectState.getBytes(offset  + i, data);
        h.updateRemainder(data, 0, length & 31);
    }
    return h;
}

布隆过滤器里用到的第二个哈希函数,计算出最后的索引值:

private long[] hash(long hash1, long hash2, int iterations, long size) {
    long[] indexes = new long[iterations];
    long hash = hash1;

    // 多次迭代
    for (int i = 0; i < iterations; i++) {
        indexes[i] = (hash & Long.MAX_VALUE) % size;

        // 根据迭代次数选择哈希值,累加
        if (i % 2 == 0) {
            hash += hash2;
        } else {
            hash += hash1;
        }
    }
    return indexes;
}