前言
通過(guò)我之前的Tomcat系列文章,相信看我博客的同學(xué)對(duì)Tomcat應(yīng)該有一個(gè)比較清晰的了解了,在前幾篇博客我們討論了Tomcat在SpringBoot框架中是如何啟動(dòng)的,討論了Tomcat的內(nèi)部組件是如何設(shè)計(jì)以及請(qǐng)求是如何流轉(zhuǎn)的,那么我們這邊博客聊聊Tomcat的異步Servlet,Tomcat是如何實(shí)現(xiàn)異步Servlet的以及異步Servlet的使用場(chǎng)景。
手?jǐn)]一個(gè)異步的Servlet
我們直接借助SpringBoot框架來(lái)實(shí)現(xiàn)一個(gè)Servlet,這里只展示Servlet代碼:
@WebServlet(urlPatterns = "/async",asyncSupported = true)
@Slf4j
public class AsyncServlet extends HttpServlet {
ExecutorService executorService =Executors.newSingleThreadExecutor();
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
//開(kāi)啟異步,獲取異步上下文
final AsyncContext ctx = req.startAsync();
// 提交線程池異步執(zhí)行
executorService.execute(new Runnable() {
@Override
public void run() {
try {
log.info("async Service 準(zhǔn)備執(zhí)行了");
//模擬耗時(shí)任務(wù)
Thread.sleep(10000L);
ctx.getResponse().getWriter().print("async servlet");
log.info("async Service 執(zhí)行了");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//最后執(zhí)行完成后完成回調(diào)。
ctx.complete();
}
});
}
上面的代碼實(shí)現(xiàn)了一個(gè)異步的Servlet,實(shí)現(xiàn)了 doGet
方法注意在SpringBoot中使用需要再啟動(dòng)類(lèi)加上 @ServletComponentScan
注解來(lái)掃描Servlet。既然代碼寫(xiě)好了,我們來(lái)看看實(shí)際運(yùn)行效果。
我們發(fā)送一個(gè)請(qǐng)求后,看到頁(yè)面有響應(yīng),同時(shí),看到請(qǐng)求時(shí)間花費(fèi)了10.05s,那么我們這個(gè)Servlet算是能正常運(yùn)行啦。有同學(xué)肯定會(huì)問(wèn),這不是異步servlet嗎?你的響應(yīng)時(shí)間并沒(méi)有加快,有什么用呢?對(duì),我們的響應(yīng)時(shí)間并不能加快,還是會(huì)取決于我們的業(yè)務(wù)邏輯,但是我們的異步servlet請(qǐng)求后,依賴(lài)于業(yè)務(wù)的異步執(zhí)行,我們可以立即返回,也就是說(shuō),Tomcat的線程可以立即回收,默認(rèn)情況下,Tomcat的核心線程是10,最大線程數(shù)是200,我們能及時(shí)回收線程,也就意味著我們能處理更多的請(qǐng)求,能夠增加我們的吞吐量,這也是異步Servlet的主要作用。
異步Servlet的內(nèi)部原理
了解完異步Servlet的作用后,我們來(lái)看看,Tomcat是如何是先異步Servlet的。其實(shí)上面的代碼,主要核心邏輯就兩部分, final AsyncContext ctx = req.startAsync();
和 ctx.complete();
那我們來(lái)看看他們究竟做了什么?
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
IllegalStateException ise =
new IllegalStateException(sm.getString("request.asyncNotSupported"));
log.warn(sm.getString("coyoteRequest.noAsync",
StringUtils.join(getNonAsyncClassNames())), ise);
throw ise;
}
if (asyncContext == null) {
asyncContext = new AsyncContextImpl(this);
}
asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
asyncContext.setTimeout(getConnector().getAsyncTimeout());
return asyncContext;
}
我們發(fā)現(xiàn) req.startAsync();
只是保存了一個(gè)異步上下文,同時(shí)設(shè)置一些基礎(chǔ)信息,比如 Timeout
,順便提一下,這里設(shè)置的默認(rèn)超時(shí)時(shí)間是30S,也就是說(shuō)你的異步處理邏輯超過(guò)30S后就會(huì)報(bào)錯(cuò),這個(gè)時(shí)候執(zhí)行 ctx.complete();
就會(huì)拋出IllegalStateException 異常。
我們來(lái)看看 ctx.complete();
的邏輯
public void complete() {
if (log.isDebugEnabled()) {
logDebug("complete ");
}
check();
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
}
//類(lèi):AbstractProcessor
public final void action(ActionCode actionCode, Object param) {
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
}
//類(lèi):AbstractProcessor
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}
//類(lèi):AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
//省略部分代碼
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
return true;
}
所以,這里最終會(huì)調(diào)用 AbstractEndpoint
的 processSocket
方法,之前看過(guò)我前面博客的同學(xué)應(yīng)該有印象, EndPoint
是用來(lái)接受和處理請(qǐng)求的,接下來(lái)就會(huì)交給 Processor
去進(jìn)行協(xié)議處理。
類(lèi):AbstractProcessorLight
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
//省略部分diam
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
state = SocketState.CLOSED;
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
這部分是重點(diǎn), AbstractProcessorLight
會(huì)根據(jù) SocketEvent
的狀態(tài)來(lái)判斷是不是要去調(diào)用 service(socketWrapper)
,該方法最終會(huì)去調(diào)用到容器,從而完成業(yè)務(wù)邏輯的調(diào)用,我們這個(gè)請(qǐng)求是執(zhí)行完成后調(diào)用的,肯定不能進(jìn)容器了,不然就是死循環(huán)了,這里通過(guò) isAsync()
判斷,就會(huì)進(jìn)入 dispatch(status)
,最終會(huì)調(diào)用 CoyoteAdapter
的 asyncDispatch
方法
public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res,
SocketEvent status) throws Exception {
//省略部分代碼
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
boolean success = true;
AsyncContextImpl asyncConImpl = request.getAsyncContextInternal();
try {
if (!request.isAsync()) {
response.setSuspended(false);
}
if (status==SocketEvent.TIMEOUT) {
if (!asyncConImpl.timeout()) {
asyncConImpl.setErrorState(null, false);
}
} else if (status==SocketEvent.ERROR) {
}
if (!request.isAsyncDispatching() && request.isAsync()) {
WriteListener writeListener = res.getWriteListener();
ReadListener readListener = req.getReadListener();
if (writeListener != null && status == SocketEvent.OPEN_WRITE) {
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
res.onWritePossible();//這里執(zhí)行瀏覽器響應(yīng),寫(xiě)入數(shù)據(jù)
if (request.isFinished() && req.sendAllDataReadEvent() &&
readListener != null) {
readListener.onAllDataRead();
}
} catch (Throwable t) {
} finally {
request.getContext().unbind(false, oldCL);
}
}
}
}
//這里判斷異步正在進(jìn)行,說(shuō)明這不是一個(gè)完成方法的回調(diào),是一個(gè)正常異步請(qǐng)求,繼續(xù)調(diào)用容器。
if (request.isAsyncDispatching()) {
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
if (t != null) {
asyncConImpl.setErrorState(t, true);
}
}
//注意,這里,如果超時(shí)或者出錯(cuò),request.isAsync()會(huì)返回false,這里是為了盡快的輸出錯(cuò)誤給客戶端。
if (!request.isAsync()) {
//這里也是輸出邏輯
request.finishRequest();
response.finishResponse();
}
//銷(xiāo)毀request和response
if (!success || !request.isAsync()) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}
}
return success;
}
上面的代碼就是 ctx.complete()
執(zhí)行最終的方法了(當(dāng)然省略了很多細(xì)節(jié)),完成了數(shù)據(jù)的輸出,最終輸出到瀏覽器。
這里有同學(xué)可能會(huì)說(shuō),我知道異步執(zhí)行完后,調(diào)用 ctx.complete()
會(huì)輸出到瀏覽器,但是,第一次doGet請(qǐng)求執(zhí)行完成后,Tomcat是怎么知道不用返回到客戶端的呢?關(guān)鍵代碼在 CoyoteAdapter
中的 service
方法,部分代碼如下:
postParseSuccess = postParseRequest(req, request, res, response);
//省略部分代碼
if (postParseSuccess) {
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
} else {
//輸出數(shù)據(jù)到客戶端
request.finishRequest();
response.finishResponse();
if (!async) {
updateWrapperErrorCount(request, response);
//銷(xiāo)毀request和response
request.recycle();
response.recycle();
}
這部分代碼在調(diào)用完 Servlet
后,會(huì)通過(guò) request.isAsync()
來(lái)判斷是否是異步請(qǐng)求,如果是異步請(qǐng)求,就設(shè)置 async = true
。如果是非異步請(qǐng)求就執(zhí)行輸出數(shù)據(jù)到客戶端邏輯,同時(shí)銷(xiāo)毀 request
和 response
。這里就完成了請(qǐng)求結(jié)束后不響應(yīng)客戶端的操作。
為什么說(shuō)Spring Boot的@EnableAsync注解不是異步Servlet
因?yàn)橹皽?zhǔn)備寫(xiě)本篇文章的時(shí)候就查詢(xún)過(guò)很多資料,發(fā)現(xiàn)很多資料寫(xiě)SpringBoot異步編程都是依賴(lài)于 @EnableAsync
注解,然后在 Controller
用多線程來(lái)完成業(yè)務(wù)邏輯,最后匯總結(jié)果,完成返回輸出。這里拿一個(gè)掘金大佬的文章來(lái)舉例《新手也能看懂的 SpringBoot 異步編程指南 》,這篇文章寫(xiě)得很通俗易懂,非常不錯(cuò),從業(yè)務(wù)層面來(lái)說(shuō),確實(shí)是異步編程,但是有一個(gè)問(wèn)題,拋開(kāi)業(yè)務(wù)的并行處理來(lái)說(shuō),針對(duì)整個(gè)請(qǐng)求來(lái)說(shuō),并不是異步的,也就是說(shuō)不能立即釋放Tomcat的線程,從而不能達(dá)到異步Servlet的效果。這里我參考上文也寫(xiě)了一個(gè)demo,我們來(lái)驗(yàn)證下,為什么它不是異步的。
@RestController
@Slf4j
public class TestController {
@Autowired
private TestService service;
@GetMapping("/hello")
public String test() {
try {
log.info("testAsynch Start");
CompletableFuture<String> test1 = service.test1();
CompletableFuture<String> test2 = service.test2();
CompletableFuture<String> test3 = service.test3();
CompletableFuture.allOf(test1, test2, test3);
log.info("test1=====" + test1.get());
log.info("test2=====" + test2.get());
log.info("test3=====" + test3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "hello";
}
@Service
public class TestService {
@Async("asyncExecutor")
public CompletableFuture<String> test1() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test1");
}
@Async("asyncExecutor")
public CompletableFuture<String> test2() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test2");
}
@Async("asyncExecutor")
public CompletableFuture<String> test3() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test3");
}
}
@SpringBootApplication
@EnableAsync
public class TomcatdebugApplication {
public static void main(String[] args) {
SpringApplication.run(TomcatdebugApplication.class, args);
}
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(3);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsynchThread-");
executor.initialize();
return executor;
}
這里我運(yùn)行下,看看效果
這里我請(qǐng)求之后,在調(diào)用容器執(zhí)行業(yè)務(wù)邏輯之前打了一個(gè)斷點(diǎn),然后在返回之后的同樣打了一個(gè)斷點(diǎn),在 Controller
執(zhí)行完之后,請(qǐng)求才回到了 CoyoteAdapter
中,并且判斷 request.isAsync()
,根據(jù)圖中看到,是為 false
,那么接下來(lái)就會(huì)執(zhí)行 request.finishRequest()
和 response.finishResponse()
來(lái)執(zhí)行響應(yīng)的結(jié)束,并銷(xiāo)毀請(qǐng)求和響應(yīng)體。很有趣的事情是,我實(shí)驗(yàn)的時(shí)候發(fā)現(xiàn),在執(zhí)行 request.isAsync()
之前,瀏覽器的頁(yè)面上已經(jīng)出現(xiàn)了響應(yīng)體,這是SpringBoot框架已經(jīng)通過(guò) StringHttpMessageConverter
類(lèi)中的 writeInternal
方法已經(jīng)進(jìn)行輸出了。
以上分析的核心邏輯就是,Tomcat的線程執(zhí)行 CoyoteAdapter
調(diào)用容器后,必須要等到請(qǐng)求返回,然后再判斷是否是異步請(qǐng)求,再處理請(qǐng)求,然后執(zhí)行完畢后,線程才能進(jìn)行回收。而我一最開(kāi)始的異步Servlet例子,執(zhí)行完doGet方法后,就會(huì)立即返回,也就是會(huì)直接到 request.isAsync()
的邏輯,然后整個(gè)線程的邏輯執(zhí)行完畢,線程被回收。
聊聊異步Servlet的使用場(chǎng)景
分析了這么多,那么異步Servlet的使用場(chǎng)景有哪些呢?其實(shí)我們只要抓住一點(diǎn)就可以分析了,就是異步Servlet提高了系統(tǒng)的吞吐量,可以接受更多的請(qǐng)求。假設(shè)web系統(tǒng)中Tomcat的線程不夠用了,大量請(qǐng)求在等待,而此時(shí)Web系統(tǒng)應(yīng)用層面的優(yōu)化已經(jīng)不能再優(yōu)化了,也就是無(wú)法縮短業(yè)務(wù)邏輯的響應(yīng)時(shí)間了,這個(gè)時(shí)候,如果想讓減少用戶的等待時(shí)間,提高吞吐量,可以嘗試下使用異步Servlet。
舉一個(gè)實(shí)際的例子:比如做一個(gè)短信系統(tǒng),短信系統(tǒng)對(duì)實(shí)時(shí)性要求很高,所以要求等待時(shí)間盡可能短,而發(fā)送功能我們實(shí)際上是委托運(yùn)營(yíng)商去發(fā)送的,也就是說(shuō)我們要調(diào)用接口,假設(shè)并發(fā)量很高,那么這個(gè)時(shí)候業(yè)務(wù)系統(tǒng)調(diào)用我們的發(fā)送短信功能,就有可能把我們的Tomcat線程池用完,剩下的請(qǐng)求就會(huì)在隊(duì)列中等待,那這個(gè)時(shí)候,短信的延時(shí)就上去了,為了解決這個(gè)問(wèn)題,我們可以引入異步Servlet,接受更多的短信發(fā)送請(qǐng)求,從而減少短信的延時(shí)。
總結(jié)
這篇文章我從手寫(xiě)一個(gè)異步Servlet來(lái)開(kāi)始,分析了異步Servlet的作用,以及Tomcat內(nèi)部是如何實(shí)現(xiàn)異步Servlet的,然后我也根據(jù)互聯(lián)網(wǎng)上流行的SpringBoot異步編程來(lái)進(jìn)行說(shuō)明,其在Tomcat內(nèi)部并不是一個(gè)異步的Servlet。最后,我談到了異步Servlet的使用場(chǎng)景,分析了什么情況下可以嘗試異步Servlet。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。