本文并沒有kafka的安裝教程,本文是針對已經安裝kafka及其配置好kafka的php拓展并且使用laravel框架進行開發(fā)項目,配置一個可供laravel框架使用的生產及消費者類.
以下代碼修改自本站的YII框架關于kafka類的代碼,經過測試使用在本人的項目中,可正常運行,larvael版本:5.6 代碼放置larvael框架位置:app/Tools/Kafka.php
?php
namespace App\Tools;
use Illuminate\Config\Repository;
use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
use Illuminate\Http\Request;
class Kafka
{
public $broker_list = '127.0.0.1';//配置kafka,可以用逗號隔開多個kafka
public $topic = 'test';//管道名稱
public $partition = 0;
protected $producer = null;
protected $consumer = null;
public function __construct()
{
if (empty($this->broker_list)) {
throw new InvalidConfigException("broker not config");
}
$rk = new \RdKafka\Producer();
if (empty($rk)) {
throw new InvalidConfigException("producer error");
}
$rk->setLogLevel(LOG_DEBUG);
if (!$rk->addBrokers($this->broker_list)) {
throw new InvalidConfigException("producer error");
}
$this->producer = $rk;
}
/**
* 生產者
* @param array $messages
* @return mixed
*/
public function send($messages = [],$topic)
{
$topic = $this->producer->newTopic($topic);
return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
}
/**
* 消費者
*/
public function consumer($object, $callback){
$conf = new \RdKafka\Conf();
$conf->set('group.id', 0);
$conf->set('metadata.broker.list', $this->broker_list);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$this->topic]);
echo "waiting for messages.....\n";
while(true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "message payload....";
$object->$callback($message->payload);
break;
}
sleep(1);
}
}
}
?>
在控制器中如何使用:
首先再頭部導入這個類:use App\Tools\Kafka;
下面是使用生產者實例:
public function test(){
$topic = 'tool';//輸入使用管道名稱
$data['shop_id'] = 58;
$data['bar_code']=586;
$data['goods_num'] = 1;
$data['goods_unit'] = '個';
$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//傳入數(shù)組會自動轉換json
var_dump($Error_Msg);
}
下面是消費者實例,消費者我這里使用了的是php腳本進行的操作:
?php
$conf = new RdKafka\Conf();
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("tool", $topicConf);//讀取的管道
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
//沒有錯誤打印信息
$message = json_decode(json_encode($message),true);
$data = json_decode($message['payload'],true);
var_dump($data);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "等待接收信息\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "超時\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
sleep(1);
}
?>
到此這篇關于Laravel中Kafka的使用詳解的文章就介紹到這了,更多相關Laravel中Kafka內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- 淺談Laravel中使用Slack進行異常通知
- 如何用Laravel包含你自己的幫助函數(shù)
- 詳解Laravel框架的依賴注入功能
- php+laravel 掃碼二維碼簽到功能
- laravel的數(shù)據(jù)表填充器使用詳解
- laravel ajax curd 搜索登錄判斷功能的實現(xiàn)
- laravel使用redis隊列實例講解
- Laravel的加密解密與哈希實例講解
- Laravel中10個有用的用法小結
- 分析五個Laravel Dusk的使用技巧