Graph Neural Networks and Distributed Data Pipelines

Posted by Anonymous and classified in Computers

Written on in English with a size of 25.85 KB

Graph Neural Networks (GNN)

GNN Challenges and Representations

Working with graphs introduces unique computational challenges:

  • Graph size is dynamic.
  • Each node can have a variable number of edges.
  • Standard methods used for images and texts are not suitable for graphs.
  • Adjacency matrix representation can be very inefficient.
  • There can be multiple adjacency matrices to represent the same graph.
  • Standard convolution applied to images does not work (though adaptations have been tried).

Potential tasks: Link prediction, node classification, community detection, and ranking.

Representation: Adjacency matrix, edge list, and adjacency list.

Spectral GNN Architectures

Como funciona: Utiliza a Transformada Discreta de Fourier (DFT) sobre a matriz de adjacência do grafo. A convolução é feita no domínio espectral.

Objetivo: Extrair padrões estruturais e frequências do grafo.

Exemplos: GCN, Chebyshev GCN, e GAT.

Principais conceitos:

  • Graph Spectra: Frequências próprias do grafo.
  • Graph Embeddings: Representações vetoriais extraídas da espectralização.
  • Graph Filtering: Operações de filtragem em diferentes frequências.
  • Graph Compression: Possível redução de dimensão de dados do grafo.

Vantagens: Captura relações globais e padrões estruturais.

Desvantagens:

  • Não permite atualização da estrutura do grafo durante o treino.
  • Computacionalmente mais intensivo que métodos espaciais.

Spectral CNN (SCNN)

  • Usa a frequência (domínio espectral) do grafo.
  • Baseia-se em eigenvalues/eigenvectors da matriz Laplaciana.
  • Mais teórico e pesado computacionalmente.

Graph Convolutional Networks (GCN)

  • Versão simplificada e prática das CNNs em grafos.
  • Trabalha diretamente no grafo (vizinhança dos nós).
  • Usa agregação de vizinhos (tipo “média” ou soma).
  • Muito mais eficiente e popular.

Spatial GNN Architectures

Como funciona: Aplica convolução diretamente sobre nós e vizinhos do grafo. Baseia-se em Message Passing Neural Networks (MPNN).

Processo geral:

  1. Initialize: Inicializa estados de nós e arestas.
  2. Message Pass: Propaga informação entre nós vizinhos.
  3. Node Update: Atualiza estados de nós com base nas mensagens recebidas.
  4. Readout: Combina informações de todos os nós para representação global.

Readout Functions: SUM, MEAN, POOLING, ou Graph-level MLP.

Exemplos: GraphSAGE, Graph Isomorphism Network (GIN), e GAT (Graph Attention Network) — que usa atenção para ponderar vizinhos, atualizando nós de forma adaptativa e baseada no contexto.

Vantagens:

  • Permite atualização da estrutura do grafo durante o treino.
  • Eficiência computacional melhor que métodos espectrais.

MPNN vs. GAT

  • MPNN: Agregação fixa de mensagens de vizinhos.
  • GAT: Usa mecanismo de atenção para adaptar o peso de cada vizinho, tornando-se mais flexível e informativo.

Sampling and Scalability

Problema que resolve: Escalabilidade em grafos muito grandes; evita processar todos os nós.

Como funciona: Seleciona uma subamostra de vizinhos/nós para cada atualização, mantendo a representatividade do grafo.

Vantagens:

  • Reduz custo computacional e memória.
  • Mantém boa qualidade das representações para grafos grandes.

Exemplos:

  • GraphSAGE: Estende vizinhança k para cada camada, aprendendo features de k nós de distância.
  • DeepWalk: Gera representações de nós não supervisionadas, inspirado no Word2Vec.

Processo DeepWalk:

  1. Random Walks: Seleciona nodes aleatoriamente e caminha aleatoriamente pelos vizinhos, repetindo até atingir o comprimento da sequência.
  2. Skip-gram Model: Modelo usado para aprender palavras. Objetivo: prever o contexto a partir de uma palavra. Aprende embeddings de nós baseado nas sequências geradas.

Word2Vec: Modelo para criar representações vetoriais de palavras. Transforma palavras em números (vetores) com Skip-gram (contexto) ou CBOW (palavra central).

Parallelization Opportunities

Opportunities for parallelization include:

  • Reading and writing data
  • Statistical operations and data transformation
  • Data aggregation and feature construction
  • SQL-like operations (depending on the engine)
  • Cross-validation and hyperparameter tuning
  • Batch training
  • Representation for the coverage lists
  • Parallel search and parallel coverage
  • Tree compression

Resolução do Teste

1. Apache Beam

  • (a) Windowing: Divide o fluxo contínuo de dados em janelas temporais (fixas, deslizantes ou baseadas em sessões) para permitir agregações e análises parciais.
  • (b) Watermark: Indica o tempo de evento processado até agora, permitindo que o sistema saiba quando uma janela pode ser finalizada mesmo com atrasos nos dados.
  • (c) PTransform: Representa qualquer transformação aplicada a uma coleção de dados (PCollection), podendo ser simples (map, filter) ou compostas (paralelas ou encadeadas).

2. Execução após falha do Bundle A

Se o Bundle A falhar, apenas esse bundle é reprocessado pelo worker apropriado; outros bundles e workers continuam executando normalmente, garantindo resiliência no pipeline.

3. Código BeamX

  • (a) Input: Sequência de palavras ou strings vindas de arquivos ou coleções de dados.
  • (b) Output: Três coleções separadas: below_cutoff_strings (palavras curtas), above_cutoff_lengths (comprimentos das palavras maiores) e marked_strings (palavras que começam com o marcador).
  • (c) Passos da pipeline: Cada palavra é processada: verifica-se se está abaixo do cutoff; gera-se a saída principal ou tagueada; palavras com marcador específico recebem saída adicional.

4. Dask – Tipos de Dados

  • (a) Bags: Para coleções desestruturadas ou semi-estruturadas, como listas de JSONs ou logs.
  • (b) Arrays: Para grandes matrizes numéricas, similares ao NumPy, com paralelismo automático.
  • (c) Dataframes: Para dados tabulares estruturados, compatíveis com pandas, permitindo consultas e agregações distribuídas.

5. Valor de L

L é uma lista de objetos Delayed, cada um representando o processamento atrasado (delayed(process)(data)) de cada arquivo. Ao passar para summarize, o pipeline sabe que deve esperar todos os resultados.

6. Quando não usar Dask Dataframes

Não é adequado para dados heterogêneos (colunas com tipos mistos), grandes strings, ou operações complexas que não são suportadas pelo pandas, pois pode gerar erros ou baixo desempenho.

7. Modin vs. Joblib

  • Modin: Paraleliza operações de DataFrame pandas, substituindo pandas sem alterar o código. Ideal para grandes tabelas.
  • Joblib: Paraleliza a execução de funções Python, útil para loops, pipelines de processamento ou treinamento de modelos.
  • Uso conjunto: Modin para manipulação de dados e Joblib para paralelizar o processamento/treinamento subsequente.

8. PySpark vs. Apache Beam

  • Vantagem PySpark: Otimizado para batch, alto desempenho em grandes volumes de dados.
  • Desvantagem: Menos flexível para streaming e manipulação contínua de dados comparado com o Beam.

9. Representação em Graph Neural Networks (GNNs)

  • (a) Representação dos dados: Spectral — Representação via Laplaciano do grafo, global. Spatial — Baseada em vizinhos diretos, local. Sampling — Amostragem de vizinhos para reduzir custo computacional.
  • (b) Significado dos dados: Spectral captura relações globais, Spatial foca na vizinhança imediata, e Sampling reduz redundância e memória.
  • (c) Impacto: Spectral exige mais parâmetros e tempo de treino; Spatial é moderado; Sampling reduz memória e acelera o treinamento.

10. MPNN vs. GAT

  • MPNN (Message Passing Neural Network): Propaga mensagens uniformemente entre vizinhos, sem diferenciação de importância.
  • GAT (Graph Attention Network): Usa mecanismo de atenção para ponderar vizinhos, permitindo que nós mais relevantes tenham maior influência no cálculo de embeddings.

11. PyCUDA – Alteração de block=(1,R,C)

Alterar para (1,R,C) cria um bloco de threads com apenas 1 thread na dimensão x; isso reduz o paralelismo, aumentando o tempo de execução e podendo tornar a memória compartilhada menos eficiente.

Apache Beam Data Pipelines

PCollections

Represent data being processed in the pipeline.

  • Bounded: Finite datasets, typical in batch use cases.
  • Unbounded: Infinite datasets, typical in streaming use cases.

PTransforms

Represent operations executed on PCollections.

  • Read: Parallel connectors to external systems.
  • ParDo: Per-element processing.
  • GroupByKey: Aggregating elements by key.
  • Flatten: Union of multiple PCollections.
  • Window: Sets the windowing strategy for a PCollection.

Timestamps and Watermarks

  • Timestamps: Every element in a PCollection has an associated timestamp. If elements denote events, timestamps are critical. If timestamps are not important, they are set to “negative infinity”.
  • Watermarks: Estimates how complete a PCollection is. The contents of a PCollection are complete when a watermark advances to “infinity”, indicating that an unbounded PCollection has ended.

Windows and Coders

  • Windows: Define the size (number of elements) processed in the pipeline at once. When elements are read from external sources, they arrive in the global window. When written externally, they are placed back into the global window; any writing transform that does not obey this risks data loss. A window has a maximum timestamp, and data from expired windows may be discarded.
  • Coder: Specifies the binary format of the elements of a PCollection (e.g., raw bytes or specific encoding systems like graphical accents).

User-Defined Functions (UDF)

A Beam pipeline may contain UDFs executed by the runner:

  • DoFn: Per-element processing function (used in ParDo).
  • WindowFn: Places elements in windows and merges windows (used in Window and GroupByKey).
  • ViewFn: Adapts a PCollection to a particular interface.
  • WindowMappingFn: Maps one element's window to another and specifies bounds on how far in the past the result window will be.
  • CombineFn: Associative and commutative aggregation (used in Combine and state).
  • Coder: Encodes user data.

Runner and Performance Limiters

The Runner is the software that executes a Beam pipeline, often using customized operators for your data processing engine. A runner has a single asynchronous method: run(pipeline), which returns a PipelineResult job descriptor to check status, cancel, or wait for termination.

Performance Limiters:

  • Serialization: Translating a data structure to be stored or transmitted is highly expensive.
  • Avoiding serialization may require re-processing elements after failures or may limit the distribution of output to other machines.

Pipeline Design Considerations

  • Where is your input data stored? Defines the type of Read transform to use.
  • What does your data look like? Defines which transform to apply for efficient handling.
  • What do you want to do with your data? Defines the transformations and functions to apply.
  • What does your output data look like and where should it go? Defines the Write transform to use.

Modifying a Pipeline for Streaming

To switch to stream processing, you must:

  • Use an I/O connector that supports reading from an unbounded source (note: ReadFromText does not support unbounded sources).
  • Use an I/O connector that supports writing to an unbounded source.
  • Choose an appropriate windowing strategy.

Running a Streaming Pipeline:

  1. Create input and output topics (channels) in Google Cloud Pub/Sub.
  2. Authenticate to GCP.
  3. Create a subscription: gcloud pubsub subscriptions create my-sub --topic=my-topics
  4. Send a message: gcloud pubsub topics publish my-topic --message="hello"
  5. Receive the message: gcloud pubsub subscriptions pull my-sub --auto-ack

Deep Learning Frameworks Comparison

Popular frameworks are used in research, production, and deployment on GPU/TPU. All support GPUs and Tensor Cores, though community, documentation, and tooling differ.

  • TensorFlow: Developed by Google Brain. Offers both high-level and low-level APIs. Ideal for production-grade deployment. Good GPU and TPU support. Uses a static computation graph (eager mode also available). Best for: Production environments.
  • Keras: High-level API built on top of TensorFlow (supports other backends). User-friendly and fast prototyping. Abstracts complexity of low-level operations. Best for: Easy prototyping and beginners.
  • PyTorch: Developed by Facebook AI Research (FAIR). Dynamic computation graph (eager execution by default). More pythonic and intuitive. Strong research community adoption. Best for: Research and experimentation.
FrameworkLevelEase of UseDeploymentDebuggingGPU/Tensor Core
TensorFlowLow/HighMediumStrongHarderExcellent
KerasHighEasyStrongEasierExcellent
PyTorchMid/HighMediumMediumEasiestExcellent

Common DNN Operations

  • Elementwise: Layers perform mathematical operations on each element independently of all other elements in the tensor (e.g., ReLU layer returning max(0, x)).
  • Reduction: Values computed over a range of input tensor values (e.g., pooling layers computing values over neighborhoods).
  • Dot-product: Matrix multiplications.

Performance Optimization Tips

  • Use shared memory to reduce global memory traffic.
  • Align memory access for better coalescing.
  • Leverage CUDA graphs and streams for overlapping compute and memory.
  • Profile using NVIDIA Nsight, cuda-gdb, cuda-memcheck, Allinea DDT, or Rogue Wave TotalView.

Performance Limiters:

  • Latency: Occurs if there is insufficient parallelism.
  • Math: Occurs if there is sufficient parallelism and the algorithm's arithmetic intensity is higher than the GPU's ops:byte ratio.
  • Memory: Occurs if there is sufficient parallelism and the algorithm's arithmetic intensity is lower than the GPU's ops:byte ratio.

How to compute arithmetic intensity:

  • Unit: FLOPS/byte.
  • Calculation: Divide the total number of FLOPS by the total number of bytes transferred.
  • Higher arithmetic intensity: Indicates the algorithm is compute-bound (large amount of computation relative to data transfer).
  • Lower arithmetic intensity: Suggests the algorithm is memory-bound (data transfer is the dominant bottleneck).
  • Factors affecting intensity: Algorithm complexity, dataset size, and hardware architecture.

Dask for Parallel Computing

Dask is a flexible library for parallel computing in Python, implemented on top of multiprocessing and multithreading. It is composed of two parts:

  1. Dynamic task scheduling optimized for interactive computational workloads.
  2. Big data collections: parallel arrays, dataframes, and lists (extending common interfaces like NumPy, pandas, or iterators).

Advantages:

  • Familiar: Provides parallelized NumPy array and pandas DataFrame objects.
  • Flexible: Provides a task scheduling interface.
  • Native: Enables distributed computing in pure Python with access to the PyData stack.
  • Fast: Low overhead, low latency, and minimal serialization.
  • Scales up: Runs on clusters with thousands of cores.
  • Scales down: Runs trivially on a laptop using a single process.
  • Responsive: Designed for interactive computing, providing rapid feedback.

Dask vs. PySpark

Dask is smaller and lighter weight than Spark. It has fewer features and is used in conjunction with other libraries in the numeric Python ecosystem (like pandas or scikit-learn). While Spark is written in Scala and supports various languages (running PySpark on the JVM), Dask is written in pure Python and only supports Python.

Dask DataFrame Usage

Use Dask DataFrames when pandas fails due to data size or computation speed:

  • Manipulating large datasets that do not fit in memory.
  • Accelerating long computations using many cores.
  • Distributed computing on large datasets with standard pandas operations like groupby, join, and time-series computations.

When Dask DataFrame may not be the best choice:

  • If the dataset fits into RAM.
  • If the dataset does not fit into the pandas tabular model (use dask.bag or dask.array instead).
  • If you need functions that are not implemented in Dask DataFrame.
  • If you need database-optimized operations.

Dask Limitations: Setting a new index from an unsorted column is expensive; operations like groupby-apply and join on unsorted columns require setting the index; operations that are slow in pandas (like iterating row-by-row) remain slow in Dask.

Modin and Joblib

Modin

An alternative to handle 100GB or 1TB datasets not supported by pandas. It can run on backends like Ray, Dask, or MPI.

Joblib

Main objectives: Avoid computing the same thing twice, persist to disk transparently, fast disk-caching, embarrassingly parallel execution, and fast compressed persistence.

Parallel Computing Backends: “loky”, “multiprocessing”, “threading”, and “dask”.

Polars for High-Performance Data

  1. What is Polars good for? Polars é ótimo para manipulação de grandes datasets tabulares, especialmente operações de agregação, filtragem e transformação de dados em memória com alto desempenho e baixo uso de RAM.
  2. Difference between Polars and other modules:
    • Polars: DataFrame in-memory, rápido, single-machine, paralelizado por CPU, com API parecida com pandas.
    • Beam / Spark: Orientados a pipelines distribuídos e processamento em cluster; podem lidar com streaming e batch.
    • Dask / Modin: Distribuem pandas-like DataFrames em múltiplos cores ou máquinas; Polars é mais eficiente em single-machine.
    • Joblib / multiprocessing: Paralelizam funções ou loops, não abstraem DataFrames diretamente.
  3. How is Polars implemented internally?
    • Escrita em Rust para alta performance e segurança de memória.
    • Usa Arrow memory layout para vetorização e execução paralela eficiente.
    • Operações são lazy (podem ser encadeadas antes de executar) ou eager.
    • Paraleliza internamente operações por colunas e chunks de dados.
  4. Situations to choose Polars to deploy an application:
    • Aplicações que precisam de processamento rápido de tabelas grandes em memória.
    • Cenários onde baixa latência é necessária, como dashboards ou pipelines de ETL locais.
    • Quando a infraestrutura não é distribuída, mas se quer aproveitar múltiplos núcleos.
  5. Best parallel/distributed environment for Polars: Funciona melhor em multi-core single-machine, usando paralelismo por threads. Pode ser integrado em pipelines distribuídas (Dask, Ray), mas seu foco é execução rápida local, não clusters grandes nativamente.

GPU Architecture and CUDA

Integrated vs. Dedicated GPUs

  • Integrated: Power is shared between GPU and CPU. The graphics card is built directly into the computer's processor. Suitable for social media, web browsing, and light editing (e.g., AMD Ryzen).
  • Dedicated: Completely separate processor from the main CPU, featuring its own dedicated memory and cooling system. Essential for neural networks and gaming (e.g., NVIDIA GTX, RTX).

GPU Evolution: Fermi/Kepler, Pascal, Volta, Ampere, Hopper, Hopper Refresh, Blackwell.

Grids, Blocks, and Threads

  • Grid organized as a 2D array of blocks.
  • Block organized as a 3D array of threads.
  • Both use the dim3 type with 3 unsigned integer fields. Unused fields are initialized to 1 and ignored.

Key Concepts

  • Data Partitioning: Divisão de um conjunto de dados em partes menores (partições) para permitir processamento paralelo, otimizar leitura/escrita e melhorar a escalabilidade em sistemas distribuídos.
  • Auto Scaling: Capacidade de ajustar dinamicamente recursos de computação (CPU, memória, nós) conforme a carga de trabalho aumenta ou diminui, garantindo desempenho e eficiência de custo.

Python Alternatives for GPUs

  • NumPy: CuPy or JAX
  • Pandas: RAPIDS cuDF
  • Scikit-learn: RAPIDS cuML
  • DNN: cuDNN

CPU vs. GPU

  • CPU: Projetada para tarefas gerais, com poucos núcleos poderosos, ótima para lógica sequencial e controle.
  • GPU: Projetada para processamento massivamente paralelo, com centenas ou milhares de núcleos simples, ideal para operações vetorizadas e aprendizado profundo.

CUDA and Python Integration

CUDA: Plataforma e API da NVIDIA para programar GPUs, permitindo que o código seja executado em paralelo em núcleos da GPU.

CUDA built-in variables: gridDim, blockDim, blockIdx, threadIdx, warpSize.

Python alternatives for CUDA: PyCUDA, PyOpenCL, or Numba.

  • PyCUDA: Biblioteca Python que fornece interface direta com CUDA, permitindo escrever kernels CUDA e gerenciar a memória da GPU diretamente a partir do Python.
  • Numba: Biblioteca Python que compila funções Python para código nativo, acelerando loops numéricos e operações vetorizadas; suporta CPU e GPU (via CUDA).

Tensor Cores and Mixed Precision

Definition: Specialized processing units developed by NVIDIA, designed for accelerating matrix operations, particularly those involved in deep learning workloads.

  • Operate on small matrices.
  • Highly optimized for mixed precision.
  • Enable significant speedups for AI training and inference.

Architecture: Integrated into Streaming Multiprocessors (SMs). They operate independently of CUDA cores and perform matrix multiply-accumulate (MMA) operations in a single clock cycle.

Execution: Operates on fragments from threadblocks (16x16 matrices split into tiles). Warp-level primitives manage data movement. Tensor Core operations require alignment and proper memory layout.

Related entries: