# Spark Connect com Snowflake ## Overview Este guia demonstra como usar o **Snowflake Connector for Spark** para ler e escrever dados diretamente entre o Spark Connect e o Snowflake, sem intermediários. O conector Snowflake-Spark permite: * Leitura direta de tabelas e queries do Snowflake * Escrita de DataFrames para tabelas Snowflake * Query pushdown para otimização de performance * Integração nativa com o ecossistema Spark ## Pré-requisitos ### JARs Necessários O cluster Spark Connect já possui os JARs do Snowflake Connector configurados: * `snowflake-jdbc-.jar` - Driver JDBC do Snowflake * `spark-snowflake_2.12-.jar` - Conector Spark-Snowflake ### Credenciais Snowflake Você precisará das seguintes informações: * Account identifier (ex: `xy12345.us-east-1`) * Usuário e senha (ou key pair) * Database e schema * Warehouse ## Conexão Básica ### Configuração das Opções ```python from pyspark.sql import SparkSession # Conectar ao Spark Connect spark = SparkSession.builder \ .remote("sc://spark-connect.spark-jobs.svc.cluster.local:15002") \ .getOrCreate() # Configuração do Snowflake sfOptions = { "sfURL": ".snowflakecomputing.com", "sfUser": "", "sfPassword": "", "sfDatabase": "", "sfSchema": "", "sfWarehouse": "" } SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" ``` ### Usando Variáveis de Ambiente (Recomendado) ```python import os sfOptions = { "sfURL": os.environ["SNOWFLAKE_ACCOUNT"] + ".snowflakecomputing.com", "sfUser": os.environ["SNOWFLAKE_USER"], "sfPassword": os.environ["SNOWFLAKE_PASSWORD"], "sfDatabase": os.environ["SNOWFLAKE_DATABASE"], "sfSchema": os.environ.get("SNOWFLAKE_SCHEMA", "PUBLIC"), "sfWarehouse": os.environ.get("SNOWFLAKE_WAREHOUSE", "COMPUTE_WH") } ``` ## Lendo Dados do Snowflake ### Leitura de Tabela Completa ```python # Ler tabela inteira df = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "VENDAS") \ .load() df.show() print(f"Total de registros: {df.count()}") ``` ### Leitura com Query SQL ```python # Ler com query customizada query = """ SELECT data_venda, produto, categoria, regiao, SUM(valor) as total_valor, COUNT(*) as num_vendas FROM VENDAS WHERE YEAR(data_venda) = 2024 GROUP BY data_venda, produto, categoria, regiao """ df = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", query) \ .load() df.show() ``` ### Leitura com Filtros (Query Pushdown) O conector automaticamente faz pushdown de filtros para o Snowflake: ```python # Filtros são enviados ao Snowflake automaticamente df = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "VENDAS") \ .load() # Este filtro será executado no Snowflake, não no Spark df_filtrado = df.filter(df.regiao == "SUL").filter(df.valor > 1000) df_filtrado.show() ``` ## Escrevendo Dados no Snowflake ### Escrita Básica (Overwrite) ```python from pyspark.sql.functions import current_timestamp # Criar DataFrame de exemplo dados = [ ("Produto A", "Categoria 1", 100.0), ("Produto B", "Categoria 2", 200.0), ("Produto C", "Categoria 1", 150.0), ] df = spark.createDataFrame(dados, ["produto", "categoria", "valor"]) # Adicionar timestamp df = df.withColumn("data_processamento", current_timestamp()) # Escrever no Snowflake (sobrescreve tabela) df.write \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "PRODUTOS_PROCESSADOS") \ .mode("overwrite") \ .save() print("Dados salvos no Snowflake!") ``` ### Escrita com Append ```python # Adicionar dados a tabela existente df.write \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "PRODUTOS_PROCESSADOS") \ .mode("append") \ .save() ``` ### Modos de Escrita Disponíveis | Modo | Descrição | |------|-----------| | `overwrite` | Substitui a tabela completamente | | `append` | Adiciona registros à tabela existente | | `errorifexists` | Erro se a tabela já existir | | `ignore` | Ignora se a tabela já existir | ## Exemplos Práticos ### ETL: Agregação e Salvamento ```python from pyspark.sql.functions import sum as spark_sum, avg, count, col # Ler dados brutos do Snowflake df_vendas = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "FATO_VENDAS") \ .load() # Transformar: agregar por categoria e mês df_agregado = df_vendas \ .groupBy("categoria", "mes") \ .agg( spark_sum("valor").alias("receita_total"), avg("valor").alias("ticket_medio"), count("*").alias("num_transacoes") ) \ .orderBy("categoria", "mes") # Salvar resultado no Snowflake df_agregado.write \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "VENDAS_AGREGADAS_MENSAL") \ .mode("overwrite") \ .save() print("ETL concluído!") ``` ### Join entre Tabelas Snowflake ```python # Ler múltiplas tabelas df_vendas = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "VENDAS") \ .load() df_produtos = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "DIM_PRODUTOS") \ .load() df_clientes = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "DIM_CLIENTES") \ .load() # Join no Spark df_completo = df_vendas \ .join(df_produtos, df_vendas.produto_id == df_produtos.id, "inner") \ .join(df_clientes, df_vendas.cliente_id == df_clientes.id, "inner") \ .select( df_vendas.data_venda, df_produtos.nome.alias("produto"), df_produtos.categoria, df_clientes.nome.alias("cliente"), df_clientes.regiao, df_vendas.valor ) df_completo.show() ``` ### Análise de Coorte ```python from pyspark.sql.functions import month, year, min as spark_min, countDistinct from pyspark.sql.window import Window # Ler dados de compras df = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("query", """ SELECT cliente_id, data_compra, valor FROM COMPRAS WHERE data_compra >= '2023-01-01' """) \ .load() # Calcular coorte (mês da primeira compra) window_cliente = Window.partitionBy("cliente_id") df_coorte = df \ .withColumn("primeira_compra", spark_min("data_compra").over(window_cliente)) \ .withColumn("coorte", month("primeira_compra")) \ .withColumn("mes_compra", month("data_compra")) # Análise de retenção retencao = df_coorte \ .groupBy("coorte", "mes_compra") \ .agg(countDistinct("cliente_id").alias("clientes")) \ .orderBy("coorte", "mes_compra") # Salvar análise retencao.write \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "ANALISE_COORTE") \ .mode("overwrite") \ .save() ``` ### Window Functions com Dados Snowflake ```python from pyspark.sql.functions import row_number, rank, lag, lead from pyspark.sql.window import Window # Ler dados df = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "VENDAS_DIARIAS") \ .load() # Definir janela por produto ordenado por data window = Window.partitionBy("produto").orderBy("data") # Aplicar funções de janela df_analise = df \ .withColumn("ranking", rank().over(window)) \ .withColumn("venda_anterior", lag("valor", 1).over(window)) \ .withColumn("venda_seguinte", lead("valor", 1).over(window)) \ .withColumn("variacao", col("valor") - col("venda_anterior")) df_analise.show() ``` ## Módulo Utilitário Reutilizável ### snowflake\_utils.py ```python """ Utilitários para integração Snowflake-Spark """ import os from contextlib import contextmanager from pyspark.sql import SparkSession, DataFrame from typing import Optional, Dict, Any SNOWFLAKE_SOURCE = "net.snowflake.spark.snowflake" SPARK_CONNECT_URL = "sc://spark-connect.spark-jobs.svc.cluster.local:15002" def get_snowflake_options() -> Dict[str, str]: """Retorna opções de conexão Snowflake das variáveis de ambiente.""" return { "sfURL": os.environ["SNOWFLAKE_ACCOUNT"] + ".snowflakecomputing.com", "sfUser": os.environ["SNOWFLAKE_USER"], "sfPassword": os.environ["SNOWFLAKE_PASSWORD"], "sfDatabase": os.environ["SNOWFLAKE_DATABASE"], "sfSchema": os.environ.get("SNOWFLAKE_SCHEMA", "PUBLIC"), "sfWarehouse": os.environ.get("SNOWFLAKE_WAREHOUSE", "COMPUTE_WH") } @contextmanager def spark_snowflake_session(): """ Context manager para sessão Spark com Snowflake. Uso: with spark_snowflake_session() as (spark, sf): df = sf.read_table("VENDAS") df.show() """ spark = SparkSession.builder \ .remote(SPARK_CONNECT_URL) \ .getOrCreate() sf = SnowflakeHelper(spark) try: yield spark, sf finally: spark.stop() class SnowflakeHelper: """Helper class para operações Snowflake.""" def __init__(self, spark: SparkSession, options: Optional[Dict] = None): self.spark = spark self.options = options or get_snowflake_options() def read_table(self, table: str) -> DataFrame: """Lê uma tabela do Snowflake.""" return self.spark.read \ .format(SNOWFLAKE_SOURCE) \ .options(**self.options) \ .option("dbtable", table) \ .load() def read_query(self, query: str) -> DataFrame: """Executa uma query no Snowflake.""" return self.spark.read \ .format(SNOWFLAKE_SOURCE) \ .options(**self.options) \ .option("query", query) \ .load() def write_table( self, df: DataFrame, table: str, mode: str = "overwrite" ) -> None: """Escreve DataFrame em tabela Snowflake.""" df.write \ .format(SNOWFLAKE_SOURCE) \ .options(**self.options) \ .option("dbtable", table) \ .mode(mode) \ .save() # Exemplo de uso if __name__ == "__main__": with spark_snowflake_session() as (spark, sf): # Ler df = sf.read_table("VENDAS") print(f"Registros: {df.count()}") # Transformar from pyspark.sql.functions import sum as spark_sum df_agg = df.groupBy("categoria").agg(spark_sum("valor").alias("total")) # Escrever sf.write_table(df_agg, "VENDAS_POR_CATEGORIA") ``` ## Uso em Notebooks ```python # Célula 1: Setup from snowflake_utils import spark_snowflake_session # Célula 2: Análise with spark_snowflake_session() as (spark, sf): # Ler dados df = sf.read_query(""" SELECT * FROM VENDAS WHERE data >= '2024-01-01' """) # Processar from pyspark.sql.functions import sum as spark_sum, avg resultado = df.groupBy("regiao", "categoria").agg( spark_sum("valor").alias("total"), avg("valor").alias("media") ) # Visualizar resultado.show() # Salvar resultado sf.write_table(resultado, "ANALISE_REGIONAL") ``` ## Uso em Data Apps (Streamlit) ```python import streamlit as st from snowflake_utils import spark_snowflake_session from pyspark.sql.functions import sum as spark_sum st.title("Dashboard Snowflake + Spark") @st.cache_data(ttl=300) def carregar_dados(): with spark_snowflake_session() as (spark, sf): df = sf.read_query(""" SELECT categoria, regiao, SUM(valor) as total FROM VENDAS WHERE YEAR(data) = 2024 GROUP BY categoria, regiao """) return df.toPandas() dados = carregar_dados() st.dataframe(dados) st.bar_chart(dados.set_index("categoria")["total"]) ``` ## Opções Avançadas ### Pushdown de Queries O conector automaticamente otimiza queries enviando filtros e agregações para o Snowflake: ```python # Configurar pushdown (habilitado por padrão) sfOptions["autopushdown"] = "on" df = spark.read \ .format(SNOWFLAKE_SOURCE_NAME) \ .options(**sfOptions) \ .option("dbtable", "GRANDE_TABELA") \ .load() # Estes filtros serão executados no Snowflake df_filtrado = df \ .filter(df.data >= "2024-01-01") \ .filter(df.status == "ATIVO") \ .select("id", "nome", "valor") ``` ### Autenticação com Key Pair ```python sfOptions = { "sfURL": ".snowflakecomputing.com", "sfUser": "", "pem_private_key": "", "sfDatabase": "", "sfSchema": "", "sfWarehouse": "" } ``` ### Especificar Role ```python sfOptions["sfRole"] = "ANALYTICS_ROLE" ``` ## Troubleshooting ### Erro: Class not found Verifique se os JARs do Snowflake estão no classpath do Spark. ### Erro: Authentication failed Verifique suas credenciais e se o usuário tem acesso ao warehouse. ### Performance lenta * Use `query` com filtros em vez de `dbtable` + filter * Verifique se o warehouse está ativo * Considere aumentar o tamanho do warehouse para queries grandes ## Referências * [Snowflake Connector for Spark](https://docs.snowflake.com/en/user-guide/spark-connector) * [Using the Spark Connector](https://docs.snowflake.com/en/user-guide/spark-connector-use) * [Installing the Spark Connector](https://docs.snowflake.com/en/user-guide/spark-connector-install)