1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
   | package com.bigdata.flink.dataStreamExtractTimestampAndGenerateWatermark;
  import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark;
  import java.util.Random;
 
 
 
 
  public class ExtractTimestampAndGenerateWatermark {     public static void main(String[] args) throws Exception{
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                   DataStreamSource<Tuple4<String, Long, String, Integer>> source = env.addSource(new ExampleSourceFunction());
          env.execute();     }
      public static class ExampleSourceFunction implements SourceFunction<Tuple4<String,Long,String,Integer>>{
          private volatile boolean isRunning = true;         private static int maxOutOfOrderness = 10 * 1000;
          private static final String[] userIDSample={"user_1","user_2","user_3"};         private static final String[] eventTypeSample={"click","browse"};         private static final int[] productIDSample={1,2,3,4,5};
          @Override         public void run(SourceContext<Tuple4<String,Long,String,Integer>> ctx) throws Exception {             while (isRunning){
                                   String userID=userIDSample[(new Random()).nextInt(userIDSample.length)];                 long eventTime = System.currentTimeMillis();                 String eventType=eventTypeSample[(new Random()).nextInt(eventTypeSample.length)];                 int productID=productIDSample[(new Random()).nextInt(productIDSample.length)];
                  Tuple4<String, Long, String, Integer> record = Tuple4.of(userID, eventTime, eventType, productID);
                                   ctx.collectWithTimestamp(record,eventTime);
                                   ctx.emitWatermark(new Watermark(eventTime - maxOutOfOrderness));
                  Thread.sleep(1000);             }         }
          @Override         public void cancel() {             isRunning = false;         }     } }
   |