16 package org.openkilda.wfm.topology.utils;
21 import org.apache.storm.tuple.Tuple;
22 import org.aspectj.lang.ProceedingJoinPoint;
23 import org.aspectj.lang.annotation.Around;
24 import org.aspectj.lang.annotation.Aspect;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
36 @Around(
"execution(* org.apache.storm.topology.IRichBolt+.execute(org.apache.storm.tuple.Tuple)) && args(input)" 37 +
"|| execution(* org.apache.storm.topology.IStatefulBolt+.execute(org.apache.storm.tuple.Tuple)) && args(input)")
38 public Object
aroundAdvice(ProceedingJoinPoint joinPoint, Tuple input) throws Throwable {
42 LOGGER.debug(
"CorrelationId was not sent or can't be extracted for tuple {}", input);
43 return DEFAULT_CORRELATION_ID;
47 String previousCorrelationId = MDC.get(CORRELATION_ID);
48 MDC.put(CORRELATION_ID, correlationId);
51 return joinPoint.proceed(joinPoint.getArgs());
53 if (previousCorrelationId != null) {
54 MDC.put(CORRELATION_ID, previousCorrelationId);
56 MDC.remove(CORRELATION_ID);
Object aroundAdvice(ProceedingJoinPoint joinPoint, Tuple input)
static Optional< CorrelationContext > extractFrom(Tuple input)
static final String DEFAULT_CORRELATION_ID
static final String CORRELATION_ID