7 minutos de leitura
Este artigo é a terceira parte da série “Orquestrando pipelines com o Apache Airflow”, e tem como objetivo apresentar ao leitor o processo de criação de DAGs para serem orquestrados pelo Airflow. Como apresentado no primeiro artigo, o Airflow permite ao usuário executar tarefas de diferentes naturezas graças ao uso dos operadores (Operators), que nada mais são do que elementos escritos em Python que abstraem comportamentos desejados pelo usuário para determinada tarefa.
Acesse o passo a passo para instalar corretamente o Airflow localmente.
Agora, veja como construir um DAG para obter o nome da criptomoeda com a maior variação (positiva) diária de valor em reais. Para tal, esse DAG fará uso de diversos elementos como: DummyOperator, PythonOperator, XComs e HttpSensor, assim como a execução de tarefas em paralelo.
As tarefas existentes neste DAG farão uso de algumas ferramentas do Airflow que merecem uma explicação mais detalhada de seu funcionamento, tanto para ambientar o leitor quanto para justificar a escolha das mesmas. Sendo assim, estas ferramentas podem ser descritas como:
Cabe ainda ressaltar que esta foi uma breve explicação das ferramentas, sendo sempre necessário consultar a documentação oficial para tirar dúvidas e verificar o comportamento completo dos elementos utilizados.
Nesse capítulo, será construído um arquivo “diario_de_criptos_DAG.py”, passo a passo, que tem como função fazer a consulta e verificação da variação diária de valor de três criptomoedas específicas (escolhidas de forma arbitrária e por grau de popularidade para fins de compreensão), sendo elas: bitcoin, ethereum e dogecoin. Para a obtenção dos valores atualizados das variações das mesmas, será utilizada a API pública e gratuita da plataforma CoinGecko.
A estrutura do DAG consiste na seguinte sequência de tarefas:
O código do DAG foi construído da seguinte forma:
Essa variável é utilizada na definição da DAG, que será apresentada nos próximos itens, para definir o proprietário e a data de início da execução da mesma. Perceba que aqui foi utilizado como data de início o método “days_ago(1)”, isso acontece pois o Airflow precisa que start_date seja um valor anterior ao dia presente, já que o mesmo executa o DAG do dia anterior no dia atual (mais informações desse comportamento muito importante aqui).
Esse método recebe dois parâmetros submetidos pelo usuário, sendo eles “id_da_criptomoeda” e “sigla_fiat”, que são os responsáveis por definir a criptomoeda a ser consultada na API e qual moeda base será utilizada para a verificação, respectivamente. O método consome a API através de um GET com parâmetros especificados e, caso o retorno tenha o formato esperado, insere o valor obtido no Airflow por meio do context[‘ti’].xcom_push. Caso o retorno não tenha o formato esperado, é apontado então a falha do consumo com os parâmetros informados (adicionalmente seria possível otimizar essa parte do código, porém o objetivo deste artigo é apenas apresentar os elementos e sua lógica principal).
Apesar de ser um simples algoritmo para obter o maior valor entre as variações armazenadas no Airflow, vale a pena ressaltar que a variável variacao faz uso do elemento de índice zero do retorno de xcom_pull, isso acontece pois o retorno de xcom_pull é na verdade um tuple, que no caso em uso possui apenas um elemento que é o de interesse.
A definição da linha 34 na imagem anterior indica que todas as tarefas (tasks) criadas nas outras linhas farão parte deste mesmo DAG, do contrário ainda seria necessário incluir o parâmetro “dag” em cada uma das tarefas e associar o objeto DAG desejado à tarefa. Note também a atribuição do dict “default_args” ao parâmetro de mesmo nome, assim como a definição de “schedule_interval” para “@daily” (que fará com que o DAG seja executada diariamente) e “catchup” como “False” (que impedirá que o Scheduler tente executar dias “perdidos” até a data atual). Outro ponto importante é a existência do atributo “op_kwargs”, que representa um dict que deverá conter os parâmetros enviados ao método python chamado pelo parâmetro “python_callable”.
Note que o sequenciamento das tarefas foi feito através dos caracteres “>>”, que indicam sequenciamento em cascata para as tasks. Também é importante ressaltar a presença de uma lista de tarefas logo após “verificacao_de_endpoint”, que faz com que tarefas sejam executadas em paralelo. As notações utilizadas para o sequenciamento de tarefas e suas peculiaridades podem ser encontradas na documentação oficial.
Por fim, copie (ou mova) o arquivo “diario_de_criptos_DAG.py” para a pasta de DAGs do seu Airflow (que pode ser encontrada com o comando airflow info em um terminal) e inicialize a ferramenta através dos comandos descritos no artigo anterior.
O código final pode ser acessado aqui.
Se tudo ocorreu bem, deve ser possível visualizar o DAG na interface gráfica do Airflow através do endereço http://localhost:8080. Caso não consiga encontrar o DAG, aguarde alguns segundos ou verifique possíveis mensagens de erro nos terminais de execução da ferramenta.
Ao clicar sobre o nome do DAG criado (“diario_de_criptos_DAG”), o usuário deverá ser redirecionado para a página da mesma na aba “Tree View”, que deve ser semelhante à descrita na imagem abaixo:
Ao clicar no botão do canto superior esquerdo (Off), o DAG será ativado e deverá ser automaticamente executado pelo Scheduler se sua frequência de execução for diferente de “None” (definido no parâmetro “schedule_interval” no capítulo anterior). Com isso, o leitor deverá ver, na aba Graph View, algo parecido com o descrito pela imagem abaixo:
Caso alguma tarefa esteja delineada em vermelho, houve algum erro em sua execução e o mesmo pode ser melhor compreendido na janela de logs.
Para visualizar os logs de execução de uma tarefa, clique na tarefa que deseja ter mais informações e então clique em “View Logs”. A imagem abaixo apresenta os logs da tarefa “escolher_criptomoeda_de_maior_variacao” que foi executada com sucesso:
Caso o leitor tenha encontrado erros na execução de suas tarefas, execute o passo anterior para verificar o erro indicado pelo Airflow nos logs da tarefa que falhou. Em seguida, após corrigir o problema, clique novamente na tarefa delineada em vermelho e em seguida em “Clear”, que deverá limpar o status atual da tarefa e a mesma poderá ser executada novamente (de forma automática).
Outra ferramenta incluída no Web Server, e que mostra ser extremamente útil no acompanhamento e otimização de seus DAGs, é o Diagrama de Gantt. Para acessá-lo, basta clicar na aba de mesmo nome (“Gantt”) na página de seu DAG e deverá ser apresentada uma tela parecida com a ilustrada pela imagem abaixo, onde é possível verificar o tempo de execução de cada tarefa, assim como o timedate de sua execução (confirmando também que houve a execução em paralelo das tarefas definidas anteriormente).
O Airflow ainda possui inúmeras ferramentas e funções que poderiam ser utilizadas para a otimização do DAG apresentada neste artigo, sendo assim, é recomendado ao leitor que busque maneiras de realizar essa otimização através da documentação e de fóruns na internet, contribuindo assim para a constante melhoria dessa incrível ferramenta de orquestração.
Chegou até aqui e acredita que suas tarefas necessitam de uma passada mais forte? O Airflow ainda conta com a possibilidade de execução de tarefas em diferentes máquinas ao mesmo tempo, trazendo assim um excelente benefício de escalabilidade ao usuário.
Por se tratar de um assunto um pouco mais complexo e que utiliza uma arquitetura do Airflow um pouco diferente, esse assunto será abordado em uma série futura com a mesma “entonação” desta aqui!
Espero que tenha gostado da série e que possa utilizar o Airflow em seus desafios cotidianos ou, quem sabe, até mesmo automatizar o preparo daquele cafezinho matinal e/ou auxiliar alguma outra rotina pessoal, você decide!