Spark Connect em Data Apps
Overview
Este guia demonstra como utilizar o Spark Connect em Data Apps Python (Streamlit, Flask, FastAPI) na plataforma Dadosfera para processamento distribuído de dados.
Configuração do Projeto
Dependências (requirements.txt)
"pyspark>4.0.0"
"numpy<2"
pandas>=2.0.0
pyarrow>=14.0.0
grpcio>=1.60.0
grpcio-status>=1.60.0
protobuf>=4.25.0
streamlit>=1.30.0
snowflake-connector-python>=3.6.0Módulo de Conexão Reutilizável
spark_utils.py
Crie um módulo utilitário para gerenciar conexões:
"""
Utilitários para conexão com Spark Connect
"""
from contextlib import contextmanager
from pyspark.sql import SparkSession
import logging
logger = logging.getLogger(__name__)
SPARK_CONNECT_URL = "sc://spark-connect.spark-jobs.svc.cluster.local:15002"
@contextmanager
def get_spark_session():
"""
Context manager para sessão Spark Connect.
Uso:
with get_spark_session() as spark:
df = spark.range(100)
df.show()
"""
spark = None
try:
logger.info("Conectando ao Spark Connect...")
spark = SparkSession.builder \
.remote(SPARK_CONNECT_URL) \
.getOrCreate()
logger.info(f"Conectado! Spark version: {spark.version}")
yield spark
except Exception as e:
logger.error(f"Erro na conexão Spark: {e}")
raise
finally:
if spark:
spark.stop()
logger.info("Sessão Spark encerrada")
def create_spark_session() -> SparkSession:
"""
Cria sessão Spark Connect.
Lembre-se de chamar spark.stop() quando terminar.
"""
return SparkSession.builder \
.remote(SPARK_CONNECT_URL) \
.getOrCreate()Exemplo: Data App com Streamlit
app.py
import streamlit as st
import pandas as pd
from spark_utils import get_spark_session
from pyspark.sql.functions import col, sum as spark_sum, avg, count
st.set_page_config(page_title="Analytics com Spark", layout="wide")
st.title("Dashboard de Analytics")
st.markdown("Processamento distribuído com Spark Connect")
@st.cache_data(ttl=300)
def processar_dados_spark(query_params: dict) -> pd.DataFrame:
"""
Processa dados usando Spark Connect.
Resultados são cacheados por 5 minutos.
"""
with get_spark_session() as spark:
# Simular dados (substitua pela sua fonte real)
dados = []
for i in range(100000):
dados.append((
f"Produto_{i % 100}",
f"Categoria_{i % 10}",
(i % 1000) * 10.5,
f"2024-{(i % 12) + 1:02d}-01"
))
df = spark.createDataFrame(
dados,
["produto", "categoria", "valor", "data"]
)
# Aplicar filtros
if query_params.get("categoria"):
df = df.filter(col("categoria") == query_params["categoria"])
if query_params.get("valor_minimo"):
df = df.filter(col("valor") >= query_params["valor_minimo"])
# Agregar
resultado = df.groupBy("categoria").agg(
spark_sum("valor").alias("total"),
avg("valor").alias("media"),
count("*").alias("quantidade")
).orderBy(col("total").desc())
return resultado.toPandas()
# Interface do usuário
st.sidebar.header("Filtros")
categoria = st.sidebar.selectbox(
"Categoria",
options=["Todas"] + [f"Categoria_{i}" for i in range(10)]
)
valor_minimo = st.sidebar.slider(
"Valor Mínimo",
min_value=0,
max_value=10000,
value=0
)
# Processar dados
params = {
"categoria": None if categoria == "Todas" else categoria,
"valor_minimo": valor_minimo
}
if st.button("Processar Dados"):
with st.spinner("Processando com Spark..."):
df_resultado = processar_dados_spark(params)
# Exibir resultados
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Geral", f"R$ {df_resultado['total'].sum():,.2f}")
with col2:
st.metric("Média Geral", f"R$ {df_resultado['media'].mean():,.2f}")
with col3:
st.metric("Transações", f"{df_resultado['quantidade'].sum():,}")
st.subheader("Dados por Categoria")
st.dataframe(df_resultado, use_container_width=True)
st.bar_chart(df_resultado.set_index("categoria")["total"])Exemplo: API com FastAPI
main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import pandas as pd
from spark_utils import get_spark_session
from pyspark.sql.functions import col, sum as spark_sum
from pyspark import Row
app = FastAPI(title="Spark Processing API")
class ProcessingRequest(BaseModel):
categoria: Optional[str] = None
data_inicio: Optional[str] = None
data_fim: Optional[str] = None
agrupar_por: List[str] = ["categoria"]
class ProcessingResult(BaseModel):
status: str
registros_processados: int
resultado: List[dict]
@app.post("/processar", response_model=ProcessingResult)
async def processar_dados(request: ProcessingRequest):
"""
Endpoint para processamento de dados com Spark Connect.
"""
try:
with get_spark_session() as spark:
# Carregar dados (substitua pela sua fonte)
# Criar DataFrame de exemplo
data = [
Row(categoria="eletrônicos", data="2024-01-15", valor=1500.00),
Row(categoria="roupas", data="2024-01-20", valor=250.00),
Row(categoria="eletrônicos", data="2024-02-10", valor=3200.00),
Row(categoria="alimentos", data="2024-02-15", valor=89.90),
Row(categoria="roupas", data="2024-03-01", valor=450.00),
]
df = spark.createDataFrame(data)
# Aplicar filtros
if request.categoria:
df = df.filter(col("categoria") == request.categoria)
if request.data_inicio:
df = df.filter(col("data") >= request.data_inicio)
if request.data_fim:
df = df.filter(col("data") <= request.data_fim)
# Contar registros antes da agregação
total_registros = df.count()
# Agregar
resultado = df.groupBy(*request.agrupar_por).agg(
spark_sum("valor").alias("total")
)
# Converter para lista de dicts
resultado_list = resultado.toPandas().to_dict(orient="records")
return ProcessingResult(
status="success",
registros_processados=total_registros,
resultado=resultado_list
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Verifica se o serviço está funcionando."""
return {"status": "healthy"}
@app.get("/spark/status")
async def spark_status():
"""Verifica conexão com Spark Connect."""
try:
with get_spark_session() as spark:
version = spark.version
return {
"status": "connected",
"spark_version": version
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}Boas Práticas para Data Apps
1. Use Caching
@st.cache_data(ttl=300) # Cache por 5 minutos
def processar_com_spark(params):
with get_spark_session() as spark:
# Processamento...
return resultado.toPandas()2. Tratamento de Erros
try:
with get_spark_session() as spark:
resultado = processar(spark)
except Exception as e:
st.error(f"Erro no processamento: {e}")
logger.exception("Erro Spark")3. Feedback ao Usuário
with st.spinner("Processando dados..."):
progress = st.progress(0)
# ... processamento
progress.progress(100)4. Limite Resultados para UI
# Sempre limite dados retornados para a interface
resultado = spark_df.limit(10000).toPandas()Updated 2 days ago
