主頁 > 知識(shí)庫(kù) > PostgreSQL 數(shù)據(jù)同步到ES 搭建操作

PostgreSQL 數(shù)據(jù)同步到ES 搭建操作

熱門標(biāo)簽:海豐有多少商家沒有地圖標(biāo)注 合肥公司外呼系統(tǒng)運(yùn)營(yíng)商 外呼調(diào)研系統(tǒng) 打電話智能電銷機(jī)器人授權(quán) 漯河外呼電話系統(tǒng) 美容工作室地圖標(biāo)注 地圖標(biāo)注和圖片名稱的區(qū)別 辦公外呼電話系統(tǒng) 重慶自動(dòng)外呼系統(tǒng)定制

安裝python 和dev 開發(fā)包

[root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm
準(zhǔn)備中...       ################################# [100%]
正在升級(jí)/安裝...
 1:python-devel-2.7.5-58.el7  ################################# [100%]
[root@rtm2 Packages]# ls

安裝 multicorn

[root@rtm2 multicorn-1.3.5]# make
Python version is 2.7
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl,--as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic
.//preflight-check.sh
cp sql/multicorn.sql sql/multicorn--1.3.5.sql
[root@rtm2 multicorn-1.3.5]# make install
Python version is 2.7
...

安裝pg-es-fdw-master

[root@rtm2 multicorn-1.3.5]# cd ../pg-es-fdw-master
[root@rtm2 pg-es-fdw-master]# ls
demo.sh dite LICENSE README.md setup.py
[root@rtm2 pg-es-fdw-master]# python setup.py build
running build
running build_py
creating build
creating build/lib
creating build/lib/dite
copying dite/__init__.py -> build/lib/dite
[root@rtm2 pg-es-fdw-master]# python setup.py install
running install
running bdist_egg
running egg_info
creating dite.egg-info
writing dite.egg-info/PKG-INFO

安裝插件 multicorn

[postgres@rtm2 ~]$ psql
psql (10.3)
Type "help" for help.
postgres=# select * from pg_extension;
 extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
---------+----------+--------------+----------------+------------+-----------+--------------
 plpgsql |  10 |   11 | f    | 1.0  |   |
(1 row)
postgres=# CREATE EXTENSION multicorn;
CREATE EXTENSION
postgres=# psql
postgres=# select * from pg_extension;
 extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
-----------+----------+--------------+----------------+------------+-----------+--------------
 plpgsql |  10 |   11 | f    | 1.0  |   |
 multicorn |  10 |   2200 | t    | 1.3.5  |   |
(2 rows)
postgres=# CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticsearchFDW');
CREATE SERVER
postgres=#

es

[root@rtm2 config]# vi elasticsearch.yml
node.name: "es-node1"
network.host: 192.168.31.121
discovery.zen.ping.unicast.hosts: ["192.168.31.121"]
[root@rtm2 config]# vi /etc/sysctl.conf
vm.max_map_count=262144
sysctl -p
[root@rtm2 config]# vi /etc/security/limits.conf
# End of file
 root soft nofile 65536
root hard nofile 65536
root soft nproc 4096
root hard nproc 4096
~

啟動(dòng)es

[root@rtm2 bin]# ls
elasticsearch  elasticsearch.in.bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.bat
elasticsearch.bat elasticsearch.in.sh elasticsearch-service-x64.exe plugin       service.bat
[root@rtm2 bin]# ./bin/elasticsearch
test=# CREATE FOREIGN TABLE pp_es (id bigint,age bigint) SERVER multicorn_es OPTIONS (host
test(# '192.168.31.121', port '9200', node 'es-node1', index 'pp');
CREATE FOREIGN TABLE
test=#

創(chuàng)建觸發(fā)器和外部表

test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$
test$# BEGIN
test$# INSERT INTO pp_es (id, age) VALUES
test$# (NEW.id, NEW.age);
test$# RETURN NEW;
test$# END;
test$# $def$ LANGUAGE plpgsql;
CREATE FUNCTION
test=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR EACH ROW EXECUTE PROCEDURE index_pp();
CREATE TRIGGER
test=#

新增數(shù)據(jù)測(cè)試

test=# insert into pp (id,age) values (1,11);
INSERT 0 1
test=# select * from pp;
 id | age
----+-----
 1 | 11
(1 row)
test=#

檢查es數(shù)據(jù)

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*pretty'
{
 "took" : 104,
 "timed_out" : false,
 "_shards" : {
 "total" : 5,
 "successful" : 5,
 "failed" : 0
 },
 "hits" : {
 "total" : 2,
 "max_score" : 1.0,
 "hits" : [ {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "1",
  "_score" : 1.0,
  "_source":{"age": "11"}
 }, {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "2",
  "_score" : 1.0,
  "_source":{"age": "22"}
 } ]
 }
}
[root@rtm2 ~]#

創(chuàng)建更新觸發(fā)器

test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$
BEGIN
UPDATE pp_es SET
id = NEW.id,
age = NEW.age
where id =NEW.id;
RETURN NEW;
END;
$def$ LANGUAGE plpgsql;
CREATE FUNCTION
test=# ^C
test=#
test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCT
test(# FROM NEW.*)EXECUTE PROCEDURE updadeIndex_pp();
CREATE TRIGGER
test=#

更新表數(shù)據(jù)

test=# select * from pp;
 id | age
----+-----
 1 | 11
 2 | 22
 3 | 22
(3 rows)
test=# update pp a set a.age = 33 where a.id = 3;
ERROR: column "a" of relation "pp" does not exist
LINE 1: update pp a set a.age = 33 where a.id = 3;
      ^
test=# update pp set age = 33 where id = 3;
UPDATE 1
test=# select * from pp;
 id | age
----+-----
 1 | 11
 2 | 22
 3 | 33
(3 rows)
test=#

es查詢變更

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*pretty'
{
 "took" : 4,
 "timed_out" : false,
 "_shards" : {
 "total" : 5,
 "successful" : 5,
 "failed" : 0
 },
 "hits" : {
 "total" : 3,
 "max_score" : 1.0,
 "hits" : [ {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "1",
  "_score" : 1.0,
  "_source":{"age": "11"}
 }, {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "2",
  "_score" : 1.0,
  "_source":{"age": "22"}
 }, {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "3",
  "_score" : 1.0,
  "_source":{"age": "33"}
 } ]
 }
}
[root@rtm2 ~]# 

補(bǔ)充:logstash同步pgsql數(shù)據(jù)到Elasticsearch

一、對(duì)于logstash的配置我就不在多說,主要是三部分,input、filter、output的配置

二、配置步驟

1、input配置

input {
 stdin {
 }
 jdbc {
  jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/world"
  jdbc_user => "postgres"
  jdbc_password => "zhang123"
  jdbc_driver_library => "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"
  jdbc_driver_class => "org.postgresql.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "300000"
  use_column_value => "true"
  tracking_column => "id"
  statement_filepath => "D:\logstash-6.4.0\bin\pgsql\jdbc.sql"
 schedule => "* * * * *"
 type => "jdbc"
 jdbc_default_timezone =>"Asia/Shanghai"
 }
}

2、filter配置

filter {
 json {
  source => "message"
  remove_field => ["message"]
 }
}

3、output 配置,就是elasticsearch的基本配置

output {
 elasticsearch {
  hosts => ["localhost:9200"]
  index => "test_out"
 template => "D:\logstash-6.4.0\bin\pgsql\es-template.json"
 template_name => "t-statistic-out-logstash"
 template_overwrite => true
 document_type => "out"
  document_id => "%{id}"
 }
 stdout {
  codec => json_lines
 }
}

以上就是整個(gè)logstash 的jdbc.conf

4、es-template.json的配置

{
 "template" : "t-statistis-out-template", 
 "order":1,
 "settings": {
   "index": {
    "refresh_interval": "5s"
   }
  },
 "mappings": {
   "_default_": {
 "_all" : {"enabled":false}, 
    "dynamic_templates": [
     { 
    "message_field" : { 
    "match" : "message", 
    "match_mapping_type" : "string", 
    "mapping" : { "type" : "string", "index" : "not_analyzed" } 
    } 
   }, { 
    "string_fields" : { 
    "match" : "*", 
    "match_mapping_type" : "string", 
    "mapping" : { "type" : "string", "index" : "not_analyzed" } 
    } 
   }
    ],
    "properties": {
     "@timestamp": {
      "type": "date"
     },
     "@version": {
      "type": "keyword"
     },     
  "id": {
      "type": "keyword"
     },
  "name": {
      "type": "keyword"
     },
  "pp": {
      "type": "keyword"
     }  
    }
   }
  },
  "aliases": {}
 
}

最后就是就是下載好pgsql的連接驅(qū)動(dòng),這個(gè)官網(wǎng)可以下載;配置好自己的數(shù)據(jù)庫(kù)表格的數(shù)據(jù)

啟動(dòng)命令:進(jìn)入到logstash的bin目錄下,自己的logstash配置都是放在bin的pgsql這個(gè)目錄下面(這個(gè)自己隨意創(chuàng)建位置都可以)

logstash.bat -f ./pgsql/jdbc.conf

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。

您可能感興趣的文章:
  • PostgreSQL 主備數(shù)據(jù)宕機(jī)恢復(fù)測(cè)試方案
  • PostgreSQL+Pgpool實(shí)現(xiàn)HA主備切換的操作
  • postgresql 如何查看pg_wal目錄下xlog文件總大小
  • postgresql之使用lsn 獲取 wal文件名的實(shí)例
  • 修改postgresql存儲(chǔ)目錄的操作方式
  • postgresql運(yùn)維之遠(yuǎn)程遷移操作
  • postgresql 12版本搭建及主備部署操作

標(biāo)簽:烏海 珠海 晉城 錦州 蚌埠 株洲 來賓 衡陽

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《PostgreSQL 數(shù)據(jù)同步到ES 搭建操作》,本文關(guān)鍵詞  PostgreSQL,數(shù)據(jù),同步,到,搭建,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《PostgreSQL 數(shù)據(jù)同步到ES 搭建操作》相關(guān)的同類信息!
  • 本頁收集關(guān)于PostgreSQL 數(shù)據(jù)同步到ES 搭建操作的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章