ÀÖÓãµç¾º

½ÌÓýÐÐÒµA¹ÉIPOµÚÒ»¹É£¨¹ÉƱ´úÂë 003032£©

È«¹ú×Éѯ/ͶËßÈÈÏߣº400-618-4000

watermarkµÄ×÷ÓÃÊÇʲô?ÔõÑù±£Ö¤Êý¾Ý²»¶ªÊ§?

¸üÐÂʱ¼ä:2023Äê10ÔÂ10ÈÕ10ʱ52·Ö À´Ô´:ÀÖÓãµç¾º ä¯ÀÀ´ÎÊý:

ºÃ¿Ú±®ITÅàѵ

¡¡¡¡ÔÚ´óÊý¾Ý´¦ÀíÖУ¬watermarkÊÇÒ»ÖÖʱ¼ä¸ÅÄÓÃÓÚºâÁ¿Ê¼þÁ÷Êý¾ÝµÄ½ø¶È¡£ËüµÄ×÷ÓÃÊÇΪÁË¿ØÖÆÊ¼þʱ¼ä´°¿ÚµÄ¼ÆËã½ø¶ÈÒÔ¼°´¦ÀíÑÓ³Ù¡£

¡¡¡¡¾ßÌå¶øÑÔ£¬watermark¿ÉÒÔ°ÑʼþÁ÷Êý¾Ý°´ÕÕʼþ·¢ÉúµÄʱ¼ä½ø¶È»®·Öµ½²»Í¬µÄʱ¼ä´°¿ÚÖС£ÔÚ´¦ÀíÊý¾ÝµÄ¹ý³ÌÖУ¬±ØÐëÒªµÈµ½Ò»¸öʱ¼ä´°¿ÚµÄËùÓÐÊý¾Ý¶¼µ½´ïºó²ÅÄܽøÐмÆËã¡£¶øwatermark¾ÍÊÇÓÃÀ´Åж¨Ò»¸öʱ¼ä´°¿ÚÄÚµÄÊý¾ÝÊÇ·ñÒѾ­È«Á¿µ½´ïµÄ±êÖ¾¡£

¡¡¡¡±£Ö¤Êý¾Ý²»¶ªÊ§µÄ¹Ø¼üÊÇͨ¹ýºÏÀíÉèÖÃwatermarkµÄÉú³ÉºÍ´¦Àí»úÖÆ¡£ÔÚÉú³ÉwatermarkµÄ¹ý³ÌÖУ¬¿ÉÒÔ»ùÓÚʼþÊý¾ÝÖеÄʱ¼ä´ÁÐÅÏ¢À´È·¶¨watermarkµÄλÖ᣶øÔÚ´¦Àíʱ£¬¿ÉÒÔͨ¹ý±È½ÏwatermarkºÍʼþʱ¼ä´ÁµÄ¹ØÏµ£¬ÅжÏʼþÊý¾ÝÊÇ·ñÂäºóÓÚwatermark£¬Èç¹ûÂäºóÔò˵Ã÷ÓÐÊý¾Ý¶ªÊ§¡£

¡¡¡¡ÒÔÏÂÊÇʹÓÃApache FlinkµÄJava APIʾÀý´úÂ룬չʾÈçºÎÔÚÁ÷ʽ´¦ÀíÖÐʹÓÃWatermarkÀ´¿ØÖÆÊ¼þʱ¼ä´°¿ÚµÄ¼ÆËã½ø¶È¡£

// µ¼Èë±ØÒªµÄ°ü
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

public class WatermarkExample {

    public static void main(String[] args) throws Exception {
        // ÉèÖÃÁ÷ʽִÐл·¾³
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // ÉèÖÃʱ¼äÌØÐÔΪʼþʱ¼ä
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // ´´½¨Êý¾ÝÔ´
        DataStream<Event> events = env.fromElements(
                new Event(1, "2021-01-01T00:00:00"),
                new Event(2, "2021-01-01T00:02:00"),
                new Event(3, "2021-01-01T00:01:30")
        );

        // ʹÓÃWatermarkÀ´Ö¸¶¨Ê¼þʱ¼ä
        events.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Event>() {
            private final long maxOutOfOrderness = 5000; // ×î´óÂÒÐò³Ì¶ÈΪ5Ãë
            private long currentMaxTimestamp;

            @Override
            public long extractTimestamp(Event event, long previousElementTimestamp) {
                long timestamp = event.getTimestamp().toEpochMilli();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
            }

            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }
        });

        // ÔÚÕâÀïÌí¼Ó¸ü¶àµÄÁ÷´¦Àí²Ù×÷£¬Èç´°¿Ú¼ÆËã¡¢¾ÛºÏµÈ

        // Ö´ÐÐÁ÷ʽ´¦Àí
        env.execute("Watermark Example");
    }

    // ¶¨ÒåʼþÀà
    public static class Event {
        private int id;
        private LocalDateTime timestamp;

        public Event(int id, String timestamp) {
            this.id = id;
            this.timestamp = LocalDateTime.parse(timestamp);
        }

        public int getId() {
            return id;
        }

        public LocalDateTime getTimestamp() {
            return timestamp;
        }
    }
}

¡¡¡¡ÔÚÉÏÃæµÄʾÀýÖУ¬ÎÒÃÇÊ×ÏÈÉèÖÃÁËÁ÷ʽִÐл·¾³£¬²¢½«Ê±¼äÌØÐÔÉèÖÃΪʼþʱ¼ä¡£È»ºó£¬ÎÒÃÇ´´½¨ÁËÒ»¸ö°üº¬Èý¸öʼþµÄÊý¾ÝÔ´£¬²¢ÎªÃ¿¸öʼþÖ¸¶¨ÁËʼþʱ¼ä´Á¡£½ÓÏÂÀ´£¬ÎÒÃÇʹÓÃAssignerWithPeriodicWatermarksº¯ÊýÀ´ÎªÊ¼þ·ÖÅäʱ¼ä´ÁºÍWatermark¡£ÔÚÕâ¸öº¯ÊýÖУ¬ÎÒÃǶ¨ÒåÁËÈçºÎÌáȡʼþµÄʱ¼ä´Á£¬²¢¸ù¾Ý×î´óÂÒÐò³Ì¶È¼ÆËãWatermark¡£×îºó£¬ÎÒÃÇ¿ÉÒÔÔÚassignTimestampsAndWatermarks·½·¨ºóÌí¼Ó¸ü¶àµÄÁ÷´¦Àí²Ù×÷£¬Èç´°¿Ú¼ÆËã¡¢¾ÛºÏµÈ¡£

¡¡¡¡ÎªÁ˸üºÃµØ±£Ö¤Êý¾Ý²»¶ªÊ§£¬»¹¿ÉÒÔ²ÉȡһЩ²ßÂÔÀ´´¦ÀíÊý¾ÝÂäºóµÄÇé¿ö£¬±ÈÈçµÈ´ýÒ»¶Îʱ¼äÒԵȴý¿ÉÄܵÄÑÓ³ÙÊý¾Ýµ½´ï£¬»òÕßÉèÖÃÊý¾ÝµÄ×î´óÂÒÐò³Ì¶È£¬³¬¹ýÂÒÐò³Ì¶ÈµÄÊý¾Ý½«±»¶ªÆú¡£Í¬Ê±£¬»¹¿ÉÒÔͨ¹ýÉèÖÃwatermarkµÄ¼ä¸ôʱ¼äÀ´¿ØÖÆÊ¼þʱ¼ä´°¿ÚµÄ´óС£¬ÒÔÊÊÓ¦²»Í¬µÄ´¦ÀíÑÓ³ÙÐèÇó¡£

0 ·ÖÏíµ½£º
ºÍÎÒÃÇÔÚÏß½»Ì¸£¡
¡¾ÍøÕ¾µØÍ¼¡¿¡¾sitemap¡¿