对另一篇博客代码的补充。
原博是谁不知道,参考博文:https://segmentfault.com/a/1190000012620152
不再基于jedis,改用redisTemplate。跑了几次,发现确实可以动态扩容。原博牛逼!!!
RedisBloomFilter.java
package com.ylzinfo.ehc.server.bloomFilter.redis;
import com.google.common.base.Predicate;
import com.google.common.hash.Funnel;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.Serializable;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* @Auther: syh
* @Date: 2020/7/10
* @Description: 基于redis和guava的bloomFilter
*/
public class RedisBloomFilter<T> implements Predicate<T>, Serializable {
private final RedisBitmaps bits;
private final int numHashFunctions;
private final Funnel<? super T> funnel;
private final RedisBloomFilter.Strategy strategy;
private RedisBloomFilter(
RedisBitmaps bits, int numHashFunctions, Funnel<? super T> funnel, RedisBloomFilter.Strategy strategy) {
checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
checkArgument(
numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
this.bits = checkNotNull(bits);
this.numHashFunctions = numHashFunctions;
this.funnel = checkNotNull(funnel);
this.strategy = checkNotNull(strategy);
}
public static <T> RedisBloomFilter create(Funnel<? super T> funnel, int expectedInsertions, double fpp) {
return create(funnel, (long) expectedInsertions, fpp);
}
public static <T> RedisBloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp) {
return create(funnel, expectedInsertions, fpp, RedisBloomFilterStrategies.MURMUR128_MITZ_64);
}
static <T> RedisBloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, RedisBloomFilter.Strategy strategy) {
checkNotNull(funnel);
checkArgument(
expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
checkNotNull(strategy);
if (expectedInsertions == 0) {
expectedInsertions = 1;
}
long numBits = optimalNumOfBits(expectedInsertions, fpp);
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
try {
return new RedisBloomFilter<T>(new RedisBitmaps(numBits), numHashFunctions, funnel, strategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
}
}
@Override
public boolean apply(@Nullable T input) {
return mightContain(input);
}
public boolean put(T object) {
return strategy.put(object, funnel, numHashFunctions, bits);
}
public boolean mightContain(T object) {
return strategy.mightContain(object, funnel, numHashFunctions, bits);
}
static long optimalNumOfBits(long n, double p) {
if (p == 0) {
p = Double.MIN_VALUE;
}
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}
static int optimalNumOfHashFunctions(long n, long m) {
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
interface Strategy extends Serializable {
<T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);
<T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);
int ordinal();
}
}
RedisBloomFilterStrategies.java
package com.ylzinfo.ehc.server.bloomFilter.redis;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Longs;
/**
* @Auther: syh
* @Date: 2020/7/10
* @Description:
*/
public enum RedisBloomFilterStrategies implements RedisBloomFilter.Strategy {
MURMUR128_MITZ_64() {
@Override
public <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
boolean bitsChanged = false;
long combinedHash = hash1;
long[] offsets = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
combinedHash += hash2;
}
bitsChanged = bits.set(offsets);
bits.ensureCapacityInternal();//自动扩容
return bitsChanged;
}
@Override
public <T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
long combinedHash = hash1;
long[] offsets = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
combinedHash += hash2;
}
return bits.get(offsets);
}
private /* static */ long lowerEight(byte[] bytes) {
return Longs.fromBytes(
bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
}
private /* static */ long upperEight(byte[] bytes) {
return Longs.fromBytes(
bytes[15], bytes[14], bytes[13], bytes[12], bytes[11], bytes[10], bytes[9], bytes[8]);
}
}
}
RedisBitmaps.java
package com.ylzinfo.ehc.server.bloomFilter.redis;
import com.google.common.math.LongMath;
import com.google.common.primitives.Longs;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.List;
import java.util.stream.LongStream;
/**
* @Auther: syh
* @Date: 2020/7/10
* @Description:
*/
public class RedisBitmaps {
private static final String BASE_KEY = "bloomfilter";
private static final String CURSOR = "cursor";
private long bitSize;
RedisBitmaps(long bits) {
this.bitSize = LongMath.divide(bits, 64, RoundingMode.CEILING) * Long.SIZE;//位数组的长度,相当于n个long的长度
if (bitCount() == 0) {
RedisExecutor.newExecutor()
.execute(conn -> conn.setBit(currentKey().getBytes(), bitSize - 1, false));
}
}
boolean get(long[] offsets) {
for (long i = 0; i < cursor() + 1; i++) {
final long cursor = i;
//只要有一个cursor对应的bitmap中,offsets全部命中,则表示可能存在
boolean match = Arrays.stream(offsets).boxed()
.map(offset -> {
List<Boolean> list = RedisExecutor.newExecutor()
.executePipelined(conn -> conn.getBit(genkey(cursor).getBytes(), offset));
return !list.contains(false);
})
.allMatch(b -> b == null ? false : b);
if (match)
return true;
}
return false;
}
boolean get(final long offset) {
Boolean rst = RedisExecutor.newExecutor()
.execute(conn -> conn.getBit(currentKey().getBytes(), offset));
return rst == null ? false : rst;
}
boolean set(long[] offsets) {
if (cursor() > 0 && get(offsets)) {
return false;
}
boolean bitsChanged = false;
for (long offset : offsets)
bitsChanged |= set(offset);
return bitsChanged;
}
boolean set(long offset) {
if (!get(offset)) {
RedisExecutor.newExecutor()
.execute(conn -> conn.setBit(currentKey().getBytes(), offset, true));
return true;
}
return false;
}
long bitCount() {
Long rst = RedisExecutor.newExecutor()
.execute(conn -> conn.bitCount(currentKey().getBytes()));
return rst == null ? 0 : rst;
}
long bitSize() {
return this.bitSize;
}
private String currentKey() {
return genkey(cursor());
}
private String genkey(long cursor) {
return BASE_KEY + "-" + cursor;
}
private long cursor() {
String cursor = RedisExecutor.newExecutor()
.execute(conn -> conn.get(CURSOR.getBytes()));
return cursor == null ? 0 : Longs.tryParse(cursor);
}
void ensureCapacityInternal() {
if (bitCount() * 2 > bitSize())
grow();
}
void grow() {
Long cursor = RedisExecutor.newExecutor()
.execute((conn) -> conn.incr(CURSOR.getBytes()));
RedisExecutor.newExecutor()
.execute(conn -> conn.setBit(genkey(cursor).getBytes(), bitSize - 1, false));
}
void reset() {
byte[][] keys = LongStream.range(0, cursor() + 1).boxed().map(k -> genkey(k).getBytes()).toArray(byte[][]::new);
RedisExecutor.newExecutor()
.execute(conn -> conn.del(keys));
RedisExecutor.newExecutor()
.execute(conn -> conn.set(CURSOR.getBytes(), "0".getBytes()));
RedisExecutor.newExecutor()
.execute(conn -> conn.setBit(currentKey().getBytes(), bitSize - 1, false));
}
}
RedisExecutor.java
package com.ylzinfo.ehc.server.bloomFilter.redis;
import com.ylzinfo.ehc.core.gateway.SpringContextUtil;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;
/**
* @Auther: syh
* @Date: 2020/7/10
* @Description:
*/
public class RedisExecutor<T> {
private RedisTemplate redisTemplate;
public static <T> RedisExecutor<T> newExecutor() {
return new RedisExecutor<>();
}
public RedisExecutor() {
redisTemplate = SpringContextUtil.getBean("redisTemplate");
}
public <T> T execute(PipelineExecutor executor) {
return (T) redisTemplate.execute((RedisCallback) conn -> {
conn.openPipeline();
T rst = (T) executor.exec(conn);
conn.close();
return rst;
});
}
public<T> List<T> executePipelined(PipelineExecutor executor) {
List<T> list = redisTemplate.executePipelined((RedisCallback) conn -> {
conn.openPipeline();
executor.exec(conn);
conn.close();
return null;
});
return list;
}
@FunctionalInterface
public interface PipelineExecutor<T> {
T exec(RedisConnection conn);
}
}
测试controller: 参数num表示待校验数据,grow表示是否开启扩容,每次扩容1000条。
package com.ylzinfo.ehc.server.bloomFilter.redis;
import com.google.common.hash.Funnels;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Auther: syh
* @Date: 2020/7/10
* @Description:
*/
@RestController
public class RedisTest {
RedisBloomFilter<String> bloomFilter = RedisBloomFilter.create(
Funnels.stringFunnel(Charset.forName("utf-8")), 1000, 0.1);
private boolean instance =true;
private AtomicInteger incr = new AtomicInteger(0);
@RequestMapping("bloom/filter/test")
public Object test(HttpServletRequest request) {
String num = request.getParameter("num");
String grow = request.getParameter("grow");
if (instance || "true".equals(grow)) {
for (int i = 0; i < 1000; i++) {
bloomFilter.put(String.valueOf(incr.getAndIncrement()));
}
instance = false;
}
return bloomFilter.mightContain(num);
}
}
num=1,grow=false时,返回true(命中目标)
num-1000,grow=false时,返回false(因为还没扩容,所以未命中)
num-1000,grow=true时,返回true(已扩容,所以命中)