Flink, like Spark, is a unified processing framework that supports both batch processing (DataSet
) and stream processing (DataStream
). This guide divides Flink operators into two main categories: DataSet Operators (for batch processing) and DataStream Operators (for stream processing).
fromCollection
Reads data from a local collection.
val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
List("1,张三", "2,李四", "3,王五", "4,赵六")
)
readTextFile
Reads data from a file.
val textDataSet: DataSet[String] = env.readTextFile("/data/a.txt")
Reads all files in a directory, including subdirectories.
val parameters = new Configuration
parameters.setBoolean("recursive.file.enumeration", true)
val file = env.readTextFile("/data").withParameters(parameters)
Flink can automatically recognize and decompress certain file types (e.g., .gz
, .bz2
).
val file = env.readTextFile("/data/file.gz")
val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
List("张三,1", "李四,2", "王五,3", "张三,4")
)
map
Transforms each element in the DataSet
.
case class User(name: String, id: String)
val userDataSet: DataSet[User] = textDataSet.map { text =>
val fieldArr = text.split(",")
User(fieldArr(0), fieldArr(1))
}
userDataSet.print()
flatMap
Transforms each element into 0...n elements.
val result = textDataSet.flatMap(line => line)
.groupBy(0) // Group by the first element
.sum(1) // Aggregate the second element
result.print()
mapPartition
Applies a transformation to all elements in a partition.
val result: DataSet[User] = textDataSet.mapPartition { lines =>
lines.map(index => User(index._1, index._2))
}
result.print()
filter
Filters elements that meet certain conditions.
val source: DataSet[String] = env.fromElements("java", "scala", "java")
val filter: DataSet[String] = source.filter(line => line.contains("java"))
filter.print()
reduce
Aggregates elements within a DataSet
or group.
val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
val reduceData = source.groupBy(0).reduce((x, y) => (x._1, x._2 + y._2))
reduceData.print()
reduceGroup
Optimized version of reduce
for grouped aggregation.
val result: DataSet[(String, Int)] = source.groupBy(0).reduceGroup {
(in: Iterator[(String, Int)], out: Collector[(String, Int)]) =>
val tuple = in.reduce((x, y) => (x._1, x._2 + y._2))
out.collect(tuple)
}
result.print()
minBy
and maxBy
Selects elements with the minimum or maximum value.
val result = textDataSet
.groupBy(0) // Group by name
.minBy(1) // Select the minimum value
Aggregate
Performs aggregation operations (e.g., MAX
, MIN
) on tuples.
val value = input.groupBy(1).aggregate(Aggregations.MAX, 2)
value.print()
distinct
Removes duplicate elements from a DataSet
.
val value: DataSet[(Int, String, Double)] = input.distinct(1)
value.print()
first
Selects the first N elements.
input.first(2)
join
Joins two DataSet
s based on specific conditions.
val joinData = s1.join(s2).where(0).equalTo(0) { (s1, s2) =>
(s1._1, s1._2, s2._2, s1._3)
}
leftOuterJoin
Performs a left outer join.
text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first, second) => {
if (second == null) (first._1, first._2, "null")
else (first._1, first._2, second._2)
}).print()
cross
Performs a Cartesian product.
val cross = input1.cross(input2) { (input1, input2) =>
(input1._1, input1._2, input1._3, input2._2)
}
cross.print()
union
Combines two or more DataSet
s without deduplication.
val unionData: DataSet[String] = elements1.union(elements2)
rebalance
Redistributes data evenly to avoid data skew.
val rebalance = filterData.rebalance()
partitionByHash
Partitions data using a hash key.
val unique = collection.partitionByHash(1).mapPartition { line =>
line.map(x => (x._1, x._2, x._3))
}
unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)
env.execute()
partitionByRange
Partitions data by range based on a key.
val unique = collection.partitionByRange(x => x._1).mapPartition { line =>
line.map(x => (x._1, x._2, x._3))
}
unique.writeAsText("rangePartition", WriteMode.OVERWRITE)
env.execute()
sortPartition
Sorts partitions by a specified field.
val result = ds.sortPartition(1, Order.DESCENDING).collect()
println(result)
collect
Outputs data to a local collection.
result.collect()
writeAsText
Outputs data to a file. Flink supports:
Example:
// Write data to a local file
result.writeAsText("/data/a", WriteMode.OVERWRITE)
// Write data to HDFS
result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)
Flink allows adding data sources to your program using StreamExecutionEnvironment.addSource(source)
. There are four main categories of sources:
TextInputFormat
and returns them as strings.SourceFunction
(non-parallel) or ParallelSourceFunction
(parallel) interfaces.Add the dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
Write Kafka data into Flink:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val source = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
val source = env.socketTextStream("IP", PORT)
map
Transforms each element in the stream.
dataStream.map { x => x * 2 }
flatMap
Generates zero, one, or more elements for each input element (e.g., splitting sentences into words).
dataStream.flatMap { str => str.split(" ") }
filter
Filters elements based on a condition.
dataStream.filter { _ != 0 }
keyBy
Partitions the stream logically into non-overlapping partitions based on keys.
dataStream.keyBy(0)
reduce
Applies a rolling reduce operation on keyed data streams.
keyedStream.reduce { _ + _ }
fold
Applies a rolling fold operation with an initial value.
val result: DataStream[String] = keyedStream.fold("start") { (str, i) =>
str + "-" + i
}
Performs rolling aggregations (min
, max
, minBy
, maxBy
).
keyedStream.sum(0)
keyedStream.min(0)
keyedStream.max(0)
Groups keyed streams based on time or count.
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
Groups all events across the stream into one window.
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Applies a user-defined function to each window.
windowedStream.apply { WindowFunction }
Applies a reduce function to a window.
windowedStream.reduce { _ + _ }
Combines multiple streams into one.
dataStream.union(otherStream1, otherStream2, ...)
Joins two streams over a shared window and keys.
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new JoinFunction() {...})
Joins streams based on time intervals.
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2))
.process(new IntervalJoinFunction() {...})
Connects two streams, maintaining their types.
val connectedStreams = someStream.connect(otherStream)
Splits a stream based on custom criteria.
val split = someDataStream.split { num =>
if (num % 2 == 0) List("even") else List("odd")
}
Selects one or more streams from a split stream.
val even = split.select("even")
val odd = split.select("odd")
val sinkTopic = "test"
case class Student(id: Int, name: String, addr: String, sex: String)
val studentStream: DataStream[String] = dataStream.map { student =>
toJsonString(student)
}
val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092")
val myProducer = new FlinkKafkaProducer011[String](
sinkTopic,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
prop
)
studentStream.addSink(myProducer)
studentStream.print()
env.execute("Flink add sink")