Comparison of Apache Stream Processing Frameworks: Part 2

时间:2022-05-07
本文章向大家介绍Comparison of Apache Stream Processing Frameworks: Part 2,主要内容包括Fault Tolerance、Managing State、Counting Words with State、Performance、Project Maturity、Summary、Framework Recommendations、Dataflow and Open Source、Conclusion、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

In the previous post we went through the necessary theory and also introduced popular streaming framework from Apache landscape - Storm, Trident, Spark Streaming, Samza and Flink. Today, we’re going to dig a little bit deeper and go through topics like fault tolerance, state management or performance. In addition, we’re going to discuss guidelines when building distributed streaming application and also I’ll give you recommendations for particular frameworks.

Fault Tolerance

Fault tolerance in streaming systems is inherently harder that in batch. When facing an error in batch processing system, we can just restart failed part of the computation and we’re good. But this is much harder in streaming scenarios, because data are still incoming and also a lot of jobs can run 24/7. Another challenge we have to face is state consistency, because in the end of the day we have start replaying events and of course not all state operations are idempotent. As you’ll see, fault tolerance can be pretty hard so let’s have a look at how our systems deal with that. Storm uses a mechanism of upstream backup and record acknowledgements to guarantee that messages are re-processed after a failure. Acknowledgements work as follows: an operator sends back to the previous operator an acknowledgement for every record that has been processed. The source of the topology keeps a backup of all the records it generates. Once received acknowledgements from all generated records until the sinks, the backup can be discarded safely. At failure, if not all acknowledgements have been received, then the records are replayed by the source. This guarantees no data loss, but does result in duplicate records passing through the system. That’s at-least once delivery (for more information about delivery guarantees you may check out the previous part). Storm implements this with a clever mechanism that only requires few bytes storage per source record to track the acknowledgements Pure record acknowledgement architectures, regardless of their performance, fail in offering exactly once guarantees, thus burdening the application developer with deduplication. Also Storm’s mechanism is low throughput and has problems with flow control, as the acknowledgment mechanism often falsely classifies failures under back-pressure.

Spark Streaming and its micro-batching semantics follows a different approach. The idea is terribly simple. Spark processes micro-batches on various worker nodes. Each micro-batch may either succeed or fail. At a failure, the micro-batch can be simply recomputed, as they are persistent and immutable. So exactly once delivery made easy.

Samza’s approach is completely different. It takes an advantage of durable, offset based messaging system. It’s usually Kafka of course. Samza monitors offsets of its tasks and moves it when message is processed. Offset can be check-pointed in a persistent storage and restored in case of failure. The problem is when it restores offset from the last checkpoint it doesn’t know which upcoming messages were processed and it might do it twice. That’s at least once delivery for us.

Flink approach is based on distributed snapshots which keeps the state of streaming job. Link sends checkpoint barriers, basically some kind of markers, through the stream and when barrier reaches the operator, operator checkpoints corresponding part of the stream. So if compare it to Storm, it’s far more efficient as it doesn’t have to acknowledge every record but does it in small batches. But don’t be confused, it’s still native streaming, conceptually it is very different from Spark. And also Flink provides exactly once delivery.

Managing State

Most of the non-trivial streaming applications have some kind of state. On the contrary of stateless operations where we have just an input, processing and an output, we have an input and a state, then processing and then an output with a modified state. We have to manage our state, persist it and in case of failure we expect our state to be recreated. The recreation of the state may be problem a little bit, as we do not have always exactly once guarantee, some of the record may be replayed multiple times. And that is not what we want usually. As we know Storm provides at-least once delivery guarantees. So how we can do the exactly once semantics provided by Trident ? Conceptually it’s quite simple, you just start committing the records, but obviously it’s not very efficient, so you start doing it in small batches, do some optimizations and here we are. Trident defines a couple of abstractions which determines when you can achieve exactly once guarantee, and as you can see in the picture below, there are some limitations and it needs some time to dig into it.

When thinking about stateful operations in stream processing, we usually have a long running operator with a state and a stream of records passing through it. As we know, Spark Streaming is micro-batching system, and it addresses it differently. Basically, Spark Streaming manages a state as another micro-batched stream. So during the processing of each micro-batch spark takes a current state and a function representing the operation and the result is a processed micro-batch and an updated state.

Samza’s solution for everything is just push it out to Kafka and problem solved, and it also works in the context of state management. Samza has real stateful operators so any task can hold a state and the state’s change log is pushed to Kafka. If needed state can be easily recreated from Kafka’s topic. To make it a little bit faster Samza allows us to plug-in key-value stores as a local storage so it doesn’t have to go to the Kafka all the time. The concept is illustrated on the picture below. Unfortunately, Samza provides at-least one semantics only, and it hurts a lot, but the implementation of exactly once delivery is planned.

Flink provides stageful operators conceptually similar to Samza. When working with Flink, we can use two different types of states. First one is local or task state, it is a current state of particular operator instance only and these guys don’t interact between each other. Then we have partitioned, or if you want key state, which maintains state of whole partitions. And of course, Flink provides exactly-once semantics. In the picture below you can see outline of Flink’s long running operator with 3 local states.

Counting Words with State

So let’s have a look how to count words, focusing on state management. For naive wordcount implementation just check out the previous post. Let’s start with Trident.

1 public static StormTopology buildTopology(LocalDRPC drpc) {
   FixedBatchSpout spout = ...

   TridentTopology topology = new TridentTopology();
  
   TridentState wordCounts = topology.newStream("spout1", spout)
     .each(new Fields("sentence"),new Split(), new Fields("word"))
     .groupBy(new Fields("word"))
     .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

 ...

 }

We can create a state by calling persistent aggregate at line 9. Important argument is the Count, which is built in component for storing numbers. If we would want to process the data from it, we would have to create a stream for that. As you can see int the snippet, it is not very convenient. Spark’s declarative approach is a little bit better.

1123456789
10
11
12
13
14
15
16
17// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)])

val lines = ...
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], 
  state: State[Int]) => {
    val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
    val output = (word, sum)
    state.update(sum)
    Some(output)
  }

val stateDstream = wordDstream.trackStateByKey(
  StateSpec.function(trackStateFunc).initialState(initialRDD))

Firstly we have to create RDD used as initial state (line 2), then we do some transformations (lines 5 and 6). Then, as you can see at lines 8 - 14, we have to define transition function, which takes a word, its count and a current state. The function does the computation, updates a state and returns result. And finally we can put all the bits together at lines 16 and 17 and get a state stream which contains word counts. Let’s have a look at Samza.

1123456789
10
11
12
13
14
15
16
17
18
19
20
21class WordCountTask extends StreamTask with InitableTask {

  private var store: CountStore = _

  def init(config: Config, context: TaskContext) {    this.store = context.getStore("wordcount-store")
      .asInstanceOf[KeyValueStore[String, Integer]]
  }

 override def process(envelope: IncomingMessageEnvelope,   collector: MessageCollector, coordinator: TaskCoordinator) {

   val words = envelope.getMessage.asInstanceOf[String].split(" ")

   words.foreach { key =>
     val count: Integer = Option(store.get(key)).getOrElse(0)
     store.put(key, count + 1)
     collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), 
       (key, count)))
   }
 }

Firstly we need to define our state at line 3, in this case its key-value store and also definite how it should be initialized (lines 5 - 8). And then, we can use it during the computation. As you can see above, it is pretty straightforward. And finally, let’s have a look at Flink with its neat API.

1123456789
10
11
12
13
val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements(...)
val words = text.flatMap ( _.split(" ") )

words.keyBy(x => x).mapWithState {  (word, count: Option[Int]) =>
    {
      val newCount = count.getOrElse(0) + 1
      val output = (word, newCount)
      (output, Some(newCount))
    }
}

We just call the function mapwithstate at line 6, which takes as an argument, function with two parameters. First one is a word to process and second is a state and function then returns processed output and new state.

Performance

Reasonable performance comparison is definitely a topic for whole article. So just an insight for now.

Various systems approach problems fairy differently and therefore it’s very hard to design not biased tests. When we talk about performance in streaming we talk about latency and throughput. It depends on many variables, but in general and for simple task. If you’re at 500k records per second per node it’s ok, if you can reach 1 million it’s nice, over a million it is great. Speaking about nodes, I mean pretty standard node like 24 cores and reasonable amount of memory, like 24 or 48 GBs. For latency, in case of micro-batch, we are usually thinking in seconds. In case of native streaming, we can expect lower hundreds of millis for most of the systems, but tuned storm can in operate in tens of millis easily. Also it is important to keep in mind the cost of delivery guarantees, fault tolerance and state management. For example turning on fault-tolerance may cost you like 10 to 15%. But in case storm it can be like 70% of throughput. As always, there is no free lunch. In this and previous post I’ve shown you stateful and stateless word count examples and of course, stateless would be faster. In case you’re wondering how much, so in the context of Apache Flink the difference was like 25% but in case of Spark it was around 50%. I’m pretty sure it could be tuned but it could give us an idea it’s something we should have in mind. Speaking about tuning, the systems have very rich tuning options which may lead to significant performance gains and you should always find some time to have a look at it. Also it’s important to have in mind, all operations are distributed and sending data through the network is pretty expensive. So try to take an advantage of data locality and also try to tune up your application’s serialization.

Project Maturity

When picking up the framework for your application, you should always consider its maturity. So Let’s have a quick look how does it look like in our cases. Storm was the first mainstream streaming system and became de-facto industrial standard for a long time and is used in many companies like Twitter, Yahoo, Spotify and many more. Spark is the most trending Scala repository these days and one of the engines behind Scala’s popularity. Spark’s adoption grows every day, it is used by companies like Netflix, Cisco, DataStax, Intel, IBM and so on. Samza is used by LinkedIn and is also by tens of other companies, as an example we can have Netflix or Uber. Flink is still an emerging project, but we can see its first production deployments and I’m sure more will follow very soon. You may also find interesting the number of project contributors. Storm & Trident have around 180 of them, whole Spark has more than 720. Samza has, according to github, more around 40 contributors and Flink has already more than 130.

Summary

Before we jump at framework recommendations, in may be helpful to check out the summary table below.

Framework Recommendations

The answer for the typical question, which one should I use, is as always, it depends. So in general, always try to evaluate requirements of your application carefully and be sure you fully understand the consequences of choosing the particular framework. I’d also recommend you to pick a framework with high level API, as it’s more elegant and more importantly much more productive. Also keep in mind, the most of the streaming applications are stateful so the state management of a particular framework should be up on your evaluation list. I’d also recommend to go for a framework with exactly once delivery semantics as it makes things easier, but of course, this really depends on requirements. There are definitely use cases when at least once or at most once delivery guarantees are all you need. But also keep in mind, system supporting exactly once does not have to implicitly support weaker guarantees. Lastly, make sure your system is able to recover quickly, you can use Chaos Monkey or similar tool for testing, because as we discussed, fast recovery is crucial in stream processing. Storm is still a great fit for small and fast tasks. If you care mainly about the latency, storm might be a good way to go. But also keep in mind the fault tolerance or trident’s state management hurts the performance a lot. Interesting option might be a potential update to Twitter’s Heron, which is designed as Storm’s replacement and should be better in every single task, but it also keeps the api. The problem is there is no guarantee Twitter is going to open-source it so who knows if it’s a good idea. For Spark Streaming, you should definitely at least try it if Spark is already part of your infrastructure, because, in this case, Streaming comes basically for free and you can also take an advantage of various Spark’s libraries. Also if you really want to use Lambda architecture, it is a pretty decent choice. But you should always keep in mind micro-batching limitations and be sure latency is not critical for you. When thinking about adopting Samza, Kafka should be a cornerstone of your architecture. I know it’s pluggable but nearly everyone is using Kafka so I would stick with that. Also as mentioned before, Samza is shipped with powerful local storage and it’s great for managing large states, it can handle states in tens of gigabytes easily, which is pretty nice. But keep in mind Samza’s at least once delivery limitation. Flink is conceptually great streaming system which fits very most streaming use cases And it often provides progressive functionality, like advanced windowing or time handling, which may not be implemented by its competitors. So you should always consider Flink when you need a functionality which might be hard to implement in Spark, or generally, in any micro-batching system. And apart of that, Flink also has an API for a common batch processing which may be pretty useful. But you need to have enough courage to adopt emerging project and also don’t forget to check out its roadmap.

Dataflow and Open Source

And the last thing I want to mention is Dataflow and its open source initiative. Dataflow is a part of Google Cloud platform and Cloud Platform has all sort of things in it as huge data storage, BigQuery, Cloud PubSub, some tools for data analysis, and so on and also aforementioned Cloud Dataflow.

Dataflow is Google’s managed service for batch and stream data processing with unified API. It is built upon well known Google technologies such as MapReduce for batch processing, FlumeJava for programming model definition and MillWheel for stream processing. And all of them are really good.

You may be asking why I’m wriming about that as I said we would be focused on about apache stream processing platforms and this is clearly Google’s proprietary solution, but Google decided to open source Dataflow SDK recently and guys behind both Spark and Flink have implemented its runners. So now we have an ability to run jobs defined by Dataflow API by Google Cloud Platform, by Flink or by Spark and it’s also pretty possible more engines will follow very soon.

Dataflow provides API in Java and in Python implemented by Google itself and also I’ve found two Scala DSLs implemented by community. Apart from that, Google and a number of partners submitted this as a new Apache proposal named Apache Beam. So it seems like a pretty interesting option, I would definitely at least think about it. But it is very important to emphasize, all of this is very recent and the implementation of particular features might be missing

Conclusion

In this short blog post series, we went through popular streaming frameworks from Apache landscape and discussed their similarities, differences, the trade-offs they have made and also their fitting use cases. I hope it was interesting for you and I believe it will be helpful when designing your own streaming solution. There’s definitely a couple of interesting frameworks which were not discussed here, but I plan to address them in separate posts. Also if you have any questions, don’t hesitate to contact me as I’m always happy to discus the topic.