# Caso 2: Migrando de Incremental para Incremental with Qualify Como adicionar deduplicação a uma pipeline incremental existente # Caso 2: Migrando de Incremental para Incremental with Qualify Este guia demonstra como migrar uma pipeline incremental para o modo `incremental_with_qualify`, que adiciona deduplicação automática por primary key. **Problema:** Pipelines incrementais capturam todas as versões de um registro, gerando duplicatas no destino. **Solução:** Migrar para `incremental_with_qualify`, que mantém apenas a versão mais recente de cada registro. ## Autenticação ```python import requests import json import os from pprint import pprint BASE_URL = "https://maestro.dadosfera.ai" response = requests.post( f"{BASE_URL}/auth/sign-in", data=json.dumps({ "username": os.environ['DADOSFERA_USERNAME'], "password": os.environ["DADOSFERA_PASSWORD"] }), headers={"Content-Type": "application/json"}, ) headers = { "Authorization": response.json()['tokens']['accessToken'], "Content-Type": "application/json" } ``` ## Passo 1: Identificar Pipeline e Job Utilizamos a mesma pipeline criada no Caso 1. ```python PIPELINE_ID = "7b8b3399-b8c0-4fc5-9116-142cd9f4e7ae" JOB_ID = "7b8b3399-b8c0-4fc5-9116-142cd9f4e7ae-0" ``` ```python response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers) job_config = response.json() pprint(f"\nload_type atual: {job_config.get('source_config', {}).get('load_type')}") pprint(f"primary_keys: {job_config.get('source_config', {}).get('primary_keys', 'NÃO DEFINIDAS')}") ``` ``` 'load_type atual: incremental' 'primary_keys: None' ``` ## Passo 2: Migrar Sync Mode O endpoint `POST /platform/jobs/jdbc/{jobId}/sync-mode` realiza a migração com validação completa. Parâmetros obrigatórios: * `target_load_type`: novo modo de sincronização * `primary_keys`: colunas que identificam unicamente cada registro * `incremental_column_name` e `incremental_column_type`: coluna de controle incremental ```python payload = { "target_load_type": "incremental_with_qualify", "incremental_column_name": "updated_at", "incremental_column_type": "timestamp", "primary_keys": ["id"] } response = requests.post( f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}/sync-mode", headers=headers, json=payload ) pprint(response.json()) ``` ```json { "message": "Successfully migrated sync mode to incremental_with_qualify", "migration_details": { "state_action": "preserve", "target_load_type": "incremental_with_qualify", "warnings": [] }, "status": "success" } ``` ## Passo 3: Verificar Alteração O `state_action: "preserve"` indica que o estado incremental foi mantido — a próxima execução continua de onde parou. ```python response = requests.get(f"{BASE_URL}/platform/jobs/jdbc/{JOB_ID}", headers=headers) job_config = response.json() print(f"load_type: {job_config.get('source_config', {}).get('load_type')}") print(f"primary_keys: {job_config.get('source_config', {}).get('primary_keys')}") ``` ``` load_type: incremental_with_qualify primary_keys: ['id'] ``` ## Passo 4: Executar Pipeline ```python payload = {"pipeline_id": PIPELINE_ID} response = requests.post( f"{BASE_URL}/platform/pipeline/execute", headers=headers, json=payload ) pprint(response.json()) ``` ### Resultado
Com `incremental_with_qualify`, uma nova tabela é criada no schema `STAGED` contendo apenas a versão mais recente de cada registro. Tanto o schema como as tabelas são customizáveis pela API. **Resultados:** 1. IDs deduplicados (apenas 1 registro por `id`) 2. Nova coluna `telefone` presente nos dados