Tutoriais

Criando um Projeto

Você pode começar um projeto de 3 formas:

  • Criando um novo projeto.
  • Importando um projeto existente.
  • Importando exemplos curados pela Dadosfera ou pela comunidade.

Criando e configurando Pipelines

Os pipelines são uma ferramenta interativa para criar e experimentar seu fluxo de trabalho de dados no Módulo. Um Pipeline é composto de etapas e conexões:

  • As etapas são arquivos executáveis executados em seus próprios ambientes isolados.
  • As conexões vinculam etapas para definir como os dados fluem (consulte passagem de dados) e a ordem de execução da etapa.

Os pipelines são editados visualmente e armazenados no formato JSON no arquivo de definição do pipeline. Isso permite que as alterações do Pipeline (por exemplo, adicionar uma etapa) sejam versionadas.

Exemplo de pipeline

Exemplo de pipeline

Parametrizando Pipelines

Os pipelines usam parâmetros como entrada (por exemplo, a URL de conexão da fonte de dados) para variar seu comportamento. Os trabalhos podem usar parâmetros diferentes para iterar em várias execuções do mesmo pipeline. Os parâmetros podem ser definidos no editor de pipeline visual.

Você pode definir os parâmetros do Pipeline em dois níveis:

  • Pipelines: os parâmetros e seus valores estarão disponíveis em todas as etapas do Pipeline.
  • Etapas do pipeline: Os parâmetros só estarão disponíveis quando forem definidos.
Editando parâmetros de pipeline
  1. Abra um Pipeline através da opção Pipelines no painel de menu esquerdo.
  2. Clique em CONFIGURAÇÕES no canto superior direito.
  3. No topo, você encontrará a seção de parâmetros do Pipeline.
  4. Insira algum JSON como {"my-param": }.
  5. Certifique-se de salvar na parte inferior da tela.
Editando os parâmetros da etapa do pipeline
  1. Abra um Pipeline através da opção Pipelines na barra de navegação.
  2. Clique em uma etapa do pipeline para abrir suas propriedades.
  3. Na parte inferior, você encontrará a seção Parâmetros.
  4. Insira algum JSON como {"my-param": }.

Criando Environments

Os ambientes definem as condições em que os passos do pipeline executam scripts e kernels. Os ambientes são:

  • Escolhidos no painel de propriedades do passo do pipeline no }editor de pipelines.
  • Configuráveis através do script de configuração (na página de ambientes) para instalar pacotes adicionais.
  • Versionados e pertencentes a um único projects.

Escolhendo uma linguagem de programação na Dadosfera
Um ambiente usa apenas uma linguagem de programação para evitar a sobrecarga da imagem do contêiner com muitas dependências. O Módulo tem suporte integrado para ambientes com as linguagens:

  • Python
  • R
  • JavaScript
  • Julia

Cada ambiente suporta scripts Bash para invocar qualquer outra linguagem indiretamente. Por exemplo: Java, Scala, Go ou C++.

Para criar o ambiente:

  1. Vá para a página Environments.
  2. Crie um novo Ambiente.
  3. Escolha um nome de Ambiente.
  4. Escolha uma imagem base.
  5. Escolha uma das linguagens suportadas.
  6. Adicione comandos de instalação para pacotes adicionais no script de configuração do Ambiente.
  7. Pressione o botão Build.

Agendando jobs para sua pipeline

Após criar o Pipeline, codificar os arquivos, configurar suas etapas e configurar os Environments, inevitavelmente o Pipeline deve ser executado. Isso pode ser feito por executando um Pipeline dentro do editor de pipeline ou por meio de Jobs. O primeiro permite teste fácil enquanto você está desenvolvendo seu Pipeline e o último (Jobs) permite que você execute seu Pipeline em produção em uma programação recorrente (por exemplo, diariamente). Veja abaixo o tutorial completo de como agendar sua pipeline.

Pré-requisitos
  • Acesso ao Módulo de Tranformação.
  • Connection_parameters da conta Snowflake.
Passo a passo:

O procedimento para executar um trabalho parametrizado é muito semelhante à execução de um trabalho sem nenhum parâmetro. Depois de seguir qualquer um dos procedimentos acima para parametrizar sua pipeline:

  1. Certifique-se de ter definido os parâmetros básicos da Pipeline.
  2. Clique em Jobs no painel de menu à esquerda.
  3. Clique no botão + new job para configurar seu trabalho.
  4. Escolha um Job name e a Pipeline que deseja executar o job.
  5. Seu conjunto padrão de parâmetros é pré-carregado. Ao clicar nos valores, um editor JSON é aberto, permitindo que você adicione valores adicionais que deseja que a Pipeline execute.
  6. Se você quiser agendar o trabalho para ser executado em um horário específico, dê uma olhada em Scheduling.
  7. Pressione Run job.
Tour guiado:

Importar a lib Dadosfera versão Snowpark

Biblioteca intuitiva para consultar e processar dados em escala no Snowflake. Atualmente, o Snowflake fornece bibliotecas Snowpark para três idiomas: Java, Python e Scala, complementares à interface SQL original do Snowflake.

Para importar a lib no Módulo de Transformação da Dadosfera, siga os passos abaixo:

1 - Criar uma pasta chamada dadosfera na raiz do projeto.

2 - Criar um arquivo chamado snowflake.py dentro da pasta dadosfera.

3 - Colocar esse conteúdo no arquivo:
import os
import pandas as pd
from snowflake.snowpark import functions as F
import orchest as utils

connection_parameters = {
    "user": os.environ.get('SNOWFLAKE_USER'),
    "password": os.environ.get('SNOWFLAKE_PASSWORD'),
    "account": os.environ.get('SNOWFLAKE_ACCOUNT'), 
    "warehouse": 'COMPUTE_WH',
    "database": os.environ.get('SNOWFLAKE_DATABASE'),
    "schema": 'PUBLIC',
}


def create_pandas_df(snowpark_df):
    rows = snowpark_df.collect()
    
    data = []
    for row in rows:
        data.append(row.as_dict())
    
    df = pd.DataFrame(data)
    for column in df.columns:
        df = df.rename({column: column.lower()}, axis=1)
        
    return df

def remove_quotes_from_columns(df, columns):
    clean_function = [
        F.trim(F.replace(column, '"', ''))
        for column in columns
    ]
    
    df = df.with_columns(columns, clean_function)
    return df

def empty_string_as_null(df, columns):
    empty_string_as_null_columns = [
        f"nullif({column}, '') as {column}"
        for column in columns
    ]
    
    df = df.selectExpr(*empty_string_as_null_columns)
    return df

📘

Caso você ainda não possua os connection_parameters da sua conta Snowflake, solicite ao [email protected].

4 - As variáveis podem ser setadas a nível de pipeline da seguinte forma:

Clica nos ... -> Pipeline Settings -> Environment Variables

5 - Importante dar o start novamente no kernel do notebook para que as novas variáveis sejam utilizadas.

6 - Restaure a sessão.

Pronto! A lib está pronta para ser utilizada.

Configuração de notificações

Você pode receber notificações de webhook quando eventos específicos ocorrem no Módulo. Por exemplo, quando uma tarefa falha. Sempre que um evento é acionado, a Dadosfera enviará uma solicitação HTTP para o seu endpoint desejado com um payload de informações. Por exemplo:

{
  "delivered_for": {
    "name": "Test webhook",
    "verify_ssl": false,
    "content_type": "application/json",
    "uuid": "a1edb89c-1cfb-4086-8f75-ab073612c5bf",
    "type": "webhook"
  },
  "event": {
    "type": "ping",
    "uuid": "08bd2a31-9b17-4d1b-83ba-b4538a970dee",
    "timestamp": "2022-06-02 16:12:25.242592+00:00"
  }
}

Para criar um webhook, acesse "Configurações de notificação". O diálogo do webhook solicitará o seguinte:

  • URL do webhook: onde a Dadosfera envia as solicitações HTTP. Ative os webhooks de entrada no canal desejado (por exemplo: Slack) e verifique a conexão com o botão "Testar".
  • Tipo de conteúdo: application/json (padrão) ou application/x-www-form-urlencoded.
  • Nome do webhook (opcional): um nome personalizado para seu webhook. Isso é útil ao criar vários webhooks com URLs semelhantes.
  • Segredo (opcional): uma string secreta que você pode usar para verificar a origem da solicitação (consulte {ref}abaixo <secure_webhook>).
Verificando o webhook

O seguinte código de exemplo verifica se a assinatura do payload é a mesma que a esperada pelo segredo do webhook armazenado:

import hashlib
import hmac
import os


def verify_signature(payload, request_headers):
    """
    Verifica se a assinatura do payload é a mesma
    que a esperada pelo segredo do webhook armazenado.
    """
    if not isinstance(body, bytes):
        body = body.encode("utf-8")
    digest = hmac.new(
        os.environ["WEBHOOK_SECRET"].encode("utf-8"),
        body,
        hashlib.sha256
    )

Habilitando extensões no JupyterLab

AI Magic Function - Conectando o Jupyter notebook no ChatGPT

No environment do orchest, faça a instalação dos seguintes pacotes:

  • jupyter_ai_magics
  • jupyter_ai

Construção do ambiente

  1. Vá para a seção de Environments do projeto em que está atuando, como pode ser observado na imagem abaixo:

  1. Coloque o seguinte código no script que será executado ao construir o ambiente:
pip3 install jupyter_ai_magics jupyter_ai
  1. Realize a construção do environment clicando no botão Build.
  2. Aguarde a construção do environment.

Setup das chaves da OpenAI

  1. No canto superior direito, clique no menu "...".
  2. Clique na opção Pipeline Settings
  3. Clique na opção Environment Variables
  4. Coloque uma variável de ambiente chamada OPENAI_API_KEYcontendo uma chave de acesso a API da OpenAI.
  5. Reinicie a sessão do projeto.

Setup do JupyterLab

  1. Entre no JupyterLab clicando no botão JUPYTERLAB no canto superior direito.
  2. Crie um Jupyter Notebook
  3. Dentro de uma célula de código, coloque o seguinte trecho de código e execute a célula para realizar o carregamento da extensão.
%load_ext jupyter_ai
  1. Para fazer perguntas para o ChatGPT de dentro do Jupyter Notebook, basta usar a magic function %%ai chagpt como pode ser visto na imagem abaixo: