# Spark Connect em Notebooks ## Overview Este guia demonstra como utilizar o Spark Connect diretamente nos Notebooks da plataforma Dadosfera para processamento distribuído de dados. ## Configuração Inicial ### Célula 1: Instalação de Dependências Execute esta célula uma vez para instalar as dependências: ```python !pip install pyspark==3.5.0 pandas "pyarrow>4" grpcio grpcio-status protobuf "numpy<2" -q ``` ### Célula 2: Conexão com Spark Connect ```python from pyspark.sql import SparkSession spark = SparkSession.builder \ .remote("sc://spark-connect.spark-jobs.svc.cluster.local:15002") \ .getOrCreate() print(f"Conectado ao Spark {spark.version}") ``` ## Exemplos Práticos ### Exemplo 1: Operações Básicas com DataFrames ```python # Criar um DataFrame simples df = spark.range(1000000) # Aplicar transformações resultado = df \ .filter(df.id > 500000) \ .withColumn("dobro", df.id * 2) \ .withColumn("resto", df.id % 100) # Visualizar amostra resultado.show(10) # Contar registros print(f"Total de registros: {resultado.count()}") ``` ### Exemplo 2: Agregações ```python from pyspark.sql.functions import col, sum as spark_sum, avg, count, max as spark_max # Criar dados de exemplo dados = [ ("Vendas", "SP", 1000), ("Vendas", "RJ", 800), ("Vendas", "SP", 1200), ("Marketing", "SP", 500), ("Marketing", "RJ", 600), ] df = spark.createDataFrame(dados, ["departamento", "estado", "valor"]) # Agregação por departamento resultado = df.groupBy("departamento").agg( spark_sum("valor").alias("total"), avg("valor").alias("media"), count("*").alias("quantidade") ) resultado.show() ``` ### Exemplo 3: SQL Queries ```python # Registrar DataFrame como view temporária df.createOrReplaceTempView("vendas") # Executar SQL resultado = spark.sql(""" SELECT departamento, estado, SUM(valor) as total, COUNT(*) as transacoes FROM vendas GROUP BY departamento, estado ORDER BY total DESC """) resultado.show() ``` ### Exemplo 4: Window Functions ```python from pyspark.sql.functions import row_number, rank, dense_rank from pyspark.sql.window import Window # Dados de exemplo dados = [ ("Alice", "Vendas", 5000), ("Bob", "Vendas", 4500), ("Carol", "Vendas", 5500), ("David", "TI", 6000), ("Eve", "TI", 5800), ] df = spark.createDataFrame(dados, ["nome", "departamento", "salario"]) # Definir janela window = Window.partitionBy("departamento").orderBy(col("salario").desc()) # Aplicar funções de janela resultado = df \ .withColumn("ranking", rank().over(window)) \ .withColumn("posicao", row_number().over(window)) resultado.show() ``` ### Exemplo 5: Joins ```python # Criar DataFrames clientes = spark.createDataFrame([ (1, "Alice", "alice@email.com"), (2, "Bob", "bob@email.com"), (3, "Carol", "carol@email.com"), ], ["id", "nome", "email"]) pedidos = spark.createDataFrame([ (101, 1, 150.00), (102, 1, 200.00), (103, 2, 300.00), (104, 3, 100.00), ], ["pedido_id", "cliente_id", "valor"]) # Join resultado = pedidos \ .join(clientes, pedidos.cliente_id == clientes.id, "inner") \ .select("pedido_id", "nome", "email", "valor") resultado.show() ``` ### Convertendo Resultados para Pandas ```python # Processar com Spark spark_df = spark.range(1000000) \ .filter(col("id") > 900000) # Converter resultado para pandas (após reduzir volume) pandas_df = spark_df.toPandas() pandas_df.head() ``` ## Boas Práticas ### 1. Sempre Feche a Sessão ```python # No final do notebook spark.stop() ``` ### 2. Use Cache para Operações Repetidas ```python df = spark.createDataFrame([("A",1),("B",2)], ["tipo","id"]) df_cached = df.cache() # Múltiplas operações no mesmo DataFrame df_cached.filter(col("tipo") == "A").count() df_cached.filter(col("tipo") == "B").count() # Liberar cache quando não precisar mais df_cached.unpersist() ``` ## Troubleshooting ### Erro: Connection Timeout Se a conexão demorar muito, verifique se o serviço está disponível: ```python # Teste de conectividade import socket try: socket.create_connection(("spark-connect.spark-jobs.svc.cluster.local", 15002), timeout=5) print("Conexão OK") except Exception as e: print(f"Erro de conexão: {e}") ``` ### Erro: Module Not Found Reinstale as dependências: ```python !pip install --force-reinstall pyspark==3.5.0 grpcio grpcio-status protobuf ``` ### Erro: Out of Memory Reduza o volume de dados ou aumente as partições: ```python # Reparticionar dados grandes df_reparticionado = df.repartition(200) ```