Spark Connect em Notebooks

Overview

Este guia demonstra como utilizar o Spark Connect diretamente nos Notebooks da plataforma Dadosfera para processamento distribuído de dados.

Configuração Inicial

Célula 1: Instalação de Dependências

Execute esta célula uma vez para instalar as dependências:

!pip install pyspark==3.5.0 pandas "pyarrow>4" grpcio grpcio-status protobuf "numpy<2" -q

Célula 2: Conexão com Spark Connect

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .remote("sc://spark-connect.spark-jobs.svc.cluster.local:15002") \
    .getOrCreate()

print(f"Conectado ao Spark {spark.version}")

Exemplos Práticos

Exemplo 1: Operações Básicas com DataFrames

# Criar um DataFrame simples
df = spark.range(1000000)

# Aplicar transformações
resultado = df \
    .filter(df.id > 500000) \
    .withColumn("dobro", df.id * 2) \
    .withColumn("resto", df.id % 100)

# Visualizar amostra
resultado.show(10)

# Contar registros
print(f"Total de registros: {resultado.count()}")

Exemplo 2: Agregações

from pyspark.sql.functions import col, sum as spark_sum, avg, count, max as spark_max

# Criar dados de exemplo
dados = [
    ("Vendas", "SP", 1000),
    ("Vendas", "RJ", 800),
    ("Vendas", "SP", 1200),
    ("Marketing", "SP", 500),
    ("Marketing", "RJ", 600),
]
df = spark.createDataFrame(dados, ["departamento", "estado", "valor"])

# Agregação por departamento
resultado = df.groupBy("departamento").agg(
    spark_sum("valor").alias("total"),
    avg("valor").alias("media"),
    count("*").alias("quantidade")
)

resultado.show()

Exemplo 3: SQL Queries

# Registrar DataFrame como view temporária
df.createOrReplaceTempView("vendas")

# Executar SQL
resultado = spark.sql("""
    SELECT
        departamento,
        estado,
        SUM(valor) as total,
        COUNT(*) as transacoes
    FROM vendas
    GROUP BY departamento, estado
    ORDER BY total DESC
""")

resultado.show()

Exemplo 4: Window Functions

from pyspark.sql.functions import row_number, rank, dense_rank
from pyspark.sql.window import Window

# Dados de exemplo
dados = [
    ("Alice", "Vendas", 5000),
    ("Bob", "Vendas", 4500),
    ("Carol", "Vendas", 5500),
    ("David", "TI", 6000),
    ("Eve", "TI", 5800),
]
df = spark.createDataFrame(dados, ["nome", "departamento", "salario"])

# Definir janela
window = Window.partitionBy("departamento").orderBy(col("salario").desc())

# Aplicar funções de janela
resultado = df \
    .withColumn("ranking", rank().over(window)) \
    .withColumn("posicao", row_number().over(window))

resultado.show()

Exemplo 5: Joins

# Criar DataFrames
clientes = spark.createDataFrame([
    (1, "Alice", "[email protected]"),
    (2, "Bob", "[email protected]"),
    (3, "Carol", "[email protected]"),
], ["id", "nome", "email"])

pedidos = spark.createDataFrame([
    (101, 1, 150.00),
    (102, 1, 200.00),
    (103, 2, 300.00),
    (104, 3, 100.00),
], ["pedido_id", "cliente_id", "valor"])

# Join
resultado = pedidos \
    .join(clientes, pedidos.cliente_id == clientes.id, "inner") \
    .select("pedido_id", "nome", "email", "valor")

resultado.show()

Convertendo Resultados para Pandas

# Processar com Spark
spark_df = spark.range(1000000) \
    .filter(col("id") > 900000)

# Converter resultado para pandas (após reduzir volume)
pandas_df = spark_df.toPandas()
pandas_df.head()

Boas Práticas

1. Sempre Feche a Sessão

# No final do notebook
spark.stop()

2. Use Cache para Operações Repetidas

df = spark.createDataFrame([("A",1),("B",2)], ["tipo","id"])
df_cached = df.cache()

# Múltiplas operações no mesmo DataFrame
df_cached.filter(col("tipo") == "A").count()
df_cached.filter(col("tipo") == "B").count()

# Liberar cache quando não precisar mais
df_cached.unpersist()

Troubleshooting

Erro: Connection Timeout

Se a conexão demorar muito, verifique se o serviço está disponível:

# Teste de conectividade
import socket
try:
    socket.create_connection(("spark-connect.spark-jobs.svc.cluster.local", 15002), timeout=5)
    print("Conexão OK")
except Exception as e:
    print(f"Erro de conexão: {e}")

Erro: Module Not Found

Reinstale as dependências:

!pip install --force-reinstall pyspark==3.5.0 grpcio grpcio-status protobuf

Erro: Out of Memory

Reduza o volume de dados ou aumente as partições:

# Reparticionar dados grandes
df_reparticionado = df.repartition(200)