前言
我們經(jīng)常需要根據(jù)用戶(hù)對(duì)自己數(shù)據(jù)的一些操作來(lái)做一些事情.
比如如果用戶(hù)刪除了自己的賬號(hào),我們就給他發(fā)短信罵他,去發(fā)短信求他回來(lái).
類(lèi)似于這種功能,當(dāng)然可以在業(yè)務(wù)邏輯層實(shí)現(xiàn),在收到用戶(hù)的刪除請(qǐng)求之后執(zhí)行這一操作,但是數(shù)據(jù)庫(kù)的binlog為我們提供了另外一種操作方法.
要監(jiān)聽(tīng)binlog,需要兩步,第一步當(dāng)然是你的mysql需要開(kāi)啟這一個(gè)功能,第二個(gè)是要寫(xiě)程序來(lái)對(duì)日志進(jìn)行讀取.
mysql開(kāi)啟binlog.
首先mysql的binlog日常是不打開(kāi)的,因此我們需要:
找到mysql的配置文件my.cnf,這個(gè)因操作系統(tǒng)不一樣,位置也不一定一樣,可以自己找一下,
在其中加入以下內(nèi)容:
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW
之后重啟mysql.
/ ubuntu
service mysql restart
// mac
mysql.server restart
監(jiān)測(cè)是否開(kāi)啟成功
進(jìn)入mysql命令行,執(zhí)行:
show variables like '%log_bin%' ;
如果結(jié)果如下圖,則說(shuō)明成功了:
查看正在寫(xiě)入的binlog狀態(tài):
代碼讀取binlog
引入依賴(lài)
我們使用開(kāi)源的一些實(shí)現(xiàn),這里因?yàn)橐恍┢婀值脑?我選用了mysql-binlog-connector-java這個(gè)包,(官方github倉(cāng)庫(kù))[github.com/shyiko/mysq…]具體依賴(lài)如下:
!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
dependency>
groupId>com.github.shyiko/groupId>
artifactId>mysql-binlog-connector-java/artifactId>
version>0.17.0/version>
/dependency>
當(dāng)然,對(duì)binlog的處理有很多開(kāi)源實(shí)現(xiàn),阿里的cancl就是一個(gè),也可以使用它.
寫(xiě)個(gè)demo
根據(jù)官方倉(cāng)庫(kù)中readme里面,來(lái)簡(jiǎn)單的寫(xiě)個(gè)demo.
public static void main(String[] args) {
BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
// TODO
dosomething();
logger.info(event.toString());
}
});
client.connect();
}
這個(gè)完全是根據(jù)官方教程里面寫(xiě)的,在onEvent里面可以寫(xiě)自己的業(yè)務(wù)邏輯,由于我只是測(cè)試,所以我在里面將每一個(gè)event都打印了出來(lái).
之后我手動(dòng)登錄到mysql,分別進(jìn)行了增加,修改,刪除操作,監(jiān)聽(tīng)到的log如下:
00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
{before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}
根據(jù)自己的業(yè)務(wù),封裝一個(gè)更好使,更定制的工具類(lèi)
開(kāi)始的時(shí)候打算貼代碼的,,,但是代碼越寫(xiě)越多,索性傳在github上了,這里只貼部分的實(shí)現(xiàn).代碼傳送門(mén)
實(shí)現(xiàn)思路
- 支持對(duì)單個(gè)表的監(jiān)聽(tīng),因?yàn)槲覀儾幌胝娴膶?duì)所有數(shù)據(jù)庫(kù)中的所有數(shù)據(jù)表進(jìn)行監(jiān)聽(tīng).
- 可以多線(xiàn)程消費(fèi).
- 把監(jiān)聽(tīng)到的內(nèi)容轉(zhuǎn)換成我們喜聞樂(lè)見(jiàn)的形式(文中的數(shù)據(jù)結(jié)構(gòu)不一定很好,我沒(méi)想到更加合適的了).
所以實(shí)現(xiàn)思路大致如下:
- 封裝個(gè)客戶(hù)端,對(duì)外只提供獲取方法,屏蔽掉初始化的細(xì)節(jié)代碼.
- 提供注冊(cè)監(jiān)聽(tīng)器(偽)的方法,可以注冊(cè)對(duì)某個(gè)表的監(jiān)聽(tīng)(重新定義一個(gè)監(jiān)聽(tīng)接口,所有注冊(cè)的監(jiān)聽(tīng)器實(shí)現(xiàn)這個(gè)就好).
- 真正的監(jiān)聽(tīng)器只有客戶(hù)端,他將此數(shù)據(jù)庫(kù)實(shí)例上的所有操作,全部監(jiān)聽(tīng)到并轉(zhuǎn)換成我們想要的格式LogItem放進(jìn)阻塞隊(duì)列里面.
- 啟動(dòng)多個(gè)線(xiàn)程,消費(fèi)阻塞隊(duì)列,對(duì)某一個(gè)LogItem調(diào)用對(duì)應(yīng)的數(shù)據(jù)表的監(jiān)聽(tīng)器,做一些業(yè)務(wù)邏輯.
初始化代碼:
public MysqlBinLogListener(Conf conf) {
BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
this.parseClient = client;
this.queue = new ArrayBlockingQueue>(1024);
this.conf = conf;
listeners = new ConcurrentHashMap>();
dbTableCols = new ConcurrentHashMap>();
this.consumer = Executors.newFixedThreadPool(consumerThreads);
}
注冊(cè)代碼:
public void regListener(String db, String table, BinLogListener listener) throws Exception {
String dbTable = getdbTable(db, table);
Class.forName("com.mysql.jdbc.Driver");
// 保存當(dāng)前注冊(cè)的表的colum信息
Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
MapString, Colum> cols = getColMap(connection, db, table);
dbTableCols.put(dbTable, cols);
// 保存當(dāng)前注冊(cè)的listener
ListBinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList>());
list.add(listener);
listeners.put(dbTable, list);
}
在這個(gè)步驟中,我們?cè)谧?cè)監(jiān)聽(tīng)者的同時(shí),獲得了該表的schema信息,并保存到map里面去,方便后續(xù)對(duì)數(shù)據(jù)進(jìn)行處理.
監(jiān)聽(tīng)代碼:
@Override
public void onEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableData = event.getData();
String db = tableData.getDatabase();
String table = tableData.getTable();
dbTable = getdbTable(db, table);
}
// 只處理添加刪除更新三種操作
if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
if (isWrite(eventType)) {
WriteRowsEventData data = event.getData();
for (Serializable[] row : data.getRows()) {
if (dbTableCols.containsKey(dbTable)) {
LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
e.setDbTable(dbTable);
queue.add(e);
}
}
}
}
}
我偷懶了,,,這里面只實(shí)現(xiàn)了對(duì)添加操作的處理,其他操作沒(méi)有寫(xiě).
消費(fèi)代碼:
public void parse() throws IOException {
parseClient.registerEventListener(this);
for (int i = 0; i consumerThreads; i++) {
consumer.submit(() -> {
while (true) {
if (queue.size() > 0) {
try {
LogItem item = queue.take();
String dbtable = item.getDbTable();
listeners.get(dbtable).forEach(l -> {
l.onEvent(item);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Thread.sleep(1000);
}
});
}
parseClient.connect();
}
消費(fèi)時(shí),從隊(duì)列中獲取item,之后獲取對(duì)應(yīng)的一個(gè)或者多個(gè)監(jiān)聽(tīng)者,分別消費(fèi)這個(gè)item.
測(cè)試代碼:
public static void main(String[] args) throws Exception {
Conf conf = new Conf();
conf.host = "hostname";
conf.port = 3306;
conf.username = conf.passwd = "hhsgsb";
MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
mysqlBinLogListener.parseArgsAndRun(args);
mysqlBinLogListener.regListener("pf", "student", item -> {
System.out.println(new String((byte[])item.getAfter().get("name")));
logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
});
mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));
mysqlBinLogListener.parse();
}
在這段很少的代碼里,注冊(cè)了兩個(gè)監(jiān)聽(tīng)者,分別監(jiān)聽(tīng)student和teacher表,并分別進(jìn)行打印處理,經(jīng)測(cè)試,在teacher表插入數(shù)據(jù)時(shí),可以獨(dú)立的運(yùn)行定義的業(yè)務(wù)邏輯.
注意:這里的工具類(lèi)并不能直接投入使用,因?yàn)槔锩嬗性S多的異常處理沒(méi)有做,且功能僅監(jiān)聽(tīng)了插入語(yǔ)句,可以用來(lái)做實(shí)現(xiàn)的參考.
參考文章
- github.com/shyiko/mysq…
- https://www.jb51.net/article/166761.htm
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)腳本之家的支持。
您可能感興趣的文章:- MySQL系列之redo log、undo log和binlog詳解
- MySQL binlog_ignore_db 參數(shù)的具體使用
- MySQL中使用binlog時(shí)格式該如何選擇
- 詳解監(jiān)聽(tīng)MySQL的binlog日志工具分析:Canal
- MySQL8.0中binlog的深入講解
- MYSQL中binlog優(yōu)化的一些思考匯總
- Mysql數(shù)據(jù)庫(kù)清理binlog日志命令詳解
- 如何區(qū)分MySQL的innodb_flush_log_at_trx_commit和sync_binlog