Reactor Netty - how to send with delayed Flux2019 Community Moderator ElectionReactor Netty TcpServer with PipelineReactor-Netty: unexpected message type: PooledUnsafeDirectByteBufHow to connect a Subscriber with a reactor.core.publisher.Flux?Reactor Flux replay(int history) method not working as expectedHow to execute blocking calls within a Spring Webflux / Reactor Netty web applicationWhy does this Producer produce before the Consumer can retry?retry (or) retryWhen does not seem to work with hot fluxReactor Netty/Spring Cloud Gateway Hang on Response 304Reactor-netty TCPClient cannot receive responsesHow to chain flux to another flux/mono and apply another back pressure?

3.5% Interest Student Loan or use all of my savings on Tuition?

Linear Combination of Atomic Orbitals

How spaceships determine each other's mass in space?

Sundering Titan and basic normal lands and snow lands

Is there a math equivalent to the conditional ternary operator?

Why do phishing e-mails use faked e-mail addresses instead of the real one?

Is being socially reclusive okay for a graduate student?

Does the in-code argument passing conventions used on PDP-11's have a name?

Why can't we use freedom of speech and expression to incite people to rebel against government in India?

Why won't the strings command stop?

Why are special aircraft used for the carriers in the United States Navy?

What can I do if someone tampers with my SSH public key?

Convert an array of objects to array of the objects' values

When to use the term transposed instead of modulation?

Are angels creatures (Mark 16:15) and can they repent (Rev 2:5 and Rom 8:21)

Is divide-by-zero a security vulnerability?

Replacing tantalum capacitor with ceramic capacitor for Op Amps

The Key to the Door

Learning to quickly identify valid fingering for piano?

Do natural melee weapons (from racial traits) trigger Improved Divine Smite?

How do we objectively assess if a dialogue sounds unnatural or cringy?

Iron deposits mined from under the city

Deal the cards to the players

Dukha vs legitimate need



Reactor Netty - how to send with delayed Flux



2019 Community Moderator ElectionReactor Netty TcpServer with PipelineReactor-Netty: unexpected message type: PooledUnsafeDirectByteBufHow to connect a Subscriber with a reactor.core.publisher.Flux?Reactor Flux replay(int history) method not working as expectedHow to execute blocking calls within a Spring Webflux / Reactor Netty web applicationWhy does this Producer produce before the Consumer can retry?retry (or) retryWhen does not seem to work with hot fluxReactor Netty/Spring Cloud Gateway Hang on Response 304Reactor-netty TCPClient cannot receive responsesHow to chain flux to another flux/mono and apply another back pressure?










2















In Reactor Netty, when sending data to TCP channel via out.send(publisher), one would expect any publisher to work. However, if instead of a simple immediate Flux we use a more complex one with delayed elements, then it stops working properly.
For example, if we take this hello world TCP echo server, it works as expected:



import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.time.Duration;

public class Reactor1
public static void main(String[] args) throws Exception
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();




However, if we change out.sendString to



out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))


then we would expect that for each received item an output will be produced with one second delay.



However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa and bb during the first second, but only AA gets produced as output (after one second):



$ nc localhost 3344
aa
bb
AA <after one second>


Then, if we later type additional line, we get output (after one second) but from the previous input:



cc
BB <after one second>


Any ideas how to make send() work as expected with a delayed Flux?










share|improve this question




























    2















    In Reactor Netty, when sending data to TCP channel via out.send(publisher), one would expect any publisher to work. However, if instead of a simple immediate Flux we use a more complex one with delayed elements, then it stops working properly.
    For example, if we take this hello world TCP echo server, it works as expected:



    import reactor.core.publisher.Flux;
    import reactor.netty.DisposableServer;
    import reactor.netty.tcp.TcpServer;

    import java.time.Duration;

    public class Reactor1
    public static void main(String[] args) throws Exception
    DisposableServer server = TcpServer.create()
    .port(3344)
    .handle((in, out) -> in
    .receive()
    .asString()
    .flatMap(s ->
    out.sendString(Flux.just(s.toUpperCase()))
    ))
    .bind()
    .block();
    server.channel().closeFuture().sync();




    However, if we change out.sendString to



    out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))


    then we would expect that for each received item an output will be produced with one second delay.



    However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa and bb during the first second, but only AA gets produced as output (after one second):



    $ nc localhost 3344
    aa
    bb
    AA <after one second>


    Then, if we later type additional line, we get output (after one second) but from the previous input:



    cc
    BB <after one second>


    Any ideas how to make send() work as expected with a delayed Flux?










    share|improve this question


























      2












      2








      2








      In Reactor Netty, when sending data to TCP channel via out.send(publisher), one would expect any publisher to work. However, if instead of a simple immediate Flux we use a more complex one with delayed elements, then it stops working properly.
      For example, if we take this hello world TCP echo server, it works as expected:



      import reactor.core.publisher.Flux;
      import reactor.netty.DisposableServer;
      import reactor.netty.tcp.TcpServer;

      import java.time.Duration;

      public class Reactor1
      public static void main(String[] args) throws Exception
      DisposableServer server = TcpServer.create()
      .port(3344)
      .handle((in, out) -> in
      .receive()
      .asString()
      .flatMap(s ->
      out.sendString(Flux.just(s.toUpperCase()))
      ))
      .bind()
      .block();
      server.channel().closeFuture().sync();




      However, if we change out.sendString to



      out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))


      then we would expect that for each received item an output will be produced with one second delay.



      However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa and bb during the first second, but only AA gets produced as output (after one second):



      $ nc localhost 3344
      aa
      bb
      AA <after one second>


      Then, if we later type additional line, we get output (after one second) but from the previous input:



      cc
      BB <after one second>


      Any ideas how to make send() work as expected with a delayed Flux?










      share|improve this question
















      In Reactor Netty, when sending data to TCP channel via out.send(publisher), one would expect any publisher to work. However, if instead of a simple immediate Flux we use a more complex one with delayed elements, then it stops working properly.
      For example, if we take this hello world TCP echo server, it works as expected:



      import reactor.core.publisher.Flux;
      import reactor.netty.DisposableServer;
      import reactor.netty.tcp.TcpServer;

      import java.time.Duration;

      public class Reactor1
      public static void main(String[] args) throws Exception
      DisposableServer server = TcpServer.create()
      .port(3344)
      .handle((in, out) -> in
      .receive()
      .asString()
      .flatMap(s ->
      out.sendString(Flux.just(s.toUpperCase()))
      ))
      .bind()
      .block();
      server.channel().closeFuture().sync();




      However, if we change out.sendString to



      out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))


      then we would expect that for each received item an output will be produced with one second delay.



      However, the way server behaves is that if it receives multiple items during the interval, it will produce output only for the first item. For example, below we type aa and bb during the first second, but only AA gets produced as output (after one second):



      $ nc localhost 3344
      aa
      bb
      AA <after one second>


      Then, if we later type additional line, we get output (after one second) but from the previous input:



      cc
      BB <after one second>


      Any ideas how to make send() work as expected with a delayed Flux?







      project-reactor reactor-netty






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited yesterday







      Ivan

















      asked yesterday









      IvanIvan

      324311




      324311






















          2 Answers
          2






          active

          oldest

          votes


















          1














          I think you shouldn't recreate publisher for the out.sendString(...)
          This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> out
          .options(NettyPipeline.SendOptions::flushOnEach)
          .sendString(in.receive()
          .asString()
          .map(String::toUpperCase)
          .delayElements(Duration.ofSeconds(1))))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer


















          • 1





            Yeah, starting with out makes sense (combined with flushOnEach). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle that starts with in and then flatmap. But looking at HTTP examples, they always use the style starting with out (res).

            – Ivan
            yesterday






          • 1





            I realized one thing in this answer doesn't work properly: it uses delayElements operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1)) with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1))))) - it will delay each element individually.

            – Ivan
            20 hours ago


















          0














          Try to use concatMap. This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> in
          .receive()
          .asString()
          .concatMap(s ->
          out.sendString(Flux.just(s.toUpperCase())
          .delayElements(Duration.ofSeconds(1)))
          ))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer























          • There is one issue with this: concatMap will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms

            – Ivan
            20 hours ago











          • Then move the .delayElements(Duration.ofSeconds(1)) before concatMap so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap

            – Violeta Georgieva
            13 hours ago











          • This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple send operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.

            – Ivan
            11 hours ago










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55015276%2freactor-netty-how-to-send-with-delayed-flux%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          2 Answers
          2






          active

          oldest

          votes








          2 Answers
          2






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          I think you shouldn't recreate publisher for the out.sendString(...)
          This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> out
          .options(NettyPipeline.SendOptions::flushOnEach)
          .sendString(in.receive()
          .asString()
          .map(String::toUpperCase)
          .delayElements(Duration.ofSeconds(1))))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer


















          • 1





            Yeah, starting with out makes sense (combined with flushOnEach). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle that starts with in and then flatmap. But looking at HTTP examples, they always use the style starting with out (res).

            – Ivan
            yesterday






          • 1





            I realized one thing in this answer doesn't work properly: it uses delayElements operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1)) with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1))))) - it will delay each element individually.

            – Ivan
            20 hours ago















          1














          I think you shouldn't recreate publisher for the out.sendString(...)
          This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> out
          .options(NettyPipeline.SendOptions::flushOnEach)
          .sendString(in.receive()
          .asString()
          .map(String::toUpperCase)
          .delayElements(Duration.ofSeconds(1))))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer


















          • 1





            Yeah, starting with out makes sense (combined with flushOnEach). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle that starts with in and then flatmap. But looking at HTTP examples, they always use the style starting with out (res).

            – Ivan
            yesterday






          • 1





            I realized one thing in this answer doesn't work properly: it uses delayElements operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1)) with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1))))) - it will delay each element individually.

            – Ivan
            20 hours ago













          1












          1








          1







          I think you shouldn't recreate publisher for the out.sendString(...)
          This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> out
          .options(NettyPipeline.SendOptions::flushOnEach)
          .sendString(in.receive()
          .asString()
          .map(String::toUpperCase)
          .delayElements(Duration.ofSeconds(1))))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer













          I think you shouldn't recreate publisher for the out.sendString(...)
          This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> out
          .options(NettyPipeline.SendOptions::flushOnEach)
          .sendString(in.receive()
          .asString()
          .map(String::toUpperCase)
          .delayElements(Duration.ofSeconds(1))))
          .bind()
          .block();
          server.channel().closeFuture().sync();






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered yesterday









          Alexander PankinAlexander Pankin

          83627




          83627







          • 1





            Yeah, starting with out makes sense (combined with flushOnEach). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle that starts with in and then flatmap. But looking at HTTP examples, they always use the style starting with out (res).

            – Ivan
            yesterday






          • 1





            I realized one thing in this answer doesn't work properly: it uses delayElements operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1)) with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1))))) - it will delay each element individually.

            – Ivan
            20 hours ago












          • 1





            Yeah, starting with out makes sense (combined with flushOnEach). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle that starts with in and then flatmap. But looking at HTTP examples, they always use the style starting with out (res).

            – Ivan
            yesterday






          • 1





            I realized one thing in this answer doesn't work properly: it uses delayElements operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1)) with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1))))) - it will delay each element individually.

            – Ivan
            20 hours ago







          1




          1





          Yeah, starting with out makes sense (combined with flushOnEach). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle that starts with in and then flatmap. But looking at HTTP examples, they always use the style starting with out (res).

          – Ivan
          yesterday





          Yeah, starting with out makes sense (combined with flushOnEach). In the document referenced from the reactor-netty page, they do have a single TCP example which has handle that starts with in and then flatmap. But looking at HTTP examples, they always use the style starting with out (res).

          – Ivan
          yesterday




          1




          1





          I realized one thing in this answer doesn't work properly: it uses delayElements operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1)) with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1))))) - it will delay each element individually.

          – Ivan
          20 hours ago





          I realized one thing in this answer doesn't work properly: it uses delayElements operator on a single stream, which will delay all responses equally, instead of each individually. For example, if there is first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms. However, with a small change it can work properly: we just have to replace .delayElements(Duration.ofSeconds(1)) with .flatMap(s -> Mono.just(s).delayElement(Duration.ofSeconds(1))))) - it will delay each element individually.

          – Ivan
          20 hours ago













          0














          Try to use concatMap. This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> in
          .receive()
          .asString()
          .concatMap(s ->
          out.sendString(Flux.just(s.toUpperCase())
          .delayElements(Duration.ofSeconds(1)))
          ))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer























          • There is one issue with this: concatMap will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms

            – Ivan
            20 hours ago











          • Then move the .delayElements(Duration.ofSeconds(1)) before concatMap so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap

            – Violeta Georgieva
            13 hours ago











          • This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple send operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.

            – Ivan
            11 hours ago















          0














          Try to use concatMap. This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> in
          .receive()
          .asString()
          .concatMap(s ->
          out.sendString(Flux.just(s.toUpperCase())
          .delayElements(Duration.ofSeconds(1)))
          ))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer























          • There is one issue with this: concatMap will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms

            – Ivan
            20 hours ago











          • Then move the .delayElements(Duration.ofSeconds(1)) before concatMap so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap

            – Violeta Georgieva
            13 hours ago











          • This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple send operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.

            – Ivan
            11 hours ago













          0












          0








          0







          Try to use concatMap. This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> in
          .receive()
          .asString()
          .concatMap(s ->
          out.sendString(Flux.just(s.toUpperCase())
          .delayElements(Duration.ofSeconds(1)))
          ))
          .bind()
          .block();
          server.channel().closeFuture().sync();





          share|improve this answer













          Try to use concatMap. This works:



          DisposableServer server = TcpServer.create()
          .port(3344)
          .handle((in, out) -> in
          .receive()
          .asString()
          .concatMap(s ->
          out.sendString(Flux.just(s.toUpperCase())
          .delayElements(Duration.ofSeconds(1)))
          ))
          .bind()
          .block();
          server.channel().closeFuture().sync();






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered yesterday









          Violeta GeorgievaVioleta Georgieva

          262




          262












          • There is one issue with this: concatMap will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms

            – Ivan
            20 hours ago











          • Then move the .delayElements(Duration.ofSeconds(1)) before concatMap so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap

            – Violeta Georgieva
            13 hours ago











          • This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple send operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.

            – Ivan
            11 hours ago

















          • There is one issue with this: concatMap will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms

            – Ivan
            20 hours ago











          • Then move the .delayElements(Duration.ofSeconds(1)) before concatMap so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap

            – Violeta Georgieva
            13 hours ago











          • This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple send operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.

            – Ivan
            11 hours ago
















          There is one issue with this: concatMap will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms

          – Ivan
          20 hours ago





          There is one issue with this: concatMap will subscribe to the next inner publisher only after the previous one is completed, which messes up the timing: given the first input at t1=0, and second at t2=100ms, instead of expected outputs at t1=1000ms and t2=1100ms, it would produce outputs at t1=1000ms and t2=2000ms

          – Ivan
          20 hours ago













          Then move the .delayElements(Duration.ofSeconds(1)) before concatMap so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap

          – Violeta Georgieva
          13 hours ago





          Then move the .delayElements(Duration.ofSeconds(1)) before concatMap so that I will delay the consumption of the incoming traffic. The important thing here is that Reactor Netty do not allow nesting the send operation, which will happen if you use flatMap

          – Violeta Georgieva
          13 hours ago













          This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple send operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.

          – Ivan
          11 hours ago





          This gives the same results, it always produces outputs at the interval boundary (1s). Given that having multiple send operations in flight is not allowed, I wonder if "in.receive, out.send" style is ever useful, compared to the "out.send, in.receive" style (as in the accepted answer) which also seems easier to reason about since there is a single out stream.

          – Ivan
          11 hours ago

















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55015276%2freactor-netty-how-to-send-with-delayed-flux%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

          2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived

          Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme