Vert.x使用Context在同一请求里隐式传递数据
问题
spring里面可以使用ThreadLocal在同一请求里隐式传递数据。 但在Vert.x里,由于是采用Eventloop来异步处理请求,同一个线程会处理许多的请求,使用ThreadLocal来传递数据会导致数据错乱。那么有什么方法可以实现这一要求呢?
context分析
Vert.x里存在一个Context的核心接口
/**
执行的 Handler 执行上下文。
当 Vert.x 向处理程序提供事件或调用 的 start 或 stop 方法 Verticle时,执行与 Context.
通常,上下文是*事件循环上下文*,并且绑定到特定的事件循环线程。因此,该上下文的执行始终发生在完全相同的事件循环线程上。
在工作线程顶点和运行内联阻塞代码的情况下,工作线程上下文将与执行相关联,该执行将使用工作线程池中的线程。
当处理程序由与特定上下文关联的线程设置时,Vert.x 将保证在执行该处理程序时,该执行将与同一上下文相关联。
如果处理程序由与上下文无关的线程(即非 Vert.x 线程)设置。然后,将为该处理程序创建一个新上下文。
换言之,上下文是传播的。
这意味着,当部署顶点时,它设置的任何处理程序都将与相同的上下文相关联 - 顶点的上下文。
这意味着(在标准顶点的情况下)顶点代码将始终使用完全相同的线程执行,因此您不必担心多线程访问顶点状态,并且可以将应用程序编码为单线程。
此类还允许任意数据 put 在上下文中, get 因此可以很容易地在不同的处理程序之间共享,例如,顶点实例。
此类还提供 runOnContext 允许使用相同上下文异步执行操作的功能。
*/
public interface Context {
<T> T get(Object key);
void put(Object key, Object value);
boolean remove(Object key);
<T> T getLocal(Object key);
/**
* Put some local data in the context.
* <p>
* This can be used to share data between different handlers that share a context
*
* @param key the key of the data
* @param value the data
*/
void putLocal(Object key, Object value);
boolean removeLocal(Object key);
}
注释里的此类还允许任意数据 put 在上下文中, get 因此可以很容易地在不同的处理程序之间共享,例如,顶点实例。 表示在同一Verticle可以使用 context.put共享数据 那么在同一请求里呢,也有类似的方法吗,有,即putLocal,此方法注释为向context放置本地数据
context的实现类
ContextInternal extends Context
ContextBase implements ContextInternal
EventLoopContext extends ContextBase
DuplicatedContext implements ContextInternal
其中ContextBase里有此方法,复制此context,返回DuplicatedContext
/**
返回与此上下文的上下文共享
相同的并发性
相同的异常处理程序
相同的上下文数据
相同的部署
相同的配置
相同的类加载器
重复的上下文有其自身的上下文
本地上下文数据
工作线程任务队列
返回:
此上下文的副本
*/
@Override
public ContextInternal duplicate() {
return new DuplicatedContext(this);
}
那么此方法会在哪里调用呢?查找后发现调用处有2个,一个是eventBus处理消息,一个是http消息处理。 
http请求处理中的context变换
查看 HttpServerImpl 代码,发现
@Override
protected Handler<Channel> childHandler(ContextInternal context, SocketAddress address, SSLHelper sslHelper) {
EventLoopContext connContext;
if (context instanceof EventLoopContext) {
connContext = (EventLoopContext) context;
} else {
connContext = vertx.createEventLoopContext(context.nettyEventLoop(), context.workerPool(), context.classLoader());
}
String host = address.isInetSocket() ? address.host() : "localhost";
int port = address.port();
String serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;
HttpServerConnectionHandler hello = new HttpServerConnectionHandler(this, requestStream.handler, invalidRequestHandler, wsStream.handler, connectionHandler, exceptionHandler == null ? DEFAULT_EXCEPTION_HANDLER : exceptionHandler);
Supplier<ContextInternal> streamContextSupplier = context::duplicate;
return new HttpServerWorker(
connContext,
streamContextSupplier,
this,
vertx,
sslHelper,
options,
serverOrigin,
disableH2c,
hello,
hello.exceptionHandler);
}
childHandler 方法返回了Handler<Channel>,其实例为HttpServerWorker,里面传入了 Supplier<ContextInternal> streamContextSupplier = context::duplicate; 继续跟进 HttpServerWorker
void configureHttp1(ChannelPipeline pipeline) {
if (!server.requestAccept()) {
sendServiceUnavailable(pipeline.channel());
return;
}
HttpServerMetrics metrics = (HttpServerMetrics) server.getMetrics();
VertxHandler<Http1xServerConnection> handler = VertxHandler.create(chctx -> {
Http1xServerConnection conn = new Http1xServerConnection(
streamContextSupplier,
sslHelper,
options,
chctx,
context,
serverOrigin,
metrics);
return conn;
});
pipeline.addLast("handler", handler);
Http1xServerConnection conn = handler.getConnection();
if (metrics != null) {
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}
connectionHandler.handle(conn);
}
发现 传入的 streamContextSupplier 封装到了Http1xServerConnection,然后调用 connectionHandler.handle(conn), 而 connectionHandler 为上面传入的 HttpServerConnectionHandler hello = new HttpServerConnectionHandler(this, requestStream.handler, invalidRequestHandler, wsStream.handler, connectionHandler, exceptionHandler == null ? DEFAULT_EXCEPTION_HANDLER : exceptionHandler); 进入HttpServerConnectionHandler的handle方法
@Override
public void handle(HttpServerConnection conn) {
Handler<HttpServerRequest> requestHandler = this.requestHandler;
if (HttpServerImpl.DISABLE_WEBSOCKETS) {
// As a performance optimisation you can set a system property to disable WebSockets altogether which avoids
// some casting and a header check
} else {
if (conn instanceof Http1xServerConnection) {
requestHandler = new Http1xServerRequestHandler(this);
Http1xServerConnection c = (Http1xServerConnection) conn;
initializeWebSocketExtensions(c.channelHandlerContext().pipeline());
}
}
conn.exceptionHandler(exceptionHandler);
conn.handler(requestHandler);
conn.invalidRequestHandler(invalidRequestHandler);
if (connectionHandler != null) {
// We hand roll event-loop execution in case of a worker context
ContextInternal ctx = conn.getContext();
ContextInternal prev = ctx.beginDispatch();
try {
connectionHandler.handle(conn);
} catch (Exception e) {
ctx.reportException(e);
} finally {
ctx.endDispatch(prev);
}
}
}
可以看到调用ContextInternal ctx = conn.getContext();获取了传入的DuplicatedContext,然后调用ctx.beginDispatch();和ctx.endDispatch(prev); 将connectionHandler.handle(conn);即http处理过程包裹起来,使整个请求处理过程的上下文变成DuplicatedContext,请求处理完成后,还原为之前的上下文。 因此我们可以在请求handler里获取到DuplicatedContext,将需要传递的数据调用putLocal传递。
使用demo
public class MainVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(MainVerticle.class);
@Override
public void start(Promise<Void> startPromise) {
HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
router.get("/").handler(ctx -> {
// 获取vertx请求相关的 DuplicatedContext
Context reqContext = Vertx.currentContext();
HttpServerRequest request = ctx.request();
String user = request.getParam("user","hello");
// putLocal ctx 到 context
reqContext.putLocal("ctx", ctx);
// putLocal 参数 user 到 context
reqContext.putLocal("user", user);
handleReq().onComplete(s -> {
HttpServerResponse response = ctx.response();
response.end("ok");
});
});
server.requestHandler(router).listen(18080)
.onSuccess(r -> {
startPromise.complete();
logger.info("Server started on port {}", 18080);
})
.onFailure(err -> {
logger.warn("Failed to start", err);
startPromise.fail(err);
});
}
private Future<Void> handleReq() {
return Future.future(f -> {
Context reqContext = Vertx.currentContext();
RoutingContext ctx = reqContext.getLocal("ctx");
String user = reqContext.getLocal("user");
f.complete();
});
}
}
此demo里将请求的RoutingContext和参数隐式传递到了handleReq()方法里面,实现了类似spring里同步请求使用ThreadLocal传递数据的效果,而且没有ThreadLocal的相关安全问题,是不是很方便?
总结
Vert.x里web和EventBus的处理都有自己的上下文,可以用来隐式传递数据,例如请求数据,链路追踪数据,日志Mdc等功能。