Proper way to programmatically stop an Alpakka Kafka stream2019 Community Moderator ElectionData Modeling with Kafka? Topics and PartitionsCan I run Kafka Streams Application on the same machine as of Kafka Broker?Akka Streams Reactive Kafka - OutOfMemoryError under high loadAkka Kafka stream supervison strategy not workingAkka Streams KillSwitch in alpakka jmsGracefully restart a Reactive-Kafka Consumer Stream on failureKafka Streams: Kafka Streams application stuck rebalancingAlpakka/Kafka - Partitions consumed faster than othersActorSystem shutdown in akka streamHow to control/pause akka streams flow in sub source/stream

Loading the leaflet Map in Lightning Web Component

What exactly term 'companion plants' means?

What (if any) is the reason to buy in small local stores?

Asserting that Atheism and Theism are both faith based positions

Recruiter wants very extensive technical details about all of my previous work

How does 取材で訪れた integrate into this sentence?

What does "mu" mean as an interjection?

Help rendering a complicated sum/product formula

Knife as defense against stray dogs

Inhabiting Mars versus going straight for a Dyson swarm

Synchronized implementation of a bank account in Java

How can an organ that provides biological immortality be unable to regenerate?

두음법칙 - When did North and South diverge in pronunciation of initial ㄹ?

What does "Four-F." mean?

How are passwords stolen from companies if they only store hashes?

Is it insecure to send a password in a `curl` command?

Can a medieval gyroplane be built?

Should I be concerned about student access to a test bank?

How to generate binary array whose elements with values 1 are randomly drawn

Variable completely messes up echoed string

Fewest number of steps to reach 200 using special calculator

Generic TVP tradeoffs?

In Aliens, how many people were on LV-426 before the Marines arrived​?

Is honey really a supersaturated solution? Does heating to un-crystalize redissolve it or melt it?



Proper way to programmatically stop an Alpakka Kafka stream



2019 Community Moderator ElectionData Modeling with Kafka? Topics and PartitionsCan I run Kafka Streams Application on the same machine as of Kafka Broker?Akka Streams Reactive Kafka - OutOfMemoryError under high loadAkka Kafka stream supervison strategy not workingAkka Streams KillSwitch in alpakka jmsGracefully restart a Reactive-Kafka Consumer Stream on failureKafka Streams: Kafka Streams application stuck rebalancingAlpakka/Kafka - Partitions consumed faster than othersActorSystem shutdown in akka streamHow to control/pause akka streams flow in sub source/stream










4















We are trying to use Akka Streams with Alpakka Kafka to consume a stream of events in a service. For handling event processing errors we are using Kafka autocommit and more than one queue. For example, if we have the topic user_created, which we want to consume from a products service, we also create user_created_for_products_failed and user_created_for_products_dead_letter. These two extra topics are coupled to a specific Kafka consumer group. If an event fails to be processed, it goes to the failed queue, where we try to consume again in five minutes--if it fails again it goes to dead letters.



On deployment we want to ensure that we don't lose events. So we are trying to stop the stream before stopping the application. As I said, we are using autocommit, but all of these events that are "flying" are not processed yet. Once the stream and application are stopped, we can deploy the new code and start the application again.



After reading the documentation, we have seen the KillSwitch feature. The problem that we are seeing in it is that the shutdown method returns Unit instead Future[Unit] as we expect. We are not sure that we won't lose events using it, because in tests it looks like it goes too fast to be working properly.



As a workaround, we create an ActorSystem for each stream and use the terminate method (which returns a Future[Terminate]). The problem with this solution is that we don't think that creating an ActorSystem per stream will scale well, and terminate takes a lot of time to resolve (in tests it takes up to one minute to shut down).



Have you faced a problem like this? Is there a faster way (compared to ActorSystem.terminate) to stop a stream and ensure that all the events that the Source has emitted have been processed?










share|improve this question




























    4















    We are trying to use Akka Streams with Alpakka Kafka to consume a stream of events in a service. For handling event processing errors we are using Kafka autocommit and more than one queue. For example, if we have the topic user_created, which we want to consume from a products service, we also create user_created_for_products_failed and user_created_for_products_dead_letter. These two extra topics are coupled to a specific Kafka consumer group. If an event fails to be processed, it goes to the failed queue, where we try to consume again in five minutes--if it fails again it goes to dead letters.



    On deployment we want to ensure that we don't lose events. So we are trying to stop the stream before stopping the application. As I said, we are using autocommit, but all of these events that are "flying" are not processed yet. Once the stream and application are stopped, we can deploy the new code and start the application again.



    After reading the documentation, we have seen the KillSwitch feature. The problem that we are seeing in it is that the shutdown method returns Unit instead Future[Unit] as we expect. We are not sure that we won't lose events using it, because in tests it looks like it goes too fast to be working properly.



    As a workaround, we create an ActorSystem for each stream and use the terminate method (which returns a Future[Terminate]). The problem with this solution is that we don't think that creating an ActorSystem per stream will scale well, and terminate takes a lot of time to resolve (in tests it takes up to one minute to shut down).



    Have you faced a problem like this? Is there a faster way (compared to ActorSystem.terminate) to stop a stream and ensure that all the events that the Source has emitted have been processed?










    share|improve this question


























      4












      4








      4








      We are trying to use Akka Streams with Alpakka Kafka to consume a stream of events in a service. For handling event processing errors we are using Kafka autocommit and more than one queue. For example, if we have the topic user_created, which we want to consume from a products service, we also create user_created_for_products_failed and user_created_for_products_dead_letter. These two extra topics are coupled to a specific Kafka consumer group. If an event fails to be processed, it goes to the failed queue, where we try to consume again in five minutes--if it fails again it goes to dead letters.



      On deployment we want to ensure that we don't lose events. So we are trying to stop the stream before stopping the application. As I said, we are using autocommit, but all of these events that are "flying" are not processed yet. Once the stream and application are stopped, we can deploy the new code and start the application again.



      After reading the documentation, we have seen the KillSwitch feature. The problem that we are seeing in it is that the shutdown method returns Unit instead Future[Unit] as we expect. We are not sure that we won't lose events using it, because in tests it looks like it goes too fast to be working properly.



      As a workaround, we create an ActorSystem for each stream and use the terminate method (which returns a Future[Terminate]). The problem with this solution is that we don't think that creating an ActorSystem per stream will scale well, and terminate takes a lot of time to resolve (in tests it takes up to one minute to shut down).



      Have you faced a problem like this? Is there a faster way (compared to ActorSystem.terminate) to stop a stream and ensure that all the events that the Source has emitted have been processed?










      share|improve this question
















      We are trying to use Akka Streams with Alpakka Kafka to consume a stream of events in a service. For handling event processing errors we are using Kafka autocommit and more than one queue. For example, if we have the topic user_created, which we want to consume from a products service, we also create user_created_for_products_failed and user_created_for_products_dead_letter. These two extra topics are coupled to a specific Kafka consumer group. If an event fails to be processed, it goes to the failed queue, where we try to consume again in five minutes--if it fails again it goes to dead letters.



      On deployment we want to ensure that we don't lose events. So we are trying to stop the stream before stopping the application. As I said, we are using autocommit, but all of these events that are "flying" are not processed yet. Once the stream and application are stopped, we can deploy the new code and start the application again.



      After reading the documentation, we have seen the KillSwitch feature. The problem that we are seeing in it is that the shutdown method returns Unit instead Future[Unit] as we expect. We are not sure that we won't lose events using it, because in tests it looks like it goes too fast to be working properly.



      As a workaround, we create an ActorSystem for each stream and use the terminate method (which returns a Future[Terminate]). The problem with this solution is that we don't think that creating an ActorSystem per stream will scale well, and terminate takes a lot of time to resolve (in tests it takes up to one minute to shut down).



      Have you faced a problem like this? Is there a faster way (compared to ActorSystem.terminate) to stop a stream and ensure that all the events that the Source has emitted have been processed?







      scala apache-kafka akka akka-stream alpakka






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 7 at 17:35









      Jeffrey Chung

      14.3k62142




      14.3k62142










      asked Mar 7 at 14:28









      SergiGPSergiGP

      259313




      259313






















          1 Answer
          1






          active

          oldest

          votes


















          4














          From the documentation (emphasis mine):




          When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.




          val (consumerControl, streamComplete) =
          Consumer
          .plainSource(consumerSettings,
          Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, 0) -> offset
          ))
          .via(businessFlow)
          .toMat(Sink.ignore)(Keep.both)
          .run()

          consumerControl.shutdown()


          Consumer.control.shutdown() returns a Future[Done]. From its Scaladoc description:




          Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.




          Alternatively, if you're using offset storage in Kafka, use Consumer.Control.drainAndShutdown, which also returns a Future. Again from the documentation (which contains more information about what drainAndShutdown does under the covers):



          val drainingControl =
          Consumer
          .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
          .mapAsync(1) msg =>
          business(msg.record).map(_ => msg.committableOffset)

          .toMat(Committer.sink(committerSettings))(Keep.both)
          .mapMaterializedValue(DrainingControl.apply)
          .run()

          val streamComplete = drainingControl.drainAndShutdown()


          The Scaladoc description for drainAndShutdown:




          Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.







          share|improve this answer

























          • Cool dude! Thank you!

            – SergiGP
            Mar 8 at 10:41










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55046169%2fproper-way-to-programmatically-stop-an-alpakka-kafka-stream%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          4














          From the documentation (emphasis mine):




          When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.




          val (consumerControl, streamComplete) =
          Consumer
          .plainSource(consumerSettings,
          Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, 0) -> offset
          ))
          .via(businessFlow)
          .toMat(Sink.ignore)(Keep.both)
          .run()

          consumerControl.shutdown()


          Consumer.control.shutdown() returns a Future[Done]. From its Scaladoc description:




          Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.




          Alternatively, if you're using offset storage in Kafka, use Consumer.Control.drainAndShutdown, which also returns a Future. Again from the documentation (which contains more information about what drainAndShutdown does under the covers):



          val drainingControl =
          Consumer
          .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
          .mapAsync(1) msg =>
          business(msg.record).map(_ => msg.committableOffset)

          .toMat(Committer.sink(committerSettings))(Keep.both)
          .mapMaterializedValue(DrainingControl.apply)
          .run()

          val streamComplete = drainingControl.drainAndShutdown()


          The Scaladoc description for drainAndShutdown:




          Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.







          share|improve this answer

























          • Cool dude! Thank you!

            – SergiGP
            Mar 8 at 10:41















          4














          From the documentation (emphasis mine):




          When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.




          val (consumerControl, streamComplete) =
          Consumer
          .plainSource(consumerSettings,
          Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, 0) -> offset
          ))
          .via(businessFlow)
          .toMat(Sink.ignore)(Keep.both)
          .run()

          consumerControl.shutdown()


          Consumer.control.shutdown() returns a Future[Done]. From its Scaladoc description:




          Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.




          Alternatively, if you're using offset storage in Kafka, use Consumer.Control.drainAndShutdown, which also returns a Future. Again from the documentation (which contains more information about what drainAndShutdown does under the covers):



          val drainingControl =
          Consumer
          .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
          .mapAsync(1) msg =>
          business(msg.record).map(_ => msg.committableOffset)

          .toMat(Committer.sink(committerSettings))(Keep.both)
          .mapMaterializedValue(DrainingControl.apply)
          .run()

          val streamComplete = drainingControl.drainAndShutdown()


          The Scaladoc description for drainAndShutdown:




          Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.







          share|improve this answer

























          • Cool dude! Thank you!

            – SergiGP
            Mar 8 at 10:41













          4












          4








          4







          From the documentation (emphasis mine):




          When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.




          val (consumerControl, streamComplete) =
          Consumer
          .plainSource(consumerSettings,
          Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, 0) -> offset
          ))
          .via(businessFlow)
          .toMat(Sink.ignore)(Keep.both)
          .run()

          consumerControl.shutdown()


          Consumer.control.shutdown() returns a Future[Done]. From its Scaladoc description:




          Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.




          Alternatively, if you're using offset storage in Kafka, use Consumer.Control.drainAndShutdown, which also returns a Future. Again from the documentation (which contains more information about what drainAndShutdown does under the covers):



          val drainingControl =
          Consumer
          .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
          .mapAsync(1) msg =>
          business(msg.record).map(_ => msg.committableOffset)

          .toMat(Committer.sink(committerSettings))(Keep.both)
          .mapMaterializedValue(DrainingControl.apply)
          .run()

          val streamComplete = drainingControl.drainAndShutdown()


          The Scaladoc description for drainAndShutdown:




          Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.







          share|improve this answer















          From the documentation (emphasis mine):




          When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.




          val (consumerControl, streamComplete) =
          Consumer
          .plainSource(consumerSettings,
          Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, 0) -> offset
          ))
          .via(businessFlow)
          .toMat(Sink.ignore)(Keep.both)
          .run()

          consumerControl.shutdown()


          Consumer.control.shutdown() returns a Future[Done]. From its Scaladoc description:




          Shutdown the consumer Source. It will wait for outstanding offset commit requests to finish before shutting down.




          Alternatively, if you're using offset storage in Kafka, use Consumer.Control.drainAndShutdown, which also returns a Future. Again from the documentation (which contains more information about what drainAndShutdown does under the covers):



          val drainingControl =
          Consumer
          .committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
          .mapAsync(1) msg =>
          business(msg.record).map(_ => msg.committableOffset)

          .toMat(Committer.sink(committerSettings))(Keep.both)
          .mapMaterializedValue(DrainingControl.apply)
          .run()

          val streamComplete = drainingControl.drainAndShutdown()


          The Scaladoc description for drainAndShutdown:




          Stop producing messages from the Source, wait for stream completion and shut down the consumer Source so that all consumed messages reach the end of the stream. Failures in stream completion will be propagated, the source will be shut down anyway.








          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Mar 7 at 17:46

























          answered Mar 7 at 14:57









          Jeffrey ChungJeffrey Chung

          14.3k62142




          14.3k62142












          • Cool dude! Thank you!

            – SergiGP
            Mar 8 at 10:41

















          • Cool dude! Thank you!

            – SergiGP
            Mar 8 at 10:41
















          Cool dude! Thank you!

          – SergiGP
          Mar 8 at 10:41





          Cool dude! Thank you!

          – SergiGP
          Mar 8 at 10:41



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55046169%2fproper-way-to-programmatically-stop-an-alpakka-kafka-stream%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

          2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived

          Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme