Spark Connect em Data Apps

Overview

Este guia demonstra como utilizar o Spark Connect em Data Apps Python (Streamlit, Flask, FastAPI) na plataforma Dadosfera para processamento distribuído de dados.

Configuração do Projeto

Dependências (requirements.txt)

"pyspark>4.0.0"
"numpy<2"
pandas>=2.0.0
pyarrow>=14.0.0
grpcio>=1.60.0
grpcio-status>=1.60.0
protobuf>=4.25.0
streamlit>=1.30.0
snowflake-connector-python>=3.6.0

Módulo de Conexão Reutilizável

spark_utils.py

Crie um módulo utilitário para gerenciar conexões:

"""
Utilitários para conexão com Spark Connect
"""
from contextlib import contextmanager
from pyspark.sql import SparkSession
import logging

logger = logging.getLogger(__name__)

SPARK_CONNECT_URL = "sc://spark-connect.spark-jobs.svc.cluster.local:15002"


@contextmanager
def get_spark_session():
    """
    Context manager para sessão Spark Connect.

    Uso:
        with get_spark_session() as spark:
            df = spark.range(100)
            df.show()
    """
    spark = None
    try:
        logger.info("Conectando ao Spark Connect...")
        spark = SparkSession.builder \
            .remote(SPARK_CONNECT_URL) \
            .getOrCreate()
        logger.info(f"Conectado! Spark version: {spark.version}")
        yield spark
    except Exception as e:
        logger.error(f"Erro na conexão Spark: {e}")
        raise
    finally:
        if spark:
            spark.stop()
            logger.info("Sessão Spark encerrada")


def create_spark_session() -> SparkSession:
    """
    Cria sessão Spark Connect.
    Lembre-se de chamar spark.stop() quando terminar.
    """
    return SparkSession.builder \
        .remote(SPARK_CONNECT_URL) \
        .getOrCreate()

Exemplo: Data App com Streamlit

app.py

import streamlit as st
import pandas as pd
from spark_utils import get_spark_session
from pyspark.sql.functions import col, sum as spark_sum, avg, count

st.set_page_config(page_title="Analytics com Spark", layout="wide")

st.title("Dashboard de Analytics")
st.markdown("Processamento distribuído com Spark Connect")


@st.cache_data(ttl=300)
def processar_dados_spark(query_params: dict) -> pd.DataFrame:
    """
    Processa dados usando Spark Connect.
    Resultados são cacheados por 5 minutos.
    """
    with get_spark_session() as spark:
        # Simular dados (substitua pela sua fonte real)
        dados = []
        for i in range(100000):
            dados.append((
                f"Produto_{i % 100}",
                f"Categoria_{i % 10}",
                (i % 1000) * 10.5,
                f"2024-{(i % 12) + 1:02d}-01"
            ))

        df = spark.createDataFrame(
            dados,
            ["produto", "categoria", "valor", "data"]
        )

        # Aplicar filtros
        if query_params.get("categoria"):
            df = df.filter(col("categoria") == query_params["categoria"])

        if query_params.get("valor_minimo"):
            df = df.filter(col("valor") >= query_params["valor_minimo"])

        # Agregar
        resultado = df.groupBy("categoria").agg(
            spark_sum("valor").alias("total"),
            avg("valor").alias("media"),
            count("*").alias("quantidade")
        ).orderBy(col("total").desc())

        return resultado.toPandas()


# Interface do usuário
st.sidebar.header("Filtros")

categoria = st.sidebar.selectbox(
    "Categoria",
    options=["Todas"] + [f"Categoria_{i}" for i in range(10)]
)

valor_minimo = st.sidebar.slider(
    "Valor Mínimo",
    min_value=0,
    max_value=10000,
    value=0
)

# Processar dados
params = {
    "categoria": None if categoria == "Todas" else categoria,
    "valor_minimo": valor_minimo
}

if st.button("Processar Dados"):
    with st.spinner("Processando com Spark..."):
        df_resultado = processar_dados_spark(params)

        # Exibir resultados
        col1, col2, col3 = st.columns(3)

        with col1:
            st.metric("Total Geral", f"R$ {df_resultado['total'].sum():,.2f}")

        with col2:
            st.metric("Média Geral", f"R$ {df_resultado['media'].mean():,.2f}")

        with col3:
            st.metric("Transações", f"{df_resultado['quantidade'].sum():,}")

        st.subheader("Dados por Categoria")
        st.dataframe(df_resultado, use_container_width=True)

        st.bar_chart(df_resultado.set_index("categoria")["total"])

Exemplo: API com FastAPI

main.py

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import pandas as pd
from spark_utils import get_spark_session
from pyspark.sql.functions import col, sum as spark_sum
from pyspark import Row

app = FastAPI(title="Spark Processing API")


class ProcessingRequest(BaseModel):
    categoria: Optional[str] = None
    data_inicio: Optional[str] = None
    data_fim: Optional[str] = None
    agrupar_por: List[str] = ["categoria"]


class ProcessingResult(BaseModel):
    status: str
    registros_processados: int
    resultado: List[dict]


@app.post("/processar", response_model=ProcessingResult)
async def processar_dados(request: ProcessingRequest):
    """
    Endpoint para processamento de dados com Spark Connect.
    """
    try:
        with get_spark_session() as spark:
            # Carregar dados (substitua pela sua fonte)
	    # Criar DataFrame de exemplo
            data = [
                Row(categoria="eletrônicos", data="2024-01-15", valor=1500.00),
                Row(categoria="roupas", data="2024-01-20", valor=250.00),
                Row(categoria="eletrônicos", data="2024-02-10", valor=3200.00),
                Row(categoria="alimentos", data="2024-02-15", valor=89.90),
                Row(categoria="roupas", data="2024-03-01", valor=450.00),
            ]
            df = spark.createDataFrame(data)
            # Aplicar filtros
            if request.categoria:
                df = df.filter(col("categoria") == request.categoria)

            if request.data_inicio:
                df = df.filter(col("data") >= request.data_inicio)

            if request.data_fim:
                df = df.filter(col("data") <= request.data_fim)

            # Contar registros antes da agregação
            total_registros = df.count()

            # Agregar
            resultado = df.groupBy(*request.agrupar_por).agg(
                spark_sum("valor").alias("total")
            )

            # Converter para lista de dicts
            resultado_list = resultado.toPandas().to_dict(orient="records")

            return ProcessingResult(
                status="success",
                registros_processados=total_registros,
                resultado=resultado_list
            )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/health")
async def health_check():
    """Verifica se o serviço está funcionando."""
    return {"status": "healthy"}


@app.get("/spark/status")
async def spark_status():
    """Verifica conexão com Spark Connect."""
    try:
        with get_spark_session() as spark:
            version = spark.version
            return {
                "status": "connected",
                "spark_version": version
            }
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

Boas Práticas para Data Apps

1. Use Caching

@st.cache_data(ttl=300)  # Cache por 5 minutos
def processar_com_spark(params):
    with get_spark_session() as spark:
        # Processamento...
        return resultado.toPandas()

2. Tratamento de Erros

try:
    with get_spark_session() as spark:
        resultado = processar(spark)
except Exception as e:
    st.error(f"Erro no processamento: {e}")
    logger.exception("Erro Spark")

3. Feedback ao Usuário

with st.spinner("Processando dados..."):
    progress = st.progress(0)
    # ... processamento
    progress.progress(100)

4. Limite Resultados para UI

# Sempre limite dados retornados para a interface
resultado = spark_df.limit(10000).toPandas()