[Flink]测试用的fake温度传感器

时间:2019-11-12
本文章向大家介绍[Flink]测试用的fake温度传感器,主要包括[Flink]测试用的fake温度传感器使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Flink-测试用的fake温度传感器

Flink中,测试时,会用到自定义的source。

下为一例。。 该例使用温度传感器的格式演示fake日志数据源。

代码用Scala写的。

传感器...

  • 传感器 - 样例类

    SensorReads.scala

     
     
     
    x
     
     
     
     
    package sr
    /**
     * 
     */
    case class SensorReads(id:String,
                           timestap:Long,
                           tempture:Double)
     
  • 传感器 - 数据源模拟

    SnsorSrc_4096T.scala

     
     
     
    xxxxxxxxxx
     
     
     
     
    package sr
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import scala.util.Random
    /**
     * period, is 4096 millis.
     */
    case class SnsorSrc_4096T extends SourceFunction[SensorReads] {
        
        var isInRunning: Boolean = true
        
        ////
        override def run(sourceContext: SourceFunction.SourceContext[
                SensorReads]): Unit = {
            
            
            val rand: Random = new Random
            
            var tptNow4 =
                (1 to 4).map(
                    "snsor_" + _.toString -> (23 + 16 * rand.nextGaussian))
            
            
            while (isInRunning) {
                tptNow4 = tptNow4.map(
                    t => t._1 -> (t._2 + rand.nextGaussian))
                
                
                val timeStampNow: Long = System.currentTimeMillis
        
                tptNow4.foreach{
                    t =>
                        sourceContext.collect( // O.U.T
                            SensorReads(t._1, timeStampNow, t._2) )
                    Thread.sleep(512)  }
                //not set, is stm
        
                Thread.sleep(2048)  }
        
        }
        
        override def cancel(): Unit = isInRunning = false
        
    }
     

测试

SnsrSrcAappli.scala

 
 
 
xxxxxxxxxx
 
 
 
 
package applis
import org.apache.flink.streaming.api.scala._
import sr._
object SnsrSrcAappli extends App{
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    env.addSource(SnsorSrc_4096T() )
                    .print("aaa")
    
    env.execute()
}
 

数据源模拟用case-class,此处使用则可以不写new。

输出

IDEA控制台上run:

 
 
 
xxxxxxxxxx
 
 
 
 
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
aaa:3> SensorReads(snsor_1,1573556705508,30.383394411578916)
aaa:4> SensorReads(snsor_2,1573556705508,21.397405872448672)
aaa:5> SensorReads(snsor_3,1573556705508,20.598086139457727)
aaa:6> SensorReads(snsor_4,1573556705508,18.30066983735531)
aaa:7> SensorReads(snsor_1,1573556709627,30.120955223032546)
aaa:8> SensorReads(snsor_2,1573556709627,22.38746867201145)
aaa:1> SensorReads(snsor_3,1573556709627,20.45357507067989)
aaa:2> SensorReads(snsor_4,1573556709627,17.18467261133715)
aaa:3> SensorReads(snsor_1,1573556713729,31.686487593592904)
aaa:4> SensorReads(snsor_2,1573556713729,20.67106361911623)
aaa:5> SensorReads(snsor_3,1573556713729,21.27724215221553)
aaa:6> SensorReads(snsor_4,1573556713729,16.84273306583804)
Process finished with exit code -1
 

原文地址:https://www.cnblogs.com/senwren/p/fake-snsr-Rd-src.html