存储数据经常读写的内容,我们一般会存放到redis中。最开始为了方便,直接将某个业务需要用到的数据序列化后存到一个key中。随着数据量的增长,key对应的value值越来越大,随之而来的就是性能问题。在生产环境中,redis都崩溃了几次,重启后才恢复正常,原因可能有很多,猜测大key就是其中之一的问题。那么拆分大key就刻不容缓了,为保证业务稳定,还需要进行多维度的分析。
原本采用的存取方法
优点:
1、整存整取,没有数据一致性问题
2、可以整体设置失效时间
缺点:
当value的内容很大时,会有性能问题
public class RedisObjectSerializer extends Jackson2JsonRedisSerializer<Object> {
public RedisObjectSerializer() {
super(Object.class);
ObjectMapper om = new ObjectMapper()
.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY)
.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
//日期格式
.setDateFormat(new SimpleDateFormat(DEFAULT_DATE_TIME_FORMAT));
SimpleModule simpleModule = new SimpleModule()
.addDeserializer(Enum.class, EnumDeserializer.INSTANCE)
.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)))
.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)))
.addSerializer(LocalDate.class,new LocalDateSerializer(DateTimeFormatter.ofPattern(DateUtils.DEFAULT_DATE_FORMAT)))
.addDeserializer(LocalDate.class,new LocalDateDeserializer(DateTimeFormatter.ofPattern(DateUtils.DEFAULT_DATE_FORMAT)))
;
om.registerModule(simpleModule);
this.setObjectMapper(om);
}
}
/**
* value 序列化
*/
private static final RedisObjectSerializer OBJECT_SERIALIZER = new RedisObjectSerializer();
/**
* 添加到带有 过期时间的 缓存
*
* @param key redis主键
* @param value 值
* @param time 过期时间(单位秒)
*/
@Override
public void setExpire(final String key, final Object value, final long time) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
byte[] keys = serializer.serialize(key);
byte[] values = OBJECT_SERIALIZER.serialize(value);
connection.setEx(keys, time, values);
return 1L;
});
}
/**
* 根据key获取对象
*
* @param key the key
* @return the string
*/
@Override
public <T> T get(final String key) {
T resultStr = redisTemplate.execute((RedisCallback<T>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
byte[] keys = serializer.serialize(key);
byte[] values = connection.get(keys);
return (T) OBJECT_SERIALIZER.deserialize(values);
});
log.debug("[redisTemplate redis]取出 缓存 url:{} ", key);
return resultStr;
}而如果要进行拆分,则需要解决以下问题
1、以什么算法进行拆分?
2、如何统一进行存取操作?
3、如何控制缓存失效?
4、如何保证数据一致性?
5、如何支持批量操作?
先讲一下第四点,不具有共性,而是业务层面的处理,原本有多个key存储数据,value是list,多个key存在包含的关系,这也就导致相同的数据会存储多份。为减少部分数据计划改造为将公共部分单独作为key存储,而最大范围的key通过组装数据的方式进行拼接,这样就减小了数据存储的大小。因此就要解决多个key组合控制整体失效的问题。
批量操作,我们可以创建分组处理方法
// 设置分组缓存 setSplitListByExpireGroup(List<String> keys,List<List<T>> groupList, long time); // 获取分组缓存 getSplitListLikeKeyGroup(List<String> keys); // 清除分组缓存 clearSplitListLikeKeyGroup(List<String> keys);
拆分数据,可以通过将数据转json,然后获取byte大小,计划拆分为100k一个,根据byte大小计算拆分为多少个,再根据大小计算每个list的长度
/**
* 计算list按照100k拆分后一共多少个
* @param list
* @return
*/
private <T> int calculateListSize(List<T> list){
int listSize = 1;
try {
long size = JSON.toJSONString(list).getBytes().length;
listSize = (int)Math.floor(new BigDecimal(size).divide(new BigDecimal(100 * 1024),0,BigDecimal.ROUND_CEILING).doubleValue());
// 如果值小于等于0则重置为1
listSize = listSize <= 0 ? 1 : listSize;
}catch (Exception e){
log.error(e.getMessage(),e);
}
return listSize;
}
// 计算每个list的长度
int oneListSize = (int)Math.floor(new BigDecimal(list.size()).divide(new BigDecimal(listSize),0,BigDecimal.ROUND_CEILING).doubleValue()); 统一存操作,我们可以将拆分后的list,拼接后缀记录下标,使用管道方式,一次性存入。
// 开启批量命令执行
int finalM = m;
redisTemplate.executePipelined((RedisCallback<Object>) connection->{
RedisSerializer<String> serializer = getRedisSerializer();
// 遍历拆分后的集合
for (int i = 1; i <= listSize; i++) {
int startIndex = Math.min((i-1) * oneListSize, list.size());
int endIndex = Math.min(startIndex + oneListSize, list.size());
String singleKey = "SPLIT_" + keys.get(finalM) + "_" + i;
List<T> singleValue = new ArrayList<T>(list.subList(startIndex,endIndex));
byte[] singleKeyByte = serializer.serialize(singleKey);
byte[] values = OBJECT_SERIALIZER.serialize(singleValue);
if(singleKeyByte==null || values == null){
continue;
}
// 数据最大缓存12小时
connection.setEx(singleKeyByte, 12 * 60 * 60, values);
}
return null;
}); 统一取操作,可以根据key模糊匹配所有的key,然后根据key后缀的序号按顺序读取数据,再合并成一个list
List<Integer> orderList = new ArrayList<>();
// 开启批量命令执行
List<Object> result = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
if (CollectionUtils.isNotEmpty(allKeys)) {
List<String> forKeys = new ArrayList<String>(allKeys).stream().sorted().collect(Collectors.toList());
for (String singleKey : forKeys) {
byte[] singleKeyByte = serializer.serialize(singleKey);
if (singleKeyByte == null) {
continue;
}
// 拆分排序号
Integer order = Integer.parseInt(singleKey.substring(singleKey.lastIndexOf("_") + 1, singleKey.length()));
orderList.add(order);
connection.get(singleKeyByte);
}
}
return null;
});
if (CollectionUtils.isEmpty(result)) {
return resultList;
}
// 按顺序组装数据
for (int i = 0; i < orderList.size(); i++) {
Object o = result.get(i);
if(Objects.isNull(o)){
continue;
}
resultList.addAll((List<T>) o);
} 相比整存整取,拆分多个key后,我们需要手动管理缓存失效,同一个key的缓存统一失效,使用系统自带的缓存失效机制,有可能失效时间不一致,导致数据不正确
在分组获取数据的方法中,拼装一个分组校验的key用于检测缓存是否失效,如果失效了,就将所有的key全部清除,然后返回空等待重新查询数据(从数据库中查询然后存入redis)
同时针对业务的批量操作,只要根据一个key模糊匹配未查询到数据,则说明单个key的缓存可能失效了,为保证整体数据有效,直接返回空等待重新查询数据(从数据库中查询然后存入redis)
// 分组校验数据
String VALID_GROUP_KEY = "VALID_GROUP_SPLIT_" + "【"+Strings.join(keys,',')+"】";
Set<String> allKeys = new HashSet<>();
// 是否存在空数据
boolean exitNull = false;
for (String key : keys) {
// 获取模拟匹配的keys
Set<String> curKeys = redisTemplate.keys("SPLIT_" + key + "*");
if (CollectionUtils.isNotEmpty(curKeys)) {
allKeys.addAll(curKeys);
} else {
exitNull = true;
}
}
// 存在空数据
if (exitNull) {
redisTemplate.delete(VALID_GROUP_KEY);
// 直接返回空列表
return resultList;
}
Boolean hasKey = redisTemplate.hasKey(VALID_GROUP_KEY);
// 缓存过期
if (Objects.isNull(hasKey) || !hasKey) {
// 删除模糊匹配key的全部缓存
redisTemplate.delete(allKeys);
// 直接返回空列表
return resultList;
} 关于数据一致性的问题,先来分析一下产生原因
多个请求同时操作一个key的时候,有存数据的,有清除数据的,有取数据的,每个请求中又包含多个redis指令。redis是单线程的,这样就有可能导致数据存取出现问题,虽然几率小,问题还是问题,得避免。
尝试的几种方案
1、使用redis的事务
结果:程序中使用了execute和executePipelined ,executePipelined是非原子性的,无法使用事务,如果改造成execute就失去了减小连接数的意义
2、使用Map记录当前正在执行的key,如果执行key时发现存在key,就等待,直到没有key了再执行
结果:当请求数大时,一个请求正在执行,其余请求均等待,第一个结束后,其余请求均释放,而不是一个一个执行
3、使用ReentrantLock锁的方法
结果:每个key存一个锁对象,如果key执行时,获取锁对象,然后加锁。这样的写法会导致所有的请求都会等待锁释放,有性能问题。
4、使用Map记录每个key的执行队列
结果:可满足需求
具体实现
每个key存一个队列,当前操作生成uuid,并将其存入队列尾部,循环等待,验证队列第一个内容是否为当前生成的uuid,如果是就继续执行,执行完成后删除第一个元素。如果不是就一直等待,直到匹配成功为止
// 存放队列map
private final static ConcurrentMap<String,Queue<String>> keyQueueMap = new ConcurrentHashMap<String,Queue<String>>();
private void waitAndSetKeyUsed(String key,String service,String curFlag){
if(outKeyUsedLog){
System.out.println(curFlag+"开始-------------------------------------------------------------------------------------------------");
System.out.println("!"+key+"!");
}
// 获取uuid队列
Queue<String> queue = Optional.ofNullable(keyQueueMap.get(key)).orElse(new LinkedList<String>());
// 添加一个元素到末尾
queue.add(curFlag);
// 立即设置到缓存中
keyQueueMap.put(key,queue);
// 如果队列不为空
if(CollectionUtils.isNotEmpty(queue)){
// 循环等待
int count = 0;
for(;;){
try {
count++;
// 获取第一个元素
String first = StrHelper.getObjectValue(queue.peek());
int size = queue.size();
// 如果第一个元素就是当前传入元素
if(curFlag.equals(first)){
if(outKeyUsedLog) {
System.out.println(service + "方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]待执行元素匹配成功,即将执行");
}
break;
}
// 等待10毫秒
Thread.sleep(10);
if(outKeyUsedLog){
System.out.println(service+"方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]待执行元素匹配失败,等待队列执行"+count*10+"毫秒");
}
if(count > 30 * 100){
if(outKeyUsedLog) {
System.out.println(service + "方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]待执行元素匹配失败,等待队列执行超时30秒");
}
break;
}
} catch (Exception e) {
log.error(e.getMessage(),e);
break;
}
}
}
}
private void removeKeyUsed(String key,String service,String curFlag){
// 获取uuid队列
Queue<String> queue = Optional.ofNullable(keyQueueMap.get(key)).orElse(new LinkedList<String>());
// 如果队列不为空
if(CollectionUtils.isNotEmpty(queue)){
// 获取第一个元素
String first = StrHelper.getObjectValue(queue.peek());
int size = queue.size();
// 如果第一个元素就是当前传入元素
if(curFlag.equals(first)){
// 弹出第一个元素
queue.poll();
if(outKeyUsedLog) {
System.out.println(service + "方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]"+curFlag+"执行完成");
}
}
}
if(outKeyUsedLog){
System.out.println(curFlag+"结束-------------------------------------------------------------------------------------------------");
System.out.println("");
}
}
示例:
public void clearSplitListLikeKeyGroup(List<String> keys) {
// 分组校验数据
String VALID_GROUP_KEY = "VALID_GROUP_SPLIT_" + "【"+Strings.join(keys,',')+"】";
// 当前操作标识
String curFlag = UUID.randomUUID().toString();
// 检查是否被占用
waitAndSetKeyUsed(VALID_GROUP_KEY,"clearGroup",curFlag);
try {
// 清理数据缓存
for (String key : keys) {
clearSplitListLikeKey(key);
}
// 清理组合失效时间缓存
clearSplitListLikeValidKey("VALID_GROUP_SPLIT_" + "【" + Strings.join(keys, ',') + "】");
} catch (Exception e){
log.error(e.getMessage(),e);
} finally {
removeKeyUsed(VALID_GROUP_KEY,"clearGroup",curFlag);
}
}运行示例:
接口并发请求
开始执行,队列中等待10个待执行
等待1910毫秒后,开始执行getGroup的业务操作
完整代码:
public class RedisObjectSerializer extends Jackson2JsonRedisSerializer<Object> {
public RedisObjectSerializer() {
super(Object.class);
ObjectMapper om = new ObjectMapper()
.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY)
.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true)
//日期格式
.setDateFormat(new SimpleDateFormat(DEFAULT_DATE_TIME_FORMAT));
SimpleModule simpleModule = new SimpleModule()
.addDeserializer(Enum.class, EnumDeserializer.INSTANCE)
.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)))
.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_DATE_TIME_FORMAT)))
.addSerializer(LocalDate.class,new LocalDateSerializer(DateTimeFormatter.ofPattern(DateUtils.DEFAULT_DATE_FORMAT)))
.addDeserializer(LocalDate.class,new LocalDateDeserializer(DateTimeFormatter.ofPattern(DateUtils.DEFAULT_DATE_FORMAT)))
;
om.registerModule(simpleModule);
this.setObjectMapper(om);
}
}public class RedisRepositoryImpl implements CacheRepository {
/**
* 默认编码
*/
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
/**
* value 序列化
*/
private static final RedisObjectSerializer OBJECT_SERIALIZER = new RedisObjectSerializer();
/**
* Spring Redis Template
*/
private RedisTemplate<String, Object> redisTemplate;
/**
* 存放key当前是否被使用
*/
private final static ConcurrentMap<String,ReentrantLock> keyUsedMap = new ConcurrentHashMap<String, ReentrantLock>();
// 输出日志 info级别时输出
private boolean outKeyUsedLog = log.isInfoEnabled();
// 存放队列map
private final static ConcurrentMap<String,Queue<String>> keyQueueMap = new ConcurrentHashMap<String,Queue<String>>();
public RedisRepositoryImpl(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 获取链接工厂
*/
public RedisConnectionFactory getConnectionFactory() {
return this.redisTemplate.getConnectionFactory();
}
/**
* 获取 RedisTemplate对象
*/
public RedisTemplate<String, Object> getRedisTemplate() {
return redisTemplate;
}
/**
* 清空DB
*
* @param node redis 节点
*/
public void flushDb(RedisClusterNode node) {
this.redisTemplate.opsForCluster().flushDb(node);
}
/**
* 添加到带有 过期时间的 缓存
*
* @param key redis主键
* @param value 值
* @param time 过期时间(单位秒)
*/
public void setExpire(final byte[] key, final byte[] value, final long time) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
connection.setEx(key, time, value);
log.debug("[redisTemplate redis]放入 缓存 url:{} ========缓存时间为{}秒", key, time);
return 1L;
});
}
/**
* 添加到带有 过期时间的 缓存
*
* @param key redis主键
* @param value 值
* @param time 过期时间(单位秒)
*/
@Override
public void setExpire(final String key, final Object value, final long time) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
byte[] keys = serializer.serialize(key);
byte[] values = OBJECT_SERIALIZER.serialize(value);
connection.setEx(keys, time, values);
return 1L;
});
}
/**
* 一次性添加数组到 过期时间的 缓存,不用多次连接,节省开销
*
* @param keys redis主键数组
* @param values 值数组
* @param time 过期时间(单位秒)
*/
public void setExpire(final String[] keys, final Object[] values, final long time) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
for (int i = 0; i < keys.length; i++) {
byte[] bKeys = serializer.serialize(keys[i]);
byte[] bValues = OBJECT_SERIALIZER.serialize(values[i]);
connection.setEx(bKeys, time, bValues);
}
return 1L;
});
}
/**
* 一次性添加数组到 过期时间的 缓存,不用多次连接,节省开销
*
* @param keys the keys
* @param values the values
*/
public void set(final String[] keys, final Object[] values) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
for (int i = 0; i < keys.length; i++) {
byte[] bKeys = serializer.serialize(keys[i]);
byte[] bValues = OBJECT_SERIALIZER.serialize(values[i]);
connection.set(bKeys, bValues);
}
return 1L;
});
}
/**
* 添加到缓存
*
* @param key the key
* @param value the value
*/
@Override
public void set(final String key, final Object value) {
redisTemplate.execute((RedisCallback<Long>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
byte[] keys = serializer.serialize(key);
byte[] values = OBJECT_SERIALIZER.serialize(value);
connection.set(keys, values);
log.debug("[redisTemplate redis]放入 缓存 url:{}", key);
return 1L;
});
}
/**
* 查询在这个时间段内即将过期的key
* 注意:
* 在服务器上执行 keys 命令是非常耗时的, 在使用该方法前,请三思!!!
* 在服务器上执行 keys ** 命令是非常恐怖的,禁止执行!!!
*
* @param key the key
* @param time the time
* @return the list
*/
public List<String> willExpire(final String key, final long time) {
if (StrUtil.isEmpty(key)) {
return Collections.emptyList();
}
if ("*".equals(key.trim())) {
throw new IllegalArgumentException("禁止模糊查询所有的key");
}
final List<String> keysList = new ArrayList<>();
redisTemplate.execute((RedisCallback<List<String>>) connection -> {
Set<String> keys = redisTemplate.keys(key + "*");
for (String key1 : keys) {
Long ttl = connection.ttl(key1.getBytes(DEFAULT_CHARSET));
if (0 <= ttl && ttl <= 2 * time) {
keysList.add(key1);
}
}
return keysList;
});
return keysList;
}
/**
* 查询在以keyPatten的所有 key
* 注意:
* 在服务器上执行 keys 命令是非常耗时的, 在使用该方法前,请三思!!!
* 在服务器上执行 keys ** 命令是非常恐怖的,禁止执行!!!
*
* @param keyPatten the key patten
* @return the set
*/
@Override
public Set<String> keys(final String keyPatten) {
if (StrUtil.isEmpty(keyPatten)) {
return Collections.emptySet();
}
if ("*".equals(keyPatten.trim())) {
throw new IllegalArgumentException("禁止模糊查询所有的key");
}
return redisTemplate.execute((RedisCallback<Set<String>>) connection -> redisTemplate.keys(keyPatten + "*"));
}
/**
* 根据key获取对象
*
* @param key the key
* @return the byte [ ]
*/
public byte[] get(final byte[] key) {
byte[] result = redisTemplate.execute((RedisCallback<byte[]>) connection -> connection.get(key));
log.debug("[redisTemplate redis]取出 缓存 url:{} ", key);
return result;
}
/**
* 根据key获取对象
*
* @param key the key
* @return the string
*/
@Override
public <T> T get(final String key) {
T resultStr = redisTemplate.execute((RedisCallback<T>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
byte[] keys = serializer.serialize(key);
byte[] values = connection.get(keys);
return (T) OBJECT_SERIALIZER.deserialize(values);
});
log.debug("[redisTemplate redis]取出 缓存 url:{} ", key);
return resultStr;
}
private void removeKeyUsed(String key,String service,String curFlag){
// 获取uuid队列
Queue<String> queue = Optional.ofNullable(keyQueueMap.get(key)).orElse(new LinkedList<String>());
// 如果队列不为空
if(CollectionUtils.isNotEmpty(queue)){
// 获取第一个元素
String first = StrHelper.getObjectValue(queue.peek());
int size = queue.size();
// 如果第一个元素就是当前传入元素
if(curFlag.equals(first)){
// 弹出第一个元素
queue.poll();
if(outKeyUsedLog) {
System.out.println(service + "方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]"+curFlag+"执行完成");
}
}
}
if(outKeyUsedLog){
System.out.println(curFlag+"结束-------------------------------------------------------------------------------------------------");
System.out.println("");
}
}
private void waitAndSetKeyUsed(String key,String service,String curFlag){
if(outKeyUsedLog){
System.out.println(curFlag+"开始-------------------------------------------------------------------------------------------------");
System.out.println("!"+key+"!");
}
// 获取uuid队列
Queue<String> queue = Optional.ofNullable(keyQueueMap.get(key)).orElse(new LinkedList<String>());
// 添加一个元素到末尾
queue.add(curFlag);
// 立即设置到缓存中
keyQueueMap.put(key,queue);
// 如果队列不为空
if(CollectionUtils.isNotEmpty(queue)){
// 循环等待
int count = 0;
for(;;){
try {
count++;
// 获取第一个元素
String first = StrHelper.getObjectValue(queue.peek());
int size = queue.size();
// 如果第一个元素就是当前传入元素
if(curFlag.equals(first)){
if(outKeyUsedLog) {
System.out.println(service + "方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]待执行元素匹配成功,即将执行");
}
break;
}
// 等待10毫秒
Thread.sleep(10);
if(outKeyUsedLog){
System.out.println(service+"方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]待执行元素匹配失败,等待队列执行"+count*10+"毫秒");
}
if(count > 30 * 100){
if(outKeyUsedLog) {
System.out.println(service + "方法 " + key + " 请求[" + curFlag + "] " + "队列["+size+"]待执行元素匹配失败,等待队列执行超时30秒");
}
break;
}
} catch (Exception e) {
log.error(e.getMessage(),e);
break;
}
}
}
}
@Override
public void clearSplitListLikeKeyGroup(List<String> keys) {
// 分组校验数据
String VALID_GROUP_KEY = "VALID_GROUP_SPLIT_" + "【"+Strings.join(keys,',')+"】";
// 当前操作标识
String curFlag = UUID.randomUUID().toString();
// 检查是否被占用
waitAndSetKeyUsed(VALID_GROUP_KEY,"clearGroup",curFlag);
try {
// 清理数据缓存
for (String key : keys) {
clearSplitListLikeKey(key);
}
// 清理组合失效时间缓存
clearSplitListLikeValidKey("VALID_GROUP_SPLIT_" + "【" + Strings.join(keys, ',') + "】");
} catch (Exception e){
log.error(e.getMessage(),e);
} finally {
removeKeyUsed(VALID_GROUP_KEY,"clearGroup",curFlag);
}
}
@Override
public void clearSplitListLikeKey(String key){
key = "SPLIT_" + key;
// 当前操作标识
String curFlag = UUID.randomUUID().toString();
// 等待和设置key占用情况
waitAndSetKeyUsed("VALID_"+key,"clear",curFlag);
try {
// 获取模拟匹配的keys
Set<String> keys = redisTemplate.keys(key + "*");
keys = Optional.ofNullable(keys).orElse(new HashSet<>());
if (CollectionUtils.isNotEmpty(keys)) {
// 删除模糊匹配key的全部缓存
redisTemplate.delete(keys);
}
// 自动清理 失效时间缓存
clearSplitListLikeValidKey("VALID_" + key);
} catch (Exception e){
log.error(e.getMessage(),e);
} finally {
removeKeyUsed("VALID_"+key,"clear",curFlag);
}
}
@Override
public void clearSplitListLikeValidKey(String validKey) {
// 获取模拟匹配的keys
Set<String> keys = redisTemplate.keys( validKey + "*");
keys = Optional.ofNullable(keys).orElse(new HashSet<>());
if(CollectionUtils.isNotEmpty(keys)){
// 删除模糊匹配key的全部缓存
redisTemplate.delete(keys);
}
}
@Override
public void clearSplitListLikeKeyAllKey(String key) {
// 获取模拟匹配的keys
Set<String> keys = redisTemplate.keys( "*" + key + "*");
keys = Optional.ofNullable(keys).orElse(new HashSet<>());
if(CollectionUtils.isNotEmpty(keys)){
// 删除模糊匹配key的全部缓存
redisTemplate.delete(keys);
}
}
@Override
public <T> List<T> getSplitListLikeKeyGroup(List<String> keys) {
List<T> resultList = new ArrayList<T>();
if(CollectionUtils.isEmpty(keys)){
return resultList;
}
keys = keys.stream().sorted().collect(Collectors.toList());
// 当前操作标识
String curFlag = UUID.randomUUID().toString();
// 分组校验数据
String VALID_GROUP_KEY = "VALID_GROUP_SPLIT_" + "【"+Strings.join(keys,',')+"】";
// 等待和设置key占用情况
waitAndSetKeyUsed(VALID_GROUP_KEY,"getGroup",curFlag);
Set<String> allKeys = new HashSet<>();
try {
// 是否存在空数据
boolean exitNull = false;
for (String key : keys) {
// 获取模拟匹配的keys
Set<String> curKeys = redisTemplate.keys("SPLIT_" + key + "*");
if (CollectionUtils.isNotEmpty(curKeys)) {
allKeys.addAll(curKeys);
} else {
exitNull = true;
}
}
// 存在空数据
if (exitNull) {
redisTemplate.delete(VALID_GROUP_KEY);
// 直接返回空列表
return resultList;
}
Boolean hasKey = redisTemplate.hasKey(VALID_GROUP_KEY);
// 缓存过期
if (Objects.isNull(hasKey) || !hasKey) {
// 删除模糊匹配key的全部缓存
redisTemplate.delete(allKeys);
// 直接返回空列表
return resultList;
}
List<Integer> orderList = new ArrayList<>();
// 开启批量命令执行
List<Object> result = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
if (CollectionUtils.isNotEmpty(allKeys)) {
List<String> forKeys = new ArrayList<String>(allKeys).stream().sorted().collect(Collectors.toList());
for (String singleKey : forKeys) {
byte[] singleKeyByte = serializer.serialize(singleKey);
if (singleKeyByte == null) {
continue;
}
// 拆分排序号
Integer order = Integer.parseInt(singleKey.substring(singleKey.lastIndexOf("_") + 1, singleKey.length()));
orderList.add(order);
connection.get(singleKeyByte);
}
}
return null;
});
if (CollectionUtils.isEmpty(result)) {
return resultList;
}
// 按顺序组装数据
for (int i = 0; i < orderList.size(); i++) {
Object o = result.get(i);
if(Objects.isNull(o)){
continue;
}
resultList.addAll((List<T>) o);
}
} catch (Exception e){
log.error(e.getMessage(),e);
} finally {
removeKeyUsed(VALID_GROUP_KEY,"getGroup",curFlag);
}
return resultList;
}
@Override
public <T> List<T> getSplitListLikeKey(String key){
List<T> resultList = new ArrayList<T>();
key = "SPLIT_" + key;
// 验证缓存key
String VALID_KEY = "VALID_" + key;
// 当前操作标识
String curFlag = UUID.randomUUID().toString();
try {
// 等待和设置key占用情况
waitAndSetKeyUsed(VALID_KEY,"get",curFlag);
// 获取模拟匹配的keys
Set<String> keys = redisTemplate.keys(key + "*");
// 未匹配到key
if (CollectionUtils.isEmpty(keys)) {
// 直接返回空列表
return resultList;
}
Boolean hasKey = redisTemplate.hasKey(VALID_KEY);
// 缓存过期
if (Objects.isNull(hasKey) || !hasKey) {
// 删除模糊匹配key的全部缓存
redisTemplate.delete(keys);
// 直接返回空列表
return resultList;
}
List<Integer> orderList = new ArrayList<>();
// 开启批量命令执行
List<Object> result = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
if (CollectionUtils.isNotEmpty(keys)) {
for (String singleKey : keys) {
byte[] singleKeyByte = serializer.serialize(singleKey);
if (singleKeyByte == null) {
continue;
}
// 拆分排序号
Integer order = Integer.parseInt(singleKey.substring(singleKey.lastIndexOf("_") + 1, singleKey.length()));
orderList.add(order);
connection.get(singleKeyByte);
}
}
return null;
});
if (CollectionUtils.isEmpty(result)) {
return resultList;
}
// 按顺序组装数据
for (int i = 0; i < orderList.size(); i++) {
Object o = result.get(i);
resultList.addAll((List<T>) o);
}
} catch (Exception e){
log.error(e.getMessage(),e);
} finally {
removeKeyUsed(VALID_KEY,"get",curFlag);
}
return resultList;
}
/**
* 计算list按照100k拆分后一共多少个
* @param list
* @return
*/
private <T> int calculateListSize(List<T> list){
int listSize = 1;
try {
long size = JSON.toJSONString(list).getBytes().length;
listSize = (int)Math.floor(new BigDecimal(size).divide(new BigDecimal(100 * 1024),0,BigDecimal.ROUND_CEILING).doubleValue());
// 如果值小于等于0则重置为1
listSize = listSize <= 0 ? 1 : listSize;
}catch (Exception e){
log.error(e.getMessage(),e);
}
return listSize;
}
@Override
public <T> void setSplitListByExpireGroup(List<String> keys, List<List<T>> groupList, long time) {
if(CollectionUtils.isEmpty(keys)){
return;
}
List<String> tempKeys = keys.stream().sorted().collect(Collectors.toList());
// 分组校验数据
String VALID_GROUP_KEY = "VALID_GROUP_SPLIT_" + "【"+Strings.join(tempKeys,',')+"】";
// 当前操作标识
String curFlag = UUID.randomUUID().toString();
// 等待和设置key占用情况
waitAndSetKeyUsed(VALID_GROUP_KEY,"setGroup",curFlag);
try{
Set<String> allKeys = new HashSet<>();
for (String key : keys) {
// 获取模拟匹配的keys
Set<String> curKeys = redisTemplate.keys("SPLIT_" + key + "*");
if(CollectionUtils.isNotEmpty(curKeys)){
allKeys.addAll(curKeys);
}
}
if(CollectionUtils.isNotEmpty(allKeys)){
// 删除模糊匹配key的全部缓存
redisTemplate.delete(allKeys);
}
// 设置分组缓存key失效
setExpire(VALID_GROUP_KEY,"【"+Strings.join(keys,',')+"】",time);
for (int m = 0; m < groupList.size(); m++) {
List<T> list = groupList.get(m);
// 计算需要拆分多少个List
int listSize = calculateListSize(list);
// 计算每个list的长度
int oneListSize = (int)Math.floor(new BigDecimal(list.size()).divide(new BigDecimal(listSize),0,BigDecimal.ROUND_CEILING).doubleValue());
// 开启批量命令执行
int finalM = m;
redisTemplate.executePipelined((RedisCallback<Object>) connection->{
RedisSerializer<String> serializer = getRedisSerializer();
// 遍历拆分后的集合
for (int i = 1; i <= listSize; i++) {
int startIndex = Math.min((i-1) * oneListSize, list.size());
int endIndex = Math.min(startIndex + oneListSize, list.size());
String singleKey = "SPLIT_" + keys.get(finalM) + "_" + i;
List<T> singleValue = new ArrayList<T>(list.subList(startIndex,endIndex));
byte[] singleKeyByte = serializer.serialize(singleKey);
byte[] values = OBJECT_SERIALIZER.serialize(singleValue);
if(singleKeyByte==null || values == null){
continue;
}
// 数据最大缓存12小时
connection.setEx(singleKeyByte, 12 * 60 * 60, values);
}
return null;
});
}
} catch (Exception e){
log.error(e.getMessage(),e);
} finally {
removeKeyUsed(VALID_GROUP_KEY,"setGroup",curFlag);
}
}
@Override
public <T> void setSplitListByExpire(String key,List<T> list,long time) {
key = "SPLIT_" + key;
String VALID_KEY = "VALID_" + key;
// 当前操作标识
String curFlag = UUID.randomUUID().toString();
// 等待和设置key占用情况
waitAndSetKeyUsed(VALID_KEY,"set",curFlag);
try {
// 获取模拟匹配的keys
Set<String> keys = redisTemplate.keys(key + "*");
if (CollectionUtils.isNotEmpty(keys)) {
// 删除模糊匹配key的全部缓存
redisTemplate.delete(keys);
}
// 设置缓存key失效
setExpire(VALID_KEY, key, time);
// 计算需要拆分多少个List
int listSize = calculateListSize(list);
// 计算每个list的长度
int oneListSize = (int) Math.floor(new BigDecimal(list.size()).divide(new BigDecimal(listSize), 0, BigDecimal.ROUND_CEILING).doubleValue());
// 开启批量命令执行
String finalKey = key;
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
// 遍历拆分后的集合
for (int i = 1; i < listSize; i++) {
int startIndex = Math.min((i - 1) * oneListSize, list.size());
int endIndex = Math.min(startIndex + oneListSize, list.size());
String singleKey = finalKey + "_" + i;
List<T> singleValue = new ArrayList<T>(list.subList(startIndex, endIndex));
byte[] singleKeyByte = serializer.serialize(singleKey);
byte[] values = OBJECT_SERIALIZER.serialize(singleValue);
if (singleKeyByte == null || values == null) {
continue;
}
connection.setEx(singleKeyByte, 12 * 60 * 60, values);
}
return null;
});
}catch (Exception e){
log.error(e.getMessage(),e);
} finally {
removeKeyUsed(VALID_KEY,"set",curFlag);
}
}
@Override
public <T> T getOrDef(String key, Function<String, ? extends T> function) {
T resultStr = get(key);
if (resultStr == null) {
T value = function.apply(key);
if (value != null) {
set(key, value);
}
return value;
}
return resultStr;
}
@Override
public <T> T getOrDefSetExpire(String key, Function<String, ? extends T> function, long time) {
T resultStr = get(key);
if (resultStr == null) {
T value = function.apply(key);
if (value != null) {
setExpire(key,value,time);
}
return value;
}
return resultStr;
}
/**
* 根据key获取对象
*
* @param keyPatten the key patten
* @return the keys values
*/
public Map<String, Object> getKeysValues(final String keyPatten) {
log.debug("[redisTemplate redis] getValues() patten={} ", keyPatten);
return redisTemplate.execute((RedisCallback<Map<String, Object>>) connection -> {
RedisSerializer<String> serializer = getRedisSerializer();
Map<String, Object> maps = new HashMap<>(16);
Set<String> keys = redisTemplate.keys(keyPatten + "*");
if (CollectionUtil.isNotEmpty(keys)) {
for (String key : keys) {
byte[] bKeys = serializer.serialize(key);
byte[] bValues = connection.get(bKeys);
Object value = OBJECT_SERIALIZER.deserialize(bValues);
maps.put(key, value);
}
}
return maps;
});
}
/**
* Ops for hash hash operations.
*
* @return the hash operations
*/
public <T> HashOperations<String, String, T> opsForHash() {
return redisTemplate.opsForHash();
}
/**
* 对HashMap操作
*
* @param key the key
* @param hashKey the hash key
* @param hashValue the hash value
*/
public void putHashValue(String key, String hashKey, Object hashValue) {
log.debug("[redisTemplate redis] putHashValue() key={},hashKey={},hashValue={} ", key, hashKey, hashValue);
opsForHash().put(key, hashKey, hashValue);
}
/**
* 获取单个field对应的值
*
* @param key the key
* @param hashKey the hash key
* @return the hash values
*/
public <T> T getHashValues(String key, String hashKey) {
log.debug("[redisTemplate redis] getHashValues() key={},hashKey={}", key, hashKey);
return (T) opsForHash().get(key, hashKey);
}
/**
* 根据key值删除
*
* @param key the key
* @param hashKeys the hash keys
*/
public void delHashValues(String key, Object... hashKeys) {
log.debug("[redisTemplate redis] delHashValues() key={}", key);
opsForHash().delete(key, hashKeys);
}
@Override
public void delHashValuesLikeKey(String key, String hashLikeKey) {
log.debug("[redisTemplate redis] delHashValuesLikeKey() key={}", key);
HashOperations<String, String, Object> operations = opsForHash();
Set<String> keys = operations.keys(key);
List<String> collect = keys.stream().filter(e -> StringUtils.startsWith(e, hashLikeKey)).collect(Collectors.toList());
if(CollectionUtil.isNotEmpty(collect)){
operations.delete(key,collect.toArray());
}
}
/**
* key只匹配map
*
* @param key the key
* @return the hash value
*/
public Map<String, Object> getHashValue(String key) {
log.debug("[redisTemplate redis] getHashValue() key={}", key);
return opsForHash().entries(key);
}
/**
* 批量添加
*
* @param key the key
* @param map the map
*/
public void putHashValues(String key, Map<String, Object> map) {
opsForHash().putAll(key, map);
}
/**
* 集合数量
*
* @return the long
*/
public long dbSize() {
return redisTemplate.execute(RedisServerCommands::dbSize);
}
/**
* 清空redis存储的数据
*
* @return the string
*/
@Override
public void flushDb() {
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.flushDb();
return "ok";
});
}
/**
* 判断某个主键是否存在
*
* @param key the key
* @return the boolean
*/
@Override
public boolean exists(final String key) {
return redisTemplate.execute((RedisCallback<Boolean>) connection -> connection.exists(key.getBytes(DEFAULT_CHARSET)));
}
/**
* 删除key
*
* @param keys the keys
* @return the long
*/
@Override
public long del(final String... keys) {
return redisTemplate.execute((RedisCallback<Long>) connection -> {
long result = 0;
for (String key : keys) {
result += connection.del(key.getBytes(DEFAULT_CHARSET));
}
return result;
});
}
/**
* 模糊删除key
* */
@Override
public long likeDel(String key){
Set<String> keys = redisTemplate.keys(key+":" + "*");
Long delete = redisTemplate.delete(keys);
return delete;
}
/**
* 获取 RedisSerializer
*
* @return the redis serializer
*/
protected RedisSerializer<String> getRedisSerializer() {
return redisTemplate.getStringSerializer();
}
/**
* 对某个主键对应的值加一,value值必须是全数字的字符串
*
* @param key the key
* @return the long
*/
public long incr(final String key) {
return redisTemplate.execute((RedisCallback<Long>) connection -> {
RedisSerializer<String> redisSerializer = getRedisSerializer();
return connection.incr(redisSerializer.serialize(key));
});
}
/**
* redis List 引擎
*
* @return the list operations
*/
@Override
public ListOperations<String, Object> opsForList() {
return redisTemplate.opsForList();
}
@Override
public SetOperations<String, Object> opsForSet() {
return redisTemplate.opsForSet();
}
/**
* redis List数据结构 : 将一个或多个值 value 插入到列表 key 的表头
*
* @param key the key
* @param value the value
* @return the long
*/
public Long leftPush(String key, Object value) {
return opsForList().leftPush(key, value);
}
/**
* redis List数据结构 : 移除并返回列表 key 的头元素
*
* @param key the key
* @return the string
*/
@Override
public Object leftPop(String key) {
return opsForList().leftPop(key);
}
/**
* redis List数据结构 :将一个或多个值 value 插入到列表 key 的表尾(最右边)。
*
* @param key the key
* @param value the value
* @return the long
*/
@Override
public Long rightPush(String key, Object value) {
return opsForList().rightPush(key, value);
}
/**
* redis List数据结构 : 移除并返回列表 key 的末尾元素
*
* @param key the key
* @return the string
*/
public Object rightPop(String key) {
return opsForList().rightPop(key);
}
/**
* redis List数据结构 : 返回列表 key 的长度 ; 如果 key 不存在,则 key 被解释为一个空列表,返回 0 ; 如果 key 不是列表类型,返回一个错误。
*
* @param key the key
* @return the long
*/
public Long length(String key) {
return opsForList().size(key);
}
/**
* redis List数据结构 : 根据参数 i 的值,移除列表中与参数 value 相等的元素
*
* @param key the key
* @param i the
* @param value the value
*/
public void remove(String key, long i, Object value) {
opsForList().remove(key, i, value);
}
/**
* redis List数据结构 : 将列表 key 下标为 index 的元素的值设置为 value
*
* @param key the key
* @param index the index
* @param value the value
*/
public void set(String key, long index, Object value) {
opsForList().set(key, index, value);
}
/**
* redis List数据结构 : 返回列表 key 中指定区间内的元素,区间以偏移量 [start 和 end] 指定。
*
* @param key the key
* @param start list的开始位置 (包含)
* @param end list的结束位置 (包含)
* @return the list
*/
public <T> List<T> getList(String key, int start, int end) {
return (List<T>) opsForList().range(key, start, end);
}
/**
* redis List数据结构 : 批量存储
*
* @param key the key
* @param list the list
* @return the long
*/
public <V> Long leftPushAll(String key, List<V> list) {
return opsForList().leftPushAll(key, list);
}
/**
* redis List数据结构 : 将值 value 插入到列表 key 当中,位于值 index 之前或之后,默认之后。
*
* @param key the key
* @param index the index
* @param value the value
*/
public void insert(String key, long index, Object value) {
opsForList().set(key, index, value);
}
@Override
public Boolean hasKey(String key) {
return redisTemplate.hasKey(key);
}
@Override
public <V> List<V> sinter(String key, List<V> list){
return (List<V>) opsForSet().intersect(key, (Collection<String>) list);
}
/**
* <p>
* 获取指定访问数据 并且删除这些数据
* </p>
* TODO ZhongYuXing 不具备原子性 需要改成 lua脚本
* */
public <T> Set<T> getScope(String key , Long min, Long max){
Set<T> s = (Set<T>) redisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
for (T t : s) {
redisTemplate.opsForZSet().remove(key,t);
}
return s;
}
public void addScope(String key , Object value, Long scope){
redisTemplate.opsForZSet().add(key, value, scope);
}
public <T> Set<T> getScope(String key , Long scope){
return (Set<T>)redisTemplate.opsForZSet().reverseRangeByScore(key, scope, scope);
}
public void delScope(String key , Object value){
redisTemplate.opsForZSet().remove(key, value);
}
@Override
public <T> List<T> getAllList(String key) {
return (List<T>) redisTemplate.opsForList().range(key,0,-1);
}
@Override
public Set<String> getAllKeyLikeKey(String key){
Set<String> sets =new HashSet<>();
String patternKey = key + "*";
ScanOptions options = ScanOptions.scanOptions()
//这里指定每次扫描key的数量(很多博客瞎说要指定Integer.MAX_VALUE,这样的话跟 keys有什么区别?)
.count(10000)
.match(patternKey).build();
RedisSerializer<String> redisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
Cursor cursor = redisTemplate.executeWithStickyConnection(redisConnection ->
new ConvertingCursor<>(redisConnection.scan(options), redisSerializer::deserialize));
while(cursor.hasNext()){
sets.add(StrHelper.getObjectValue(cursor.next()));
}
try{
//切记这里一定要关闭,否则会耗尽连接数。报Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisException: Could not get a
cursor.close();
}catch (Exception ex) {
System.out.println(ex.getMessage());
log.error(ex.getMessage());
}
return sets;
}
@Override
public <T> Set<T> getZSetByKey(String key) {
return (Set<T>) redisTemplate.opsForZSet().range(key,0,-1);
}
@Override
public long delLikeKey(String key) {
//获取以该key开头的keys
Set<String> keys = this.getAllKeyLikeKey(key);
redisTemplate.delete(keys);
return 0;
}
}

业务执行完毕
发表评论