国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
使用Logstash

 1、數(shù)據(jù)同步方式

全量同步與增量同步

全量同步是指全部將數(shù)據(jù)同步到es,通常是剛建立es,第一次同步時使用。增量同步是指將后續(xù)的更新、插入記錄同步到es。

2、常用的一些ES同步方法

1)、 elasticsearch-jdbc : 嚴格意義上它已經(jīng)不是第三方插件。已經(jīng)成為獨立的第三方工具。不支持5.5.1。。。
2)、elasticsearch-river-mysql插件:   https://github.com/scharron/elasticsearch-river-mysql 
3)、go-mysql-elasticsearch(國內(nèi)作者siddontang) :  https://github.com/siddontang/go-mysql-elasticsearch
4)、python-mysql-replication:  github地址  https://github.com/noplay/python-mysql-replication
5)、MySQL Binlog:  通過 MySQL binlog 將 MySQL 的數(shù)據(jù)同步給 ES, 只能使用 row 模式的 binlog。
6)、Logstash-input-jdbc:  github地址  https://github.com/logstash-plugins/logstash-input-jdbc

3、Logstash-input-jdbc安裝

由于我用的ES版本是5.5.1,elasticsearch-jdbc不支持,只支持2.3.4,這就尷尬了。

所用這里用Logstash-input-jdbc來同步數(shù)據(jù),logstash-input-jdbc插件是logstash 的一個個插件,使用ruby語言開發(fā)。所以要先安裝ruby,也是為了好使用ruby中的gem安裝插件,下載地址: https://rubyinstaller.org/downloads/

下載下來之后,進行安裝

安裝好之后試下是否安裝成功,打開CMD輸入:

OK,然后修改gem的源,使用以下命令查看gem源

gem sources -l

刪除默認的源

gem sources --remove https://rubygems.org/

添加新的源

gem sources -a http://gems.ruby-china.org/gem sources -l

更改成功,還的修改Gemfile的數(shù)據(jù)源地址。步驟如下:

gem install bundlerbundle config mirror.https://rubygems.org https://gems.ruby-china.org

然后就是安裝logstash-input-jdbc,在logstash-5.5.1/bin目錄下

執(zhí)行安裝命令

.\logstash-plugin.bat install logstash-input-jdbc

靜等一會兒,成功之后提示如下

4、Logstash-input-jdbc使用

官方文檔地址 

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

首先在bin目錄下新建一個mysql目錄,里面包含jdbc.conf,jdbc.sql文件,加入mysql的驅(qū)動

jdbc.conf配置如下

input {    stdin {    }    jdbc {      # mysql 數(shù)據(jù)庫鏈接,test為數(shù)據(jù)庫名      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"      # 用戶名和密碼      jdbc_user => "root"      jdbc_password => "root"      # 驅(qū)動      jdbc_driver_library => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\mysql-connector-java-5.1.9.jar"      # 驅(qū)動類名      jdbc_driver_class => "com.mysql.jdbc.Driver"      jdbc_paging_enabled => "true"      jdbc_page_size => "50000"	  # 執(zhí)行的sql 文件路徑+名稱      statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"      # 設置監(jiān)聽間隔  各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鐘都更新	  schedule => "* * * * *"      # 索引類型	  type => "jdbc"    }}filter {    json {        source => "message"        remove_field => ["message"]    }}output {    elasticsearch {	    # ES的IP地址及端口        hosts => ["localhost:9200"]	    # 索引名稱        index => "article"	    # 自增ID 需要關聯(lián)的數(shù)據(jù)庫中有有一個id字段,對應索引的id號        document_id => "%{id}"    }    stdout {	   # JSON格式輸出        codec => json_lines    }}

各數(shù)據(jù)庫對應的鏈接如下:

Driver ="path/to/jdbc-drivers/mysql-connector-java-5.1.35-bin.jar"   //驅(qū)動程序

Class  ="com.mysql.jdbc.Driver"; 

URL  ="jdbc:mysql://localhost:3306/db_name";                           //連接的URL,db_name為數(shù)據(jù)庫名

Driver ="path/to/jdbc-drivers/sqljdbc4.jar"

Class  ="com.microsoft.jdbc.sqlserver.SQLServerDriver";

URL   ="jdbc:microsoft:sqlserver://localhost:1433;DatabaseName=db_name";     //db_name為數(shù)據(jù)庫名

Driver ="path/to/jdbc-drivers/ojdbc6-12.1.0.2.jar"

Class  ="oracle.jdbc.driver.OracleDriver";

URL   ="jdbc:oracle:thin:@loaclhost:1521:orcl";     //orcl為數(shù)據(jù)庫的SID

//連接具有DB2客戶端的Provider實例

Driver ="path/to/jdbc-drivers/jt400.jar"

Class  ="com.ibm.db2.jdbc.app.DB2.Driver";

URL   ="jdbc:db2://localhost:5000/db_name";     //db_name為數(shù)據(jù)可名

Driver ="path/to/jdbc-drivers/postgresql-9.4.1201.jdbc4.jar"

Class  ="org.postgresql.Driver";            //連接數(shù)據(jù)庫的方法

URL   ="jdbc:postgresql://localhost/db_name";      //db_name為數(shù)據(jù)可名

jdbc.sql配置如下:

select * from person

就一條查詢語句對應的表數(shù)據(jù)如下:

注意:這里的jdbc.sql和jdbc.conf文件編碼都必須是ANSI

先啟動ES,然后通過sense創(chuàng)建article索引

UT http://localhost:9200/article

然后通過以下命令啟動logstash

.\logstash.bat -f  .\mysql\jdbc.conf

過一會他就會自動的往ES里添加數(shù)據(jù),輸出的日志如下:

執(zhí)行了SQL查詢。查看下article索引會發(fā)現(xiàn)多出來了很多文檔

我們在數(shù)據(jù)庫增加一條數(shù)據(jù),看他是否自動同步到ES中

靜等一會,發(fā)現(xiàn)logstash的日志

查詢了一篇,ES中的數(shù)據(jù)會多出剛剛插入的那條

下面使用 增量 來新增數(shù)據(jù),需要在jdbc.conf配置文件中做如下修改:

input {    stdin {    }    jdbc {      # mysql 數(shù)據(jù)庫鏈接,test為數(shù)據(jù)庫名      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"      # 用戶名和密碼      jdbc_user => "root"      jdbc_password => "root"      # 驅(qū)動      jdbc_driver_library => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\mysql-connector-java-5.1.9.jar"      # 驅(qū)動類名      jdbc_driver_class => "com.mysql.jdbc.Driver"      #處理中文亂碼問題      codec => plain { charset => "UTF-8"}      #使用其它字段追蹤,而不是用時間      use_column_value => true      #追蹤的字段      tracking_column => id      record_last_run => true     #上一個sql_last_value值的存放文件路徑, 必須要在文件中指定字段的初始值     last_run_metadata_path => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\station_parameter.txt"     #開啟分頁查詢     jdbc_paging_enabled => true     jdbc_page_size => 300	        # 執(zhí)行的sql 文件路徑+名稱      statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"      # 設置監(jiān)聽間隔  各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鐘都更新	  schedule => "* * * * *"      # 索引類型	  type => "jdbc"    }}filter {    json {        source => "message"        remove_field => ["message"]    }}output {    elasticsearch {	    # ES的IP地址及端口        hosts => ["localhost:9200"]		# 索引名稱        index => "article"		# 自增ID        document_id => "%{id}"    }    stdout {	    # JSON格式輸出        codec => json_lines    }}

參數(shù)介紹:

//是否記錄上次執(zhí)行結果, 如果為真,將會把上次執(zhí)行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中record_last_run => true//是否需要記錄某個column 的值,如果 record_last_run 為真,可以自定義我們需要 track 的 column 名稱,此時該參數(shù)就要為 true. 否則默認 track 的是 timestamp 的值.use_column_value => true//如果 use_column_value 為真,需配置此參數(shù). track 的數(shù)據(jù)庫 column 名,該 column 必須是遞增的.比如:ID.tracking_column => MY_ID//指定文件,來記錄上次執(zhí)行到的 tracking_column 字段的值//比如上次數(shù)據(jù)庫有 10000 條記錄,查詢完后該文件中就會有數(shù)字 10000 這樣的記錄,下次執(zhí)行 SQL 查詢可以從 10001 條處開始.//我們只需要在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是該文件中的值(10000).last_run_metadata_path => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\station_parameter.txt"http://是否清除 last_run_metadata_path 的記錄,如果為真那么每次都相當于從頭開始查詢所有的數(shù)據(jù)庫記錄clean_run => false//是否將 column 名稱轉(zhuǎn)小寫lowercase_column_names => false//存放需要執(zhí)行的 SQL 語句的文件位置statement_filepath => "G:\Developer\Elasticsearch5.5.1\ES5\logstash-5.5.1\bin\mysql\jdbc.sql"

這里使用webmagic爬蟲來爬取數(shù)據(jù),導入到數(shù)據(jù)庫中,先運行爬蟲,爬取一些數(shù)據(jù)

這里爬取到了277條,然后啟動logstash,通過logstash導入到ES中去

打開mysql目錄下的station_parameter.txt文件

這個文件里記錄上次執(zhí)行到的 tracking_column 字段的值,比如上次數(shù)據(jù)庫有 10000 條記錄,查詢完后該文件中就會有數(shù)字 10000 這樣的記錄,下次執(zhí)行 SQL 查詢可以從 10001 條處開始,我們只需要在 SQL 語句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是該文件中的值。

然后開啟爬蟲,爬取數(shù)據(jù),往數(shù)據(jù)庫里插,logstash會自動的識別到更新,然后導入到ES中?。?/p>

本站僅提供存儲服務,所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊舉報
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Elasticsearch學習與使用【面試+工作】
如何通過 Docker 部署 Logstash 同步 Mysql 數(shù)據(jù)庫數(shù)據(jù)到 ElasticSearch
logstash_output_kafka:Mysql同步Kafka深入詳解
Spark SQL通過JDBC連接MySQL讀寫數(shù)據(jù)
零代碼如何打造自己的實時監(jiān)控警告系統(tǒng)
如何用開源項目搭建威脅感知大腦 SIEM?| 硬創(chuàng)公開課
更多類似文章 >>
生活服務
分享 收藏 導長圖 關注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服