网关的鉴权,动态限流,注册中心自动路由功能,是一个网关的最基本功能,让我们一起来学习。
前言         前面几章主要是对Spring Cloud Gateway源码的了解,本章则讲述常用的网关功能的实现。
鉴权实现         鉴权通过对称加密,用户传入一个token。如下图展示的是POST方式的,GET方式比较简单,直接在Path后面拼装?token=dd9e46d0eb1aced7即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public  class  AuthFilter  implements  GlobalFilter , Ordered      private  DesUtils dest;     @Autowired      private  AuthBean authBean;     private  static  final  String SPLIT_WORD = "," ;          @PostConstruct      private  void  init ()           try  {             LOGGER.info("AuthFilter初始化: token salt:{}" , authBean.getSalt());             dest = new  DesUtils(authBean.getSalt());             LOGGER.info("token={}" , dest.encrypt("1001" ));         } catch  (Exception e) {             throw  new  MfwSearchBusinessException("AuthFilter秘钥工具初始化失败:" , e.getMessage(), e);         }     }     @Override      public  Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain)           HttpMethod httpMethod = exchange.getRequest().getMethod();         LOGGER.debug("AuthFilter in.HTTP :{}" , httpMethod);         if  (HttpMethod.GET.equals(httpMethod)) {             String token = exchange.getRequest().getQueryParams().getFirst("token" );             if  (!checkToken(token)) {                 LOGGER.info("请求不通过鉴权,url:{}" , exchange.getRequest().getURI().toString());                                  exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);                 return  exchange.getResponse().setComplete();             }         } else  if  (HttpMethod.POST.equals(httpMethod)) {             Map<String, String> postBody = exchange.getAttribute(GatewayConstant.CACHE_POST_BODY);             LOGGER.debug("http request body:{}" , JSON.toJSONString(postBody));             if  (postBody == null ) {                 exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);                 return  exchange.getResponse().setComplete();             }             String token = postBody.get("token" );             if  (!checkToken(token)) {                 exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);                 return  exchange.getResponse().setComplete();             }         } else  {                          exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE);             return  exchange.getResponse().setComplete();         }                  return  chain.filter(exchange);     }          private  boolean  checkToken (String token)           if  (StringUtils.isBlank(token)) {             return  false ;         }         try  {             boolean  flag = false ;             String tokenStr = dest.decrypt(token);             String[] appIdArray = authBean.getAppId().split(SPLIT_WORD);             for  (String appId : appIdArray) {                 if  (tokenStr.equals(appId)) {                     flag = true ;                     break ;                 }             }             return  flag;         } catch  (Exception e) {                          LOGGER.error("token解析失败,默认通过校验token:{}" , token, e);         }         return  true ;     }     ... } 
动态限流功能         Spring Cloud Gateway中,限流功能是至关重要的,当有不正常的流量打过来,可以控制它的流速或者直接封杀调异常流量。限流熔断的中间件有很多,比如Hystrix、Sentinel,本文主要讲解Sentinel集成Apollo,实现动态的流量控制。首先需要加入依赖,Apollo对于Sentinel来说,是一个datasource,当然像Mysql,Redis等都可以当做Sentinel的数据源。
1 2 3 4 5 6 7 8 <dependency >     <groupId > com.alibaba.cloud</groupId >      <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId >  </dependency > <dependency >     <groupId > com.alibaba.csp</groupId >      <artifactId > sentinel-datasource-apollo</artifactId >  </dependency > 
        如下提供了一个包括总体限流、URL限流、用户限流的实现,需要在Apollo中配置限流规则。GATEWAY-LIMIT是总体限流的key,/search-bs/helloword是URL的限流规则,u_1001是用户限流规则。
[{"grade":1,"count":100,"resource":"GATEWAY-LIMIT"},{"grade":1,"count":5,"resource":"/search-bs/helloword"},{"grade":1,"count":20,"resource":"u_1001"}]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public  class  SentinelFilter  implements  GlobalFilter , Ordered           @PostConstruct      private  void  init ()                    String defaultFlowRules = "[]" ;         ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new  ApolloDataSource<>(                 GatewayConstant.Apollo.NAMESPACE_RULE,                 GatewayConstant.Sentinel.FLOW_RULE_KEY,                 defaultFlowRules,                 source -> JSON.parseObject(source, new  TypeReference<List<FlowRule>>() {         }));         FlowRuleManager.register2Property(flowRuleDataSource.getProperty());     }     @Override      public  Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain)          try  (Entry entry = SphU.entry(GatewayConstant.FlowRule.GATEWAY_LIMIT)) {             String uid = null ;             if  (exchange.getRequest().getMethod().equals(HttpMethod.POST)) {                 Map<String, String> bodyMap = (Map<String, String>)exchange.getAttributes().get(GatewayConstant.CACHE_POST_BODY);                 uid = bodyMap.get("uid" );             } else  if (exchange.getRequest().getMethod().equals(HttpMethod.GET)) {                 uid = exchange.getRequest().getQueryParams().getFirst("uid" );             }             String url = exchange.getRequest().getPath().toString();             System.out.println("url="  + url + ",uid="  + uid);                          if  (StringUtils.isNotBlank(uid)) {                 try  (Entry userEntry = SphU.entry("u_"  + uid)) {                 } catch  (BlockException ex) {                     LOGGER.error("触发了用户限流规则" );                     exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);                     return  exchange.getResponse().setComplete();                 }             }                          try  (Entry urlEntry = SphU.entry(url)) {             } catch  (BlockException ex) {                 LOGGER.error("触发了url限流规则" );                 exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);                 return  exchange.getResponse().setComplete();             }         } catch  (BlockException ex) {                                       LOGGER.error("触发了限流规则" );             exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);             return  exchange.getResponse().setComplete();         }     }     @Override      public  int  getOrder ()           return  GatewayConstant.FLOW_FILTER;     } } 
注册中心自动路由 ##实现原理
        Spring Cloud Gateway可以与注册中心进行整合,比如常见的zookeeper,Eureka,Nacos等。本文就以nacos举例,其他的注册中心整合类似。    
        首先,需要引入Nacos的依赖,它通过SpringBoot的autoConfiguration特性,自动集成到工程中。
1 2 3 4 <dependency >     <groupId > com.alibaba.cloud</groupId >      <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId >  </dependency > 
        最关键的代码是NacosWatch,它实现了ApplicationEventPublisherAware 和SmartLifecycle,拥有发布事件和监听Spring生命周期的能力。通过scheduleWithFixedDelay实现默认30s轮询的发送HeartbeatEvent事件对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 	@Bean  	@ConditionalOnMissingBean  	@ConditionalOnProperty (value = "spring.cloud.nacos.discovery.watch.enabled" , matchIfMissing = true ) 	public  NacosWatch nacosWatch (NacosDiscoveryProperties nacosDiscoveryProperties)   		return  new  NacosWatch(nacosDiscoveryProperties); } public  class  NacosWatch  implements  ApplicationEventPublisherAware , SmartLifecycle  	@Override  	public  void  stop (Runnable callback)   		this .stop(); 		callback.run(); 	} 	@Override  	public  void  start ()   		if  (this .running.compareAndSet(false , true )) { 			this .watchFuture = this .taskScheduler.scheduleWithFixedDelay( 					this ::nacosServicesWatch, this .properties.getWatchDelay()); 		} 	} 	@Override  	public  void  stop ()   		if  (this .running.compareAndSet(true , false ) && this .watchFuture != null ) { 			this .watchFuture.cancel(true ); 		} 	} 	public  void  nacosServicesWatch ()   		 		this .publisher.publishEvent( 				new  HeartbeatEvent(this , nacosWatchIndex.getAndIncrement())); 	} } 
        HeartbeatEvent事件发出之后,会被Spring Cloud Gateway的RouteRefreshListener监听到,最终发送一个RefreshRoutesEvent,这个事件会触发所有的路由刷新。总结为,注册中心默认30s同步一次Gateway路由 ,可以通过spring.cloud.nacos.watch-delay属性来配置轮询的时间间隔。HeartbeatMonitor这个类象征性的判定是否Heartbeat更新,只要value不为null且不和现有值相等,都会执行reset()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public  class  RouteRefreshListener  implements  ApplicationListener <ApplicationEvent >    private  HeartbeatMonitor monitor = new  HeartbeatMonitor();    @Override     public  void  onApplicationEvent (ApplicationEvent event)         ...       if  (event instanceof  HeartbeatEvent) {          HeartbeatEvent e = (HeartbeatEvent) event;          resetIfNeeded(e.getValue());       }    }    private  void  resetIfNeeded (Object value)         if  (this .monitor.update(value)) {          reset();       }    }    private  void  reset ()         this .publisher.publishEvent(new  RefreshRoutesEvent(this ));    } } public  class  HeartbeatMonitor  	private  AtomicReference<Object> latestHeartbeat = new  AtomicReference<>(); 	 	public  boolean  update (Object value)   		Object last = this .latestHeartbeat.get(); 		if  (value != null  && !value.equals(last)) { 			return  this .latestHeartbeat.compareAndSet(last, value); 		} 		return  false ; 	} } 
总结         总的来说,nacos这种实现方式,简单有效,但是也不太友好。因为路由是低频变化的,默认的30s刷新时间,会导致清空缓存,所有的路由都需要重新加载(非注册中心的路由也会重新加载)会对性能有一定的损耗,如果能判定nacos路由是否有变化,变化再更新就会最大限度降低缓存重建的问题。