Spark Connect em Notebooks
Overview
Este guia demonstra como utilizar o Spark Connect diretamente nos Notebooks da plataforma Dadosfera para processamento distribuído de dados.
Configuração Inicial
Célula 1: Instalação de Dependências
Execute esta célula uma vez para instalar as dependências:
!pip install pyspark==3.5.0 pandas "pyarrow>4" grpcio grpcio-status protobuf "numpy<2" -qCélula 2: Conexão com Spark Connect
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.remote("sc://spark-connect.spark-jobs.svc.cluster.local:15002") \
.getOrCreate()
print(f"Conectado ao Spark {spark.version}")Exemplos Práticos
Exemplo 1: Operações Básicas com DataFrames
# Criar um DataFrame simples
df = spark.range(1000000)
# Aplicar transformações
resultado = df \
.filter(df.id > 500000) \
.withColumn("dobro", df.id * 2) \
.withColumn("resto", df.id % 100)
# Visualizar amostra
resultado.show(10)
# Contar registros
print(f"Total de registros: {resultado.count()}")Exemplo 2: Agregações
from pyspark.sql.functions import col, sum as spark_sum, avg, count, max as spark_max
# Criar dados de exemplo
dados = [
("Vendas", "SP", 1000),
("Vendas", "RJ", 800),
("Vendas", "SP", 1200),
("Marketing", "SP", 500),
("Marketing", "RJ", 600),
]
df = spark.createDataFrame(dados, ["departamento", "estado", "valor"])
# Agregação por departamento
resultado = df.groupBy("departamento").agg(
spark_sum("valor").alias("total"),
avg("valor").alias("media"),
count("*").alias("quantidade")
)
resultado.show()Exemplo 3: SQL Queries
# Registrar DataFrame como view temporária
df.createOrReplaceTempView("vendas")
# Executar SQL
resultado = spark.sql("""
SELECT
departamento,
estado,
SUM(valor) as total,
COUNT(*) as transacoes
FROM vendas
GROUP BY departamento, estado
ORDER BY total DESC
""")
resultado.show()Exemplo 4: Window Functions
from pyspark.sql.functions import row_number, rank, dense_rank
from pyspark.sql.window import Window
# Dados de exemplo
dados = [
("Alice", "Vendas", 5000),
("Bob", "Vendas", 4500),
("Carol", "Vendas", 5500),
("David", "TI", 6000),
("Eve", "TI", 5800),
]
df = spark.createDataFrame(dados, ["nome", "departamento", "salario"])
# Definir janela
window = Window.partitionBy("departamento").orderBy(col("salario").desc())
# Aplicar funções de janela
resultado = df \
.withColumn("ranking", rank().over(window)) \
.withColumn("posicao", row_number().over(window))
resultado.show()Exemplo 5: Joins
# Criar DataFrames
clientes = spark.createDataFrame([
(1, "Alice", "[email protected]"),
(2, "Bob", "[email protected]"),
(3, "Carol", "[email protected]"),
], ["id", "nome", "email"])
pedidos = spark.createDataFrame([
(101, 1, 150.00),
(102, 1, 200.00),
(103, 2, 300.00),
(104, 3, 100.00),
], ["pedido_id", "cliente_id", "valor"])
# Join
resultado = pedidos \
.join(clientes, pedidos.cliente_id == clientes.id, "inner") \
.select("pedido_id", "nome", "email", "valor")
resultado.show()Convertendo Resultados para Pandas
# Processar com Spark
spark_df = spark.range(1000000) \
.filter(col("id") > 900000)
# Converter resultado para pandas (após reduzir volume)
pandas_df = spark_df.toPandas()
pandas_df.head()Boas Práticas
1. Sempre Feche a Sessão
# No final do notebook
spark.stop()2. Use Cache para Operações Repetidas
df = spark.createDataFrame([("A",1),("B",2)], ["tipo","id"])
df_cached = df.cache()
# Múltiplas operações no mesmo DataFrame
df_cached.filter(col("tipo") == "A").count()
df_cached.filter(col("tipo") == "B").count()
# Liberar cache quando não precisar mais
df_cached.unpersist()Troubleshooting
Erro: Connection Timeout
Se a conexão demorar muito, verifique se o serviço está disponível:
# Teste de conectividade
import socket
try:
socket.create_connection(("spark-connect.spark-jobs.svc.cluster.local", 15002), timeout=5)
print("Conexão OK")
except Exception as e:
print(f"Erro de conexão: {e}")Erro: Module Not Found
Reinstale as dependências:
!pip install --force-reinstall pyspark==3.5.0 grpcio grpcio-status protobufErro: Out of Memory
Reduza o volume de dados ou aumente as partições:
# Reparticionar dados grandes
df_reparticionado = df.repartition(200)Updated 2 days ago
