Caso 4: Customizando Nome das Tabelas

Como personalizar nomes de tabelas raw e qualify na criação de pipelines

Este guia demonstra como personalizar os nomes de tabelas de destino (raw e qualify) ao criar uma pipeline na Dadosfera.

Por que personalizar? Por padrão, o sistema gera nomes como tb__xqfu8g__cadastros__pedidos. A personalização permite seguir convenções de nomenclatura da sua organização e organizar dados em schemas que indicam maturidade dos dados.

Autenticação

import requests
import json
import os
from pprint import pprint

BASE_URL = "https://maestro.dadosfera.ai"

response = requests.post(
    f"{BASE_URL}/auth/sign-in",
    data=json.dumps({
        "username": os.environ['DADOSFERA_USERNAME'],
        "password": os.environ["DADOSFERA_PASSWORD"]
    }),
    headers={"Content-Type": "application/json"},
)

headers = {
    "Authorization": response.json()['tokens']['accessToken'],
    "Content-Type": "application/json"
}

Nomenclatura Padrão

Quando você não especifica nomes personalizados, o sistema gera automaticamente:

tb__{base32_id}__{schema_origem}__{tabela_origem}

Exemplo: tb__xqfu8g__cadastros__pedidos

ComponenteDescrição
tb__Prefixo padrão
xqfu8gIdentificador único (6 primeiros caracteres do pipeline ID em base32)
cadastrosSchema de origem
pedidosTabela de origem

Schemas padrão por load_type:

  • full_load / incremental: Tabela em PUBLIC
  • incremental_with_qualify: Raw em PUBLIC, Qualify em STAGED

Passo 1: Criar Pipeline com Nome Padrão (Referência)

Primeiro, vamos ver como fica uma pipeline sem personalização:

import uuid
pipeline_id = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id}")
payload = {
    "id": pipeline_id,
    "name": "Pedidos - Padrão",
    "cron": "@once",
    "description": "Pipeline com nomenclatura padrão",
    "jobs": [
        {
            "input": {
                "connector": "jdbc",
                "plugin": "mysql",
                "table_schema": "cadastros",
                "table_name": "pedidos",
                "load_type": "incremental_with_qualify",
                "incremental_column_name": "updated_at",
                "incremental_column_type": "timestamp",
                "primary_keys": ["id"],
                "column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
                "auth_parameters": {
                    "auth_type": "connection_manager",
                    "config_id": "sua-connection-config-id"
                }
            },
            "transformations": [],
            "output": {
                "plugin": "dadosfera_snowflake"
            }
        }
    ]
}

response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())

Resultado: Tabelas criadas com nomes gerados automaticamente:

  • Raw: PUBLIC.tb__xxxxxx__cadastros__pedidos
  • Qualify: STAGED.tb__xxxxxx__cadastros__pedidos

Passo 2: Criar Pipeline com Nomes Personalizados

Agora vamos criar uma pipeline especificando nomes e schemas customizados. O schema indica a maturidade dos dados:

  • RAW: Dados brutos, sem tratamento
  • STAGED: Dados processados e deduplicados
import uuid
pipeline_id_custom = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id_custom}")
payload = {
    "id": pipeline_id_custom,
    "name": "Pedidos - Customizado",
    "cron": "@once",
    "description": "Pipeline com nomenclatura personalizada",
    "jobs": [
        {
            "input": {
                "connector": "jdbc",
                "plugin": "mysql",
                "table_schema": "cadastros",
                "table_name": "pedidos",
                "load_type": "incremental_with_qualify",
                "incremental_column_name": "updated_at",
                "incremental_column_type": "timestamp",
                "primary_keys": ["id"],
                "column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
                "auth_parameters": {
                    "auth_type": "connection_manager",
                    "config_id": "sua-connection-config-id"
                }
            },
            "transformations": [],
            "output": {
                "plugin": "dadosfera_snowflake",
                "raw": {
                    "table_name": "pedidos",
                    "table_schema": "RAW"
                },
                "qualify": {
                    "table_name": "pedidos",
                    "table_schema": "STAGED"
                }
            }
        }
    ]
}

response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())

Resultado: Tabelas criadas com nomes personalizados:

  • Raw: RAW.pedidos
  • Qualify: STAGED.pedidos

O mesmo nome de tabela (pedidos) pode existir em schemas diferentes. O schema indica a maturidade: RAW contém dados brutos, STAGED contém dados deduplicados.

Opções de Configuração

Estrutura do Output

"output": {
    "plugin": "dadosfera_snowflake",
    "raw": {
        "table_name": "pedidos",
        "table_schema": "RAW"
    },
    "qualify": {
        "table_name": "pedidos",
        "table_schema": "STAGED"
    }
}
CampoDescriçãoObrigatório
raw.table_nameNome da tabela de dados brutosSim (se usar raw)
raw.table_schemaSchema da tabela rawSim (se usar raw)
qualify.table_nameNome da tabela deduplicadaSim (se usar qualify)
qualify.table_schemaSchema da tabela qualifySim (se usar qualify)

Regras de Validação

  1. qualify só é permitido com incremental_with_qualify: Se o load_type for full_load ou incremental, use apenas raw.

  2. Ambos são opcionais: Você pode personalizar apenas raw, apenas qualify, ambos, ou nenhum.

  3. Validação de conflitos: O sistema verifica se já existe uma tabela com o mesmo nome para o seu customer antes de criar a pipeline.

Exemplos por Load Type

Full Load (apenas raw)

"output": {
    "plugin": "dadosfera_snowflake",
    "raw": {
        "table_name": "clientes",
        "table_schema": "RAW"
    }
}

Incremental (apenas raw)

"output": {
    "plugin": "dadosfera_snowflake",
    "raw": {
        "table_name": "eventos",
        "table_schema": "RAW"
    }
}

Incremental with Qualify (raw + qualify)

"output": {
    "plugin": "dadosfera_snowflake",
    "raw": {
        "table_name": "pedidos",
        "table_schema": "RAW"
    },
    "qualify": {
        "table_name": "pedidos",
        "table_schema": "STAGED"
    }
}

Validação de Tabelas por Customer

A Dadosfera mantém um controle de todas as tabelas no Snowflake por cliente. Antes de criar a pipeline, o sistema valida:

ValidaçãoDescrição
UnicidadeO nome schema.table_name não pode existir para o mesmo customer
Schema válidoO schema deve existir no ambiente Snowflake do cliente
Conflito com catálogoNão pode haver conflito com tabelas já registradas

Erro de Conflito (HTTP 409)

Se você tentar criar uma pipeline com um nome de tabela que já existe:

response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
print(response.status_code)  # 409
pprint(response.json())
{
  "status": false,
  "exception_type": "RepositoryResourceAlreadyExists",
  "traceback": "Table 'RAW.pedidos' already exists in catalog",
  "data": null
}

Solução: Escolha um nome de tabela diferente ou use outro schema.

Erro de Validação (HTTP 400)

Se você tentar usar qualify com um load_type incompatível:

# ERRO: qualify com full_load
payload = {
    # ...
    "jobs": [{
        "input": {
            "load_type": "full_load",  # NÃO suporta qualify
            # ...
        },
        "output": {
            "plugin": "dadosfera_snowflake",
            "qualify": {  # INVÁLIDO para full_load
                "table_name": "pedidos",
                "table_schema": "STAGED"
            }
        }
    }]
}
{
  "status": false,
  "exception_type": "ValidationError",
  "traceback": "Output 'qualify' configuration is only allowed when input load_type is 'incremental_with_qualify'",
  "data": null
}

Solução: Remova a configuração qualify ou altere o load_type para incremental_with_qualify.

Verificar Tabelas Criadas

Após criar a pipeline, consulte o job para ver os nomes das tabelas:

JOB_ID = f"{pipeline_id_custom}-0"

response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers)
job_config = response.json()

print("Output Config:")
pprint(job_config.get("output_config"))
{
  "plugin": "dadosfera_snowflake",
  "table_name": "tb__xxxxxx__cadastros__pedidos",
  "raw": {
    "table_name": "pedidos",
    "table_schema": "RAW"
  },
  "qualify": {
    "table_name": "pedidos",
    "table_schema": "STAGED"
  }
}

Boas Práticas

  1. Use schemas para indicar maturidade: RAW para dados brutos, STAGED para dados processados
  2. Mantenha nomes simples: O mesmo nome de tabela em schemas diferentes facilita o entendimento
  3. Evite caracteres especiais: Use apenas letras, números e underscores
  4. Planeje antes de criar: Uma vez criada, a tabela fica reservada para aquela pipeline

Resumo

CenárioConfiguração rawConfiguração qualify
full_loadOpcionalNão permitido
incrementalOpcionalNão permitido
incremental_with_qualifyOpcionalOpcional