99插插插,亚洲伦理中文在线,站长推荐一二三区欧美,青青草在线导航

×

基于Flink和Drools的實時日志處理

分類:互聯(lián)網(wǎng)熱點 編輯:聊聊云計算 瀏覽量:429
2020-07-13 16:55:23
**背景** 日志系統(tǒng)接入的日志種類多、格式復(fù)雜多樣,主流的有以下幾種日志: - Filebeat采集到的文本日志,格式多樣 - Winbeat采集到的操作系統(tǒng)日志 - 設(shè)備上報到Logstash的syslog日志 - 接入到Kafka的業(yè)務(wù)日志 以上通過各種渠道接入的日志,存在2個主要的問題: - 格式不統(tǒng)一、不規(guī)范、標準化不夠 - 如何從各類日志中提取出用戶關(guān)心的指標,挖掘更多的業(yè)務(wù)價值 為了解決上面2個問題,我們基于Flink和Drools規(guī)則引擎做了實時的日志處理服務(wù)。 **系統(tǒng)架構(gòu)** 架構(gòu)比較簡單,架構(gòu)圖如下: 各類日志都是通過Kafka匯總,做日志中轉(zhuǎn)。 Flink消費Kafka的數(shù)據(jù),同時通過API調(diào)用拉取Drools規(guī)則引擎,對日志做解析處理后,將解析后的數(shù)據(jù)存儲到Elasticsearch中,用于日志的搜索和分析等業(yè)務(wù)。 為了監(jiān)控日志解析的實時狀態(tài),F(xiàn)link會將日志處理的統(tǒng)計數(shù)據(jù),如每分鐘處理的日志量,每種日志從各個機器IP來的日志量寫到Redis中,用于監(jiān)控統(tǒng)計。 **模塊介紹** 系統(tǒng)項目命名為Eagle。 eagle-api:基于Spring Boot,作為Drools規(guī)則引擎的寫入和讀取API服務(wù)。 eagle-common:通用類模塊。 eagle-log:基于Flink的日志處理服務(wù)。 重點講一下eagle-log: **對接kafka、ES和Redis** 對接Kafka和ES都比較簡單,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),詳見代碼。 對接Redis,最開始用的是org.apache.bahir提供的redis connector,后來發(fā)現(xiàn)靈活度不夠,就使用了Jedis。 在將統(tǒng)計數(shù)據(jù)寫入redis的時候,最開始用的keyby分組后緩存了分組數(shù)據(jù),在sink中做統(tǒng)計處理后寫入,參考代碼如下: ``` String name = "redis-agg-log"; DataStream>> keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex()) .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime)) .process(new ProcessWindowFunction>, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable iterable, Collector>> collector) { ArrayList logs = Lists.newArrayList(iterable); if (logs.size() > 0) { collector.collect(new Tuple2(s, logs)); } } }).setParallelism(redisSinkParallelism).name(name).uid(name); ``` 后來發(fā)現(xiàn)這樣做對內(nèi)存消耗比較大,其實不需要緩存整個分組的原始數(shù)據(jù),只需要一個統(tǒng)計數(shù)據(jù)就OK了,優(yōu)化后: ``` String name = "redis-agg-log"; DataStream keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex()) .timeWindow(Time.seconds(windowTime)) .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime)) .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction()) .setParallelism(redisSinkParallelism).name(name).uid(name); ``` 這里使用了Flink的聚合函數(shù)和Accumulator,通過Flink的agg操作做統(tǒng)計,減輕了內(nèi)存消耗的壓力。 **使用Broadcast廣播Drools規(guī)則引擎** 1、Drools規(guī)則流通過broadcast map state廣播出去。 2、Kafka的數(shù)據(jù)流connect規(guī)則流處理日志。 ``` //廣播規(guī)則流 env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1) .broadcast(ruleStateDescriptor); //Kafka數(shù)據(jù)流 FlinkKafkaConsumer010 source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties); env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism); //數(shù)據(jù)流connect規(guī)則流處理日志 BroadcastConnectedStream connectedStreams = dataSource.connect(ruleSource); connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name); ``` 具體細節(jié)參考開源代碼。 **小結(jié)** 本系統(tǒng)提供了一個基于Flink的實時數(shù)據(jù)處理參考,對接了Kafka、Redis和Elasticsearch,通過可配置的Drools規(guī)則引擎,將數(shù)據(jù)處理邏輯配置化和動態(tài)化。 對于處理后的數(shù)據(jù),也可以對接到其他sink,為其他各類業(yè)務(wù)平臺提供數(shù)據(jù)的解析、清洗和標準化服務(wù)。 > 【棲號在線課堂】每天都有產(chǎn)品技術(shù)專家分享! > 課程地址:https://yqh.aliyun.com/live > 立即加入社群,與專家面對面,及時了解課程最新動態(tài)! > 【云棲號在線課堂 社群】https://c.tb.cn/F3.Z8gvnK 原文發(fā)布時間:2020-07-09 本文作者: aoxiang 本文來自:“”,了解相關(guān)信息可以關(guān)注“dockone”

聲明:免責(zé)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻自行上傳,本網(wǎng)站不擁有所有權(quán),也不承認相關(guān)法律責(zé)任。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,請發(fā)

送郵件至:operations@xinnet.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,本站將立刻刪除涉嫌侵權(quán)內(nèi)容。本站原創(chuàng)內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時

需注明出處:新網(wǎng)idc知識百科

免費咨詢獲取折扣

Loading
又大又粗的中文字幕| 艹熟女B网站| 色五月丁香婷婷| 日韩午夜成人影片| 欧美激情视频二区| 淫乱熟女欧美一区二区| 内射少妇第一区| 伊人大香蕉视频网| 深夜福利麻豆视频| 明星ai在线换脸午夜| 日韩久久成人一品女优特一级| 国产裸体美女免费观看视频| 久久久日本看| 天天操天天操天天操不卡视频| 日本综合在线| 蜜桃视频日韩一区二三区| 亚洲国产欧美麻豆新视频| 日本免费电影久久久久| 日韩人妻在线播放不卡| 超碰二区三区| 欧美乱码电影一区二区三区| 激情深爰五月婷婷在线| 哦久久久aaa欧美久久久| 欧美一级黄片免费观看的| 日韩欧美一级黄| 久久影院欧美一级黄| 欧美久久免费观看下载| 成人大黄区大片成人区| 久久国产十亚洲| xxx欧美国产乱伦综合有声| 大秀这里只有精品| 91啦中文视频| 日韩美天天操| 日本a三级经典视频久久久人妻| 加勒比av在线播放网址| 91最新午夜视频在线观看| 色呦呦国产成人| 欧美性生活视频网站免费| av.91在线| caoporn个人首页| 鼻窦炎药图片大全|