Escrito por Yan Cescon Haeffner,

7 minutos de leitura

A hora do show: criando seu primeiro DAG no Apache Airflow

Conheça o processo de criação de DAGs para serem orquestrados pelo Airflow.

Compartilhe este post:

Este artigo é a terceira parte da série “Orquestrando pipelines com o Apache Airflow”, e tem como objetivo apresentar ao leitor o processo de criação de DAGs para serem orquestrados pelo Airflow. Como apresentado no primeiro artigo, o Airflow permite ao usuário executar tarefas de diferentes naturezas graças ao uso dos operadores (Operators), que nada mais são do que elementos escritos em Python que abstraem comportamentos desejados pelo usuário para determinada tarefa.

Acesse o passo a passo para instalar corretamente o Airflow localmente.

Agora, veja como construir um DAG para obter o nome da criptomoeda com a maior variação (positiva) diária de valor em reais. Para tal, esse DAG fará uso de diversos elementos como: DummyOperator, PythonOperator, XComs e HttpSensor, assim como a execução de tarefas em paralelo.

 

Conhecendo seus instrumentos

As tarefas existentes neste DAG farão uso de algumas ferramentas do Airflow que merecem uma explicação mais detalhada de seu funcionamento, tanto para ambientar o leitor quanto para justificar a escolha das mesmas. Sendo assim, estas ferramentas podem ser descritas como:

  • DummyOperator: como o próprio nome diz, trata-se de um Operator “bobo”, ou seja, utilizado apenas para marcar transições entre tarefas. Neste projeto, será utilizado para demarcar o início e o fim da execução do DAG. Necessita apenas do argumento “task_id”, que deve ser único para todo o DAG.
  • PythonOperator: Operator que tem como função a execução de um script python, que pode estar localizado dentro do arquivo “.py” do DAG ou em outro módulo importado neste arquivo. Aqui será feito o uso de um argumento especial, o “provide_context”, um boolean que permite (ou não) o chamado de variáveis importantes como “task_instance” (ou “ti”), datetimes e várias outras informações relevantes para construção de tarefas e DAGs mais complexos, pelo método. Neste DAG ele foi utilizado pela necessidade de executar operações um pouco mais complexas, que serão descritas no próximo capítulo.
  • HttpSensor: trata-se de um Sensor, que nada mais é do que um método que realiza uma chamada específica para determinado serviço e aguarda a resposta do mesmo, para elementos de natureza HTTP. Foi utilizado pela necessidade de verificar o status atual de uma API, visto que não faria sentido tentar executar tarefas que dependem desse serviço se o mesmo não está respondendo, nesse exemplo. Permite a definição de alguns parâmetros úteis, tais como o limite de tempo que o sensor deve esperar para tentar consumir o serviço novamente ou o limite de tempo máximo para resposta do serviço.
  • Xcoms: elementos de transferência de valores entre tarefas. Devem ser utilizados com muita cautela, pois como descrito no primeiro artigo, fazem uso dos metadados do Airflow e não devem ser utilizados para processamento de grandes quantidades de dados. Como neste DAG as tarefas precisariam transferir apenas uma pequena quantidade de dados, fez sentido utilizar esta ferramenta para ter um comportamento útil automatizado.

 

Cabe ainda ressaltar que esta foi uma breve explicação das ferramentas, sendo sempre necessário consultar a documentação oficial para tirar dúvidas e verificar o comportamento completo dos elementos utilizados.

Construção

Nesse capítulo, será construído um arquivo “diario_de_criptos_DAG.py”, passo a passo, que tem como função fazer a consulta e verificação da variação diária de valor de três criptomoedas específicas (escolhidas de forma arbitrária e por grau de popularidade para fins de compreensão), sendo elas: bitcoin, ethereum e dogecoin. Para a obtenção dos valores atualizados das variações das mesmas, será utilizada a API pública e gratuita da plataforma CoinGecko.

A estrutura do DAG consiste na seguinte sequência de tarefas:

  1. Início da execução, dada pela tarefa “begin”.
  2. Verificação da saúde da API utilizada, dada pela tarefa “verificacao_de_endpoint” com uso de um HttpSensor.
  3. Obtenção dos valores de variação diária para as três criptomoedas, através de três tarefas executadas em paralelo, com o armazenamento dos valores no Airflow por meio de Xcoms. Todas as tarefas serão executadas por meio de PythonOperators e possuem ids: “obter_variacao_bitcoin”, “obter_variacao_ethereum” e “obter_variacao_dogecoin”.
  4. Caso o item 3 seja executado com sucesso, os valores obtidos deverão ser comparados e deverá então ser feita a informação ao usuário (por meio dos logs) da criptomoeda de maior variação.
  5. Essa tarefa será executada também por um PythonOperator com id “escolher_criptomoeda_de_maior_variacao”.
  6. Fim da execução, dada pela tarefa “end”.

 

O código do DAG foi construído da seguinte forma:

  • Primeiramente, importe os elementos necessários para construção do DAG:

 

  • Defina um dict chamado “default_args” contendo os valores de “owner” e “start_date”:

 

Essa variável é utilizada na definição da DAG, que será apresentada nos próximos itens, para definir o proprietário e a data de início da execução da mesma. Perceba que aqui foi utilizado como data de início o método “days_ago(1)”, isso acontece pois o Airflow precisa que start_date seja um valor anterior ao dia presente, já que o mesmo executa o DAG do dia anterior no dia atual (mais informações desse comportamento muito importante aqui).

 

  • Definição do método responsável pelo consumo das variações pela API:

Esse método recebe dois parâmetros submetidos pelo usuário, sendo eles “id_da_criptomoeda” e “sigla_fiat”, que são os responsáveis por definir a criptomoeda a ser consultada na API e qual moeda base será utilizada para a verificação, respectivamente. O método consome a API através de um GET com parâmetros especificados e, caso o retorno tenha o formato esperado, insere o valor obtido no Airflow por meio do context[‘ti’].xcom_push. Caso o retorno não tenha o formato esperado, é apontado então a falha do consumo com os parâmetros informados (adicionalmente seria possível otimizar essa parte do código, porém o objetivo deste artigo é apenas apresentar os elementos e sua lógica principal).

 

  • Definição do método responsável pela comparação das variações obtidas:

Apesar de ser um simples algoritmo para obter o maior valor entre as variações armazenadas no Airflow, vale a pena ressaltar que a variável variacao faz uso do elemento de índice zero do retorno de xcom_pull, isso acontece pois o retorno de xcom_pull é na verdade um tuple, que no caso em uso possui apenas um elemento que é o de interesse.

 

  • Definição do DAG, tarefas e sequência de acionamentos:

 

A definição da linha 34 na imagem anterior indica que todas as tarefas (tasks) criadas nas outras linhas farão parte deste mesmo DAG, do contrário ainda seria necessário incluir o parâmetro “dag” em cada uma das tarefas e associar o objeto DAG desejado à tarefa. Note também a atribuição do dict “default_args” ao parâmetro de mesmo nome, assim como a definição de “schedule_interval” para “@daily” (que fará com que o DAG seja executada diariamente) e “catchup” como “False” (que impedirá que o Scheduler tente executar dias “perdidos” até a data atual). Outro ponto importante é a existência do atributo “op_kwargs”, que representa um dict que deverá conter os parâmetros enviados ao método python chamado pelo parâmetro “python_callable”.

Note que o sequenciamento das tarefas foi feito através dos caracteres “>>”, que indicam sequenciamento em cascata para as tasks. Também é importante ressaltar a presença de uma lista de tarefas logo após “verificacao_de_endpoint”, que faz com que tarefas sejam executadas em paralelo. As notações utilizadas para o sequenciamento de tarefas e suas peculiaridades podem ser encontradas na documentação oficial.

 

  • Implementação do DAG no Airflow:

Por fim, copie (ou mova) o arquivo “diario_de_criptos_DAG.py” para a pasta de DAGs do seu Airflow (que pode ser encontrada com o comando airflow info em um terminal) e inicialize a ferramenta através dos comandos descritos no artigo anterior.

O código final pode ser acessado aqui.

 

Execução e ferramentas adicionais

Se tudo ocorreu bem, deve ser possível visualizar o DAG na interface gráfica do Airflow através do endereço http://localhost:8080. Caso não consiga encontrar o DAG, aguarde alguns segundos ou verifique possíveis mensagens de erro nos terminais de execução da ferramenta.

Ao clicar sobre o nome do DAG criado (“diario_de_criptos_DAG”), o usuário deverá ser redirecionado para a página da mesma na aba “Tree View”, que deve ser semelhante à descrita na imagem abaixo:

 

Ao clicar no botão do canto superior esquerdo (Off), o DAG será ativado e deverá ser automaticamente executado pelo Scheduler se sua frequência de execução for diferente de “None” (definido no parâmetro “schedule_interval” no capítulo anterior). Com isso, o leitor deverá ver, na aba Graph View, algo parecido com o descrito pela imagem abaixo:

 

Caso alguma tarefa esteja delineada em vermelho, houve algum erro em sua execução e o mesmo pode ser melhor compreendido na janela de logs.

Para visualizar os logs de execução de uma tarefa, clique na tarefa que deseja ter mais informações e então clique em “View Logs”. A imagem abaixo apresenta os logs da tarefa “escolher_criptomoeda_de_maior_variacao” que foi executada com sucesso:

 

Caso o leitor tenha encontrado erros na execução de suas tarefas, execute o passo anterior para verificar o erro indicado pelo Airflow nos logs da tarefa que falhou. Em seguida, após corrigir o problema, clique novamente na tarefa delineada em vermelho e em seguida em “Clear”, que deverá limpar o status atual da tarefa e a mesma poderá ser executada novamente (de forma automática).

Outra ferramenta incluída no Web Server, e que mostra ser extremamente útil no acompanhamento e otimização de seus DAGs, é o Diagrama de Gantt. Para acessá-lo, basta clicar na aba de mesmo nome (“Gantt”) na página de seu DAG e deverá ser apresentada uma tela parecida com a ilustrada pela imagem abaixo, onde é possível verificar o tempo de execução de cada tarefa, assim como o timedate de sua execução (confirmando também que houve a execução em paralelo das tarefas definidas anteriormente).

 

O Airflow ainda possui inúmeras ferramentas e funções que poderiam ser utilizadas para a otimização do DAG apresentada neste artigo, sendo assim, é recomendado ao leitor que busque maneiras de realizar essa otimização através da documentação e de fóruns na internet, contribuindo assim para a constante melhoria dessa incrível ferramenta de orquestração.

 

Quer ir ainda mais além?

Chegou até aqui e acredita que suas tarefas necessitam de uma passada mais forte? O Airflow ainda conta com a possibilidade de execução de tarefas em diferentes máquinas ao mesmo tempo, trazendo assim um excelente benefício de escalabilidade ao usuário.

Por se tratar de um assunto um pouco mais complexo e que utiliza uma arquitetura do Airflow um pouco diferente, esse assunto será abordado em uma série futura com a mesma “entonação” desta aqui!

Espero que tenha gostado da série e que possa utilizar o Airflow em seus desafios cotidianos ou, quem sabe, até mesmo automatizar o preparo daquele cafezinho matinal e/ou auxiliar alguma outra rotina pessoal, você decide!

Compartilhe este post: