网关的鉴权,动态限流,注册中心自动路由功能,是一个网关的最基本功能,让我们一起来学习。
前言 前面几章主要是对Spring Cloud Gateway源码的了解,本章则讲述常用的网关功能的实现。
鉴权实现 鉴权通过对称加密,用户传入一个token。如下图展示的是POST方式的,GET方式比较简单,直接在Path后面拼装?token=dd9e46d0eb1aced7
即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class AuthFilter implements GlobalFilter , Ordered { private DesUtils dest; @Autowired private AuthBean authBean; private static final String SPLIT_WORD = "," ; @PostConstruct private void init () { try { LOGGER.info("AuthFilter初始化: token salt:{}" , authBean.getSalt()); dest = new DesUtils(authBean.getSalt()); LOGGER.info("token={}" , dest.encrypt("1001" )); } catch (Exception e) { throw new MfwSearchBusinessException("AuthFilter秘钥工具初始化失败:" , e.getMessage(), e); } } @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { HttpMethod httpMethod = exchange.getRequest().getMethod(); LOGGER.debug("AuthFilter in.HTTP :{}" , httpMethod); if (HttpMethod.GET.equals(httpMethod)) { String token = exchange.getRequest().getQueryParams().getFirst("token" ); if (!checkToken(token)) { LOGGER.info("请求不通过鉴权,url:{}" , exchange.getRequest().getURI().toString()); exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); return exchange.getResponse().setComplete(); } } else if (HttpMethod.POST.equals(httpMethod)) { Map<String, String> postBody = exchange.getAttribute(GatewayConstant.CACHE_POST_BODY); LOGGER.debug("http request body:{}" , JSON.toJSONString(postBody)); if (postBody == null ) { exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); return exchange.getResponse().setComplete(); } String token = postBody.get("token" ); if (!checkToken(token)) { exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED); return exchange.getResponse().setComplete(); } } else { exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE); return exchange.getResponse().setComplete(); } return chain.filter(exchange); } private boolean checkToken (String token) { if (StringUtils.isBlank(token)) { return false ; } try { boolean flag = false ; String tokenStr = dest.decrypt(token); String[] appIdArray = authBean.getAppId().split(SPLIT_WORD); for (String appId : appIdArray) { if (tokenStr.equals(appId)) { flag = true ; break ; } } return flag; } catch (Exception e) { LOGGER.error("token解析失败,默认通过校验token:{}" , token, e); } return true ; } ... }
动态限流功能 Spring Cloud Gateway中,限流功能是至关重要的,当有不正常的流量打过来,可以控制它的流速或者直接封杀调异常流量。限流熔断的中间件有很多,比如Hystrix、Sentinel,本文主要讲解Sentinel集成Apollo,实现动态的流量控制。首先需要加入依赖,Apollo对于Sentinel来说,是一个datasource
,当然像Mysql,Redis等都可以当做Sentinel的数据源。
1 2 3 4 5 6 7 8 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId > </dependency > <dependency > <groupId > com.alibaba.csp</groupId > <artifactId > sentinel-datasource-apollo</artifactId > </dependency >
如下提供了一个包括总体限流、URL限流、用户限流的实现,需要在Apollo中配置限流规则。GATEWAY-LIMIT
是总体限流的key,/search-bs/helloword
是URL的限流规则,u_1001
是用户限流规则。
[{"grade":1,"count":100,"resource":"GATEWAY-LIMIT"},{"grade":1,"count":5,"resource":"/search-bs/helloword"},{"grade":1,"count":20,"resource":"u_1001"}]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class SentinelFilter implements GlobalFilter , Ordered { @PostConstruct private void init () { String defaultFlowRules = "[]" ; ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ApolloDataSource<>( GatewayConstant.Apollo.NAMESPACE_RULE, GatewayConstant.Sentinel.FLOW_RULE_KEY, defaultFlowRules, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() { })); FlowRuleManager.register2Property(flowRuleDataSource.getProperty()); } @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { try (Entry entry = SphU.entry(GatewayConstant.FlowRule.GATEWAY_LIMIT)) { String uid = null ; if (exchange.getRequest().getMethod().equals(HttpMethod.POST)) { Map<String, String> bodyMap = (Map<String, String>)exchange.getAttributes().get(GatewayConstant.CACHE_POST_BODY); uid = bodyMap.get("uid" ); } else if (exchange.getRequest().getMethod().equals(HttpMethod.GET)) { uid = exchange.getRequest().getQueryParams().getFirst("uid" ); } String url = exchange.getRequest().getPath().toString(); System.out.println("url=" + url + ",uid=" + uid); if (StringUtils.isNotBlank(uid)) { try (Entry userEntry = SphU.entry("u_" + uid)) { } catch (BlockException ex) { LOGGER.error("触发了用户限流规则" ); exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } try (Entry urlEntry = SphU.entry(url)) { } catch (BlockException ex) { LOGGER.error("触发了url限流规则" ); exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } catch (BlockException ex) { LOGGER.error("触发了限流规则" ); exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder () { return GatewayConstant.FLOW_FILTER; } }
注册中心自动路由 ##实现原理
Spring Cloud Gateway可以与注册中心进行整合,比如常见的zookeeper,Eureka,Nacos等。本文就以nacos举例,其他的注册中心整合类似。
首先,需要引入Nacos的依赖,它通过SpringBoot的autoConfiguration特性,自动集成到工程中。
1 2 3 4 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency >
最关键的代码是NacosWatch,它实现了ApplicationEventPublisherAware
和SmartLifecycle
,拥有发布事件和监听Spring生命周期的能力。通过scheduleWithFixedDelay
实现默认30s轮询的发送HeartbeatEvent事件对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Bean @ConditionalOnMissingBean @ConditionalOnProperty (value = "spring.cloud.nacos.discovery.watch.enabled" , matchIfMissing = true ) public NacosWatch nacosWatch (NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosWatch(nacosDiscoveryProperties); } public class NacosWatch implements ApplicationEventPublisherAware , SmartLifecycle { @Override public void stop (Runnable callback) { this .stop(); callback.run(); } @Override public void start () { if (this .running.compareAndSet(false , true )) { this .watchFuture = this .taskScheduler.scheduleWithFixedDelay( this ::nacosServicesWatch, this .properties.getWatchDelay()); } } @Override public void stop () { if (this .running.compareAndSet(true , false ) && this .watchFuture != null ) { this .watchFuture.cancel(true ); } } public void nacosServicesWatch () { this .publisher.publishEvent( new HeartbeatEvent(this , nacosWatchIndex.getAndIncrement())); } }
HeartbeatEvent事件发出之后,会被Spring Cloud Gateway的RouteRefreshListener
监听到,最终发送一个RefreshRoutesEvent,这个事件会触发所有的路由刷新。总结为,注册中心默认30s同步一次Gateway路由 ,可以通过spring.cloud.nacos.watch-delay
属性来配置轮询的时间间隔。HeartbeatMonitor
这个类象征性的判定是否Heartbeat更新,只要value不为null且不和现有值相等,都会执行reset()
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class RouteRefreshListener implements ApplicationListener <ApplicationEvent > { private HeartbeatMonitor monitor = new HeartbeatMonitor(); @Override public void onApplicationEvent (ApplicationEvent event) { ... if (event instanceof HeartbeatEvent) { HeartbeatEvent e = (HeartbeatEvent) event; resetIfNeeded(e.getValue()); } } private void resetIfNeeded (Object value) { if (this .monitor.update(value)) { reset(); } } private void reset () { this .publisher.publishEvent(new RefreshRoutesEvent(this )); } } public class HeartbeatMonitor { private AtomicReference<Object> latestHeartbeat = new AtomicReference<>(); public boolean update (Object value) { Object last = this .latestHeartbeat.get(); if (value != null && !value.equals(last)) { return this .latestHeartbeat.compareAndSet(last, value); } return false ; } }
总结 总的来说,nacos这种实现方式,简单有效,但是也不太友好。因为路由是低频变化的,默认的30s刷新时间,会导致清空缓存,所有的路由都需要重新加载(非注册中心的路由也会重新加载)会对性能有一定的损耗,如果能判定nacos路由是否有变化,变化再更新就会最大限度降低缓存重建的问题。