Airflow python callable function reusable The Next CEO of Stack OverflowCalling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?

I want to delete every two lines after 3rd lines in file contain very large number of lines :

Is the D&D universe the same as the Forgotten Realms universe?

Is French Guiana a (hard) EU border?

Chain wire methods together in Lightning Web Components

What does "Its cash flow is deeply negative" mean?

Solving system of ODEs with extra parameter

Is micro rebar a better way to reinforce concrete than rebar?

How to invert MapIndexed on a ragged structure? How to construct a tree from rules?

Is it possible to use a NPN BJT as switch, from single power source?

Prepend last line of stdin to entire stdin

Why does standard notation not preserve intervals (visually)

What connection does MS Office have to Netscape Navigator?

Domestic-to-international connection at Orlando (MCO)

How did people program for Consoles with multiple CPUs?

Newlines in BSD sed vs gsed

Proper way to express "He disappeared them"

Why didn't Khan get resurrected in the Genesis Explosion?

Flying from Cape Town to England and return to another province

The exact meaning of 'Mom made me a sandwich'

Why do airplanes bank sharply to the right after air-to-air refueling?

Why is information "lost" when it got into a black hole?

Why is the US ranked as #45 in Press Freedom ratings, despite its extremely permissive free speech laws?

Bartok - Syncopation (1): Meaning of notes in between Grand Staff

Method for adding error messages to a dictionary given a key



Airflow python callable function reusable



The Next CEO of Stack OverflowCalling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?










3















airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?










share|improve this question
























  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44















3















airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?










share|improve this question
























  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44













3












3








3








airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?










share|improve this question
















airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?







python airflow






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 15 at 15:54







Miguel Serrano

















asked Mar 8 at 16:13









Miguel SerranoMiguel Serrano

215




215












  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44

















  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44
















I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

– Mntfr
Mar 8 at 16:17





I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

– Mntfr
Mar 8 at 16:17













Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

– Miguel Serrano
Mar 8 at 16:24






Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

– Miguel Serrano
Mar 8 at 16:24














@dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

– Miguel Serrano
Mar 8 at 16:27






@dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

– Miguel Serrano
Mar 8 at 16:27














@dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

– Miguel Serrano
Mar 8 at 16:39





@dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

– Miguel Serrano
Mar 8 at 16:39













Alright, sorry, for the confusion (deleted my comments)

– dorvak
Mar 8 at 16:44





Alright, sorry, for the confusion (deleted my comments)

– dorvak
Mar 8 at 16:44












1 Answer
1






active

oldest

votes


















0














Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



    What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






    share|improve this answer



























      0














      Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



      What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






      share|improve this answer

























        0












        0








        0







        Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



        What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






        share|improve this answer













        Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



        What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered yesterday









        Miguel SerranoMiguel Serrano

        215




        215





























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

            2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived

            Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme