Reactor Netty - how to send with delayed Flux2019 Community Moderator ElectionReactor Netty TcpServer with PipelineReactor-Netty: unexpected message type: PooledUnsafeDirectByteBufHow to connect a Subscriber with a reactor.core.publisher.Flux?Reactor Flux replay(int history) method not working as expectedHow to execute blocking calls within a Spring Webflux / Reactor Netty web applicationWhy does this Producer produce before the Consumer can retry?retry (or) retryWhen does not seem to work with hot fluxReactor Netty/Spring Cloud Gateway Hang on Response 304Reactor-netty TCPClient cannot receive responsesHow to chain flux to another flux/mono and apply another back pressure?
3.5% Interest Student Loan or use all of my savings on Tuition?
Linear Combination of Atomic Orbitals
How spaceships determine each other's mass in space?
Sundering Titan and basic normal lands and snow lands
Is there a math equivalent to the conditional ternary operator?
Why do phishing e-mails use faked e-mail addresses instead of the real one?
Is being socially reclusive okay for a graduate student?
Does the in-code argument passing conventions used on PDP-11's have a name?
Why can't we use freedom of speech and expression to incite people to rebel against government in India?
Why won't the strings command stop?
Why are special aircraft used for the carriers in the United States Navy?
What can I do if someone tampers with my SSH public key?
Convert an array of objects to array of the objects' values
When to use the term transposed instead of modulation?
Are angels creatures (Mark 16:15) and can they repent (Rev 2:5 and Rom 8:21)
Is divide-by-zero a security vulnerability?
Replacing tantalum capacitor with ceramic capacitor for Op Amps
The Key to the Door
Learning to quickly identify valid fingering for piano?
Do natural melee weapons (from racial traits) trigger Improved Divine Smite?
How do we objectively assess if a dialogue sounds unnatural or cringy?
Iron deposits mined from under the city
Deal the cards to the players
Dukha vs legitimate need
Reactor Netty - how to send with delayed Flux
2019 Community Moderator ElectionReactor Netty TcpServer with PipelineReactor-Netty: unexpected message type: PooledUnsafeDirectByteBufHow to connect a Subscriber with a reactor.core.publisher.Flux?Reactor Flux replay(int history) method not working as expectedHow to execute blocking calls within a Spring Webflux / Reactor Netty web applicationWhy does this Producer produce before the Consumer can retry?retry (or) retryWhen does not seem to work with hot fluxReactor Netty/Spring Cloud Gateway Hang on Response 304Reactor-netty TCPClient cannot receive responsesHow to chain flux to another flux/mono and apply another back pressure?
In Reactor Netty, when sending data to TCP channel via out.send(publisher)
, one would expect any publisher to work. However, if instead of a simple immediate Flux
we use a more complex one with delayed elements, then it stops working properly.
For example, if we take this hello world TCP echo server, it works as expected:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1
public static void main(String[] args) throws Exception
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
However, if we change out.sendString
to
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
then we would expect that for each received item an output will be produced with one second delay.
However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa
and bb
during the first second, but only AA
gets produced as output (after one second):
$ nc localhost 3344
aa
bb
AA <after one second>
Then, if we later type additional line, we get output (after one second) but from the previous input:
cc
BB <after one second>
Any ideas how to make send()
work as expected with a delayed Flux
?
project-reactor reactor-netty
add a comment |
In Reactor Netty, when sending data to TCP channel via out.send(publisher)
, one would expect any publisher to work. However, if instead of a simple immediate Flux
we use a more complex one with delayed elements, then it stops working properly.
For example, if we take this hello world TCP echo server, it works as expected:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1
public static void main(String[] args) throws Exception
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
However, if we change out.sendString
to
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
then we would expect that for each received item an output will be produced with one second delay.
However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa
and bb
during the first second, but only AA
gets produced as output (after one second):
$ nc localhost 3344
aa
bb
AA <after one second>
Then, if we later type additional line, we get output (after one second) but from the previous input:
cc
BB <after one second>
Any ideas how to make send()
work as expected with a delayed Flux
?
project-reactor reactor-netty
add a comment |
In Reactor Netty, when sending data to TCP channel via out.send(publisher)
, one would expect any publisher to work. However, if instead of a simple immediate Flux
we use a more complex one with delayed elements, then it stops working properly.
For example, if we take this hello world TCP echo server, it works as expected:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1
public static void main(String[] args) throws Exception
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
However, if we change out.sendString
to
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
then we would expect that for each received item an output will be produced with one second delay.
However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa
and bb
during the first second, but only AA
gets produced as output (after one second):
$ nc localhost 3344
aa
bb
AA <after one second>
Then, if we later type additional line, we get output (after one second) but from the previous input:
cc
BB <after one second>
Any ideas how to make send()
work as expected with a delayed Flux
?
project-reactor reactor-netty
In Reactor Netty, when sending data to TCP channel via out.send(publisher)
, one would expect any publisher to work. However, if instead of a simple immediate Flux
we use a more complex one with delayed elements, then it stops working properly.
For example, if we take this hello world TCP echo server, it works as expected:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1
public static void main(String[] args) throws Exception
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
However, if we change out.sendString
to
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
then we would expect that for each received item an output will be produced with one second delay.
However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa
and bb
during the first second, but only AA
gets produced as output (after one second):
$ nc localhost 3344
aa
bb
AA <after one second>
Then, if we later type additional line, we get output (after one second) but from the previous input:
cc
BB <after one second>
Any ideas how to make send()
work as expected with a delayed Flux
?
project-reactor reactor-netty
project-reactor reactor-netty
edited yesterday
Ivan
asked yesterday
IvanIvan
324311
324311
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
I think you shouldn't recreate publisher for the out.sendString(...)
This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
1
Yeah, starting without
makes sense (combined withflushOnEach
). In the document referenced from the reactor-netty page, they do have a single TCP example which hashandle
that starts within
and thenflatmap
. But looking at HTTP examples, they always use the style starting without
(res
).
– Ivan
yesterday
1
I realized one thing in this answer doesn't work properly: it usesdelayElements
operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace.delayElements(Duration.ofSeconds(1))
with.flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1)))))
- it will delay each element individually.
– Ivan
20 hours ago
add a comment |
Try to use concatMap. This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
There is one issue with this:concatMap
will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms
– Ivan
20 hours ago
Then move the.delayElements(Duration.ofSeconds(1))
beforeconcatMap
so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap
– Violeta Georgieva
13 hours ago
This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiplesend
operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.
– Ivan
11 hours ago
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55015276%2freactor-netty-how-to-send-with-delayed-flux%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
I think you shouldn't recreate publisher for the out.sendString(...)
This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
1
Yeah, starting without
makes sense (combined withflushOnEach
). In the document referenced from the reactor-netty page, they do have a single TCP example which hashandle
that starts within
and thenflatmap
. But looking at HTTP examples, they always use the style starting without
(res
).
– Ivan
yesterday
1
I realized one thing in this answer doesn't work properly: it usesdelayElements
operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace.delayElements(Duration.ofSeconds(1))
with.flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1)))))
- it will delay each element individually.
– Ivan
20 hours ago
add a comment |
I think you shouldn't recreate publisher for the out.sendString(...)
This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
1
Yeah, starting without
makes sense (combined withflushOnEach
). In the document referenced from the reactor-netty page, they do have a single TCP example which hashandle
that starts within
and thenflatmap
. But looking at HTTP examples, they always use the style starting without
(res
).
– Ivan
yesterday
1
I realized one thing in this answer doesn't work properly: it usesdelayElements
operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace.delayElements(Duration.ofSeconds(1))
with.flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1)))))
- it will delay each element individually.
– Ivan
20 hours ago
add a comment |
I think you shouldn't recreate publisher for the out.sendString(...)
This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
I think you shouldn't recreate publisher for the out.sendString(...)
This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
answered yesterday
Alexander PankinAlexander Pankin
83627
83627
1
Yeah, starting without
makes sense (combined withflushOnEach
). In the document referenced from the reactor-netty page, they do have a single TCP example which hashandle
that starts within
and thenflatmap
. But looking at HTTP examples, they always use the style starting without
(res
).
– Ivan
yesterday
1
I realized one thing in this answer doesn't work properly: it usesdelayElements
operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace.delayElements(Duration.ofSeconds(1))
with.flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1)))))
- it will delay each element individually.
– Ivan
20 hours ago
add a comment |
1
Yeah, starting without
makes sense (combined withflushOnEach
). In the document referenced from the reactor-netty page, they do have a single TCP example which hashandle
that starts within
and thenflatmap
. But looking at HTTP examples, they always use the style starting without
(res
).
– Ivan
yesterday
1
I realized one thing in this answer doesn't work properly: it usesdelayElements
operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace.delayElements(Duration.ofSeconds(1))
with.flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1)))))
- it will delay each element individually.
– Ivan
20 hours ago
1
1
Yeah, starting with
out
makes sense (combined with flushOnEach
). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle
that starts with in
and then flatmap
. But looking at HTTP examples, they always use the style starting with out
(res
).– Ivan
yesterday
Yeah, starting with
out
makes sense (combined with flushOnEach
). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle
that starts with in
and then flatmap
. But looking at HTTP examples, they always use the style starting with out
(res
).– Ivan
yesterday
1
1
I realized one thing in this answer doesn't work properly: it uses
delayElements
operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1))
with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1)))))
- it will delay each element individually.– Ivan
20 hours ago
I realized one thing in this answer doesn't work properly: it uses
delayElements
operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1))
with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1)))))
- it will delay each element individually.– Ivan
20 hours ago
add a comment |
Try to use concatMap. This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
There is one issue with this:concatMap
will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms
– Ivan
20 hours ago
Then move the.delayElements(Duration.ofSeconds(1))
beforeconcatMap
so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap
– Violeta Georgieva
13 hours ago
This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiplesend
operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.
– Ivan
11 hours ago
add a comment |
Try to use concatMap. This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
There is one issue with this:concatMap
will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms
– Ivan
20 hours ago
Then move the.delayElements(Duration.ofSeconds(1))
beforeconcatMap
so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap
– Violeta Georgieva
13 hours ago
This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiplesend
operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.
– Ivan
11 hours ago
add a comment |
Try to use concatMap. This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
Try to use concatMap. This works:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
answered yesterday
Violeta GeorgievaVioleta Georgieva
262
262
There is one issue with this:concatMap
will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms
– Ivan
20 hours ago
Then move the.delayElements(Duration.ofSeconds(1))
beforeconcatMap
so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap
– Violeta Georgieva
13 hours ago
This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiplesend
operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.
– Ivan
11 hours ago
add a comment |
There is one issue with this:concatMap
will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms
– Ivan
20 hours ago
Then move the.delayElements(Duration.ofSeconds(1))
beforeconcatMap
so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap
– Violeta Georgieva
13 hours ago
This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiplesend
operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.
– Ivan
11 hours ago
There is one issue with this:
concatMap
will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms– Ivan
20 hours ago
There is one issue with this:
concatMap
will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms– Ivan
20 hours ago
Then move the
.delayElements(Duration.ofSeconds(1))
before concatMap
so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap– Violeta Georgieva
13 hours ago
Then move the
.delayElements(Duration.ofSeconds(1))
before concatMap
so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap– Violeta Georgieva
13 hours ago
This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple
send
operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.– Ivan
11 hours ago
This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple
send
operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.– Ivan
11 hours ago
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55015276%2freactor-netty-how-to-send-with-delayed-flux%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown