簡介
PHP 的異步進程助手,借助于 AMQP 實現(xiàn)異步執(zhí)行 PHP 的方法,將一些很耗時、追求高可用、需要重試機制的操作放到異步進程中去執(zhí)行,將你的 HTTP 服務從繁重的業(yè)務邏輯中解脫出來。以一個較低的成本將傳統(tǒng) PHP 業(yè)務邏輯轉換成非阻塞、高可用、可擴展的異步模式。
依賴
- php 5.6+
- ext-bcmath
- ext-amqp 1.9.1+
- ext-memcached 3.0.3+
安裝
通過 composer 安裝
composer require l669/async-helper
或直接下載項目源碼
wget https://github.com/l669306630/async-helper/archive/master.zip
使用范例
業(yè)務邏輯:這里定義了很多等待被調用的類和方法,在你的項目中這可能是數(shù)據(jù)模型、或是一個發(fā)送郵件的類。
?php
class SendMailHelper
{
/**
* @param array $mail
* @throws Exception
*/
public static function request($mail)
{
// 在這里發(fā)送郵件,或是通過調用第三方提供的服務發(fā)送郵件
// 發(fā)送失敗的時候你拋出了異常,希望被進程捕獲,并按設定的規(guī)則進行重試
}
}
生產(chǎn)者:通常是 HTTP 服務,傳統(tǒng)的 PHP 項目或是一個命令行程序,接收到某個請求或指令后進行一系列的操作。
?php
use l669\AsyncHelper;
class UserController
{
public function register()
{
// 假設這是一個用戶注冊的請求,用戶提交了姓名、郵箱、驗證碼
// 第一步、校驗用戶信息
// 第二步、實例化異步助手,這時候會連接 AMQP
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => '/'
]);
// 第三步、保存用戶信息到數(shù)據(jù)庫
$mail = [
'from' => 'service@yourdomain.com',
'to' => 'username@163.com',
'subject' => '恭喜你注冊成功',
'body' => '請點擊郵件中的鏈接完成驗證....'
];
// 第四步、通過異步助手發(fā)送郵件
$async_helper->run('\\SendMailHelper', 'request', [$mail]);
// 這是同步的模式去發(fā)送郵件,如果郵件服務響應遲緩或異常,就會直接影響該請求的響應時間,甚至丟失這封重要郵件
// SendMailHelper::request($mail);
}
}
消費者:PHP 的異步進程,監(jiān)聽消息隊列,執(zhí)行你指定的方法。并且該消費者進程是可擴展的高可用的服務,這一切都得益于 AMQP,這是系統(tǒng)解耦、布局微服務的最佳方案。
consume.php
?php
require_once('vendor/autoload.php');
require_once('SendMailHelper.php');
use l669\AsyncHelper;
use l669\CacheHelper;
$cache_helper = new CacheHelper('127.0.0.1', 11211);
while(true){
try{
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => '/',
'cacheHelper' => $cache_helper
]);
$async_helper->consume();
}catch(Exception $e){
// 可以在這里記錄一些日志
sleep(2);
}
}
# 在命令行下啟動消費者進程,推薦使用 supervisor 來管理進程
php consume.php
支持事務:需要一次提交執(zhí)行多個異步方法,事務可以確保完成性。
// 接著上面的示例來說,這里省略了一些重復的代碼,下同
$async_helper->beginTransaction();
try{
$async_helper->run('\\SendMailHelper', 'request', [$mail1]);
$async_helper->run('\\SendMailHelper', 'request', [$mail2]);
$async_helper->run('\\SendMailHelper', 'request', [$mail3]);
$async_helper->commit();
}catch(\Exception $e){
$async_helper->rollback();
}
阻塞式重試:當異步進程執(zhí)行一個方法,方法內部拋出異常時進行重試,一些必須遵循執(zhí)行順序的業(yè)務就要采用阻塞式的重試,通過指定重試最大阻塞時長來控制。
use l669\CacheHelper;
use l669\AsyncHelper;
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => '/',
'cacheHelper' => new CacheHelper('127.0.0.1', 11211),
'retryMode' => AsyncHelper::RETRY_MODE_REJECT, // 阻塞式重試
'maxDuration' => 600 // 最長重試 10 分鐘
]);
$send_mail_helper = new \SendMailHelper();
$mail = new \stdClass();
$mail->from = 'service@yourdomain.com';
$mail->to = 'username@163.com';
$mail->subject = '恭喜你注冊成功';
$mail->body = '請點擊郵件中的鏈接完成驗證....';
$async_helper->run($send_mail_helper, 'request', [$mail]);
// 如果方法中需要拋出異常來結束程序,又不希望被異步進程重試,可以拋出以下幾種錯誤碼,進程捕獲到這些異常后會放棄重試:
// l669\AsyncException::PARAMS_ERROR
// l669\AsyncException::METHOD_DOES_NOT_EXIST
// l669\AsyncException::KNOWN_ERROR
非阻塞式重試:當異步執(zhí)行的方法內部拋出異常,async-helper 會將該方法重新放進隊列的尾部,先執(zhí)行新進入隊列的方法,回頭再重試剛才執(zhí)行失敗的方法,通過指定最大重試次數(shù)來控制。
use l669\CacheHelper;
use l669\AsyncHelper;
$async_helper = new AsyncHelper([
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'pass' => '123456',
'vhost' => 'new',
'cacheHelper' => new CacheHelper('127.0.0.1', 11211),
'queueName' => 'emails.vip', // 給付費的大爺走 VIP 隊列
'retryMode' => AsyncHelper::RETRY_MODE_TTL, // 非阻塞式重試
'maxRetries' => 10 // 最多重試 10 次
]);
$mail = new \stdClass();
$mail->from = 'service@yourdomain.com';
$mail->to = 'username@163.com';
$mail->subject = '恭喜你注冊成功';
$mail->body = '請點擊郵件中的鏈接完成驗證....';
$async_helper->run('\\SendMailHelper', 'request', [$mail]);
應用和解惑
- 我們采用的是開源的 RabbitMQ 來為我們提供的 AMQP 服務。
- 你的項目部署在擁有很多服務器節(jié)點的集群上,每個節(jié)點的程序都需要寫日志文件,現(xiàn)在的問題就是要收集所有節(jié)點上面的日志到一個地方,方便我們及時發(fā)現(xiàn)問題或是做一些統(tǒng)計。所有節(jié)點都可以使用 async-helper 異步調用一個寫日志的方法,而執(zhí)行這個寫日志的方法的進程只需要在一臺機器上啟動就可以了,這樣所有節(jié)點的日志就都實時掌握在手里了。
- 做過微信公眾號開發(fā)的都知道,騰訊微信可以將用戶的消息推送到我們的服務器,如果我們在 5s 內未及時響應,騰訊微信會重試 3 次,其實這就是消息隊列的應用,使用 async-helper 可以輕松的做和這一樣的事情。
- 得益于 RabbitMQ,你可以輕松的橫向擴展你的消費者進程的能力,因為 RabbitMQ 天生就支持集群部署,你可以輕松的啟動多個消費者進程,或是將消費者進程分布到多臺機器上。
- 如果 RabbitMQ 服務不可用怎么辦呢?部署 RabbitMQ 高可用服務是容易的,對外提供單一 IP,這個 IP 是個負載均衡,背后是 RabbitMQ 集群,負載均衡承擔對后端集群節(jié)點的健康檢查。
- async-helper 能否承受高并發(fā)請求?async-helper 生產(chǎn)者使用的是短連接,也就說在你的 HTTP 還沒有響應瀏覽器的時候 async-helper 就已經(jīng)結束了工作,你連接 RabbitMQ 的時間是百分之百小于 HTTP 請求的時間的,換言之,只要 RabbitMQ 承受并發(fā)的能力超過你的 HTTP 服務的承受并發(fā)的能力,RabbitMQ 就永遠不會崩,通過橫向擴展 RabbitMQ 很容易做到的。
和傳統(tǒng) PHP 相比
- 對任何 PHP 方法通過反射進行異步執(zhí)行;
- 高可用,執(zhí)行方法進入消息隊列,可持久化,即使服務器宕機,執(zhí)行任務也不丟失;
- 高可用,對異常可以進行不限次數(shù)和時間的重試,重試次數(shù)和時間可配置;
- 支持對多個異步方法包含在事務中執(zhí)行,支持回滾事務;
- 方法的參數(shù)類型支持除資源類型(resource)和回調函數(shù)(callable)外的任意類型的參數(shù);
- 得益于 AMQP,異步方法可以承受高并發(fā)、高負載,支持集群部署、橫向擴展;
- 低延時,實測延時時間 0.016 ~ 0.021s;
- 適用于:日常數(shù)據(jù)庫操作、日志收集、金融交易、消息推送、發(fā)送郵件和短信、數(shù)據(jù)導入導出、計算大量數(shù)據(jù)生成報表;