Caso 4: Customizando Nome das Tabelas
Como personalizar nomes de tabelas raw e qualify na criação de pipelines
Este guia demonstra como personalizar os nomes de tabelas de destino (raw e qualify) ao criar uma pipeline na Dadosfera.
Por que personalizar? Por padrão, o sistema gera nomes como tb__xqfu8g__cadastros__pedidos. A personalização permite seguir convenções de nomenclatura da sua organização e organizar dados em schemas que indicam maturidade dos dados.
Autenticação
import requests
import json
import os
from pprint import pprint
BASE_URL = "https://maestro.dadosfera.ai"
response = requests.post(
f"{BASE_URL}/auth/sign-in",
data=json.dumps({
"username": os.environ['DADOSFERA_USERNAME'],
"password": os.environ["DADOSFERA_PASSWORD"]
}),
headers={"Content-Type": "application/json"},
)
headers = {
"Authorization": response.json()['tokens']['accessToken'],
"Content-Type": "application/json"
}Nomenclatura Padrão
Quando você não especifica nomes personalizados, o sistema gera automaticamente:
tb__{base32_id}__{schema_origem}__{tabela_origem}
Exemplo: tb__xqfu8g__cadastros__pedidos
| Componente | Descrição |
|---|---|
tb__ | Prefixo padrão |
xqfu8g | Identificador único (6 primeiros caracteres do pipeline ID em base32) |
cadastros | Schema de origem |
pedidos | Tabela de origem |
Schemas padrão por load_type:
full_load/incremental: Tabela emPUBLICincremental_with_qualify: Raw emPUBLIC, Qualify emSTAGED
Passo 1: Criar Pipeline com Nome Padrão (Referência)
Primeiro, vamos ver como fica uma pipeline sem personalização:
import uuid
pipeline_id = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id}")payload = {
"id": pipeline_id,
"name": "Pedidos - Padrão",
"cron": "@once",
"description": "Pipeline com nomenclatura padrão",
"jobs": [
{
"input": {
"connector": "jdbc",
"plugin": "mysql",
"table_schema": "cadastros",
"table_name": "pedidos",
"load_type": "incremental_with_qualify",
"incremental_column_name": "updated_at",
"incremental_column_type": "timestamp",
"primary_keys": ["id"],
"column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
"auth_parameters": {
"auth_type": "connection_manager",
"config_id": "sua-connection-config-id"
}
},
"transformations": [],
"output": {
"plugin": "dadosfera_snowflake"
}
}
]
}
response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())Resultado: Tabelas criadas com nomes gerados automaticamente:
- Raw:
PUBLIC.tb__xxxxxx__cadastros__pedidos - Qualify:
STAGED.tb__xxxxxx__cadastros__pedidos
Passo 2: Criar Pipeline com Nomes Personalizados
Agora vamos criar uma pipeline especificando nomes e schemas customizados. O schema indica a maturidade dos dados:
RAW: Dados brutos, sem tratamentoSTAGED: Dados processados e deduplicados
import uuid
pipeline_id_custom = str(uuid.uuid4())
print(f"Pipeline ID: {pipeline_id_custom}")payload = {
"id": pipeline_id_custom,
"name": "Pedidos - Customizado",
"cron": "@once",
"description": "Pipeline com nomenclatura personalizada",
"jobs": [
{
"input": {
"connector": "jdbc",
"plugin": "mysql",
"table_schema": "cadastros",
"table_name": "pedidos",
"load_type": "incremental_with_qualify",
"incremental_column_name": "updated_at",
"incremental_column_type": "timestamp",
"primary_keys": ["id"],
"column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"],
"auth_parameters": {
"auth_type": "connection_manager",
"config_id": "sua-connection-config-id"
}
},
"transformations": [],
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
}
]
}
response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
pprint(response.json())Resultado: Tabelas criadas com nomes personalizados:
- Raw:
RAW.pedidos - Qualify:
STAGED.pedidos
O mesmo nome de tabela (pedidos) pode existir em schemas diferentes. O schema indica a maturidade: RAW contém dados brutos, STAGED contém dados deduplicados.
Opções de Configuração
Estrutura do Output
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}| Campo | Descrição | Obrigatório |
|---|---|---|
raw.table_name | Nome da tabela de dados brutos | Sim (se usar raw) |
raw.table_schema | Schema da tabela raw | Sim (se usar raw) |
qualify.table_name | Nome da tabela deduplicada | Sim (se usar qualify) |
qualify.table_schema | Schema da tabela qualify | Sim (se usar qualify) |
Regras de Validação
-
qualifysó é permitido comincremental_with_qualify: Se oload_typeforfull_loadouincremental, use apenasraw. -
Ambos são opcionais: Você pode personalizar apenas
raw, apenasqualify, ambos, ou nenhum. -
Validação de conflitos: O sistema verifica se já existe uma tabela com o mesmo nome para o seu customer antes de criar a pipeline.
Exemplos por Load Type
Full Load (apenas raw)
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "clientes",
"table_schema": "RAW"
}
}Incremental (apenas raw)
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "eventos",
"table_schema": "RAW"
}
}Incremental with Qualify (raw + qualify)
"output": {
"plugin": "dadosfera_snowflake",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}Validação de Tabelas por Customer
A Dadosfera mantém um controle de todas as tabelas no Snowflake por cliente. Antes de criar a pipeline, o sistema valida:
| Validação | Descrição |
|---|---|
| Unicidade | O nome schema.table_name não pode existir para o mesmo customer |
| Schema válido | O schema deve existir no ambiente Snowflake do cliente |
| Conflito com catálogo | Não pode haver conflito com tabelas já registradas |
Erro de Conflito (HTTP 409)
Se você tentar criar uma pipeline com um nome de tabela que já existe:
response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload))
print(response.status_code) # 409
pprint(response.json()){
"status": false,
"exception_type": "RepositoryResourceAlreadyExists",
"traceback": "Table 'RAW.pedidos' already exists in catalog",
"data": null
}Solução: Escolha um nome de tabela diferente ou use outro schema.
Erro de Validação (HTTP 400)
Se você tentar usar qualify com um load_type incompatível:
# ERRO: qualify com full_load
payload = {
# ...
"jobs": [{
"input": {
"load_type": "full_load", # NÃO suporta qualify
# ...
},
"output": {
"plugin": "dadosfera_snowflake",
"qualify": { # INVÁLIDO para full_load
"table_name": "pedidos",
"table_schema": "STAGED"
}
}
}]
}{
"status": false,
"exception_type": "ValidationError",
"traceback": "Output 'qualify' configuration is only allowed when input load_type is 'incremental_with_qualify'",
"data": null
}Solução: Remova a configuração qualify ou altere o load_type para incremental_with_qualify.
Verificar Tabelas Criadas
Após criar a pipeline, consulte o job para ver os nomes das tabelas:
JOB_ID = f"{pipeline_id_custom}-0"
response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers)
job_config = response.json()
print("Output Config:")
pprint(job_config.get("output_config")){
"plugin": "dadosfera_snowflake",
"table_name": "tb__xxxxxx__cadastros__pedidos",
"raw": {
"table_name": "pedidos",
"table_schema": "RAW"
},
"qualify": {
"table_name": "pedidos",
"table_schema": "STAGED"
}
}Boas Práticas
- Use schemas para indicar maturidade:
RAWpara dados brutos,STAGEDpara dados processados - Mantenha nomes simples: O mesmo nome de tabela em schemas diferentes facilita o entendimento
- Evite caracteres especiais: Use apenas letras, números e underscores
- Planeje antes de criar: Uma vez criada, a tabela fica reservada para aquela pipeline
Resumo
| Cenário | Configuração raw | Configuração qualify |
|---|---|---|
full_load | Opcional | Não permitido |
incremental | Opcional | Não permitido |
incremental_with_qualify | Opcional | Opcional |
Updated about 2 hours ago
