@InterfaceStability.Unstable public final class Windows extends java.lang.Object
Window
s.
Groups incoming messages in a MessageStream
into finite windows for processing.
Each window is uniquely identified by its WindowKey
. A window can have one or more associated
Trigger
s that determine when results from the Window
are emitted. Each emitted result contains one
or more messages in the window and is called a WindowPane
.
A window can have early triggers that allow emitting WindowPane
s speculatively before all data
for the window has arrived, or late triggers that allow handling late arrivals of data.
window wk1 +--------------------------------+ ------------+--------+-----------+ | | | | | pane 1 |pane2 | pane3 | +-----------+--------+-----------+ ----------------------------------- incoming message stream ------+ ----------------------------------- window wk2 +---------------------+---------+ | pane 1| pane 2 | pane 3 | | | | | +---------+-----------+---------+ window wk3 +----------+-----------+---------+ | | | | | pane 1 | pane 2 | pane 3| | | | | +----------+-----------+---------+
A Window
can be one of the following types:
MessageStream
into sessions.
A session captures some period of activity over a MessageStream
.
The boundary for a session is defined by a sessionGap
. All messages that that arrive within
the gap are grouped into the same session.
A Window
is said to be "keyed" when the incoming messages are first grouped based on their key
and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants
of the window types above.
The value for the window can be updated incrementally by providing an initialValue
Supplier
and an aggregating FoldLeftFunction
. The initial value supplier is invoked every time a new window is
created. The aggregating function is invoked for each incoming message for the window. If these are not provided,
the emitted WindowPane
will contain a collection of messages in the window.
Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of finer granularity are not supported.
Modifier and Type | Method and Description |
---|---|
static <M,K> Window<M,K,java.util.Collection<M>> |
keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn,
java.time.Duration sessionGap,
Serde<K> keySerde,
Serde<M> msgSerde)
Creates a
Window that groups incoming messages into sessions per-key based on the provided
sessionGap . |
static <M,K,WV> Window<M,K,WV> |
keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn,
java.time.Duration sessionGap,
java.util.function.Supplier<? extends WV> initialValue,
FoldLeftFunction<? super M,WV> aggregator,
Serde<K> keySerde,
Serde<WV> windowValueSerde)
Creates a
Window that groups incoming messages into sessions per-key based on the provided
sessionGap and applies the provided fold function to them. |
static <M,K,WV> Window<M,K,WV> |
keyedTumblingWindow(java.util.function.Function<? super M,? extends K> keyFn,
java.time.Duration interval,
java.util.function.Supplier<? extends WV> initialValue,
FoldLeftFunction<? super M,WV> aggregator,
Serde<K> keySerde,
Serde<WV> windowValueSerde)
Creates a
Window that groups incoming messages into fixed-size, non-overlapping processing
time based windows based on the provided keyFn and applies the provided fold function to them. |
static <M,K> Window<M,K,java.util.Collection<M>> |
keyedTumblingWindow(java.util.function.Function<M,K> keyFn,
java.time.Duration interval,
Serde<K> keySerde,
Serde<M> msgSerde)
Creates a
Window that groups incoming messages into fixed-size, non-overlapping
processing time based windows using the provided keyFn. |
static <M> Window<M,java.lang.Void,java.util.Collection<M>> |
tumblingWindow(java.time.Duration duration,
Serde<M> msgSerde)
Creates a
Window that groups incoming messages into fixed-size, non-overlapping
processing time based windows. |
static <M,WV> Window<M,java.lang.Void,WV> |
tumblingWindow(java.time.Duration interval,
java.util.function.Supplier<? extends WV> initialValue,
FoldLeftFunction<? super M,WV> aggregator,
Serde<WV> windowValueSerde)
Creates a
Window that windows values into fixed-size processing time based windows and aggregates
them applying the provided function. |
public static <M,K,WV> Window<M,K,WV> keyedTumblingWindow(java.util.function.Function<? super M,? extends K> keyFn, java.time.Duration interval, java.util.function.Supplier<? extends WV> initialValue, FoldLeftFunction<? super M,WV> aggregator, Serde<K> keySerde, Serde<WV> windowValueSerde)
Window
that groups incoming messages into fixed-size, non-overlapping processing
time based windows based on the provided keyFn and applies the provided fold function to them.
The below example computes the maximum value per-key over fixed size 10 second windows.
MessageStream<UserClick> stream = ...;
Function<UserClick, String> keyFn = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
M
- the type of the input messageWV
- the type of the WindowPane
output valueK
- the type of the key in the Window
keyFn
- the function to extract the window key from a messageinterval
- the duration in processing timeinitialValue
- the initial value supplier for the aggregator. Invoked when a new window is created.aggregator
- the function to incrementally update the window value. Invoked when a new message
arrives for the window.keySerde
- the serde for the window keywindowValueSerde
- the serde for the window valueWindow
function.public static <M,K> Window<M,K,java.util.Collection<M>> keyedTumblingWindow(java.util.function.Function<M,K> keyFn, java.time.Duration interval, Serde<K> keySerde, Serde<M> msgSerde)
Window
that groups incoming messages into fixed-size, non-overlapping
processing time based windows using the provided keyFn.
The below example groups the stream into fixed-size 10 second windows for each key.
MessageStream<UserClick> stream = ...;
Function<UserClick, String> keyFn = ...;
MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window(
Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
M
- the type of the input messageK
- the type of the key in the Window
keyFn
- function to extract key from the messageinterval
- the duration in processing timekeySerde
- the serde for the window keymsgSerde
- the serde for the input messageWindow
functionpublic static <M,WV> Window<M,java.lang.Void,WV> tumblingWindow(java.time.Duration interval, java.util.function.Supplier<? extends WV> initialValue, FoldLeftFunction<? super M,WV> aggregator, Serde<WV> windowValueSerde)
Window
that windows values into fixed-size processing time based windows and aggregates
them applying the provided function.
The below example computes the maximum value per-key over fixed size 10 second windows.
MessageStream<String> stream = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
M
- the type of the input messageWV
- the type of the WindowPane
output valueinterval
- the duration in processing timeinitialValue
- the initial value supplier for the aggregator. Invoked when a new window is created.aggregator
- the function to incrementally update the window value. Invoked when a new message
arrives for the window.windowValueSerde
- the serde for the window valueWindow
functionpublic static <M> Window<M,java.lang.Void,java.util.Collection<M>> tumblingWindow(java.time.Duration duration, Serde<M> msgSerde)
Window
that groups incoming messages into fixed-size, non-overlapping
processing time based windows.
The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
MessageStream<Long> stream = ...;
Function<Collection<Long>, Long> percentile99 = ..
MessageStream<WindowPane<Void, Collection<Long>>> windowedStream =
integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
MessageStream<Long> windowedPercentiles =
windowedStream.map(windowPane -> percentile99(windowPane.getMessage());
M
- the type of the input messageduration
- the duration in processing timemsgSerde
- the serde for the input messageWindow
functionpublic static <M,K,WV> Window<M,K,WV> keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn, java.time.Duration sessionGap, java.util.function.Supplier<? extends WV> initialValue, FoldLeftFunction<? super M,WV> aggregator, Serde<K> keySerde, Serde<WV> windowValueSerde)
Window
that groups incoming messages into sessions per-key based on the provided
sessionGap
and applies the provided fold function to them.
A session captures some period of activity over a MessageStream
.
A session is considered complete when no new messages arrive within the sessionGap
. All messages
that arrive within the gap are grouped into the same session.
The below example computes the maximum value per-key over a session window of gap 10 seconds.
MessageStream<UserClick> stream = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
M
- the type of the input messageK
- the type of the key in the Window
WV
- the type of the output value in the WindowPane
keyFn
- the function to extract the window key from a messagesessionGap
- the timeout gap for defining the sessioninitialValue
- the initial value supplier for the aggregator. Invoked when a new window is created.aggregator
- the function to incrementally update the window value. Invoked when a new message
arrives for the window.keySerde
- the serde for the window keywindowValueSerde
- the serde for the window valueWindow
functionpublic static <M,K> Window<M,K,java.util.Collection<M>> keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn, java.time.Duration sessionGap, Serde<K> keySerde, Serde<M> msgSerde)
Window
that groups incoming messages into sessions per-key based on the provided
sessionGap
.
A session captures some period of activity over a MessageStream
. The
boundary for the session is defined by a sessionGap
. All messages that that arrive within
the gap are grouped into the same session.
The below example groups the stream into per-key session windows of gap 10 seconds.
MessageStream<UserClick> stream = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
M
- the type of the input messageK
- the type of the key in the Window
keyFn
- the function to extract the window key from a message}sessionGap
- the timeout gap for defining the sessionkeySerde
- the serde for the window keymsgSerde
- the serde for the input messageWindow
function