策略回测效率的优化
起因
公司项目里有一个由spring和vert.x开发的异步量化交易程序,上面会运行很多的策略。 需求是对相应的策略进行回测。 基于性能考量,整个策略都是由vert.x里的future组成的异步代码,因此初始回测程序也是一套异步的代码,对策略进行回测。
优化
前期优化
- 将策略回测期间的数据保存到内存,回测结束后再保存,而不是在回测期间写入数据库,这样直接避免了与数据库的大量交互。单个策略平均回测时间从50分钟降到3分多钟。
- 缓存行情数据,避免重复加载,降低回测时间5s。
回测时,会回放整个kline和ticker,不断将其发给策略进行处理,由于将数据给策略是一个异步的过程,需要确保kline和ticker都已经被策略按顺序接收和处理完毕, 因此回测程序无法知道何时应该推送下一条数据,因此,初期采用的代码逻辑如下,将异步future转为同步等待,直到它执行完毕:
future.toCompletionStage().toCompletableFuture().get();
但这个过程涉及到了两个线程的切换,即数据回放线程和策略运行线程。数据回放过程,伴随着两个线程的不间断切换,单个策略2年的时间会涉及到数百万次线程的不断切换,结果就是浪费大量时间在切换线程。
future的优化
熟悉Vert.x的都知道future是异步执行的,但是这个异步现在造成了回测效率的下降,那么有什么方法可以处理呢? 我们知道,单个线程执行计算密集型的效率是很高的,那么是否能将future的异步转换成同步呢?先分析一下代码。
future会由谁来执行呢?查看文档,显然是EventLoopContext
public class EventLoopContext extends ContextImpl {
EventLoopContext(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, Deployment deployment, CloseFuture closeFuture, ClassLoader tccl, boolean disableTCCL) {
super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, closeFuture, tccl, disableTCCL);
}
void runOnContext(AbstractContext ctx, Handler<Void> action) {
try {
this.nettyEventLoop().execute(() -> {
ctx.dispatch(action);
});
} catch (RejectedExecutionException var4) {
}
}
<T> void emit(AbstractContext ctx, T argument, Handler<T> task) {
EventLoop eventLoop = this.nettyEventLoop();
if (eventLoop.inEventLoop()) {
ContextInternal prev = ctx.beginDispatch();
try {
task.handle(argument);
} catch (Throwable var10) {
this.reportException(var10);
} finally {
ctx.endDispatch(prev);
}
} else {
eventLoop.execute(() -> {
this.emit(ctx, argument, task);
});
}
}
<T> void execute(AbstractContext ctx, T argument, Handler<T> task) {
EventLoop eventLoop = this.nettyEventLoop();
if (eventLoop.inEventLoop()) {
task.handle(argument);
} else {
eventLoop.execute(() -> {
task.handle(argument);
});
}
}
<T> void execute(AbstractContext ctx, Runnable task) {
EventLoop eventLoop = this.nettyEventLoop();
if (eventLoop.inEventLoop()) {
task.run();
} else {
eventLoop.execute(task);
}
}
}
可以发现 Handler 即future被 nettyEventLoop().execute 所处理,即交给Netty的EventLoop线程处理。 那可不可以由当前线程处理,而不交给EventLoop线程处理呢?当然可以,上面的**task.handle(argument);**即使由当前线程直接执行的。 因此可以修改此类为这样
public class EventLoopContext extends ContextImpl {
EventLoopContext(VertxInternal vertx,
EventLoop eventLoop,
WorkerPool internalBlockingPool,
WorkerPool workerPool,
Deployment deployment,
CloseFuture closeFuture,
ClassLoader tccl,
boolean disableTCCL) {
super(vertx, eventLoop, internalBlockingPool, workerPool, deployment, closeFuture, tccl, disableTCCL);
}
@Override
void runOnContext(AbstractContext ctx, Handler<Void> action) {
try {
if (Thread.currentThread() instanceof VertxThread) {
ctx.dispatch(action);
} else {
nettyEventLoop().execute(() -> ctx.dispatch(action));
}
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
}
}
@Override
<T> void emit(AbstractContext ctx, T argument, Handler<T> task) {
EventLoop eventLoop = nettyEventLoop();
if (true) {
ContextInternal prev = ctx.beginDispatch();
try {
task.handle(argument);
} catch (Throwable t) {
reportException(t);
} finally {
ctx.endDispatch(prev);
}
} else {
eventLoop.execute(() -> emit(ctx, argument, task));
}
}
/**
* <ul>
* <li>When the current thread is event-loop thread of this context the implementation will execute the {@code task} directly</li>
* <li>Otherwise the task will be scheduled on the event-loop thread for execution</li>
* </ul>
*/
@Override
<T> void execute(AbstractContext ctx, T argument, Handler<T> task) {
EventLoop eventLoop = nettyEventLoop();
if (true) {
task.handle(argument);
} else {
eventLoop.execute(() -> task.handle(argument));
}
}
@Override
<T> void execute(AbstractContext ctx, Runnable task) {
EventLoop eventLoop = nettyEventLoop();
if (true) {
task.run();
} else {
eventLoop.execute(task);
}
}
}
将条件判断直接改为true,runOnContext改为 ctx.dispatch(action); 这样就改成了同步执行。
EventLoopContext替换
如何将修改过的类替换以来的Jar里的类呢,手动下载再替换?这个方法不太好,每次修改版本都要重新替换,容易遗忘。 有一种比较好的方法,在我们的项目里创建EventLoopContext的包,将此类放到里面就可以了 
这样会随着打包自动生效。原始Jar打包和SpringBoot的FatJar打包都能生效
这里存在一个问题。为什么这样就能替换第三方的Jar呢?请看下回分解 记一次项目替换第三方Jar包类的过程