Apache Flink:使用事件时间方式处理工业数据和延迟数据

1.声明

当前内容主要为测试和使用事件时间,使用自定义的时间作为水印,主要为模拟之用

  1. 工业的传感器中,默认获取的数据可能在多个工厂中的时间到来不一致,Flink处理的为工厂中的传递的时间
  2. 可能存在延迟的数据,延迟的数据需要处理(可能是网络原因或者其他原因)

主要内容为:

  1. 收集当前的延迟时间,显示延迟数据
  2. 使用事件窗口处理数据,处理当前数据

2.基本demo

pom依赖和ComputerTemperature实体类参考前面的博文

1.创建时间会变化的数据源Source(就是简单的变化)

 import java.io.Serializable; import java.util.Date; import java.util.Iterator; import org.apache.flink.streaming.api.functions.source.FromIteratorFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;  import com.hy.flink.test.pojo.AccountTransation; import com.hy.flink.test.pojo.ComputerTemperature;  /**  *   * @author hy  * @createTime 2021-05-16 09:52:36  * @description 生成一个随机数的资源  *  */ public class RandomComputerTemperatureLaterSource extends FromIteratorFunction<ComputerTemperature> {  	/** 	 *  	 */ 	private static final long serialVersionUID = 1L;  	public RandomComputerTemperatureLaterSource(long sleepTime, Integer randomNumCount) { 		super(new RandomComputerTemperatureIterator(sleepTime, randomNumCount)); 		// TODO Auto-generated constructor stub 	}  	private static class RandomComputerTemperatureIterator implements Iterator<ComputerTemperature>, Serializable {  		/** 		 *  		 */ 		private static final long serialVersionUID = 1L; 		private final long sleepTime; 		private String[] computerNames = { "电脑1", "电脑2", "电脑3" }; 		private Integer randomNumCount;  		private RandomComputerTemperatureIterator(long sleepTime, Integer randomNumCount) { 			this.sleepTime = sleepTime; 			this.randomNumCount = randomNumCount; 		}  		@Override 		public boolean hasNext() { 			randomNumCount--; 			return randomNumCount > 0; 		}  		@Override 		public ComputerTemperature next() { 			// 默认休眠时间为1秒钟 			try { 				Thread.sleep(sleepTime); 			} catch (InterruptedException e) { 				throw new RuntimeException(e); 			} 			Double temperature = Math.random() * 100 + 1; 			int index = (int) (Math.random() * computerNames.length); 			long timestamp = new Date().getTime(); 			// 控制时间的产生,让时间出现乱序的操作 			if (timestamp % 2 == 0) { 				timestamp -= temperature.intValue() * 3000; 			}  			ComputerTemperature computerTemperature = new ComputerTemperature(computerNames[index], temperature, 					timestamp); 			// StreamRecord<ComputerTemperature> streamRecord = new 			// StreamRecord<ComputerTemperature>(computerTemperature,timestamp); 			System.out.println("当前的数量为:" + randomNumCount); 			System.out.println("产生数据为====》"+computerTemperature); 			return computerTemperature; 		} 	}  }  

为了对比数据,所以将所有的数据全部打印出来

2.创建ComputerTemperature的Sink

import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.OutputTag; import org.apache.flink.walkthrough.common.sink.AlertSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  /**  *   * @author hy  * @createTime 2021-05-16 10:27:25  * @description 主要为打印当前的账户的信息  *  */ public class ComputerTemperatureSink implements SinkFunction<ComputerTemperature> {  	/** 	 *  	 */ 	private static final long serialVersionUID = 1L; 	private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);  	@Override 	public void invoke(ComputerTemperature value, Context context) { 		LOG.info("LATE===>"+value.toString()); 	} }  

这里就是简单的log打印,这里可以采用其他的东西取代

3.编写具体操作类

package com.hy.flink.test.window;  import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collection; import java.util.Date;  import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.flink.walkthrough.common.sink.AlertSink;  import com.hy.flink.test.pojo.ComputerTemperature; import com.hy.flink.test.pojo.ComputerTemperatureSink; import com.hy.flink.test.source.RandomComputerTemperatureLaterSource; import com.hy.flink.test.source.RandomComputerTemperatureSource; import com.hy.flink.test.window.OfficeWindowsTest.MyMaxTemperatureHandler;  /**  *   * @author hy  * @createTime 2021-06-06 09:37:41  * @description 当前内容主要为测试和使用当前的迟到的事件 1. 产生迟到的事件(应该以随机的时间作为目标,而不是有序的) 2.  *              需要特定的水印才能处理,创建自己的时间水印  */ public class LaterWindowEventTest { 	static OutputTag<ComputerTemperature> lateTag = new OutputTag<ComputerTemperature>("late") { 	}; 	public static void main(String[] args) { 		// 设置存储延迟到的数据 		 		// 设置当前的环境为本地环境 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();  		// 设置为事件时间 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  		// 设置数据来源为随机数方式的数据 		DataStream<ComputerTemperature> dataStream = env.addSource(new RandomComputerTemperatureLaterSource(560, 30)); 		 		  WatermarkStrategy<ComputerTemperature> strategy = WatermarkStrategy 		  .<ComputerTemperature>forBoundedOutOfOrderness(Duration.ofSeconds(5)) 		  .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); 		   		  dataStream = dataStream.assignTimestampsAndWatermarks(strategy); 		  		// 将数据按照当前的名称进行分组操作,然后每5秒统计一次,使用MyHandler进行统计操作 		SingleOutputStreamOperator<Tuple3<String, Long, Double>> process = dataStream 				/* .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator()) */  				/* 				 * .assignTimestampsAndWatermarks( new 				 * BoundedOutOfOrdernessTimestampExtractor<ComputerTemperature>(Time.seconds(10) 				 * ) { 				 *  				 * @Override public long extractTimestamp(ComputerTemperature element) { return 				 * System.currentTimeMillis(); } }) 				 */  				/* 				 * .assignTimestampsAndWatermarks(new 				 * AscendingTimestampExtractor<ComputerTemperature>() { 				 *  				 * @Override public long extractAscendingTimestamp(ComputerTemperature element) 				 * { return element.getTimestamp(); } }) 				 */ 				.keyBy(x -> x.getName()) 				// 使用timeWindow才是时间窗口,使用.window(TumblingProcessingTimeWindows.of(Time.seconds(5))的延迟数据无效,无法正常显示和处理延迟数据 				.timeWindow(Time.milliseconds(5000)) 				 				// 将延迟到的数据放在延迟数据集合中 				.sideOutputLateData(lateTag) 				// 允许最晚到的时间为10秒的数据,也可以处理 				.allowedLateness(Time.seconds(10)) 				.process(new MyMaxTemperatureHandler()); 		// print the results with a single thread, rather than in parallel 		// 打印结果并使用单个线程的方式,采用并行计算,不管当前的是否有数据,都开始统计 		process.addSink(new SinkFunction<Tuple3<String,Long,Double>>(){  			@Override 			public void invoke(Tuple3<String, Long, Double> value, Context context) throws Exception { 				System.out.println("收集到的数据===>"+value); 			} 			 		}).setParallelism(1);  		// 开始处理延迟的数据,问题1这里为什么没有数据显示????,是没有丢弃数据吗?通过查看发现有数据是被丢弃的 		DataStream<ComputerTemperature> lateStream = process.getSideOutput(lateTag); 		//SingleOutputStreamOperator<ComputerTemperature> process2 = lateStream.process(new MyLateTemperatureHandler()); 		//process2.print().setParallelism(1);// 延迟的数据的显示,发现丢弃的数据是无法打印的,是否能sink 		lateStream.addSink(new ComputerTemperatureSink());  		try { 			env.execute("开始执行统计每个电脑的5次温度中的温度最大值"); 		} catch (Exception e) { 			// TODO Auto-generated catch block 			e.printStackTrace(); 		} 	}  	private static class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<ComputerTemperature> {  		private final long maxTimeLag = 5000; // 5 seconds  		@Override 		public long extractTimestamp(ComputerTemperature element, long previousElementTimestamp) { 			return element.getTimestamp(); 		}  		@Override 		public Watermark getCurrentWatermark() { 			// return the watermark as current time minus the maximum time lag 			return new Watermark(System.currentTimeMillis() - maxTimeLag); 		} 	}  	// 专门处理延期到达的数据 	static class MyLateTemperatureHandler extends ProcessFunction<ComputerTemperature, ComputerTemperature> {  		@Override 		public void processElement(ComputerTemperature bean, 				ProcessFunction<ComputerTemperature, ComputerTemperature>.Context context, 				Collector<ComputerTemperature> out) throws Exception { 			System.out.println("=======================>"); 			System.out.println(bean); 			System.out.println("<========================"); 			// 不收集过时的bean 			/* out.collect(bean); */ 			/* context.output(lateTag,bean); */ 		}  	}  	// 主要获取当前的temperature的最大温度 	static class MyMaxTemperatureHandler 			extends ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow> {  		@Override 		public void process(String key, 				ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow>.Context context, 				Iterable<ComputerTemperature> events, Collector<Tuple3<String, Long, Double>> out) throws Exception { 			// TODO Auto-generated method stub 			Double max = 0.0; 			long time = 0L;  			System.out.println("开始处理数据........."); 			events.forEach(x -> System.out.println(x));  			for (ComputerTemperature event : events) { 				Double temperature = event.getTemperature(); 				if (temperature > max) { 					max = temperature; 					time = event.getTimestamp(); 				} 			} 			// 主要收集最大值的数据new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new 			// Date(time)) 			out.collect(Tuple3.of(key, time, max));  		}  	}  }  

这里使用OutputTag收集延迟的数据,自动收集,可以设置延迟时间

3.测试

Apache Flink:使用事件时间方式处理工业数据和延迟数据
由于数据量比较多乱,所以整理后:
Apache Flink:使用事件时间方式处理工业数据和延迟数据

Apache Flink:使用事件时间方式处理工业数据和延迟数据
Apache Flink:使用事件时间方式处理工业数据和延迟数据
测试成功!

4.其中的坑

1. 本人感觉有坑,在执行的过程中一直没有出现延迟数据的打印,但是实际处理的时候是少了数据,个人感觉必须使用:timeWindow(Time.milliseconds(5000))而不是使用window

2.由于官方没有实际的例子,所以不知道是env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);还是其他的东西生效了,导致可以正常使用

版权声明:玥玥 发表于 2021-06-09 1:24:19。
转载请注明:Apache Flink:使用事件时间方式处理工业数据和延迟数据 | 女黑客导航