在微服务大行其道的今天,Spring Cloud Alibaba作为优秀的微服务实现,却不能很容易的集成ElasticAPM。本文就将解决的思路和实现,呈现给大家,希望能帮助大家。
前言 继上一篇ElasticAPM初体验 我们知道了什么是可观察性 ,并领略了ElasticAPM 的强大功能,但是仅仅是上篇文章中单机模式的使用时远远不够的。还记得上一篇最后提出的两个问题:
1、本文在单机版的环境中,测试通过,但是在分布式环境中,请求会串联起很多应用,那服务跟踪能否实现?实现的原理是什么?
2、Elastic APM可以自动采集http请求,在PRC分布式环境中,Elastic APM能否正常工作?是否必须采用 public API来实现?
重点是分布式 和RPC ,即在分布式情况下,ElasticAPM能否良好工作?在RPC环境下,ElasticAPM是不是也能正常工作呢?
先说答案:在默认情况下,ElasticAPM能够支持分布式的Http方式调用,但是不支持RPC协议 。但是很多公司都采用RPC协议作为其内部系统的通信协议,比如我司就采用Spring Cloud Alibaba作为搜索服务的框架,框架内应用的通信是借助RPC框架Dubbo来实现的。所以问题就变成了如何把ElasticAPM 集成进Spring Cloud Alibaba中。
架构讲解&问题分析 首先,我先大概图示下Spring Cloud Alibaba和ElasticAPM的架构和工作流程。
如架构图所示,搜索系统分为了网关应用(Gateway) ,US应用 ,AS应用 ,BS应用 ,用户的请求会先到达网关,网关会把请求,以Http协议转发给US应用,US应用会采用Dubbo协议调用AS应用,AS应用采用Dubbo协议调用BS应用。
Request —http —>US —RPC —->AS —–RPC ——>BS
每一个应用启动的时候都已经集成了Apm-agent(如果不知道怎么集成请参考ElasticAPM初体验 ),如果APM-agent默认支持Dubbo 就完美了(但是并没有)。所以整个链路追踪,到了US之后,就没有上报之后应用的锚点数据。在查看ElasticAPM官方文档的时候,我注意到了Public API ,文档中交代了这样一件事情:
The public API of the Elastic APM Java agent lets you customize and manually create spans and transactions, as well as track errors.
没错,你可以自定义Span和Transaction,如果不懂什么是Span和Transaction请参考ElasticAPM初体验 或直接读一遍官方文档 。既然Agent默认不支持Dubbo,那么我们使用Public API来实现功能。
设计思想 基于Spring Cloud Alibaba的架构,我们可以如下图方式实现。
首先用户的请求一定要经过微服务网关,在网关的过滤器中,首先埋入父级Transaction。
请求经过网关,会被网关转发到第一层应用中,注意这次转发是http请求,如果是用SpringMVC实现的话,需要在Controller处,上报子Transaction。
请求被第一层应用处理之后,下层的应用全部是Dubbo协议的。这时可以采用Dubbo的过滤器机制,对Concumer和Provider都进行拦截,通过这种方式做到不侵入业务代码。
最终,请求返回到微服务网关,调用transaction.end()上报根Transaction。
所有流程完毕。
核心实现讲解 微服务网关 微服务网关需要做这样几件事情:
开启根Transaction
POST请求body中增加追踪ID,GET请求Parameter中增加追踪ID
在请求返回之后调用transaction.end()完成上报
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { HttpMethod httpMethod = exchange.getRequest().getMethod(); Transaction transaction = ElasticApm.startTransaction(); transaction.setName("mainSearch" ); transaction.setType(Transaction.TYPE_REQUEST); Span span = transaction.startSpan("gateway" , "filter" , "gateway action" ); span.setName("com.mfw.search.gateway.filter.PostBodyCacheFilter#filter" ); LOGGER.info("APM埋点成功transactionId:{}" , transaction.getId()); if (HttpMethod.POST.equals(httpMethod)) { ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders); MediaType mediaType = exchange.getRequest().getHeaders().getContentType(); Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> { if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) { Map<String, String> bodyMap = decodeBody(body); exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap); span.injectTraceHeaders((name, value) -> { bodyMap.put(name, value); LOGGER.info("APM埋点 key:{}, transactionId:{}" , name, value); }); span.end(); return Mono.just(encodeBody(bodyMap)); } else if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { Map<String, String> bodyMap = decodeJsonBody(body); exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap); span.injectTraceHeaders((name, value) -> { bodyMap.put(name, value); LOGGER.info("APM埋点 key:{}, transactionId:{}" , name, value); }); span.end(); return Mono.just(encodeJsonBody(bodyMap)); } return Mono.empty(); }); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) { public HttpHeaders getHeaders () { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super .getHeaders()); if (contentLength > 0 ) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked" ); } return httpHeaders; } public Flux<DataBuffer> getBody () { return outputMessage.getBody(); } }; return chain.filter(exchange.mutate().request(decorator).build()).then(Mono.fromRunnable(() -> transaction.end())); })); } else if (HttpMethod.GET.equals(httpMethod)) { span.injectTraceHeaders((name, value) -> { exchange.getRequest().getQueryParams().set(name, transaction.getId()); LOGGER.info("APM埋点 key:{}, transactionId:{}" , name, value); }); return chain.filter(exchange).then(Mono.fromRunnable(() -> { span.end(); transaction.end(); LOGGER.info("APM买点完成,transactionId:{}" , transaction.getId()); })); } else { exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE); return exchange.getResponse().setComplete(); } }
Controller Controller层的实现采用了SpringAOP方式实现,这样的好处是对业务代码不侵入,可扩展性高,对想要监控的方法直接配置上@TransactionWithRemoteParent()即可。
如下代码是通过@TransactionWithRemoteParent()实现对Controller方法的上报。
1 2 3 4 5 6 @PostMapping (value = "/search" , consumes = "application/json" , produces = "application/json" )@TransactionWithRemoteParent ()public String searchForm (@RequestBody String req) { String result = asService.helloAs(req); return result; }
AOP实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Aspect public class ApmAspect { private static final Logger LOGGER = LoggerFactory.getLogger(ApmAspect.class); @PostConstruct private void init () { LOGGER.info("ApmAspect加载完毕" ); } @Pointcut (value = "@annotation(transactionWithRemoteParent)" , argNames = "transactionWithRemoteParent" ) public void pointcut (TransactionWithRemoteParent transactionWithRemoteParent) { } @Around (value = "pointcut(transactionWithRemoteParent)" , argNames = "joinPoint,transactionWithRemoteParent" ) public Object around (ProceedingJoinPoint joinPoint, TransactionWithRemoteParent transactionWithRemoteParent) throws Throwable { Transaction transaction = null ; try { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); transaction = ElasticApm.startTransactionWithRemoteParent(key -> { String httpRequest = (String) joinPoint.getArgs()[0 ]; JSONObject json = JSON.parseObject(httpRequest); String traceId = json.getString(key); LOGGER.info("切面添加了子Transaction,key={},value={}" , key, traceId); RpcContext.getContext().setAttachment(key, traceId); return traceId; }); transaction.setName(StringUtils.isNotBlank(transactionWithRemoteParent.name()) ? transactionWithRemoteParent.name() : signature.getName()); transaction.setType(Transaction.TYPE_REQUEST); return joinPoint.proceed(); } catch (Throwable throwable) { if (transaction != null ) { transaction.captureException(throwable); } throw throwable; } finally { if (transaction != null ) { LOGGER.info("切面执行完毕,上报Transaction:{}" , transaction.getId()); transaction.end(); } } } }
Dubbo过滤器 如下代码是DubboConsumer过滤器,专门用于处理APM。DubboProvider的实现类似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Activate (group = "consumer" )public class DubboConsumerApmFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(DubboConsumerApmFilter.class); @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> { String traceId = invocation.getAttachment(key); LOGGER.info("key={},value={}" , key, traceId); return traceId; }); try (final Scope scope = transaction.activate()) { String name = "consumer:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName(); transaction.setName(name); transaction.setType(Transaction.TYPE_REQUEST); Result result = invoker.invoke(invocation); return result; } catch (Exception e) { transaction.captureException(e); throw e; } finally { transaction.end(); } } } @Activate (group = "provider" )public class DubboProviderApmFilter implements Filter { @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> invocation.getAttachment(key)); try (final Scope scope = transaction.activate()) { String name = "provider:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName(); transaction.setName(name); transaction.setType(Transaction.TYPE_REQUEST); return invoker.invoke(invocation); } catch (Exception e) { transaction.captureException(e); throw e; } finally { transaction.end(); } } }
效果
源代码 https://github.com/siyuanWang/springCloudAlibabaAPMDemo
参考文档 ElasticAPM集成Dubbo的讨论