title: redis限制请求频率及资源隔离 tags:
- redis
- dataSource
- aop
- limiter
- Spring categories: redis date: 2017-08-04 18:18:53
背景
由于导入及导出服务的使用,可能过多占用业务系统的请求。
为此在db层次做了切分(资源隔离)使用不同的db连接池。
同时针对导入导出服务增加请求频率限制,避免占用过多资源
解决方案
db连接资源分离
比较简单的利用spring做dataSource路由
/** * Created by qixiaobo on 2016/12/2. */ public class DataSourceRouter extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return WxbStatic.getDataSourceRouting(); } } import com.air.tqb.annoate.DataSource; import com.air.tqb.annoate.DataSourceType; import com.air.tqb.utils.WxbStatic; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; /** * Created by qixiaobo on 2016/12/2. */ @Aspect @Component @Order(Ordered.HIGHEST_PRECEDENCE + 2) public class DataSourceRoutingAspect { private static final Log logger = LogFactory.getLog(DataSourceRoutingAspect.class); @Around("execution(public * com.air.tqb.service..*.*(..)) && @annotation(com.air.tqb.annoate.DataSource) && @annotation(dataSource)") public Object setDataSource(ProceedingJoinPoint joinPoint, DataSource dataSource) throws Throwable { DataSourceType dataSourceType = dataSource.value(); try { WxbStatic.setDataSourceRouting(dataSourceType.getValue()); if (logger.isDebugEnabled()) { logger.debug("DataSourceType[" + dataSourceType.getValue() + "] set."); } return joinPoint.proceed(); } finally { WxbStatic.clearDataSourceRouting(); if (logger.isDebugEnabled()) { logger.debug("DataSourceType[" + dataSourceType.getValue() + "] remove."); } } } @Around("execution(public * com.air.tqb.service.report..*.*(..))") public Object setDataSource(ProceedingJoinPoint joinPoint) throws Throwable { DataSourceType dataSourceType = DataSourceType.SLOW; try { WxbStatic.setDataSourceRouting(dataSourceType.getValue()); if (logger.isDebugEnabled()) { logger.debug("report DataSourceType[" + dataSourceType.getValue() + "] set."); } return joinPoint.proceed(); } finally { WxbStatic.clearDataSourceRouting(); if (logger.isDebugEnabled()) { logger.debug("report DataSourceType[" + dataSourceType.getValue() + "] remove."); } } } }复制代码
这样我们可以吧report和正常请求区分开来。使用不同的db连接
/** * Created by qixiaobo on 2016/12/2. */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface DataSource { DataSourceType value() default DataSourceType.NORMAL; } package com.air.tqb.annoate; public enum DataSourceType { NORMAL("normal"), SLOW("slow"), REPORT("report"); DataSourceType(String value) { this.value = value; } private String value; public String getValue() { return value; } }复制代码
在不同的方法上加上指定的db标签可以完成db的选择。从而完成资源隔离。
方法调用限制
对于指定的占用资源方法采用开发按照key做配置,对于指定次数的限制
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface OperationLimit { int value() default 5; String key() default "import"; }复制代码
package com.air.tqb.aop; import com.air.tqb.Exception.LimitOperationException; import com.air.tqb.annoate.OperationLimit; import com.google.common.base.Throwables; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Aspect @Component @Order(Ordered.HIGHEST_PRECEDENCE + 4) public class OperationLimitAspect { @Autowired @Qualifier(value = "stringRedisTemplate") private StringRedisTemplate template; private final static String LIMIT_KEY_PREFIX = "limit:"; @Around("@annotation(com.air.tqb.annoate.OperationLimit) && @annotation(operationLimit)") public Object operationLimit(ProceedingJoinPoint joinPoint, OperationLimit operationLimit) { ValueOperationsstringStringValueOperations = template.opsForValue(); final String key = getLimitKey(operationLimit); Long incremented = stringStringValueOperations.increment(key, 1); //暂时不考虑事务了,简单策略过期即可 stringStringValueOperations.getOperations().expire(key, 180, TimeUnit.SECONDS); boolean limitReached = checkLimit(incremented, operationLimit); if (limitReached) { stringStringValueOperations.increment(key, -1); throw new LimitOperationException("当前操作" + operationLimit.key() + "已经超过最大操作数" + operationLimit.value() + "限制,请稍后再试!"); } else { try { return joinPoint.proceed(); } catch (Throwable throwable) { return Throwables.propagate(throwable); } finally { stringStringValueOperations.increment(key, -1); } } } private boolean checkLimit(Long incremented, OperationLimit operationLimit) { if (operationLimit.value() < 0 || incremented <= operationLimit.value()) { return false; } return true; } private String getLimitKey(OperationLimit operationLimit) { return LIMIT_KEY_PREFIX + operationLimit.key(); } }复制代码
这个数据是更新了的
对于返回为空可以参考
public Long incrBy(byte[] key, long value) { try { if (isPipelined()) { pipeline(new JedisResult(pipeline.incrBy(key, value))); return null; } if (isQueueing()) { transaction(new JedisResult(transaction.incrBy(key, value))); return null; } return jedis.incrBy(key, value); } catch (Exception ex) { throw convertJedisAccessException(ex); } }复制代码