⛓ Convert event-based API to Flux
When working with reactive streams in Java, you may encounter a situation when there is an event-based API that must somehow return data as a stream.
In my case for Reactive Streams, I use Reactor. Let’s say we have a simple listener object with onUpdate
callback method, like in the code snippet below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CustomListener implements BaseListener {
private final FluxConnector<String[]> namesFluxConnector;;
public MarketDataListener() {
namesFluxConnector = new FluxConnector<>();
}
@Override
public void onUpdate(String[] names) {
namesFluxConnector.next(names);
}
public Flux<String[]> getFlux() {
return namesFluxConnector.getFlux();
}
}
And we need to work with data as with stream, in the code above CustomListener
returns Flux<String[]>
object to consume the stream later. The main transformations are done in FluxConnector
. FluxConnector
creates a Flux
and uses FluxSink
to push objects into stream. A simple implementation to convey the main idea in the code snippet below.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class FluxConnector<T> {
private final Flux<T> flux;
private FluxSink<T> fluxSink;
public FluxConnector() {
this.flux = Flux
.create(new SinkAdapter<T>(this::setFluxSink))
.share();
}
public void next(T data) {
if (Objects.nonNull(fluxSink))
fluxSink.next(data);
}
public Flux<T> getFlux() {
return flux;
}
private void setFluxSink(FluxSink<T> fluxSink) {
if (Objects.isNull(this.fluxSink))
this.fluxSink = fluxSink;
}
private static class SinkAdapter<T> implements Consumer<FluxSink<T>> {
private final Consumer<FluxSink<T>> consumer;
SinkAdapter(Consumer<FluxSink<T>> consumer) {
this.consumer = consumer;
}
@Override
public void accept(FluxSink<T> sink) {
consumer.accept(sink);
}
}
}
Links
This post is licensed under CC BY 4.0 by the author.