# Caso 1: Adicionando Coluna em Pipeline Incremental Como adicionar uma nova coluna e fazer backfill dos dados históricos # Caso 1: Adicionando Coluna em Pipeline Incremental Este guia demonstra como adicionar uma nova coluna a uma pipeline incremental existente e realizar o backfill dos dados históricos.
**Pré-requisito:** Veja o notebook `00-setup-e-helpers.ipynb` para entender como obter o `job_id`. ## Autenticação ```python import requests import os import json from pprint import pprint BASE_URL = "https://maestro.dadosfera.ai" response = requests.post( f"{BASE_URL}/auth/sign-in", data=json.dumps({ "username": os.environ['DADOSFERA_USERNAME'], "password": os.environ["DADOSFERA_PASSWORD"] }), headers={"Content-Type": "application/json"}, ) headers = { "Authorization": response.json()['tokens']['accessToken'], "Content-Type": "application/json" } ``` ## Passo 1: Criar a pipeline de teste Para este exemplo, criamos uma pipeline que sincroniza a tabela `pedidos` do MySQL. ```python import uuid uuid.uuid4() ``` ```python payload = { "id": "7b8b3399-b8c0-4fc5-9116-142cd9f4e7ae", "name": "Orders", "cron": "@once", "description": "tabela pedidos", "jobs": [ { "input": { "connector": "jdbc", "plugin": "mysql", "table_schema": "cadastros", "table_name": "pedidos", "load_type": "incremental", "incremental_column_name": "updated_at", "incremental_column_type": "timestamp", "column_include_list": ["id","cliente_id","valor","status","created_at","updated_at"], "auth_parameters": { "auth_type": "connection_manager", "config_id": "1763991009206_kvpd3364_mysql-1.0.0" } }, "transformations": [], "output": { "plugin": "dadosfera_snowflake" } } ] } response = requests.post(f"{BASE_URL}/platform/pipeline", headers=headers, data=json.dumps(payload)) ``` ```python response = requests.get(f"{BASE_URL}/platform/pipeline/7b8b3399-b8c0-4fc5-9116-142cd9f4e7ae/pipeline_run", headers=headers) response.json() ``` ```json [{ "last_status": "SUCCESS", "pipeline_run_id": "cbab0131966a40809491555a8f23e272", "updated_at": "2025-12-17T20:41:25.401858+00:00", "pipeline_id": "7b8b3399_b8c0_4fc5_9116_142cd9f4e7ae", "id": 2246, "created_at": "2025-12-17T20:37:54.422460+00:00" }] ``` ## Passo 2: Consultar Job JDBC ```python JOB_ID = "7b8b3399-b8c0-4fc5-9116-142cd9f4e7ae-0" ``` ```python response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers) job_config = response.json() pprint(job_config) print("\nColunas atuais:") print(job_config.get("source_config", {}).get("column_include_list", [])) ``` ```json { "cron": "@once", "customer_id": "dadosfera", "description": "tabela pedidos", "id": "7b8b3399_b8c0_4fc5_9116_142cd9f4e7ae", "job_name": "cadastros_pedidos", "name": "Orders", "output_config": { "plugin": "dadosfera_snowflake", "table_name": "tb__xqfu8g__cadastros__pedidos" }, "source_config": { "column_include_list": ["id", "cliente_id", "valor", "status", "created_at", "updated_at"], "connector": "jdbc", "incremental_column_name": "updated_at", "incremental_column_type": "timestamp", "load_type": "incremental", "plugin": "mysql", "table_name": "pedidos", "table_schema": "cadastros" } } ``` ``` Colunas atuais: ['id', 'cliente_id', 'valor', 'status', 'created_at', 'updated_at'] ``` ### Resultado da primeira execução A pipeline sincronizou os 5 registros iniciais da tabela `pedidos`.
## Passo 3: Adicionar Nova Coluna O time de desenvolvimento adicionou uma nova coluna `telefone` na tabela de origem: ```sql ALTER TABLE cadastros.pedidos ADD COLUMN telefone VARCHAR(20) DEFAULT NULL; UPDATE cadastros.pedidos SET telefone = '11999990001', updated_at = '2025-12-17 16:50:00' WHERE id = 1; UPDATE cadastros.pedidos SET telefone = '11999990002', updated_at = '2025-12-17 16:50:00' WHERE id = 2; UPDATE cadastros.pedidos SET telefone = '11999990003', updated_at = '2025-12-17 16:50:00' WHERE id = 3; UPDATE cadastros.pedidos SET telefone = '11999990004', updated_at = '2025-12-17 16:50:00' WHERE id = 4; UPDATE cadastros.pedidos SET telefone = '11999990005', updated_at = '2025-12-17 16:50:00' WHERE id = 5; ``` A pipeline executou novamente, mas sem a nova coluna configurada. Os dados foram replicados, porém a coluna `telefone` não aparece. Para corrigir, precisamos: 1. Adicionar a coluna à configuração da pipeline 2. Resetar o estado para re-extrair os dados com a nova coluna ## Passo 4: Adicionar a nova coluna à pipeline O endpoint `PATCH /platform/jobs/{jobId}/input` faz união das colunas — a nova coluna será adicionada às existentes. ```python NEW_COLUMN = "telefone" payload = { "column_include_list": [NEW_COLUMN] } response = requests.patch( f"{BASE_URL}/platform/jobs/{JOB_ID}/input", headers=headers, json=payload ) pprint(response.json()) ``` ```json { "detail": "Successfully updated input fields for job_id 7b8b3399_b8c0_4fc5_9116_142cd9f4e7ae_0", "status": true, "updated_fields": ["column_include_list", "info"] } ``` ```python response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers) job_config = response.json() print("Colunas após adição:") print(job_config.get("source_config", {}).get("column_include_list", [])) ``` ``` Colunas após adição: ['id', 'cliente_id', 'valor', 'status', 'created_at', 'updated_at', 'telefone'] ``` ## Passo 5: Resetar Estado Parcial (Backfill) ```python response = requests.post( f"{BASE_URL}/platform/jobs/{JOB_ID}/reset-state", data=json.dumps({"incremental_column_value": "2025-12-17 17:49:00"}), headers=headers ) pprint(response.json()) ``` ```json { "detail": "Successfully reset state for job_id 7b8b3399_b8c0_4fc5_9116_142cd9f4e7ae_0", "incremental_column_value": "2025-12-17 17:49:00", "state": {}, "status": true } ``` ## Passo 6: Limpar registros duplicados no destino Antes de re-executar, no Snowflake, remova os registros que serão re-extraídos para evitar duplicação: ```sql DELETE FROM public.tb__xqfu8g__cadastros__pedidos WHERE updated_at >= '2025-12-17 17:50:00.000'; ``` ## Passo 7: Re-executar a pipeline ### Resultado
A coluna `telefone` agora está presente nos dados. Note que existem registros duplicados (mesmo `id` com diferentes versões). Para resolver isso, veja o **Caso 2: Migração para Incremental with Qualify**.