1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
kafkaStream .process(new KafkaProcessFunction()) .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds))) .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID) .timeWindow(Time.seconds(windowLengthSeconds)) .reduce( new ReduceFunction<UserActionLog>() { @Override public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception { return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2; } }, new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception { UserActionLog max = elements.iterator().next(); String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的最大价值对应的那条记录: "+max; out.collect(record); } } ) .print();
Key: user_2 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_2', eventTime='2019-11-09 13:54:10', eventType='browse', productID='product_3', productPrice=30} Key: user_4 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_4', eventTime='2019-11-09 13:54:15', eventType='browse', productID='product_3', productPrice=30} Key: user_3 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_3', eventTime='2019-11-09 13:54:12', eventType='browse', productID='product_2', productPrice=20} Key: user_5 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_5', eventTime='2019-11-09 13:54:17', eventType='browse', productID='product_2', productPrice=20}
|