# Caso 4: Customizando Nome das Tabelas Como personalizar nomes de tabelas raw e qualify na criação de pipelines Este guia demonstra como personalizar os nomes de tabelas de destino (raw e qualify) ao criar uma pipeline na Dadosfera. **Por que personalizar?** Por padrão, o sistema gera nomes como `tb__xqfu8g__cadastros__pedidos`. A personalização permite seguir convenções de nomenclatura da sua organização e organizar dados em schemas que indicam maturidade dos dados. ## Autenticação ```python import requests import json import os from pprint import pprint BASE_URL = "https://maestro.dadosfera.ai" response = requests.post( f"{BASE_URL}/auth/sign-in", data=json.dumps({ "username": os.environ['DADOSFERA_USERNAME'], "password": os.environ["DADOSFERA_PASSWORD"] }), headers={"Content-Type": "application/json"}, ) headers = { "Authorization": response.json()['tokens']['accessToken'], "Content-Type": "application/json" } ``` ## Nomenclatura Padrão Quando você não especifica nomes personalizados, o sistema gera automaticamente: ``` tb__{base32_id}__{schema_origem}__{tabela_origem} ``` **Exemplo:** `tb__xqfu8g__cadastros__pedidos` | Componente | Descrição | | ----------- | --------------------------------------------------------------------- | | `tb__` | Prefixo padrão | | `xqfu8g` | Identificador único (6 primeiros caracteres do pipeline ID em base32) | | `cadastros` | Schema de origem | | `pedidos` | Tabela de origem | **Schemas padrão por load\_type:** * `full_load` / `incremental`: Tabela em `PUBLIC` * `incremental_with_qualify`: Raw em `PUBLIC`, Qualify em `STAGED` ## Passo 1: Criar Pipeline com Nome Padrão (Referência) Primeiro, vamos ver como fica uma pipeline sem personalização: ```python import uuid pipeline_id = str(uuid.uuid4()) print(f"Pipeline ID: {pipeline_id}") ``` ```python payload = { "id": pipeline_id, "name": "Pedidos - Padrão", "cron": "@once", "description": "Pipeline com nomenclatura padrão", "jobs": [ { "input": { "connector": "jdbc", "plugin": "mysql", "table_schema": "cadastros", "table_name": "pedidos", "load_type": "incremental_with_qualify", "incremental_column_name": "updated_at", "incremental_column_type": "timestamp", "primary_keys": ["id"], "column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"], "auth_parameters": { "auth_type": "connection_manager", "config_id": "sua-connection-config-id" } }, "transformations": [], "output": { "plugin": "dadosfera_snowflake" } } ] } response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload)) pprint(response.json()) ``` Resultado: Tabelas criadas com nomes gerados automaticamente: * Raw: `PUBLIC.tb__xxxxxx__cadastros__pedidos` * Qualify: `STAGED.tb__xxxxxx__cadastros__pedidos` ## Passo 2: Criar Pipeline com Nomes Personalizados Agora vamos criar uma pipeline especificando nomes e schemas customizados. O schema indica a maturidade dos dados: * `RAW`: Dados brutos, sem tratamento * `STAGED`: Dados processados e deduplicados ```python import uuid pipeline_id_custom = str(uuid.uuid4()) print(f"Pipeline ID: {pipeline_id_custom}") ``` ```python payload = { "id": pipeline_id_custom, "name": "Pedidos - Customizado", "cron": "@once", "description": "Pipeline com nomenclatura personalizada", "jobs": [ { "input": { "connector": "jdbc", "plugin": "mysql", "table_schema": "cadastros", "table_name": "pedidos", "load_type": "incremental_with_qualify", "incremental_column_name": "updated_at", "incremental_column_type": "timestamp", "primary_keys": ["id"], "column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"], "auth_parameters": { "auth_type": "connection_manager", "config_id": "sua-connection-config-id" } }, "transformations": [], "output": { "plugin": "dadosfera_snowflake", "raw": { "table_name": "pedidos", "table_schema": "RAW" }, "qualify": { "table_name": "pedidos", "table_schema": "STAGED" } } } ] } response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload)) pprint(response.json()) ``` Resultado: Tabelas criadas com nomes personalizados: * Raw: `RAW.pedidos` * Qualify: `STAGED.pedidos` O mesmo nome de tabela (`pedidos`) pode existir em schemas diferentes. O schema indica a maturidade: `RAW` contém dados brutos, `STAGED` contém dados deduplicados. ## Opções de Configuração ### Estrutura do Output ```python "output": { "plugin": "dadosfera_snowflake", "raw": { "table_name": "pedidos", "table_schema": "RAW" }, "qualify": { "table_name": "pedidos", "table_schema": "STAGED" } } ``` | Campo | Descrição | Obrigatório | | ---------------------- | ------------------------------ | ----------------------- | | `raw.table_name` | Nome da tabela de dados brutos | Sim (se usar `raw`) | | `raw.table_schema` | Schema da tabela raw | Sim (se usar `raw`) | | `qualify.table_name` | Nome da tabela deduplicada | Sim (se usar `qualify`) | | `qualify.table_schema` | Schema da tabela qualify | Sim (se usar `qualify`) | ### Regras de Validação 1. **`qualify` só é permitido com `incremental_with_qualify`:** Se o `load_type` for `full_load` ou `incremental`, use apenas `raw`. 2. **Ambos são opcionais:** Você pode personalizar apenas `raw`, apenas `qualify`, ambos, ou nenhum. 3. **Validação de conflitos:** O sistema verifica se já existe uma tabela com o mesmo nome para o seu customer antes de criar a pipeline. ## Exemplos por Load Type ### Full Load (apenas raw) ```python "output": { "plugin": "dadosfera_snowflake", "raw": { "table_name": "clientes", "table_schema": "RAW" } } ``` ### Incremental (apenas raw) ```python "output": { "plugin": "dadosfera_snowflake", "raw": { "table_name": "eventos", "table_schema": "RAW" } } ``` ### Incremental with Qualify (raw + qualify) ```python "output": { "plugin": "dadosfera_snowflake", "raw": { "table_name": "pedidos", "table_schema": "RAW" }, "qualify": { "table_name": "pedidos", "table_schema": "STAGED" } } ``` ## Validação de Tabelas por Customer A Dadosfera mantém um controle de todas as tabelas no Snowflake por cliente. Antes de criar a pipeline, o sistema valida: | Validação | Descrição | | ------------------------- | ----------------------------------------------------------------- | | **Unicidade** | O nome `schema.table_name` não pode existir para o mesmo customer | | **Schema válido** | O schema deve existir no ambiente Snowflake do cliente | | **Conflito com catálogo** | Não pode haver conflito com tabelas já registradas | ### Erro de Conflito (HTTP 409) Se você tentar criar uma pipeline com um nome de tabela que já existe: ```python response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload)) print(response.status_code) # 409 pprint(response.json()) ``` ```json { "status": false, "exception_type": "RepositoryResourceAlreadyExists", "traceback": "Table 'RAW.pedidos' already exists in catalog", "data": null } ``` **Solução:** Escolha um nome de tabela diferente ou use outro schema. ### Erro de Validação (HTTP 400) Se você tentar usar `qualify` com um load\_type incompatível: ```python # ERRO: qualify com full_load payload = { # ... "jobs": [{ "input": { "load_type": "full_load", # NÃO suporta qualify # ... }, "output": { "plugin": "dadosfera_snowflake", "qualify": { # INVÁLIDO para full_load "table_name": "pedidos", "table_schema": "STAGED" } } }] } ``` ```json { "status": false, "exception_type": "ValidationError", "traceback": "Output 'qualify' configuration is only allowed when input load_type is 'incremental_with_qualify'", "data": null } ``` **Solução:** Remova a configuração `qualify` ou altere o `load_type` para `incremental_with_qualify`. ## Verificar Tabelas Criadas Após criar a pipeline, consulte o job para ver os nomes das tabelas: ```python JOB_ID = f"{pipeline_id_custom}-0" response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers) job_config = response.json() print("Output Config:") pprint(job_config.get("output_config")) ``` ```json { "plugin": "dadosfera_snowflake", "table_name": "tb__xxxxxx__cadastros__pedidos", "raw": { "table_name": "pedidos", "table_schema": "RAW" }, "qualify": { "table_name": "pedidos", "table_schema": "STAGED" } } ``` ## Boas Práticas 1. **Use schemas para indicar maturidade:** `RAW` para dados brutos, `STAGED` para dados processados 2. **Mantenha nomes simples:** O mesmo nome de tabela em schemas diferentes facilita o entendimento 3. **Evite caracteres especiais:** Use apenas letras, números e underscores 4. **Planeje antes de criar:** Uma vez criada, a tabela fica reservada para aquela pipeline ## Resumo | Cenário | Configuração `raw` | Configuração `qualify` | | -------------------------- | ------------------ | ---------------------- | | `full_load` | Opcional | Não permitido | | `incremental` | Opcional | Não permitido | | `incremental_with_qualify` | Opcional | Opcional |