3. Flink Operators

Flink, like Spark, is a unified processing framework that supports both batch processing (DataSet) and stream processing (DataStream). This guide divides Flink operators into two main categories: DataSet Operators (for batch processing) and DataStream Operators (for stream processing).

DataSet Operators (Batch Processing)

1. Source Operators

1.1 fromCollection

Reads data from a local collection.

val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
  List("1,张三", "2,李四", "3,王五", "4,赵六")
)
readTextFile

Reads data from a file.

val textDataSet: DataSet[String] = env.readTextFile("/data/a.txt")
Recursive File Enumeration

Reads all files in a directory, including subdirectories.

val parameters = new Configuration
parameters.setBoolean("recursive.file.enumeration", true)
val file = env.readTextFile("/data").withParameters(parameters)
Reading Compressed Files

Flink can automatically recognize and decompress certain file types (e.g., .gz, .bz2).

val file = env.readTextFile("/data/file.gz")

Transformation Operators

Setup Example
val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
  List("张三,1", "李四,2", "王五,3", "张三,4")
)
map

Transforms each element in the DataSet.

case class User(name: String, id: String)
val userDataSet: DataSet[User] = textDataSet.map { text =>
  val fieldArr = text.split(",")
  User(fieldArr(0), fieldArr(1))
}
userDataSet.print()
flatMap

Transforms each element into 0...n elements.

val result = textDataSet.flatMap(line => line)
  .groupBy(0) // Group by the first element
  .sum(1)     // Aggregate the second element
result.print()
mapPartition

Applies a transformation to all elements in a partition.

val result: DataSet[User] = textDataSet.mapPartition { lines =>
  lines.map(index => User(index._1, index._2))
}
result.print()
filter

Filters elements that meet certain conditions.

val source: DataSet[String] = env.fromElements("java", "scala", "java")
val filter: DataSet[String] = source.filter(line => line.contains("java"))
filter.print()
reduce

Aggregates elements within a DataSet or group.

val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
val reduceData = source.groupBy(0).reduce((x, y) => (x._1, x._2 + y._2))
reduceData.print()
reduceGroup

Optimized version of reduce for grouped aggregation.

val result: DataSet[(String, Int)] = source.groupBy(0).reduceGroup {
  (in: Iterator[(String, Int)], out: Collector[(String, Int)]) =>
    val tuple = in.reduce((x, y) => (x._1, x._2 + y._2))
    out.collect(tuple)
}
result.print()
minBy and maxBy

Selects elements with the minimum or maximum value.

val result = textDataSet
  .groupBy(0) // Group by name
  .minBy(1)   // Select the minimum value
Aggregate

Performs aggregation operations (e.g., MAX, MIN) on tuples.

val value = input.groupBy(1).aggregate(Aggregations.MAX, 2)
value.print()
distinct

Removes duplicate elements from a DataSet.

val value: DataSet[(Int, String, Double)] = input.distinct(1)
value.print()
first

Selects the first N elements.

input.first(2)
join

Joins two DataSets based on specific conditions.

val joinData = s1.join(s2).where(0).equalTo(0) { (s1, s2) =>
  (s1._1, s1._2, s2._2, s1._3)
}
leftOuterJoin

Performs a left outer join.

text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first, second) => {
  if (second == null) (first._1, first._2, "null")
  else (first._1, first._2, second._2)
}).print()
cross

Performs a Cartesian product.

val cross = input1.cross(input2) { (input1, input2) =>
  (input1._1, input1._2, input1._3, input2._2)
}
cross.print()
union

Combines two or more DataSets without deduplication.

val unionData: DataSet[String] = elements1.union(elements2)
rebalance

Redistributes data evenly to avoid data skew.

val rebalance = filterData.rebalance()
partitionByHash

Partitions data using a hash key.

val unique = collection.partitionByHash(1).mapPartition { line =>
  line.map(x => (x._1, x._2, x._3))
}
unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)
env.execute()
partitionByRange

Partitions data by range based on a key.

val unique = collection.partitionByRange(x => x._1).mapPartition { line =>
  line.map(x => (x._1, x._2, x._3))
}
unique.writeAsText("rangePartition", WriteMode.OVERWRITE)
env.execute()
sortPartition

Sorts partitions by a specified field.

val result = ds.sortPartition(1, Order.DESCENDING).collect()
println(result)

Sink Operators

1. collect

Outputs data to a local collection.

result.collect()
2. writeAsText

Outputs data to a file. Flink supports:

  • Various storage devices: local files, HDFS, etc.
  • Different file formats: text files, CSV files, etc.

Example:

// Write data to a local file
result.writeAsText("/data/a", WriteMode.OVERWRITE)

// Write data to HDFS
result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)

DataStream Operators

1. Source Operators

Flink allows adding data sources to your program using StreamExecutionEnvironment.addSource(source). There are four main categories of sources:

  1. Collection-based source: Reads data from local collections.
  2. File-based source: Reads text files that follow TextInputFormat and returns them as strings.
  3. Socket-based source: Reads from sockets; elements can be split using delimiters.
  4. Custom source: Create a custom source by implementing SourceFunction (non-parallel) or ParallelSourceFunction (parallel) interfaces.
Example: Adding Kafka as a Source

Add the dependency:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

Write Kafka data into Flink:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val source = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
Example: Socket-based Source
val source = env.socketTextStream("IP", PORT)

2. Transformation Operators

1. map

Transforms each element in the stream.

dataStream.map { x => x * 2 }
2. flatMap

Generates zero, one, or more elements for each input element (e.g., splitting sentences into words).

dataStream.flatMap { str => str.split(" ") }
3. filter

Filters elements based on a condition.

dataStream.filter { _ != 0 }
4. keyBy

Partitions the stream logically into non-overlapping partitions based on keys.

dataStream.keyBy(0)
5. reduce

Applies a rolling reduce operation on keyed data streams.

keyedStream.reduce { _ + _ }
6. fold

Applies a rolling fold operation with an initial value.

val result: DataStream[String] = keyedStream.fold("start") { (str, i) =>
  str + "-" + i
}
7. Aggregations

Performs rolling aggregations (min, max, minBy, maxBy).

keyedStream.sum(0)
keyedStream.min(0)
keyedStream.max(0)
8. Window

Groups keyed streams based on time or count.

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
9. WindowAll

Groups all events across the stream into one window.

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
10. Window Apply

Applies a user-defined function to each window.

windowedStream.apply { WindowFunction }
11. Window Reduce

Applies a reduce function to a window.

windowedStream.reduce { _ + _ }
12. Union

Combines multiple streams into one.

dataStream.union(otherStream1, otherStream2, ...)
13. Window Join

Joins two streams over a shared window and keys.

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply(new JoinFunction() {...})
14. Interval Join

Joins streams based on time intervals.

keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2))
    .process(new IntervalJoinFunction() {...})
15. Connect

Connects two streams, maintaining their types.

val connectedStreams = someStream.connect(otherStream)
16. Split

Splits a stream based on custom criteria.

val split = someDataStream.split { num =>
  if (num % 2 == 0) List("even") else List("odd")
}
17. Select

Selects one or more streams from a split stream.

val even = split.select("even")
val odd = split.select("odd")

Kafka Sink Example

val sinkTopic = "test"
case class Student(id: Int, name: String, addr: String, sex: String)

val studentStream: DataStream[String] = dataStream.map { student =>
  toJsonString(student)
}

val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092")

val myProducer = new FlinkKafkaProducer011[String](
  sinkTopic,
  new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
  prop
)
studentStream.addSink(myProducer)
studentStream.print()
env.execute("Flink add sink")