久久久精品一区ed2k-女人被男人叉到高潮的视频-中文字幕乱码一区久久麻豆樱花-俄罗斯熟妇真实视频

基于Flink1.11.0是怎樣實(shí)現(xiàn)Flink的Watermark機(jī)制-創(chuàng)新互聯(lián)

基于Flink 1.11.0是怎樣實(shí)現(xiàn)Flink的Watermark機(jī)制,相信很多沒有經(jīng)驗(yàn)的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

創(chuàng)新互聯(lián)公司主營新羅網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app軟件定制開發(fā),新羅h5小程序制作搭建,新羅網(wǎng)站營銷推廣歡迎新羅等地區(qū)企業(yè)咨詢

在使用eventTime的時候如何處理亂序數(shù)據(jù)?我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)延遲等原因,導(dǎo)致亂序的產(chǎn)生,特別是使用kafka的話,多個分區(qū)的數(shù)據(jù)無法保證有序。所以在進(jìn)行window計算的時候,我們又不能無限期的等下去,必須要有個機(jī)制來保證一個特定的時間后,必須觸發(fā)window去進(jìn)行計算了。這個特別的機(jī)制,就是watermark。Watermark是用于處理亂序事件的,用于衡量Event Time進(jìn)展的機(jī)制。watermark可以翻譯為水位線。

一、Watermark的核心原理

Watermark的核心本質(zhì)可以理解成一個延遲觸發(fā)機(jī)制。
在 Flink 的窗口處理過程中,如果確定全部數(shù)據(jù)到達(dá),就可以對 Window 的所有數(shù)據(jù)做 窗口計算操作(如匯總、分組等),如果數(shù)據(jù)沒有全部到達(dá),則繼續(xù)等待該窗口中的數(shù)據(jù)全 部到達(dá)才開始處理。這種情況下就需要用到水位線(WaterMarks)機(jī)制,它能夠衡量數(shù)據(jù)處 理進(jìn)度(表達(dá)數(shù)據(jù)到達(dá)的完整性),保證事件數(shù)據(jù)(全部)到達(dá) Flink 系統(tǒng),或者在亂序及 延遲到達(dá)時,也能夠像預(yù)期一樣計算出正確并且連續(xù)的結(jié)果。當(dāng)任何 Event 進(jìn)入到 Flink 系統(tǒng)時,會根據(jù)當(dāng)前大事件時間產(chǎn)生 Watermarks 時間戳。

那么 Flink 是怎么計算 Watermak 的值呢?

Watermark =進(jìn)入Flink 的大的事件時間(mxtEventTime)-指定的延遲時間(t)

那么有 Watermark 的 Window 是怎么觸發(fā)窗口函數(shù)的呢?
如果有窗口的停止時間等于或者小于 maxEventTime - t(當(dāng)時的warkmark),那么這個窗口被觸發(fā)執(zhí)行。

其核心處理流程如下圖所示。

基于Flink 1.11.0是怎樣實(shí)現(xiàn)Flink的Watermark機(jī)制

二、Watermark的三種使用情況

1、本來有序的Stream中的 Watermark

如果數(shù)據(jù)元素的事件時間是有序的,Watermark 時間戳?xí)S著數(shù)據(jù)元素的事件時間按順 序生成,此時水位線的變化和事件時間保持一直(因?yàn)榧热皇怯行虻臅r間,就不需要設(shè)置延遲了,那么t就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想狀態(tài)下的水位 線。當(dāng) Watermark 時間大于 Windows 結(jié)束時間就會觸發(fā)對 Windows 的數(shù)據(jù)計算,以此類推, 下一個 Window 也是一樣。這種情況其實(shí)是亂序數(shù)據(jù)的一種特殊情況。

2、亂序事件中的Watermark

現(xiàn)實(shí)情況下數(shù)據(jù)元素往往并不是按照其產(chǎn)生順序接入到 Flink 系統(tǒng)中進(jìn)行處理,而頻繁 出現(xiàn)亂序或遲到的情況,這種情況就需要使用 Watermarks 來應(yīng)對。比如下圖,設(shè)置延遲時間t為2。

3、并行數(shù)據(jù)流中的Watermark

在多并行度的情況下,Watermark 會有一個對齊機(jī)制,這個對齊機(jī)制會取所有 Channel 中最小的 Watermark。

三、設(shè)置Watermark的核心代碼

1、首先,正確設(shè)置事件處理的時間語義,一般都是采用Event Time。

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

2、其次,指定生成Watermark的機(jī)制,包括:延時處理的時間和EventTime對應(yīng)的字段。如下:

基于Flink 1.11.0是怎樣實(shí)現(xiàn)Flink的Watermark機(jī)制

注意:不管是數(shù)據(jù)是否有序,都可以使用上面的代碼。有序的數(shù)據(jù)只是無序數(shù)據(jù)的一種特殊情況。

四、Watermark編程案例

測試數(shù)據(jù):基站的手機(jī)通話數(shù)據(jù),如下:

基于Flink 1.11.0是怎樣實(shí)現(xiàn)Flink的Watermark機(jī)制

需求:按基站,每5秒統(tǒng)計通話時間最長的記錄。

  • StationLog用于封裝基站數(shù)據(jù)

package watermark;
//station1,18688822219,18684812319,10,1595158485855
public class StationLog {
    private String stationID;   //基站ID
    private String from;        //呼叫放
    private String to;            //被叫方
    private long duration;        //通話的持續(xù)時間
    private long callTime;        //通話的呼叫時間
    public StationLog(String stationID, String from, 
                      String to, long duration, 
                      long callTime) {
        this.stationID = stationID;
        this.from = from;
        this.to = to;
        this.duration = duration;
        this.callTime = callTime;
    }
    public String getStationID() {
        return stationID;
    }
    public void setStationID(String stationID) {
        this.stationID = stationID;
    }
    public long getCallTime() {
        return callTime;
    }
    public void setCallTime(long callTime) {
        this.callTime = callTime;
    }
    public String getFrom() {
        return from;
    }
    public void setFrom(String from) {
        this.from = from;
    }
    public String getTo() {
        return to;
    }
    public void setTo(String to) {
        this.to = to;
    }
    public long getDuration() {
        return duration;
    }
    public void setDuration(long duration) {
        this.duration = duration;
    }
}
  • 代碼實(shí)現(xiàn):WaterMarkDemo用于完成計算(注意:為了方便咱們測試設(shè)置任務(wù)的并行度為1)

package watermark;
import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
//每隔五秒,將過去是10秒內(nèi),通話時間最長的通話日志輸出。
public class WaterMarkDemo {
    public static void main(String[] args) throws Exception {
        //得到Flink流式處理的運(yùn)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //設(shè)置周期性的產(chǎn)生水位線的時間間隔。當(dāng)數(shù)據(jù)流很大的時候,如果每個事件都產(chǎn)生水位線,會影響性能。
        env.getConfig().setAutoWatermarkInterval(100);//默認(rèn)100毫秒
        //得到輸入流
        DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234);
        stream.flatMap(new FlatMapFunction<String, StationLog>() {
            public void flatMap(String data, Collector<StationLog> output) throws Exception {
                String[] words = data.split(",");
                //                           基站ID            from    to        通話時長                                                    callTime
                output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4])));
            }
        }).filter(new FilterFunction<StationLog>() {
            @Override
            public boolean filter(StationLog value) throws Exception {
                return value.getDuration() > 0?true:false;
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
                    @Override
                    public long extractTimestamp(StationLog element, long recordTimestamp) {
                        return element.getCallTime(); //指定EventTime對應(yīng)的字段
                    }
                })
        ).keyBy(new KeySelector<StationLog, String>(){
            @Override
            public String getKey(StationLog value) throws Exception {
                return value.getStationID();  //按照基站分組
            }}
        ).timeWindow(Time.seconds(5)) //設(shè)置時間窗口
        .reduce(new MyReduceFunction(),new MyProcessWindows()).print();
        env.execute();
    }
}
//用于如何處理窗口中的數(shù)據(jù),即:找到窗口內(nèi)通話時間最長的記錄。
class MyReduceFunction implements ReduceFunction<StationLog> {
    @Override
    public StationLog reduce(StationLog value1, StationLog value2) throws Exception {
        // 找到通話時間最長的通話記錄
        return value1.getDuration() >= value2.getDuration() ? value1 : value2;
    }
}
//窗口處理完成后,輸出的結(jié)果是什么
class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> {
    @Override
    public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context,
            Iterable<StationLog> elements, Collector<String> out) throws Exception {
        StationLog maxLog = elements.iterator().next();
        StringBuffer sb = new StringBuffer();
        sb.append("窗口范圍是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");;
        sb.append("基站ID:").append(maxLog.getStationID()).append("\t")
          .append("呼叫時間:").append(maxLog.getCallTime()).append("\t")
          .append("主叫號碼:").append(maxLog.getFrom()).append("\t")
          .append("被叫號碼:")    .append(maxLog.getTo()).append("\t")
          .append("通話時長:").append(maxLog.getDuration()).append("\n");
        out.collect(sb.toString());
    }
}

看完上述內(nèi)容,你們掌握基于Flink 1.11.0是怎樣實(shí)現(xiàn)Flink的Watermark機(jī)制的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道,感謝各位的閱讀!

本文標(biāo)題:基于Flink1.11.0是怎樣實(shí)現(xiàn)Flink的Watermark機(jī)制-創(chuàng)新互聯(lián)
瀏覽地址:http://www.sd-ha.com/article16/phhgg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)、營銷型網(wǎng)站建設(shè)網(wǎng)站營銷、做網(wǎng)站、App開發(fā)、網(wǎng)站設(shè)計

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司