一、背景
限流對于一個微服務架構系統來說具有非常重要的意義,否則其中的某個微服務將成為整個系統隱藏的雪崩因素,為什么這么說?
舉例來講,某個SAAS平臺有100多個微服務應用,但是作為底層的某個或某幾個應用來說,將會被所有上層應用頻繁調用,業務高峰期時,如果底層應用不做限流處理,該應用必將面臨著巨大的壓力,尤其是那些個別被高頻調用的接口來說,最直接的表現就是導致后續新進來的請求阻塞、排隊、響應超時...最后直到該服務所在JVM資源被耗盡。
二、限流概述
在大多數的微服務架構在設計之初,比如在技術選型階段,架構師會從一個全局的視角去規劃技術棧的組合,比如結合當前產品的現狀考慮是使用dubbo?還是springcloud?作為微服務治理的底層框架。甚至為了滿足快速的上線、迭代和交付,直接以springboot為基座進行開發,后續再引入新的技術棧等...
所以在談論某個業務場景具體的技術解決方案時不可一概而論,而是需要結合產品和業務的現狀綜合評估,以限流來說,在下面的不同的技術架構下具體在選擇的時候可能也不一樣。
2.1 dubbo 服務治理模式
選擇dubbo框架作為基礎服務治理對于那種偏向內部平臺的應用還是不錯的,dubbo底層走netty,這一點相比http協議來說,在一定場景下還是具有優勢的,如果選擇dubbo,在選擇限流方案上可以做如下的參考。
2.1.1 dubbo框架級限流
dubbo官方提供了完善的服務治理,能夠滿足大多數開發場景中的需求,針對限流這個場景,具體來說包括如下手段,具體的配置,可以參考官方手冊;
客戶端限流
信號量限流 (通過統計的方式)
連接數限流 (socket->tcp)
服務端限流
線程池限流 (隔離手段)
信號量限流 (非隔離手段)
接收數限流 (socket->tcp)
2.1.2 線程池設置
多線程并發操作一定離不開線程池,Dubbo自身提供了支持了四種線程池類型支持。生產者
2.1.3 集成第三方組件
如果是springboot框架的項目,可以考慮直接引入地方的組件或SDK,比如hystrix,guava,sentinel原生SDK等,如果技術實力足夠強甚至可以考慮自己造輪子。
2.2 springcloud 服務治理模式
如果你的服務治理框架選用的是springcloud或springcloud-alibaba,其框架自身的生態中已經包含了相應的限流組件,可以實現開箱即用,下面列舉幾種常用的基于springcloud框架的限流組件。
2.2.1 hystrix
Hystrix是Netflix開源的一款容錯框架,在springcloud早期推出市場的時候,作為springcloud生態中用于限流、熔斷、降級的一款組件。
Hystrix提供了限流功能,在springcloud架構的系統中,可以在網關啟用Hystrix,進行限流處理,每個微服務也可以各自啟用Hystrix進行限流。
Hystrix默認使用線程隔離模式,可以通過線程數+隊列大小進行限流,具體參數配置可以參考官網相關資料。
2.2.2 sentinel
Sentinel 號稱分布式系統的流量防衛兵,屬于springcloud-alibaba生態中的重要組件,面向分布式服務架構的流量控制組件,主要以流量為切入點,從限流、流量整形、熔斷降級、系統負載保護、熱點防護等多個維度來幫助開發者保障微服務的穩定性。
2.3 網關層限流
隨著微服務規模的增加,整個系統中很多微服務都需要實現限流這種需求時,就可以考慮在網關這一層進行限流了,通常來說,網關層的限流面向的是通用的業務,比如那些惡意的請求,爬蟲,攻擊等,簡單來說,網關層面的限流提供了一層對系統整體的保護措施。
三、常用限流策略
3.1 限流常用的算法
不管是哪種限流組件,其底層的限流實現算法大同小異,這里列舉幾種常用的限流算法以供了解。
3.1.1 令牌桶算法
令牌桶算法是目前應用最為廣泛的限流算法,顧名思義,它有以下兩個關鍵角色:
令牌 :獲取到令牌的Request才會被處理,其他Requests要么排隊要么被直接丟棄;
桶 :用來裝令牌的地方,所有Request都從這個桶里面獲取令牌
令牌桶主要涉及到2個過程,即令牌的生成,令牌的獲取
3.1.2 漏桶算法
漏桶算法的前半段和令牌桶類似,但是操作的對象不同,結合下圖進行理解。
令牌桶是將令牌放入桶里,而漏桶是將訪問請求的數據包放到桶里。同樣的是,如果桶滿了,那么后面新來的數據包將被丟棄。
3.1.3 滑動時間窗口
根據下圖,簡單描述下滑動時間窗口這種過程:
黑色大框為時間窗口,可以設定窗口時間單位為5秒,它會隨著時間推移向后滑動。我們將窗口內的時間劃分為五個小格子,每個格子代表1秒鐘,同時這個格子還包含一個計數器,用來計算在當前時間內訪問的請求數量。那么這個時間窗口內的總訪問量就是所有格子計數器累加后的數值;
比如說,我們在每一秒內有5個用戶訪問,第5秒內有10個用戶訪問,那么在0到5秒這個時間窗口內訪問量就是15。如果我們的接口設置了時間窗口內訪問上限是20,那么當時間到第六秒的時候,這個時間窗口內的計數總和就變成了10,因為1秒的格子已經退出了時間窗口,因此在第六秒內可以接收的訪問量就是20-10=10個;
滑動窗口其實也是一種計算器算法,它有一個顯著特點,當時間窗口的跨度越長時,限流效果就越平滑。打個比方,如果當前時間窗口只有兩秒,而訪問請求全部集中在第一秒的時候,當時間向后滑動一秒后,當前窗口的計數量將發生較大的變化,拉長時間窗口可以降低這種情況的發生概率
四、通用限流實現方案
拋開網關層的限流先不說,在微服務應用中,考慮到技術棧的組合,團隊人員的開發水平,以及易維護性等因素,一個比較通用的做法是,利用AOP技術+自定義注解實現對特定的方法或接口進行限流,下面基于這個思路來分別介紹下幾種常用的限流方案的實現。
4.1 基于guava限流實現
guava為谷歌開源的一個比較實用的組件,利用這個組件可以幫助開發人員完成常規的限流操作,接下來看具體的實現步驟。
4.1.1 引入guava依賴
版本可以選擇更高的或其他版本
com.google.guava guava 23.0
4.1.2 自定義限流注解
自定義一個限流用的注解,后面在需要限流的方法或接口上面只需添加該注解即可;
importjava.lang.annotation.ElementType; importjava.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; importjava.lang.annotation.Target; @Target(value=ElementType.METHOD) @Retention(value=RetentionPolicy.RUNTIME) public@interfaceRateConfigAnno{ StringlimitType(); doublelimitCount()default5d; }
4.1.3 限流AOP類
通過AOP前置通知的方式攔截添加了上述自定義限流注解的方法,解析注解中的屬性值,并以該屬性值作為guava提供的限流參數,該類為整個實現的核心所在。
importcom.alibaba.fastjson2.JSONObject; importcom.google.common.util.concurrent.RateLimiter; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Before; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.stereotype.Component; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.ServletOutputStream; importjavax.servlet.http.HttpServletResponse; importjava.io.IOException; importjava.lang.reflect.Method; importjava.util.Objects; @Aspect @Component publicclassGuavaLimitAop{ privatestaticLoggerlogger=LoggerFactory.getLogger(GuavaLimitAop.class); @Before("execution(@RateConfigAnno**(..))") publicvoidlimit(JoinPointjoinPoint){ //1、獲取當前的調用方法 MethodcurrentMethod=getCurrentMethod(joinPoint); if(Objects.isNull(currentMethod)){ return; } //2、從方法注解定義上獲取限流的類型 StringlimitType=currentMethod.getAnnotation(RateConfigAnno.class).limitType(); doublelimitCount=currentMethod.getAnnotation(RateConfigAnno.class).limitCount(); //使用guava的令牌桶算法獲取一個令牌,獲取不到先等待 RateLimiterrateLimiter=RateLimitHelper.getRateLimiter(limitType,limitCount); booleanb=rateLimiter.tryAcquire(); if(b){ System.out.println("獲取到令牌"); }else{ HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse(); JSONObjectjsonObject=newJSONObject(); jsonObject.put("success",false); jsonObject.put("msg","限流中"); try{ output(resp,jsonObject.toJSONString()); }catch(Exceptione){ logger.error("error,e:{}",e); } } } privateMethodgetCurrentMethod(JoinPointjoinPoint){ Method[]methods=joinPoint.getTarget().getClass().getMethods(); Methodtarget=null; for(Methodmethod:methods){ if(method.getName().equals(joinPoint.getSignature().getName())){ target=method; break; } } returntarget; } publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{ response.setContentType("application/json;charset=UTF-8"); ServletOutputStreamoutputStream=null; try{ outputStream=response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); }catch(IOExceptione){ e.printStackTrace(); }finally{ outputStream.flush(); outputStream.close(); } } }
其中限流的核心API即為RateLimiter這個對象,涉及到的RateLimitHelper類如下
importcom.google.common.util.concurrent.RateLimiter; importjava.util.HashMap; importjava.util.Map; publicclassRateLimitHelper{ privateRateLimitHelper(){} privatestaticMaprateMap=newHashMap<>(); publicstaticRateLimitergetRateLimiter(StringlimitType,doublelimitCount){ RateLimiterrateLimiter=rateMap.get(limitType); if(rateLimiter==null){ rateLimiter=RateLimiter.create(limitCount); rateMap.put(limitType,rateLimiter); } returnrateLimiter; } }
4.1.4 測試接口
下面添加一個測試接口,測試一下上面的代碼是否生效
@RestController publicclassOrderController{ //localhost:8081/save @GetMapping("/save") @RateConfigAnno(limitType="saveOrder",limitCount=1) publicStringsave(){ return"success"; } }
在接口中為了模擬出效果,我們將參數設置的非常小,即QPS為1,可以預想當每秒請求超過1時將會出現被限流的提示,啟動工程并驗證接口,每秒1次的請求,可以正常得到結果,效果如下:
快速刷接口,將會看到下面的效果
4.2 基于sentinel限流實現
在不少同學的意識中,sentinel通常是需要結合springcloud-alibaba框架一起實用的,而且與框架集成之后,可以配合控制臺一起使用達到更好的效果,實際上,sentinel官方也提供了相對原生的SDK可供使用,接下來就以這種方式進行整合。
4.2.1 引入sentinel核心依賴包
com.alibaba.csp sentinel-core 1.8.0
4.2.2 自定義限流注解
可以根據需要,添加更多的屬性
importjava.lang.annotation.ElementType; importjava.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; importjava.lang.annotation.Target; @Target(value=ElementType.METHOD) @Retention(value=RetentionPolicy.RUNTIME) public@interfaceSentinelLimitAnnotation{ StringresourceName(); intlimitCount()default5; }
4.2.3 自定義AOP類實現限流
該類的實現思路與上述使用guava類似,不同的是,這里使用的是sentinel原生的限流相關的API,對此不夠屬性的可以查閱官方的文檔進行學習,這里就不展開來說了。
importcom.alibaba.csp.sentinel.Entry; importcom.alibaba.csp.sentinel.SphU; importcom.alibaba.csp.sentinel.Tracer; importcom.alibaba.csp.sentinel.slots.block.BlockException; importcom.alibaba.csp.sentinel.slots.block.RuleConstant; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; importorg.apache.commons.lang3.StringUtils; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.springframework.stereotype.Component; importjava.lang.reflect.Method; importjava.util.ArrayList; importjava.util.List; importjava.util.Objects; @Aspect @Component publicclassSentinelMethodLimitAop{ privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){ Listrules=newArrayList<>(); FlowRulerule=newFlowRule(); //設置受保護的資源 rule.setResource(resourceName); //設置流控規則QPS rule.setGrade(RuleConstant.FLOW_GRADE_QPS); //設置受保護的資源閾值 rule.setCount(limitCount); rules.add(rule); //加載配置好的規則 FlowRuleManager.loadRules(rules); } @Pointcut(value="@annotation(com.congge.sentinel.SentinelLimitAnnotation)") publicvoidrateLimit(){ } @Around("rateLimit()") publicObjectaround(ProceedingJoinPointjoinPoint){ //1、獲取當前的調用方法 MethodcurrentMethod=getCurrentMethod(joinPoint); if(Objects.isNull(currentMethod)){ returnnull; } //2、從方法注解定義上獲取限流的類型 StringresourceName=currentMethod.getAnnotation(SentinelLimitAnnotation.class).resourceName(); if(StringUtils.isEmpty(resourceName)){ thrownewRuntimeException("資源名稱為空"); } intlimitCount=currentMethod.getAnnotation(SentinelLimitAnnotation.class).limitCount(); initFlowRule(resourceName,limitCount); Entryentry=null; Objectresult=null; try{ entry=SphU.entry(resourceName); try{ result=joinPoint.proceed(); }catch(Throwablethrowable){ throwable.printStackTrace(); } }catch(BlockExceptionex){ //資源訪問阻止,被限流或被降級 //在此處進行相應的處理操作 System.out.println("blocked"); return"被限流了"; }catch(Exceptione){ Tracer.traceEntry(e,entry); }finally{ if(entry!=null){ entry.exit(); } } returnresult; } privateMethodgetCurrentMethod(JoinPointjoinPoint){ Method[]methods=joinPoint.getTarget().getClass().getMethods(); Methodtarget=null; for(Methodmethod:methods){ if(method.getName().equals(joinPoint.getSignature().getName())){ target=method; break; } } returntarget; } }
4.2.4 自定義測試接口
為了模擬效果,這里將QPS的數量設置為1
//localhost:8081/limit @GetMapping("/limit") @SentinelLimitAnnotation(limitCount=1,resourceName="sentinelLimit") publicStringsentinelLimit(){ return"sentinelLimit"; }
啟動工程之后,瀏覽器調用接口測試一下,每秒一個請求,可以正常通過
快速刷接口,超過每秒1次時,效果如下
這里只是為了演示出效果,建議在真實的項目中使用時,對返回結果做一個封裝。
4.3 基于redis+lua限流實現
redis是線程安全的,天然具有線程安全的特性,支持原子性操作,限流服務不僅需要承接超高QPS,還要保證限流邏輯的執行層面具備線程安全的特性,利用Redis這些特性做限流,既能保證線程安全,也能保證性能?;趓edis的限流實現完整流程如下圖:
結合上面的流程圖,這里梳理出一個整體的實現思路:
編寫lua腳本,指定入參的限流規則,比如對特定的接口限流時,可以根據某個或幾個參數進行判定,調用該接口的請求,在一定的時間窗口內監控請求次數;
既然是限流,最好能夠通用,可將限流規則應用到任何接口上,那么最合適的方式就是通過自定義注解形式切入;
提供一個配置類,被spring的容器管理,redisTemplate中提供了DefaultRedisScript這個bean;
提供一個能動態解析接口參數的類,根據接口參數進行規則匹配后觸發限流;
4.3.1 引入redis依賴
org.springframework.boot spring-boot-starter-data-redis
4.3.2 自定義注解
@Target({ElementType.METHOD,ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public@interfaceRedisLimitAnnotation{ /** *key */ Stringkey()default""; /** *Key的前綴 */ Stringprefix()default""; /** *一定時間內最多訪問次數 */ intcount(); /** *給定的時間范圍單位(秒) */ intperiod(); /** *限流的類型(用戶自定義key或者請求ip) */ LimitTypelimitType()defaultLimitType.CUSTOMER; }
4.3.3 自定義redis配置類
importorg.springframework.context.annotation.Bean; importorg.springframework.core.io.ClassPathResource; importorg.springframework.data.redis.connection.RedisConnectionFactory; importorg.springframework.data.redis.core.RedisTemplate; importorg.springframework.data.redis.core.script.DefaultRedisScript; importorg.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; importorg.springframework.data.redis.serializer.StringRedisSerializer; importorg.springframework.scripting.support.ResourceScriptSource; importorg.springframework.stereotype.Component; importjava.io.Serializable; @Component publicclassRedisConfiguration{ @Bean publicDefaultRedisScriptredisluaScript(){ DefaultRedisScript redisScript=newDefaultRedisScript<>(); redisScript.setScriptSource(newResourceScriptSource(newClassPathResource("limit.lua"))); redisScript.setResultType(Number.class); returnredisScript; } @Bean("redisTemplate") publicRedisTemplate redisTemplate(RedisConnectionFactoryredisConnectionFactory){ RedisTemplate redisTemplate=newRedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializerjackson2JsonRedisSerializer=newJackson2JsonRedisSerializer(Object.class); //設置value的序列化方式為JSOn redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); //設置key的序列化方式為String redisTemplate.setKeySerializer(newStringRedisSerializer()); redisTemplate.setHashKeySerializer(newStringRedisSerializer()); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); returnredisTemplate; } }
4.3.4 自定義限流AOP類
importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.aspectj.lang.reflect.MethodSignature; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.context.annotation.Configuration; importorg.springframework.data.redis.core.RedisTemplate; importorg.springframework.data.redis.core.script.DefaultRedisScript; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.http.HttpServletRequest; importjava.io.Serializable; importjava.lang.reflect.Method; importjava.util.Collections; importjava.util.List; @Aspect @Configuration publicclassLimitRestAspect{ privatestaticfinalLoggerlogger=LoggerFactory.getLogger(LimitRestAspect.class); @Autowired privateRedisTemplateredisTemplate; @Autowired privateDefaultRedisScript redisluaScript; @Pointcut(value="@annotation(com.congge.config.limit.RedisLimitAnnotation)") publicvoidrateLimit(){ } @Around("rateLimit()") publicObjectinterceptor(ProceedingJoinPointjoinPoint)throwsThrowable{ MethodSignaturesignature=(MethodSignature)joinPoint.getSignature(); Methodmethod=signature.getMethod(); Class>targetClass=method.getDeclaringClass(); RedisLimitAnnotationrateLimit=method.getAnnotation(RedisLimitAnnotation.class); if(rateLimit!=null){ HttpServletRequestrequest=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getRequest(); StringipAddress=getIpAddr(request); StringBufferstringBuffer=newStringBuffer(); stringBuffer.append(ipAddress).append("-") .append(targetClass.getName()).append("-") .append(method.getName()).append("-") .append(rateLimit.key()); List keys=Collections.singletonList(stringBuffer.toString()); //調用lua腳本,獲取返回結果,這里即為請求的次數 Numbernumber=redisTemplate.execute( redisluaScript, keys, rateLimit.count(), rateLimit.period() ); if(number!=null&&number.intValue()!=0&&number.intValue()<=?rateLimit.count())?{ ????????????????logger.info("限流時間段內訪問了第:{}?次",?number.toString()); ????????????????return?joinPoint.proceed(); ????????????} ????????}?else?{ ????????????return?joinPoint.proceed(); ????????} ????????throw?new?RuntimeException("訪問頻率過快,被限流了"); ????} ? ????/** ?????*?獲取請求的IP方法 ?????*?@param?request ?????*?@return ?????*/ ????private?static?String?getIpAddr(HttpServletRequest?request)?{ ????????String?ipAddress?=?null; ????????try?{ ????????????ipAddress?=?request.getHeader("x-forwarded-for"); ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getHeader("Proxy-Client-IP"); ????????????} ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getHeader("WL-Proxy-Client-IP"); ????????????} ????????????if?(ipAddress?==?null?||?ipAddress.length()?==?0?||?"unknown".equalsIgnoreCase(ipAddress))?{ ????????????????ipAddress?=?request.getRemoteAddr(); ????????????} ????????????//?對于通過多個代理的情況,第一個IP為客戶端真實IP,多個IP按照','分割 ????????????if?(ipAddress?!=?null?&&?ipAddress.length()?>15){ if(ipAddress.indexOf(",")>0){ ipAddress=ipAddress.substring(0,ipAddress.indexOf(",")); } } }catch(Exceptione){ ipAddress=""; } returnipAddress; } }
該類要做的事情和上面的兩種限流措施類似,不過在這里核心的限流是通過讀取lua腳步,通過參數傳遞給lua腳步實現的。
4.3.5 自定義lua腳本
在工程的resources目錄下,添加如下的lua腳本
localkey="rate.limit:"..KEYS[1] locallimit=tonumber(ARGV[1]) localcurrent=tonumber(redis.call('get',key)or"0") ifcurrent+1>limitthen return0 else --沒有超閾值,將當前訪問數量+1,并設置2秒過期(可根據自己的業務情況調整) redis.call("INCRBY",key,"1") redis.call("expire",key,"2") returncurrent+1 end
4.3.6 添加測試接口
@RestController publicclassRedisController{ //localhost:8081/redis/limit @GetMapping("/redis/limit") @RedisLimitAnnotation(key="queryFromRedis",period=1,count=1) publicStringqueryFromRedis(){ return"success"; } }
為了模擬效果,這里將QPS設置為1 ,啟動工程后(提前啟動redis服務),調用一下接口,正常的效果如下:
快速刷接口,超過每秒1次的請求時看到如下效果
五、自定義starter限流實現
上面通過案例介紹了幾種常用的限流實現,不過細心的同學可以看到,這些限流的實現都是在具體的工程模塊中嵌入的,事實上,在真實的微服務開發中,一個項目可能包含了眾多的微服務模塊,為了減少重復造輪子,避免每個微服務模塊中單獨實現,可以考慮將限流的邏輯實現封裝成一個SDK,即作為一個springboot的starter的方式被其他微服務模塊進行引用即可。這也是目前很多生產實踐中比較通用的做法,接下來看看具體的實現吧。
5.1 前置準備
創建一個空的springboot工程,工程目錄結構如下圖,目錄說明:
annotation:存放自定義的限流相關的注解;
aop:存放不同的限流實現,比如基于guava的aop,基于sentinel的aop實現等;
spring.factories:自定義待裝配的aop實現類;
5.2 代碼整合完成步驟
5.2.1 導入基礎的依賴
這里包括如下幾個必須的依賴,其他的依賴可以結合自身的情況合理選擇;
spring-boot-starter;
guava;
spring-boot-autoconfigure;
sentinel-core;
org.springframework.boot spring-boot-starter-parent 2.2.1.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-aop log4j log4j 1.2.17 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.projectlombok lombok com.google.guava guava 23.0 org.springframework.boot spring-boot-autoconfigure 2.2.1.RELEASE org.springframework.boot spring-boot-configuration-processor 2.2.1.RELEASE com.alibaba.csp sentinel-core 1.8.0 org.apache.commons commons-lang3 3.4 com.alibaba.fastjson2 fastjson2 2.0.22 src/main/resources **/**
5.2.2 自定義注解
目前該SDK支持三種限流方式,即后續其他微服務工程中可以通過添加這3種注解即可實現限流,分別是基于guava的令牌桶,基于sentinel的限流,基于java自帶的Semaphore限流,三個自定義注解類如下:
令牌桶
@Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public@interfaceTokenBucketLimiter{ intvalue()default50; }
Semaphore
@Documented @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public@interfaceShLimiter{ intvalue()default50; }
sentinel
@Target(value=ElementType.METHOD) @Retention(value=RetentionPolicy.RUNTIME) public@interfaceSentinelLimiter{ StringresourceName(); intlimitCount()default50; }
5.2.3 限流實現AOP類
具體的限流在AOP中進行實現,思路和上一章節類似,即通過環繞通知的方式,先解析那些添加了限流注解的方法,然后解析里面的參數,進行限流的業務實現。
基于guava的aop實現
importcom.alibaba.fastjson2.JSONObject; importcom.congge.annotation.TokenBucketLimiter; importcom.google.common.util.concurrent.RateLimiter; importlombok.extern.slf4j.Slf4j; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.springframework.cglib.core.ReflectUtils; importorg.springframework.stereotype.Component; importorg.springframework.web.context.request.RequestContextHolder; importorg.springframework.web.context.request.ServletRequestAttributes; importjavax.servlet.ServletOutputStream; importjavax.servlet.http.HttpServletResponse; importjava.io.IOException; importjava.lang.reflect.Method; importjava.util.Arrays; importjava.util.Map; importjava.util.concurrent.ConcurrentHashMap; @Aspect @Component @Slf4j publicclassGuavaLimiterAop{ privatefinalMaprateLimiters=newConcurrentHashMap (); @Pointcut("@annotation(com.congge.annotation.TokenBucketLimiter)") publicvoidaspect(){ } @Around(value="aspect()") publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{ log.debug("準備限流"); Objecttarget=point.getTarget(); StringtargetName=target.getClass().getName(); StringmethodName=point.getSignature().getName(); Object[]arguments=point.getArgs(); Class>targetClass=Class.forName(targetName); Class>[]argTypes=ReflectUtils.getClasses(arguments); Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes); //獲取目標method上的限流注解@Limiter TokenBucketLimiterlimiter=method.getAnnotation(TokenBucketLimiter.class); RateLimiterrateLimiter=null; Objectresult=null; if(null!=limiter){ //以class+method+parameters為key,避免重載、重寫帶來的混亂 Stringkey=targetName+"."+methodName+Arrays.toString(argTypes); rateLimiter=rateLimiters.get(key); if(null==rateLimiter){ //獲取限定的流量 //為了防止并發 rateLimiters.putIfAbsent(key,RateLimiter.create(limiter.value())); rateLimiter=rateLimiters.get(key); } booleanb=rateLimiter.tryAcquire(); if(b){ log.debug("得到令牌,準備執行業務"); result=point.proceed(); }else{ HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse(); JSONObjectjsonObject=newJSONObject(); jsonObject.put("success",false); jsonObject.put("msg","限流中"); try{ output(resp,jsonObject.toJSONString()); }catch(Exceptione){ log.error("error,e:{}",e); } } }else{ result=point.proceed(); } log.debug("退出限流"); returnresult; } publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{ response.setContentType("application/json;charset=UTF-8"); ServletOutputStreamoutputStream=null; try{ outputStream=response.getOutputStream(); outputStream.write(msg.getBytes("UTF-8")); }catch(IOExceptione){ e.printStackTrace(); }finally{ outputStream.flush(); outputStream.close(); } } }
基于Semaphore的aop實現
importcom.congge.annotation.ShLimiter; importlombok.extern.slf4j.Slf4j; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.cglib.core.ReflectUtils; importorg.springframework.stereotype.Component; importjava.lang.reflect.Method; importjava.util.Arrays; importjava.util.Map; importjava.util.concurrent.ConcurrentHashMap; importjava.util.concurrent.Semaphore; @Aspect @Component @Slf4j publicclassSemaphoreLimiterAop{ privatefinalMapsemaphores=newConcurrentHashMap (); privatefinalstaticLoggerLOG=LoggerFactory.getLogger(SemaphoreLimiterAop.class); @Pointcut("@annotation(com.congge.annotation.ShLimiter)") publicvoidaspect(){ } @Around(value="aspect()") publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{ log.debug("進入限流aop"); Objecttarget=point.getTarget(); StringtargetName=target.getClass().getName(); StringmethodName=point.getSignature().getName(); Object[]arguments=point.getArgs(); Class>targetClass=Class.forName(targetName); Class>[]argTypes=ReflectUtils.getClasses(arguments); Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes); //獲取目標method上的限流注解@Limiter ShLimiterlimiter=method.getAnnotation(ShLimiter.class); Objectresult=null; if(null!=limiter){ //以class+method+parameters為key,避免重載、重寫帶來的混亂 Stringkey=targetName+"."+methodName+Arrays.toString(argTypes); //獲取限定的流量 Semaphoresemaphore=semaphores.get(key); if(null==semaphore){ semaphores.putIfAbsent(key,newSemaphore(limiter.value())); semaphore=semaphores.get(key); } try{ semaphore.acquire(); result=point.proceed(); }finally{ if(null!=semaphore){ semaphore.release(); } } }else{ result=point.proceed(); } log.debug("退出限流"); returnresult; } }
基于sentinel的aop實現
importcom.alibaba.csp.sentinel.Entry; importcom.alibaba.csp.sentinel.SphU; importcom.alibaba.csp.sentinel.Tracer; importcom.alibaba.csp.sentinel.slots.block.BlockException; importcom.alibaba.csp.sentinel.slots.block.RuleConstant; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule; importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; importcom.congge.annotation.SentinelLimiter; importorg.apache.commons.lang3.StringUtils; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.springframework.stereotype.Component; importjava.lang.reflect.Method; importjava.util.ArrayList; importjava.util.List; importjava.util.Objects; @Aspect @Component publicclassSentinelLimiterAop{ privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){ Listrules=newArrayList<>(); FlowRulerule=newFlowRule(); //設置受保護的資源 rule.setResource(resourceName); //設置流控規則QPS rule.setGrade(RuleConstant.FLOW_GRADE_QPS); //設置受保護的資源閾值 rule.setCount(limitCount); rules.add(rule); //加載配置好的規則 FlowRuleManager.loadRules(rules); } @Pointcut(value="@annotation(com.congge.annotation.SentinelLimiter)") publicvoidrateLimit(){ } @Around("rateLimit()") publicObjectaround(ProceedingJoinPointjoinPoint){ //1、獲取當前的調用方法 MethodcurrentMethod=getCurrentMethod(joinPoint); if(Objects.isNull(currentMethod)){ returnnull; } //2、從方法注解定義上獲取限流的類型 StringresourceName=currentMethod.getAnnotation(SentinelLimiter.class).resourceName(); if(StringUtils.isEmpty(resourceName)){ thrownewRuntimeException("資源名稱為空"); } intlimitCount=currentMethod.getAnnotation(SentinelLimiter.class).limitCount(); initFlowRule(resourceName,limitCount); Entryentry=null; Objectresult=null; try{ entry=SphU.entry(resourceName); try{ result=joinPoint.proceed(); }catch(Throwablethrowable){ throwable.printStackTrace(); } }catch(BlockExceptionex){ //資源訪問阻止,被限流或被降級 //在此處進行相應的處理操作 System.out.println("blocked"); return"被限流了"; }catch(Exceptione){ Tracer.traceEntry(e,entry); }finally{ if(entry!=null){ entry.exit(); } } returnresult; } privateMethodgetCurrentMethod(JoinPointjoinPoint){ Method[]methods=joinPoint.getTarget().getClass().getMethods(); Methodtarget=null; for(Methodmethod:methods){ if(method.getName().equals(joinPoint.getSignature().getName())){ target=method; break; } } returntarget; } }
5.2.4 配置自動裝配AOP實現
在resources目錄下創建上述的spring.factories文件,內容如下,通過這種方式配置后,其他應用模塊引入了當前的SDK的jar之后,就可以實現開箱即用了;
org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.congge.aop.SemaphoreLimiterAop, com.congge.aop.GuavaLimiterAop, com.congge.aop.SemaphoreLimiterAop
5.2.5 將工程打成jar進行安裝
這一步比較簡單就跳過了
5.2.6 在其他的工程中引入上述SDK
cm.congge biz-limit 1.0-SNAPSHOT
5.2.7 編寫測試接口
在其他工程中,編寫一個測試接口,并使用上面的注解,這里以guava的限流注解為例進行說明
importcom.congge.annotation.TokenBucketLimiter; importorg.springframework.web.bind.annotation.GetMapping; importorg.springframework.web.bind.annotation.RestController; @RestController publicclassSdkController{ //localhost:8081/query @GetMapping("/query") @TokenBucketLimiter(1) publicStringqueryUser(){ return"queryUser"; } }
5.2.8 功能測試
啟動當前的工程后,正常調用接口,每秒一次的請求,可以正常得到結果
快速刷接口,QPS超過1之后,將會觸發限流,看到如下效果
通過上面這種方式,也可以得到預期的效果,其他兩種限流注解有興趣的同學也可以繼續測試驗證,篇幅原因就不再贅述了。
上述通過starter的方式實現了一種更優雅的限流集成方式,也是生產中比較推薦的一種方式,不過當前的案例還比較粗糙,需要使用的同學還需根據自己的情況完善里面的邏輯,進一步的封裝以期得到更好的效果。
審核編輯:劉清
-
接收機
+關注
關注
8文章
1180瀏覽量
53459 -
計數器
+關注
關注
32文章
2256瀏覽量
94485 -
JVM
+關注
關注
0文章
158瀏覽量
12220 -
QPS
+關注
關注
0文章
24瀏覽量
8800 -
負載保護器
+關注
關注
0文章
4瀏覽量
5426 -
SpringBoot
+關注
關注
0文章
173瀏覽量
177
原文標題:SpringBoot 通用限流方案(VIP珍藏版)
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論