@InterfaceStability.Unstable public final class Windows extends java.lang.Object
Window
s.
Groups the incoming MessageEnvelope
s in the MessageStream
into finite windows for processing.
Each window is uniquely identified by its WindowKey
. A window can have one or more associated Trigger
s
that determine when results from the Window
are emitted. Each emitted result contains one or more
MessageEnvelope
s in the window and is called a WindowPane
.
A window can have early triggers that allow emitting WindowPane
s speculatively before all data for the window
has arrived or late triggers that allow handling of late data arrivals.
window wk1
+--------------------------------+
------------+--------+-----------+
| | | |
| pane 1 |pane2 | pane3 |
+-----------+--------+-----------+
-----------------------------------
incoming message stream ------+
-----------------------------------
window wk2
+---------------------+---------+
| pane 1| pane 2 | pane 3 |
| | | |
+---------+-----------+---------+
window wk3
+----------+-----------+---------+
| | | |
| pane 1 | pane 2 | pane 3|
| | | |
+----------+-----------+---------+
A Window
can be one of the following types:
MessageStream
into sessions.
A session captures some period of activity over a MessageStream
.
The boundary for a session is defined by a sessionGap
. All MessageEnvelope
s that that arrive within
the gap are grouped into the same session.
MessageStream
.
An early trigger must be specified when defining a global window.
A Window
is defined as "keyed" when the incoming MessageEnvelope
s are first grouped based on their key
and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
types.
Modifier and Type | Method and Description |
---|---|
static <M extends MessageEnvelope> |
globalWindow()
Creates a
Window that groups incoming MessageEnvelope s into a single global window. |
static <M extends MessageEnvelope,WV> |
globalWindow(java.util.function.BiFunction<M,WV,WV> foldFn)
Creates a
Window that groups incoming MessageEnvelope s into a single global window. |
static <M extends MessageEnvelope,K> |
keyedGlobalWindow(java.util.function.Function<M,K> keyFn)
Returns a global
Window that groups incoming MessageEnvelope s using the provided keyFn. |
static <M extends MessageEnvelope,K,WV> |
keyedGlobalWindow(java.util.function.Function<M,K> keyFn,
java.util.function.BiFunction<M,WV,WV> foldFn)
Returns a global
Window that groups incoming MessageEnvelope s using the provided keyFn. |
static <M extends MessageEnvelope,K> |
keyedSessionWindow(java.util.function.Function<M,K> keyFn,
java.time.Duration sessionGap)
Creates a
Window that groups incoming MessageEnvelope s into sessions per-key based on the provided sessionGap . |
static <M extends MessageEnvelope,K,WV> |
keyedSessionWindow(java.util.function.Function<M,K> keyFn,
java.time.Duration sessionGap,
java.util.function.BiFunction<M,WV,WV> foldFn)
Creates a
Window that groups incoming MessageEnvelope s into sessions per-key based on the provided sessionGap
and applies the provided fold function to them. |
static <M extends MessageEnvelope,K> |
keyedTumblingWindow(java.util.function.Function<M,K> keyFn,
java.time.Duration interval)
Creates a
Window that groups incoming MessageEnvelope s into fixed-size, non-overlapping
processing time based windows using the provided keyFn. |
static <M extends MessageEnvelope,K,WV> |
keyedTumblingWindow(java.util.function.Function<M,K> keyFn,
java.time.Duration interval,
java.util.function.BiFunction<M,WV,WV> foldFn)
Creates a
Window that groups incoming MessageEnvelope s into fixed-size, non-overlapping processing
time based windows based on the provided keyFn and applies the provided fold function to them. |
static <M extends MessageEnvelope> |
tumblingWindow(java.time.Duration duration)
Creates a
Window that groups incoming MessageEnvelope s into fixed-size, non-overlapping
processing time based windows. |
static <M extends MessageEnvelope,WV> |
tumblingWindow(java.time.Duration duration,
java.util.function.BiFunction<M,WV,WV> foldFn)
Creates a
Window that windows values into fixed-size processing time based windows and aggregates
them applying the provided function. |
public static <M extends MessageEnvelope,K,WV> Window<M,K,WV,WindowPane<K,WV>> keyedTumblingWindow(java.util.function.Function<M,K> keyFn, java.time.Duration interval, java.util.function.BiFunction<M,WV,WV> foldFn)
Window
that groups incoming MessageEnvelope
s into fixed-size, non-overlapping processing
time based windows based on the provided keyFn and applies the provided fold function to them.
The below example computes the maximum value per-key over fixed size 10 second windows.
MessageStream<UserClick> stream = ...;
Function<UserClick, String> keyFn = ...;
BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window(
Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
M
- the type of the input MessageEnvelope
WV
- the type of the WindowPane
output valueK
- the type of the key in the Window
keyFn
- the function to extract the window key from a MessageEnvelope
interval
- the duration in processing timefoldFn
- the function to aggregate MessageEnvelope
s in the WindowPane
Window
function.public static <M extends MessageEnvelope,K> Window<M,K,java.util.Collection<M>,WindowPane<K,java.util.Collection<M>>> keyedTumblingWindow(java.util.function.Function<M,K> keyFn, java.time.Duration interval)
Window
that groups incoming MessageEnvelope
s into fixed-size, non-overlapping
processing time based windows using the provided keyFn.
The below example groups the stream into fixed-size 10 second windows for each key.
MessageStream<UserClick> stream = ...;
Function<UserClick, String> keyFn = ...;
MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(
Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
M
- the type of the input MessageEnvelope
K
- the type of the key in the Window
keyFn
- function to extract key from the MessageEnvelope
interval
- the duration in processing timeWindow
functionpublic static <M extends MessageEnvelope,WV> Window<M,java.lang.Void,WV,WindowPane<java.lang.Void,WV>> tumblingWindow(java.time.Duration duration, java.util.function.BiFunction<M,WV,WV> foldFn)
Window
that windows values into fixed-size processing time based windows and aggregates
them applying the provided function.
The below example computes the maximum value per-key over fixed size 10 second windows.
MessageStream<String> stream = ...;
BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
MessageStream<WindowOutput<WindowKey, Integer>> windowedStream = stream.window(
Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
M
- the type of the input MessageEnvelope
WV
- the type of the WindowPane
output valueduration
- the duration in processing timefoldFn
- to aggregate MessageEnvelope
s in the WindowPane
Window
functionpublic static <M extends MessageEnvelope> Window<M,java.lang.Void,java.util.Collection<M>,WindowPane<java.lang.Void,java.util.Collection<M>>> tumblingWindow(java.time.Duration duration)
Window
that groups incoming MessageEnvelope
s into fixed-size, non-overlapping
processing time based windows.
The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
MessageStream<Long> stream = ...;
Function<Collection<Long, Long>> percentile99 = ..
MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage());
M
- the type of the input MessageEnvelope
duration
- the duration in processing timeWindow
functionpublic static <M extends MessageEnvelope,K,WV> Window<M,K,WV,WindowPane<K,WV>> keyedSessionWindow(java.util.function.Function<M,K> keyFn, java.time.Duration sessionGap, java.util.function.BiFunction<M,WV,WV> foldFn)
Window
that groups incoming MessageEnvelope
s into sessions per-key based on the provided sessionGap
and applies the provided fold function to them.
A session captures some period of activity over a MessageStream
.
A session is considered complete when no new messages arrive within the sessionGap
. All MessageEnvelope
s that arrive within
the gap are grouped into the same session.
The below example computes the maximum value per-key over a session window of gap 10 seconds.
MessageStream<UserClick> stream = ...;
BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window(
Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
M
- the type of the input MessageEnvelope
K
- the type of the key in the Window
WV
- the type of the output value in the WindowPane
keyFn
- the function to extract the window key from a MessageEnvelope
sessionGap
- the timeout gap for defining the sessionfoldFn
- the function to aggregate MessageEnvelope
s in the WindowPane
Window
functionpublic static <M extends MessageEnvelope,K> Window<M,K,java.util.Collection<M>,WindowPane<K,java.util.Collection<M>>> keyedSessionWindow(java.util.function.Function<M,K> keyFn, java.time.Duration sessionGap)
Window
that groups incoming MessageEnvelope
s into sessions per-key based on the provided sessionGap
.
A session captures some period of activity over a MessageStream
. The
boundary for the session is defined by a sessionGap
. All MessageEnvelope
s that that arrive within
the gap are grouped into the same session.
The below example groups the stream into per-key session windows of gap 10 seconds.
MessageStream<UserClick> stream = ...;
BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
MessageStream<WindowOutput<WindowKey<String>, Collection<M>>> windowedStream = stream.window(
Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
M
- the type of the input MessageEnvelope
K
- the type of the key in the Window
keyFn
- the function to extract the window key from a MessageEnvelope
sessionGap
- the timeout gap for defining the sessionWindow
functionpublic static <M extends MessageEnvelope,WV> Window<M,java.lang.Void,WV,WindowPane<java.lang.Void,WV>> globalWindow(java.util.function.BiFunction<M,WV,WV> foldFn)
Window
that groups incoming MessageEnvelope
s into a single global window. This window does not have a
default trigger. The triggering behavior must be specified by setting an early trigger.
The below example computes the maximum value over a count based window. The window emits WindowPane
s when
there are either 50 messages in the window pane or when 10 seconds have passed since the first message in the pane.
MessageStream<Long> stream = ...;
BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c);
MessageStream<WindowOutput<WindowKey, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
.setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
M
- the type of MessageEnvelope
WV
- type of the output value in the WindowPane
foldFn
- the function to aggregate MessageEnvelope
s in the WindowPane
Window
function.public static <M extends MessageEnvelope> Window<M,java.lang.Void,java.util.Collection<M>,WindowPane<java.lang.Void,java.util.Collection<M>>> globalWindow()
Window
that groups incoming MessageEnvelope
s into a single global window. This window does not have a
default trigger. The triggering behavior must be specified by setting an early trigger.
The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes.
MessageStream<Long> stream = ...;
MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = stream.window(Windows.globalWindow()
.setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
M
- the type of MessageEnvelope
Window
function.public static <M extends MessageEnvelope,K,WV> Window<M,K,WV,WindowPane<K,WV>> keyedGlobalWindow(java.util.function.Function<M,K> keyFn, java.util.function.BiFunction<M,WV,WV> foldFn)
Window
that groups incoming MessageEnvelope
s using the provided keyFn.
The window does not have a default trigger. The triggering behavior must be specified by setting an early
trigger.
The below example groups the stream into count based windows. The window triggers every 50 messages or every 10 minutes.
MessageStream<UserClick> stream = ...;
BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c);
Function<UserClick, String> keyFn = ...;
MessageStream<WindowOutput<WindowKey<String>, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
.setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
M
- the type of MessageEnvelope
K
- type of the key in the Window
WV
- the type of the output value in the WindowPane
keyFn
- the function to extract the window key from a MessageEnvelope
foldFn
- the function to aggregate MessageEnvelope
s in the WindowPane
Window
functionpublic static <M extends MessageEnvelope,K> Window<M,K,java.util.Collection<M>,WindowPane<K,java.util.Collection<M>>> keyedGlobalWindow(java.util.function.Function<M,K> keyFn)
Window
that groups incoming MessageEnvelope
s using the provided keyFn.
The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger.
The below example groups the stream per-key into count based windows. The window triggers every 50 messages or every 10 minutes.
MessageStream<UserClick> stream = ...;
Function<UserClick, String> keyFn = ...;
MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
.setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
M
- the type of MessageEnvelope
K
- the type of the key in the Window
keyFn
- the function to extract the window key from a MessageEnvelope
Window
function