本文實例講述了php測試kafka項目。分享給大家供大家參考,具體如下:
概述
Kafka是最初由Linkedin公司開發(fā),是一個分布式、分區(qū)的、多副本的、多訂閱者,基于zookeeper協(xié)調(diào)的分布式日志系統(tǒng)(也可以當做MQ系統(tǒng)),常見可以用于web/nginx日志、訪問日志,消息服務(wù)等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。
主要應(yīng)用場景是:日志收集系統(tǒng)和消息系統(tǒng)。
安裝kafka-php項目依賴
composer require nmred/kafka-php
produce.php
?php
require './vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('0.10.2.1');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(function() {
$t = time();
return array(
array(
'topic' => 'test',
'value' => $t,
'key' => $t,
),
);
});
$producer->success(function($result) {
var_export($result);
});
$producer->error(function($errorCode) {
var_dump('error', $errorCode);
});
$producer->send();
consumer.php
?php
require './vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.10.2.1');
$config->setTopics(array('test'));
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
測試生產(chǎn)者
測試消費者
更多關(guān)于PHP相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《php面向?qū)ο蟪绦蛟O(shè)計入門教程》、《PHP數(shù)組(Array)操作技巧大全》、《PHP基本語法入門教程》、《PHP運算與運算符用法總結(jié)》、《php字符串(string)用法總結(jié)》、《php+mysql數(shù)據(jù)庫操作入門教程》及《php常見數(shù)據(jù)庫操作技巧匯總》
希望本文所述對大家PHP程序設(shè)計有所幫助。
您可能感興趣的文章:- 使用 PHP Masked Package 屏蔽敏感數(shù)據(jù)的實現(xiàn)方法
- 完美解決phpdoc導(dǎo)出文檔中@package的warning及Error的錯誤
- 利用ThinkPHP內(nèi)置的ThinkAjax實現(xiàn)異步傳輸技術(shù)的實現(xiàn)方法
- PHP擴展之kafka安裝應(yīng)用案例詳解