问题内容 我在 airflow 中有一个使用 cloud composer 执行的 pythonoperator: with DAG( dag_id = config['dag
我在 airflow 中有一个使用 cloud composer 执行的 pythonoperator:
with DAG(
dag_id = config['dag_id'],
schedule_interval = config['schedule_interval'],
default_args = default_args
) as dag:
generate_data_task = PythonOperator(
task_id = 'generate_dummy_data',
python_callable = generate_data,
dag = dag
)
generate_data() 函数将随机生成的唯一命名的 csv 文件写入包含一些数据的存储桶中。按原样执行效果很好,但我想并行多次执行同一任务。如果我指定并行执行 10 次,我预计会在存储桶中写入 10 个文件。我尝试过并发和task_concurrency,但得到了相同的结果。
这可以在 cloud composer 之上使用 airflow 实现吗?
使用动态任务映射:
generate_data_task = PythonOperator.partial(
task_id = 'generate_dummy_data',
python_callable = generate_data,
dag = dag
).expand(op_args=[[]] * 10)
以上就是在 Cloud Composer 中将同一运算符作为多个任务执行的详细内容,更多请关注编程网其它相关文章!
--结束END--
本文标题: 在 Cloud Composer 中将同一运算符作为多个任务执行
本文链接: https://lsjlt.com/news/562036.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
2024-05-24
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0