When to use ConcurrentKafkaListenerContainerFactory?2019 Community Moderator ElectionWhen to use LinkedList over ArrayList in Java?What's the difference between ConcurrentHashMap and Collections.synchronizedMap(Map)?What is a stack trace, and how can I use it to debug my application errors?What is the point of “final class” in Java?What in the world are Spring beans?Data access object (DAO) in JavaUpdate Eclipse with Android development tools v. 23Kafka Consumer poll and reconnectionHow to get an acknowledgement or a call back from Kafka consumer to the producerreprocess maprstream messages using spring integration kafka

PTIJ: Aliyot for the deceased

Is "cogitate" an appropriate word for this?

Giving a talk in my old university, how prominently should I tell students my salary?

Should we avoid writing fiction about historical events without extensive research?

Why do phishing e-mails use faked e-mail addresses instead of the real one?

The past tense for the quoting particle って

Did Amazon pay $0 in taxes last year?

What is the oldest European royal house?

The (Easy) Road to Code

3.5% Interest Student Loan or use all of my savings on Tuition?

Iron deposits mined from under the city

Why aren't there more gauls like Obelix?

Can a Mexican citizen living in US under DACA drive to Canada?

What does "rhumatis" mean?

Why can't we use freedom of speech and expression to incite people to rebel against government in India?

What can I do if someone tampers with my SSH public key?

An Undercover Army

Dukha vs legitimate need

In the world of The Matrix, what is "popping"?

Can inspiration allow the Rogue to make a Sneak Attack?

Does the US political system, in principle, allow for a no-party system?

Learning to quickly identify valid fingering for piano?

ESPP--any reason not to go all in?

Is being socially reclusive okay for a graduate student?



When to use ConcurrentKafkaListenerContainerFactory?



2019 Community Moderator ElectionWhen to use LinkedList over ArrayList in Java?What's the difference between ConcurrentHashMap and Collections.synchronizedMap(Map)?What is a stack trace, and how can I use it to debug my application errors?What is the point of “final class” in Java?What in the world are Spring beans?Data access object (DAO) in JavaUpdate Eclipse with Android development tools v. 23Kafka Consumer poll and reconnectionHow to get an acknowledgement or a call back from Kafka consumer to the producerreprocess maprstream messages using spring integration kafka










0















I am new to kafka and i went through the documentation but I couldn't understand anything. Can someone please explain when to use the ConcurrentKafkaListenerContainerFactory class? I have used the Kafkaconsumer class but I see ConcurrentKafkaListenerContainerFactory being used in my current project. Please explain what purpose it serves.










share|improve this question



















  • 1





    No prizes for guessing how they came up with that name

    – Michael
    yesterday











  • ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.

    – Madhu Bhat
    yesterday












  • Thanks @MadhuBhat for the info but that's what I wanted to know, when would I want to use ConcurrentKafkaListenerContainerFactory over kafkaconsumer and what extra capabilities it provides.

    – Rahul Gupta
    yesterday















0















I am new to kafka and i went through the documentation but I couldn't understand anything. Can someone please explain when to use the ConcurrentKafkaListenerContainerFactory class? I have used the Kafkaconsumer class but I see ConcurrentKafkaListenerContainerFactory being used in my current project. Please explain what purpose it serves.










share|improve this question



















  • 1





    No prizes for guessing how they came up with that name

    – Michael
    yesterday











  • ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.

    – Madhu Bhat
    yesterday












  • Thanks @MadhuBhat for the info but that's what I wanted to know, when would I want to use ConcurrentKafkaListenerContainerFactory over kafkaconsumer and what extra capabilities it provides.

    – Rahul Gupta
    yesterday













0












0








0








I am new to kafka and i went through the documentation but I couldn't understand anything. Can someone please explain when to use the ConcurrentKafkaListenerContainerFactory class? I have used the Kafkaconsumer class but I see ConcurrentKafkaListenerContainerFactory being used in my current project. Please explain what purpose it serves.










share|improve this question
















I am new to kafka and i went through the documentation but I couldn't understand anything. Can someone please explain when to use the ConcurrentKafkaListenerContainerFactory class? I have used the Kafkaconsumer class but I see ConcurrentKafkaListenerContainerFactory being used in my current project. Please explain what purpose it serves.







java apache-kafka kafka-consumer-api spring-kafka






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited yesterday









Deadpool

6,7342629




6,7342629










asked yesterday









Rahul GuptaRahul Gupta

1509




1509







  • 1





    No prizes for guessing how they came up with that name

    – Michael
    yesterday











  • ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.

    – Madhu Bhat
    yesterday












  • Thanks @MadhuBhat for the info but that's what I wanted to know, when would I want to use ConcurrentKafkaListenerContainerFactory over kafkaconsumer and what extra capabilities it provides.

    – Rahul Gupta
    yesterday












  • 1





    No prizes for guessing how they came up with that name

    – Michael
    yesterday











  • ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.

    – Madhu Bhat
    yesterday












  • Thanks @MadhuBhat for the info but that's what I wanted to know, when would I want to use ConcurrentKafkaListenerContainerFactory over kafkaconsumer and what extra capabilities it provides.

    – Rahul Gupta
    yesterday







1




1





No prizes for guessing how they came up with that name

– Michael
yesterday





No prizes for guessing how they came up with that name

– Michael
yesterday













ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.

– Madhu Bhat
yesterday






ConcurrentKafkaListenerContainerFactory is from the spring framework and can be used in the spring ecosystem. KafkaConsumer is from Apache's Java sdk for Kafka. Both are just different tools/apis to implement Kafka consumers on Java. Just might differ in the capabilities provided.

– Madhu Bhat
yesterday














Thanks @MadhuBhat for the info but that's what I wanted to know, when would I want to use ConcurrentKafkaListenerContainerFactory over kafkaconsumer and what extra capabilities it provides.

– Rahul Gupta
yesterday





Thanks @MadhuBhat for the info but that's what I wanted to know, when would I want to use ConcurrentKafkaListenerContainerFactory over kafkaconsumer and what extra capabilities it provides.

– Rahul Gupta
yesterday












2 Answers
2






active

oldest

votes


















1














The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.




If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.




Spring-kafka



ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods with @KafkaListener



There are two MessageListenerContainer in spring kafka



KafkaMessageListenerContainer
ConcurrentMessageListenerContainer



The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.




Using ConcurrentMessageListenerContainer



@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;



It has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.




If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.




here is the clear example with documentation here






share|improve this answer























  • thanks a lot for the documentation link :)

    – Rahul Gupta
    yesterday


















1














Kafka Consumer API is not thread safe. ConcurrentKafkaListenerContainerFactory api provides concurrent way of using Kafka Consumer API along with setting other kafka consumer properties.






share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55023240%2fwhen-to-use-concurrentkafkalistenercontainerfactory%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.




    If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.




    Spring-kafka



    ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods with @KafkaListener



    There are two MessageListenerContainer in spring kafka



    KafkaMessageListenerContainer
    ConcurrentMessageListenerContainer



    The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.




    Using ConcurrentMessageListenerContainer



    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory()
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;



    It has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.




    If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.




    here is the clear example with documentation here






    share|improve this answer























    • thanks a lot for the documentation link :)

      – Rahul Gupta
      yesterday















    1














    The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.




    If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.




    Spring-kafka



    ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods with @KafkaListener



    There are two MessageListenerContainer in spring kafka



    KafkaMessageListenerContainer
    ConcurrentMessageListenerContainer



    The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.




    Using ConcurrentMessageListenerContainer



    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory()
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;



    It has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.




    If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.




    here is the clear example with documentation here






    share|improve this answer























    • thanks a lot for the documentation link :)

      – Rahul Gupta
      yesterday













    1












    1








    1







    The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.




    If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.




    Spring-kafka



    ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods with @KafkaListener



    There are two MessageListenerContainer in spring kafka



    KafkaMessageListenerContainer
    ConcurrentMessageListenerContainer



    The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.




    Using ConcurrentMessageListenerContainer



    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory()
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;



    It has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.




    If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.




    here is the clear example with documentation here






    share|improve this answer













    The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.




    If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.




    Spring-kafka



    ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods with @KafkaListener



    There are two MessageListenerContainer in spring kafka



    KafkaMessageListenerContainer
    ConcurrentMessageListenerContainer



    The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.




    Using ConcurrentMessageListenerContainer



    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory()
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;



    It has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.




    If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.




    here is the clear example with documentation here







    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered yesterday









    DeadpoolDeadpool

    6,7342629




    6,7342629












    • thanks a lot for the documentation link :)

      – Rahul Gupta
      yesterday

















    • thanks a lot for the documentation link :)

      – Rahul Gupta
      yesterday
















    thanks a lot for the documentation link :)

    – Rahul Gupta
    yesterday





    thanks a lot for the documentation link :)

    – Rahul Gupta
    yesterday













    1














    Kafka Consumer API is not thread safe. ConcurrentKafkaListenerContainerFactory api provides concurrent way of using Kafka Consumer API along with setting other kafka consumer properties.






    share|improve this answer



























      1














      Kafka Consumer API is not thread safe. ConcurrentKafkaListenerContainerFactory api provides concurrent way of using Kafka Consumer API along with setting other kafka consumer properties.






      share|improve this answer

























        1












        1








        1







        Kafka Consumer API is not thread safe. ConcurrentKafkaListenerContainerFactory api provides concurrent way of using Kafka Consumer API along with setting other kafka consumer properties.






        share|improve this answer













        Kafka Consumer API is not thread safe. ConcurrentKafkaListenerContainerFactory api provides concurrent way of using Kafka Consumer API along with setting other kafka consumer properties.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered yesterday









        Rohit YadavRohit Yadav

        12917




        12917



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55023240%2fwhen-to-use-concurrentkafkalistenercontainerfactory%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

            2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived

            Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme