1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
   | 
 
 
 
 
  kafkaStream          .process(new KafkaProcessFunction())          .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))          .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)          .timeWindow(Time.seconds(windowLengthSeconds))          .reduce(             new ReduceFunction<UserActionLog>() {                 @Override                 public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {                     return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;                 }             },             new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {                 @Override                 public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {                                          UserActionLog max = elements.iterator().next();                          String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");                     String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");                          String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的最大价值对应的那条记录: "+max;                     out.collect(record);                      }             }     )     .print();     
  Key: user_2 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_2', eventTime='2019-11-09 13:54:10', eventType='browse', productID='product_3', productPrice=30} Key: user_4 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_4', eventTime='2019-11-09 13:54:15', eventType='browse', productID='product_3', productPrice=30} Key: user_3 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_3', eventTime='2019-11-09 13:54:12', eventType='browse', productID='product_2', productPrice=20} Key: user_5 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_5', eventTime='2019-11-09 13:54:17', eventType='browse', productID='product_2', productPrice=20}
 
  |