¸üÐÂʱ¼ä:2023Äê10ÔÂ10ÈÕ10ʱ52·Ö À´Ô´:ÀÖÓãµç¾º ä¯ÀÀ´ÎÊý:

¡¡¡¡ÔÚ´óÊý¾Ý´¦ÀíÖУ¬watermarkÊÇÒ»ÖÖʱ¼ä¸ÅÄÓÃÓÚºâÁ¿Ê¼þÁ÷Êý¾ÝµÄ½ø¶È¡£ËüµÄ×÷ÓÃÊÇΪÁË¿ØÖÆÊ¼þʱ¼ä´°¿ÚµÄ¼ÆËã½ø¶ÈÒÔ¼°´¦ÀíÑÓ³Ù¡£
¡¡¡¡¾ßÌå¶øÑÔ£¬watermark¿ÉÒÔ°ÑʼþÁ÷Êý¾Ý°´ÕÕʼþ·¢ÉúµÄʱ¼ä½ø¶È»®·Öµ½²»Í¬µÄʱ¼ä´°¿ÚÖС£ÔÚ´¦ÀíÊý¾ÝµÄ¹ý³ÌÖУ¬±ØÐëÒªµÈµ½Ò»¸öʱ¼ä´°¿ÚµÄËùÓÐÊý¾Ý¶¼µ½´ïºó²ÅÄܽøÐмÆËã¡£¶øwatermark¾ÍÊÇÓÃÀ´Åж¨Ò»¸öʱ¼ä´°¿ÚÄÚµÄÊý¾ÝÊÇ·ñÒѾȫÁ¿µ½´ïµÄ±êÖ¾¡£
¡¡¡¡±£Ö¤Êý¾Ý²»¶ªÊ§µÄ¹Ø¼üÊÇͨ¹ýºÏÀíÉèÖÃwatermarkµÄÉú³ÉºÍ´¦Àí»úÖÆ¡£ÔÚÉú³ÉwatermarkµÄ¹ý³ÌÖУ¬¿ÉÒÔ»ùÓÚʼþÊý¾ÝÖеÄʱ¼ä´ÁÐÅÏ¢À´È·¶¨watermarkµÄλÖ᣶øÔÚ´¦Àíʱ£¬¿ÉÒÔͨ¹ý±È½ÏwatermarkºÍʼþʱ¼ä´ÁµÄ¹ØÏµ£¬ÅжÏʼþÊý¾ÝÊÇ·ñÂäºóÓÚwatermark£¬Èç¹ûÂäºóÔò˵Ã÷ÓÐÊý¾Ý¶ªÊ§¡£
¡¡¡¡ÒÔÏÂÊÇʹÓÃApache FlinkµÄJava APIʾÀý´úÂ룬չʾÈçºÎÔÚÁ÷ʽ´¦ÀíÖÐʹÓÃWatermarkÀ´¿ØÖÆÊ¼þʱ¼ä´°¿ÚµÄ¼ÆËã½ø¶È¡£
// µ¼Èë±ØÒªµÄ°ü
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
public class WatermarkExample {
public static void main(String[] args) throws Exception {
// ÉèÖÃÁ÷ʽִÐл·¾³
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ÉèÖÃʱ¼äÌØÐÔΪʼþʱ¼ä
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// ´´½¨Êý¾ÝÔ´
DataStream<Event> events = env.fromElements(
new Event(1, "2021-01-01T00:00:00"),
new Event(2, "2021-01-01T00:02:00"),
new Event(3, "2021-01-01T00:01:30")
);
// ʹÓÃWatermarkÀ´Ö¸¶¨Ê¼þʱ¼ä
events.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>() {
private final long maxOutOfOrderness = 5000; // ×î´óÂÒÐò³Ì¶ÈΪ5Ãë
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Event event, long previousElementTimestamp) {
long timestamp = event.getTimestamp().toEpochMilli();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
});
// ÔÚÕâÀïÌí¼Ó¸ü¶àµÄÁ÷´¦Àí²Ù×÷£¬Èç´°¿Ú¼ÆËã¡¢¾ÛºÏµÈ
// Ö´ÐÐÁ÷ʽ´¦Àí
env.execute("Watermark Example");
}
// ¶¨ÒåʼþÀà
public static class Event {
private int id;
private LocalDateTime timestamp;
public Event(int id, String timestamp) {
this.id = id;
this.timestamp = LocalDateTime.parse(timestamp);
}
public int getId() {
return id;
}
public LocalDateTime getTimestamp() {
return timestamp;
}
}
}
¡¡¡¡ÔÚÉÏÃæµÄʾÀýÖУ¬ÎÒÃÇÊ×ÏÈÉèÖÃÁËÁ÷ʽִÐл·¾³£¬²¢½«Ê±¼äÌØÐÔÉèÖÃΪʼþʱ¼ä¡£È»ºó£¬ÎÒÃÇ´´½¨ÁËÒ»¸ö°üº¬Èý¸öʼþµÄÊý¾ÝÔ´£¬²¢ÎªÃ¿¸öʼþÖ¸¶¨ÁËʼþʱ¼ä´Á¡£½ÓÏÂÀ´£¬ÎÒÃÇʹÓÃAssignerWithPeriodicWatermarksº¯ÊýÀ´ÎªÊ¼þ·ÖÅäʱ¼ä´ÁºÍWatermark¡£ÔÚÕâ¸öº¯ÊýÖУ¬ÎÒÃǶ¨ÒåÁËÈçºÎÌáȡʼþµÄʱ¼ä´Á£¬²¢¸ù¾Ý×î´óÂÒÐò³Ì¶È¼ÆËãWatermark¡£×îºó£¬ÎÒÃÇ¿ÉÒÔÔÚassignTimestampsAndWatermarks·½·¨ºóÌí¼Ó¸ü¶àµÄÁ÷´¦Àí²Ù×÷£¬Èç´°¿Ú¼ÆËã¡¢¾ÛºÏµÈ¡£
¡¡¡¡ÎªÁ˸üºÃµØ±£Ö¤Êý¾Ý²»¶ªÊ§£¬»¹¿ÉÒÔ²ÉȡһЩ²ßÂÔÀ´´¦ÀíÊý¾ÝÂäºóµÄÇé¿ö£¬±ÈÈçµÈ´ýÒ»¶Îʱ¼äÒԵȴý¿ÉÄܵÄÑÓ³ÙÊý¾Ýµ½´ï£¬»òÕßÉèÖÃÊý¾ÝµÄ×î´óÂÒÐò³Ì¶È£¬³¬¹ýÂÒÐò³Ì¶ÈµÄÊý¾Ý½«±»¶ªÆú¡£Í¬Ê±£¬»¹¿ÉÒÔͨ¹ýÉèÖÃwatermarkµÄ¼ä¸ôʱ¼äÀ´¿ØÖÆÊ¼þʱ¼ä´°¿ÚµÄ´óС£¬ÒÔÊÊÓ¦²»Í¬µÄ´¦ÀíÑÓ³ÙÐèÇó¡£
±±¾©Ð£Çø