主頁 > 知識庫 > Laravel中Kafka的使用詳解

Laravel中Kafka的使用詳解

熱門標簽:常州地圖標注服務商 衡水外呼系統(tǒng)平臺 新河科技智能外呼系統(tǒng)怎么樣 福州人工外呼系統(tǒng)哪家強 百度商鋪地圖標注 地圖標注平臺怎么給錢注冊 注冊400電話申請 安裝電銷外呼系統(tǒng) 釘釘打卡地圖標注

本文并沒有kafka的安裝教程,本文是針對已經安裝kafka及其配置好kafka的php拓展并且使用laravel框架進行開發(fā)項目,配置一個可供laravel框架使用的生產及消費者類.

以下代碼修改自本站的YII框架關于kafka類的代碼,經過測試使用在本人的項目中,可正常運行,larvael版本:5.6 代碼放置larvael框架位置:app/Tools/Kafka.php

?php
namespace App\Tools;
 
use Illuminate\Config\Repository;
 
use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
 
use Illuminate\Http\Request;
 
class Kafka
{
  public $broker_list = '127.0.0.1';//配置kafka,可以用逗號隔開多個kafka
  public $topic = 'test';//管道名稱
  public $partition = 0;
 
  protected $producer = null;
  protected $consumer = null;
 
  public function __construct()
  {
    if (empty($this->broker_list)) {
      throw new InvalidConfigException("broker not config");
    }
    $rk = new \RdKafka\Producer();
    if (empty($rk)) {
      throw new InvalidConfigException("producer error");
    }
    $rk->setLogLevel(LOG_DEBUG);
    if (!$rk->addBrokers($this->broker_list)) {
      throw new InvalidConfigException("producer error");
    }
    $this->producer = $rk;
  }
 
  /**
   * 生產者
   * @param array $messages
   * @return mixed
   */
  public function send($messages = [],$topic)
  {
    $topic = $this->producer->newTopic($topic);
    return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
  }
 
  /**
   * 消費者
   */
  public function consumer($object, $callback){
    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 0);
    $conf->set('metadata.broker.list', $this->broker_list);
 
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
 
    $conf->setDefaultTopicConf($topicConf);
 
    $consumer = new \RdKafka\KafkaConsumer($conf);
 
    $consumer->subscribe([$this->topic]);
 
    echo "waiting for messages.....\n";
    while(true) {
      $message = $consumer->consume(120*1000);
      switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
          echo "message payload....";
          $object->$callback($message->payload);
          break;
      }
      sleep(1);
    }
  }
}
?>

在控制器中如何使用:

首先再頭部導入這個類:use App\Tools\Kafka;

下面是使用生產者實例:

public function test(){
 
   $topic = 'tool';//輸入使用管道名稱
   $data['shop_id'] = 58;
   $data['bar_code']=586;
   $data['goods_num'] = 1;
   $data['goods_unit'] = '個';
 
$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//傳入數(shù)組會自動轉換json
var_dump($Error_Msg);
 
 
  }

下面是消費者實例,消費者我這里使用了的是php腳本進行的操作:

?php
 
$conf = new RdKafka\Conf();
 
$conf->set('group.id', 'myConsumerGroup');
 
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
 
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
 
$topic = $rk->newTopic("tool", $topicConf);//讀取的管道
 
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
 
while (true) {
  $message = $topic->consume(0, 120*10000);
  switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    //沒有錯誤打印信息
      $message = json_decode(json_encode($message),true);
      $data = json_decode($message['payload'],true);
      var_dump($data);
      break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
      echo "等待接收信息\n";
      break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
      echo "超時\n";
      break;
    default:
      throw new \Exception($message->errstr(), $message->err);
      break;
  }
 sleep(1);
}
 
?>

到此這篇關于Laravel中Kafka的使用詳解的文章就介紹到這了,更多相關Laravel中Kafka內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • 淺談Laravel中使用Slack進行異常通知
  • 如何用Laravel包含你自己的幫助函數(shù)
  • 詳解Laravel框架的依賴注入功能
  • php+laravel 掃碼二維碼簽到功能
  • laravel的數(shù)據(jù)表填充器使用詳解
  • laravel ajax curd 搜索登錄判斷功能的實現(xiàn)
  • laravel使用redis隊列實例講解
  • Laravel的加密解密與哈希實例講解
  • Laravel中10個有用的用法小結
  • 分析五個Laravel Dusk的使用技巧

標簽:克拉瑪依 唐山 遼陽 鶴崗 鷹潭 柳州 白城 六安

巨人網絡通訊聲明:本文標題《Laravel中Kafka的使用詳解》,本文關鍵詞  Laravel,中,Kafka,的,使用,詳解,;如發(fā)現(xiàn)本文內容存在版權問題,煩請?zhí)峁┫嚓P信息告之我們,我們將及時溝通與處理。本站內容系統(tǒng)采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《Laravel中Kafka的使用詳解》相關的同類信息!
  • 本頁收集關于Laravel中Kafka的使用詳解的相關信息資訊供網民參考!
  • 推薦文章