SpringCloudGateway源码解析(6)- 常用功能实现

SpringCloudGateway源码解析(6)- 常用功能实现

​ 网关的鉴权,动态限流,注册中心自动路由功能,是一个网关的最基本功能,让我们一起来学习。

前言

​ 前面几章主要是对Spring Cloud Gateway源码的了解,本章则讲述常用的网关功能的实现。

鉴权实现

​ 鉴权通过对称加密,用户传入一个token。如下图展示的是POST方式的,GET方式比较简单,直接在Path后面拼装?token=dd9e46d0eb1aced7即可。

POST方式传入token

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class AuthFilter implements GlobalFilter, Ordered {

private DesUtils dest;
@Autowired
private AuthBean authBean;
private static final String SPLIT_WORD = ",";

@PostConstruct
private void init() {
try {
LOGGER.info("AuthFilter初始化: token salt:{}", authBean.getSalt());
dest = new DesUtils(authBean.getSalt());
LOGGER.info("token={}", dest.encrypt("1001"));
} catch (Exception e) {
throw new MfwSearchBusinessException("AuthFilter秘钥工具初始化失败:", e.getMessage(), e);
}
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
HttpMethod httpMethod = exchange.getRequest().getMethod();
LOGGER.debug("AuthFilter in.HTTP :{}", httpMethod);
if (HttpMethod.GET.equals(httpMethod)) {
String token = exchange.getRequest().getQueryParams().getFirst("token");
if (!checkToken(token)) {
LOGGER.info("请求不通过鉴权,url:{}", exchange.getRequest().getURI().toString());
//未通过验证
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
} else if (HttpMethod.POST.equals(httpMethod)) {
Map<String, String> postBody = exchange.getAttribute(GatewayConstant.CACHE_POST_BODY);
LOGGER.debug("http request body:{}", JSON.toJSONString(postBody));
if (postBody == null) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
String token = postBody.get("token");
if (!checkToken(token)) {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
}
} else {
//not support other Http Method
exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE);
return exchange.getResponse().setComplete();
}
//pass auth
return chain.filter(exchange);
}

/**
* 校验token是否合法
* token解密后是appId
*
* @param token
* @return
*/
private boolean checkToken(String token) {
if (StringUtils.isBlank(token)) {
return false;
}
try {
boolean flag = false;
String tokenStr = dest.decrypt(token);
String[] appIdArray = authBean.getAppId().split(SPLIT_WORD);
for (String appId : appIdArray) {
if (tokenStr.equals(appId)) {
flag = true;
break;
}
}
return flag;
} catch (Exception e) {
//todo 解析失败需要报警
LOGGER.error("token解析失败,默认通过校验token:{}", token, e);
}
return true;
}
...
}

动态限流功能

​ Spring Cloud Gateway中,限流功能是至关重要的,当有不正常的流量打过来,可以控制它的流速或者直接封杀调异常流量。限流熔断的中间件有很多,比如Hystrix、Sentinel,本文主要讲解Sentinel集成Apollo,实现动态的流量控制。首先需要加入依赖,Apollo对于Sentinel来说,是一个datasource,当然像Mysql,Redis等都可以当做Sentinel的数据源。

1
2
3
4
5
6
7
8
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-apollo</artifactId>
</dependency>

​ 如下提供了一个包括总体限流、URL限流、用户限流的实现,需要在Apollo中配置限流规则。GATEWAY-LIMIT是总体限流的key,/search-bs/helloword是URL的限流规则,u_1001是用户限流规则。

[{"grade":1,"count":100,"resource":"GATEWAY-LIMIT"},{"grade":1,"count":5,"resource":"/search-bs/helloword"},{"grade":1,"count":20,"resource":"u_1001"}]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class SentinelFilter implements GlobalFilter, Ordered {
/**
* 限流
* 限流是一个单key,value是FlowRule的集合
*/
@PostConstruct
private void init() {
// It's better to provide a meaningful default value.
String defaultFlowRules = "[]";
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ApolloDataSource<>(
GatewayConstant.Apollo.NAMESPACE_RULE,
GatewayConstant.Sentinel.FLOW_RULE_KEY,
defaultFlowRules,
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
}));
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
try (Entry entry = SphU.entry(GatewayConstant.FlowRule.GATEWAY_LIMIT)) {
String uid = null;
if (exchange.getRequest().getMethod().equals(HttpMethod.POST)) {
Map<String, String> bodyMap = (Map<String, String>)exchange.getAttributes().get(GatewayConstant.CACHE_POST_BODY);
uid = bodyMap.get("uid");
} else if(exchange.getRequest().getMethod().equals(HttpMethod.GET)) {
uid = exchange.getRequest().getQueryParams().getFirst("uid");
}
String url = exchange.getRequest().getPath().toString();
System.out.println("url=" + url + ",uid=" + uid);
//校验用户限流
if (StringUtils.isNotBlank(uid)) {
try (Entry userEntry = SphU.entry("u_" + uid)) {

} catch (BlockException ex) {
LOGGER.error("触发了用户限流规则");
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}
}
//校验url限流
try (Entry urlEntry = SphU.entry(url)) {

} catch (BlockException ex) {
LOGGER.error("触发了url限流规则");
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}

} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级
// 在此处进行相应的处理操作
LOGGER.error("触发了限流规则");
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}
}

@Override
public int getOrder() {
return GatewayConstant.FLOW_FILTER;
}
}

注册中心自动路由

##实现原理

​ Spring Cloud Gateway可以与注册中心进行整合,比如常见的zookeeper,Eureka,Nacos等。本文就以nacos举例,其他的注册中心整合类似。

​ 首先,需要引入Nacos的依赖,它通过SpringBoot的autoConfiguration特性,自动集成到工程中。

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

​ 最关键的代码是NacosWatch,它实现了ApplicationEventPublisherAwareSmartLifecycle,拥有发布事件和监听Spring生命周期的能力。通过scheduleWithFixedDelay实现默认30s轮询的发送HeartbeatEvent事件对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
	@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosDiscoveryProperties);
}

public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
@Override
public void stop(Runnable callback) {
this.stop();
callback.run();
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
this.watchFuture.cancel(true);
}
}
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
}

​ HeartbeatEvent事件发出之后,会被Spring Cloud Gateway的RouteRefreshListener监听到,最终发送一个RefreshRoutesEvent,这个事件会触发所有的路由刷新。总结为,注册中心默认30s同步一次Gateway路由,可以通过spring.cloud.nacos.watch-delay属性来配置轮询的时间间隔。HeartbeatMonitor这个类象征性的判定是否Heartbeat更新,只要value不为null且不和现有值相等,都会执行reset()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RouteRefreshListener implements ApplicationListener<ApplicationEvent> {
private HeartbeatMonitor monitor = new HeartbeatMonitor();
@Override
public void onApplicationEvent(ApplicationEvent event) {
...
if (event instanceof HeartbeatEvent) {
HeartbeatEvent e = (HeartbeatEvent) event;
resetIfNeeded(e.getValue());
}
}
private void resetIfNeeded(Object value) {
if (this.monitor.update(value)) {
reset();
}
}
private void reset() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
}

public class HeartbeatMonitor {
private AtomicReference<Object> latestHeartbeat = new AtomicReference<>();
/**
* @param value The latest heartbeat.
* @return True if the state changed.
*/
public boolean update(Object value) {
Object last = this.latestHeartbeat.get();
if (value != null && !value.equals(last)) {
return this.latestHeartbeat.compareAndSet(last, value);
}
return false;
}
}

总结

​ 总的来说,nacos这种实现方式,简单有效,但是也不太友好。因为路由是低频变化的,默认的30s刷新时间,会导致清空缓存,所有的路由都需要重新加载(非注册中心的路由也会重新加载)会对性能有一定的损耗,如果能判定nacos路由是否有变化,变化再更新就会最大限度降低缓存重建的问题。

评论