# Spark Connect em Data Apps ## Overview Este guia demonstra como utilizar o Spark Connect em Data Apps Python (Streamlit, Flask, FastAPI) na plataforma Dadosfera para processamento distribuído de dados. ## Configuração do Projeto ### Dependências (requirements.txt) ```text "pyspark>4.0.0" "numpy<2" pandas>=2.0.0 pyarrow>=14.0.0 grpcio>=1.60.0 grpcio-status>=1.60.0 protobuf>=4.25.0 streamlit>=1.30.0 snowflake-connector-python>=3.6.0 ``` ## Módulo de Conexão Reutilizável ### spark\_utils.py Crie um módulo utilitário para gerenciar conexões: ```python """ Utilitários para conexão com Spark Connect """ from contextlib import contextmanager from pyspark.sql import SparkSession import logging logger = logging.getLogger(__name__) SPARK_CONNECT_URL = "sc://spark-connect.spark-jobs.svc.cluster.local:15002" @contextmanager def get_spark_session(): """ Context manager para sessão Spark Connect. Uso: with get_spark_session() as spark: df = spark.range(100) df.show() """ spark = None try: logger.info("Conectando ao Spark Connect...") spark = SparkSession.builder \ .remote(SPARK_CONNECT_URL) \ .getOrCreate() logger.info(f"Conectado! Spark version: {spark.version}") yield spark except Exception as e: logger.error(f"Erro na conexão Spark: {e}") raise finally: if spark: spark.stop() logger.info("Sessão Spark encerrada") def create_spark_session() -> SparkSession: """ Cria sessão Spark Connect. Lembre-se de chamar spark.stop() quando terminar. """ return SparkSession.builder \ .remote(SPARK_CONNECT_URL) \ .getOrCreate() ``` ## Exemplo: Data App com Streamlit ### app.py ```python import streamlit as st import pandas as pd from spark_utils import get_spark_session from pyspark.sql.functions import col, sum as spark_sum, avg, count st.set_page_config(page_title="Analytics com Spark", layout="wide") st.title("Dashboard de Analytics") st.markdown("Processamento distribuído com Spark Connect") @st.cache_data(ttl=300) def processar_dados_spark(query_params: dict) -> pd.DataFrame: """ Processa dados usando Spark Connect. Resultados são cacheados por 5 minutos. """ with get_spark_session() as spark: # Simular dados (substitua pela sua fonte real) dados = [] for i in range(100000): dados.append(( f"Produto_{i % 100}", f"Categoria_{i % 10}", (i % 1000) * 10.5, f"2024-{(i % 12) + 1:02d}-01" )) df = spark.createDataFrame( dados, ["produto", "categoria", "valor", "data"] ) # Aplicar filtros if query_params.get("categoria"): df = df.filter(col("categoria") == query_params["categoria"]) if query_params.get("valor_minimo"): df = df.filter(col("valor") >= query_params["valor_minimo"]) # Agregar resultado = df.groupBy("categoria").agg( spark_sum("valor").alias("total"), avg("valor").alias("media"), count("*").alias("quantidade") ).orderBy(col("total").desc()) return resultado.toPandas() # Interface do usuário st.sidebar.header("Filtros") categoria = st.sidebar.selectbox( "Categoria", options=["Todas"] + [f"Categoria_{i}" for i in range(10)] ) valor_minimo = st.sidebar.slider( "Valor Mínimo", min_value=0, max_value=10000, value=0 ) # Processar dados params = { "categoria": None if categoria == "Todas" else categoria, "valor_minimo": valor_minimo } if st.button("Processar Dados"): with st.spinner("Processando com Spark..."): df_resultado = processar_dados_spark(params) # Exibir resultados col1, col2, col3 = st.columns(3) with col1: st.metric("Total Geral", f"R$ {df_resultado['total'].sum():,.2f}") with col2: st.metric("Média Geral", f"R$ {df_resultado['media'].mean():,.2f}") with col3: st.metric("Transações", f"{df_resultado['quantidade'].sum():,}") st.subheader("Dados por Categoria") st.dataframe(df_resultado, use_container_width=True) st.bar_chart(df_resultado.set_index("categoria")["total"]) ``` ## Exemplo: API com FastAPI ### main.py ```python from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from typing import Optional, List import pandas as pd from spark_utils import get_spark_session from pyspark.sql.functions import col, sum as spark_sum from pyspark import Row app = FastAPI(title="Spark Processing API") class ProcessingRequest(BaseModel): categoria: Optional[str] = None data_inicio: Optional[str] = None data_fim: Optional[str] = None agrupar_por: List[str] = ["categoria"] class ProcessingResult(BaseModel): status: str registros_processados: int resultado: List[dict] @app.post("/processar", response_model=ProcessingResult) async def processar_dados(request: ProcessingRequest): """ Endpoint para processamento de dados com Spark Connect. """ try: with get_spark_session() as spark: # Carregar dados (substitua pela sua fonte) # Criar DataFrame de exemplo data = [ Row(categoria="eletrônicos", data="2024-01-15", valor=1500.00), Row(categoria="roupas", data="2024-01-20", valor=250.00), Row(categoria="eletrônicos", data="2024-02-10", valor=3200.00), Row(categoria="alimentos", data="2024-02-15", valor=89.90), Row(categoria="roupas", data="2024-03-01", valor=450.00), ] df = spark.createDataFrame(data) # Aplicar filtros if request.categoria: df = df.filter(col("categoria") == request.categoria) if request.data_inicio: df = df.filter(col("data") >= request.data_inicio) if request.data_fim: df = df.filter(col("data") <= request.data_fim) # Contar registros antes da agregação total_registros = df.count() # Agregar resultado = df.groupBy(*request.agrupar_por).agg( spark_sum("valor").alias("total") ) # Converter para lista de dicts resultado_list = resultado.toPandas().to_dict(orient="records") return ProcessingResult( status="success", registros_processados=total_registros, resultado=resultado_list ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health_check(): """Verifica se o serviço está funcionando.""" return {"status": "healthy"} @app.get("/spark/status") async def spark_status(): """Verifica conexão com Spark Connect.""" try: with get_spark_session() as spark: version = spark.version return { "status": "connected", "spark_version": version } except Exception as e: return { "status": "error", "error": str(e) } ``` ## Boas Práticas para Data Apps ### 1. Use Caching ```python @st.cache_data(ttl=300) # Cache por 5 minutos def processar_com_spark(params): with get_spark_session() as spark: # Processamento... return resultado.toPandas() ``` ### 2. Tratamento de Erros ```python try: with get_spark_session() as spark: resultado = processar(spark) except Exception as e: st.error(f"Erro no processamento: {e}") logger.exception("Erro Spark") ``` ### 3. Feedback ao Usuário ```python with st.spinner("Processando dados..."): progress = st.progress(0) # ... processamento progress.progress(100) ``` ### 4. Limite Resultados para UI ```python # Sempre limite dados retornados para a interface resultado = spark_df.limit(10000).toPandas() ```