1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package com.bigdata.flink.dataStreamExtractTimestampAndGenerateWatermark;
import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.Random;
public class ExtractTimestampAndGenerateWatermark { public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Tuple4<String, Long, String, Integer>> source = env.addSource(new ExampleSourceFunction());
env.execute(); }
public static class ExampleSourceFunction implements SourceFunction<Tuple4<String,Long,String,Integer>>{
private volatile boolean isRunning = true; private static int maxOutOfOrderness = 10 * 1000;
private static final String[] userIDSample={"user_1","user_2","user_3"}; private static final String[] eventTypeSample={"click","browse"}; private static final int[] productIDSample={1,2,3,4,5};
@Override public void run(SourceContext<Tuple4<String,Long,String,Integer>> ctx) throws Exception { while (isRunning){
String userID=userIDSample[(new Random()).nextInt(userIDSample.length)]; long eventTime = System.currentTimeMillis(); String eventType=eventTypeSample[(new Random()).nextInt(eventTypeSample.length)]; int productID=productIDSample[(new Random()).nextInt(productIDSample.length)];
Tuple4<String, Long, String, Integer> record = Tuple4.of(userID, eventTime, eventType, productID);
ctx.collectWithTimestamp(record,eventTime);
ctx.emitWatermark(new Watermark(eventTime - maxOutOfOrderness));
Thread.sleep(1000); } }
@Override public void cancel() { isRunning = false; } } }
|