Airflow python callable function reusable The Next CEO of Stack OverflowCalling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?
I want to delete every two lines after 3rd lines in file contain very large number of lines :
Is the D&D universe the same as the Forgotten Realms universe?
Is French Guiana a (hard) EU border?
Chain wire methods together in Lightning Web Components
What does "Its cash flow is deeply negative" mean?
Solving system of ODEs with extra parameter
Is micro rebar a better way to reinforce concrete than rebar?
How to invert MapIndexed on a ragged structure? How to construct a tree from rules?
Is it possible to use a NPN BJT as switch, from single power source?
Prepend last line of stdin to entire stdin
Why does standard notation not preserve intervals (visually)
What connection does MS Office have to Netscape Navigator?
Domestic-to-international connection at Orlando (MCO)
How did people program for Consoles with multiple CPUs?
Newlines in BSD sed vs gsed
Proper way to express "He disappeared them"
Why didn't Khan get resurrected in the Genesis Explosion?
Flying from Cape Town to England and return to another province
The exact meaning of 'Mom made me a sandwich'
Why do airplanes bank sharply to the right after air-to-air refueling?
Why is information "lost" when it got into a black hole?
Why is the US ranked as #45 in Press Freedom ratings, despite its extremely permissive free speech laws?
Bartok - Syncopation (1): Meaning of notes in between Grand Staff
Method for adding error messages to a dictionary given a key
Airflow python callable function reusable
The Next CEO of Stack OverflowCalling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
|
show 5 more comments
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure isairflow -- dags -- libs
, and libs is a py module so, init.py is there :
– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a__init__.py
, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!
– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
|
show 5 more comments
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
python airflow
edited Mar 15 at 15:54
Miguel Serrano
asked Mar 8 at 16:13
Miguel SerranoMiguel Serrano
215
215
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure isairflow -- dags -- libs
, and libs is a py module so, init.py is there :
– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a__init__.py
, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!
– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
|
show 5 more comments
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure isairflow -- dags -- libs
, and libs is a py module so, init.py is there :
– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a__init__.py
, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!
– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure is
airflow -- dags -- libs
, and libs is a py module so, init.py is there :– Miguel Serrano
Mar 8 at 16:27
@dorvak libs is subpath to dags, so folder structure is
airflow -- dags -- libs
, and libs is a py module so, init.py is there :– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a
__init__.py
, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!– Miguel Serrano
Mar 8 at 16:39
@dorvak libs has a
__init__.py
, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
|
show 5 more comments
1 Answer
1
active
oldest
votes
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
add a comment |
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
add a comment |
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
answered yesterday
Miguel SerranoMiguel Serrano
215
215
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure is
airflow -- dags -- libs
, and libs is a py module so, init.py is there :– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a
__init__.py
, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44