🎓How I Study AIHISA
📖Read
📄Papers📰Blogs🎬Courses
💡Learn
🛤️Paths📚Topics💡Concepts🎴Shorts
🎯Practice
⏱️Coach🧩Problems🧠Thinking🎯Prompts🧠Review
SearchSettings
How I Study AI - Learn AI Papers & Lectures the Easy Way
⚙️AlgorithmIntermediate

Distributed & Parallel Optimization

Key Points

  • •
    Data parallelism splits the training data across workers that compute gradients in parallel on a shared model.
  • •
    Synchronous SGD makes every worker wait at a barrier, averages their gradients, and applies a single consistent update.
  • •
    Asynchronous SGD lets workers update the model without waiting, which reduces idle time but introduces stale gradients.
  • •
    Synchronous methods are easier to reason about, have stable convergence, and need fast communication (e.g., AllReduce).
  • •
    Asynchronous methods are robust to stragglers and network variance but require careful tuning to avoid divergence.
  • •
    Effective batch size equals per-worker batch size times the number of workers in synchronous data parallelism.
  • •
    Communication cost can dominate when the model is large; ring AllReduce and compression help mitigate this.
  • •
    Learning rate scaling, staleness bounds, and sparsity-aware updates (e.g., Hogwild) are key practical techniques.

Prerequisites

  • →Gradient-based optimization and SGD — Understanding how gradients are computed and used to update parameters is foundational for both synchronous and asynchronous variants.
  • →Linear algebra (vectors, dot products) — Gradients and parameter updates are vector operations; efficient implementations rely on linear algebra.
  • →Probability and statistics — Mini-batch gradients are stochastic estimators; concepts like expectation and variance underpin convergence behavior.
  • →Parallel programming (threads, synchronization) — Implementing data-parallel training requires knowledge of threads, barriers, atomics, and potential race conditions.
  • →Distributed systems basics — At scale, communication patterns (AllReduce, parameter servers) and network effects dominate performance.
  • →Numerical stability and floating-point arithmetic — Parallel reductions and asynchronous updates can introduce rounding differences and nondeterminism.

Detailed Explanation

Tap terms for definitions

01Overview

Distributed and parallel optimization aims to speed up model training by spreading computation across multiple CPUs, GPUs, or machines. In data parallelism, each worker gets a different subset of the data but holds a replica of the model parameters. Workers compute gradients on their local mini-batches. The two most common coordination styles are synchronous and asynchronous stochastic gradient descent (SGD). In synchronous SGD, all workers compute gradients, then pause at a barrier so their gradients can be averaged (e.g., via AllReduce) and a single consistent update is applied. In asynchronous SGD, workers do not wait; each worker reads parameters (possibly slightly outdated), computes a gradient, and applies its update independently.

Synchronous approaches usually provide more stable and predictable convergence because every update is based on the same parameter vector. However, they can suffer from "stragglers": slow workers that make everyone wait. Asynchronous approaches keep hardware busy and reduce waiting time but make updates noisier due to staleness (workers may update based on older parameters). The choice between synchronous and asynchronous methods depends on hardware homogeneity, network speed, model size, and tolerance for statistical noise. In practice, systems combine engineering techniques (efficient collective communication, compression, parameter servers) with algorithmic safeguards (learning-rate schedules, momentum, staleness bounds) to get both speed and accuracy.

02Intuition & Analogies

Imagine a kitchen with several cooks preparing the same dish. Each cook tastes a spoonful (reads the model), adds a pinch of salt (computes a gradient), and then adjusts the main pot (updates parameters).

Synchronous kitchen: Every cook tastes their spoonful at the same time, writes their salt suggestion on a card (gradient), then they all meet to average their suggestions. Only after they agree on a single combined adjustment do they add salt to the main pot. The flavor changes in controlled, predictable steps, but if one cook is slow, everyone waits.

Asynchronous kitchen: Cooks do not meet. Each cook tastes the pot whenever they can, decides on a salt adjustment, and immediately adds it. The pot changes frequently and unpredictably—sometimes two cooks reach for the salt at once. The process moves faster because nobody waits, but individual adjustments might be based on slightly outdated flavor (stale parameters). If cooks are careful (small pinches, i.e., small learning rates) and the recipe is forgiving (sparse conflicts, good mixing), the dish still converges to a good taste.

In computing terms, synchronous SGD ensures all workers agree on a single gradient per step, which keeps the trajectory smooth but may waste time on barriers. Asynchronous SGD keeps devices busy and can achieve higher throughput, but the sequence of parameter vectors becomes noisier and less coordinated. The art is balancing throughput with statistical efficiency: faster steps that move less precisely can still reach the goal sooner if the extra speed compensates for the extra noise.

03Formal Definition

Let L(w) be the expected loss, and let \(ℓ(w; ξ)\) be the loss on a random sample \(ξ\). Standard SGD performs updates \[ wt+1​=wt​ - ηt​ \, ∇ ℓ(wt​; ξt​), \] where \(ηt​\) is the learning rate and \(ξt​\) is drawn from the data distribution. With mini-batches of size B, the gradient estimator is \[ gt​ = B1​ ∑i=1B​ ∇ ℓ(wt​; ξt,i​), wt+1​=wt​ - ηt​ gt​. \] In synchronous data-parallel SGD with K workers, each worker k computes a local gradient on its batch \(Bt(k)​\) (often of size b), \[ g_t(k) = b1​ ∑ξ∈Bt(k)​​ ∇ ℓ(wt​; ξ), \] then workers average gradients (e.g., via AllReduce): \[ \bar gt​ = K1​ ∑k=1K​ g_t(k), wt+1​=wt​ - ηt​ \bar gt​. \] The effective batch size is \(B=K b\). Under mild conditions, \(E[\bar gt​] = ∇ L(wt​)\) and the variance decreases with B. In asynchronous SGD, worker k reads parameters \(w_{t-τt(k)​}\) that may be \(τt(k)​\) steps stale, computes \(g_t(k) = ∇ ℓ(w_{t-τt(k)​}; ξt(k)​)\), and applies an update without coordination: \[ w ← w - ηt​ \, g_t(k). \] Convergence analyses often assume bounded staleness \(τt(k)​ ≤ τmax​\) or sparsity to show that asynchronous noise is controllable with proper learning-rate schedules.

04When to Use

Use synchronous SGD when your hardware is relatively homogeneous and communication is fast enough that barriers do not dominate. Examples include multi-GPU training on a single machine with NVLink or Infiniband-connected clusters using efficient AllReduce. You gain stable convergence, straightforward hyperparameter transfer from single-worker training, and simple correctness reasoning. Synchronous training is also favored when exact reproducibility and deterministic behavior are important.

Use asynchronous SGD when you expect stragglers, variable network latency, or when you can exploit sparsity to reduce conflicts (e.g., large sparse models, recommender systems). Asynchronous methods shine in parameter-server architectures or when workers join/leave dynamically (elastic training). If you apply bounded staleness (e.g., allow at most s outstanding steps), you can trade a bit of noise for a lot of throughput, especially at large scale.

Hybrid strategies are common: synchronous within a node (fast interconnect), asynchronous across nodes (slower network); or synchronous most of the time with occasional asynchronous bursts. Complementary techniques—gradient compression/quantization, mixed precision, and overlapping communication with computation—are often decisive for scaling. Adjust the learning rate with the effective batch size (e.g., linear scaling with warmup) and monitor training variance; if validation performance degrades, reduce staleness or communication period.

⚠️Common Mistakes

• Not averaging gradients correctly: summing K workers’ gradients without dividing by K changes the effective learning rate by a factor of K. Fix by either averaging or scaling the learning rate accordingly. • Forgetting learning-rate scaling when changing batch size: increasing effective batch size B typically requires increasing (\eta) (e.g., linear scaling with warmup) to maintain similar progress per epoch. • Ignoring communication cost: naive parameter broadcasting or centralized reduction can bottleneck training. Use collective communication (e.g., ring AllReduce) and overlap communication with computation. • Data races in asynchronous code: lock-free updates on floating-point parameters can cause undefined behavior if not using atomics. Use atomic operations or well-defined lock-free constructs (e.g., compare-and-swap loops). • Excessive staleness: allowing unbounded delay between read and update can destabilize training. Enforce staleness bounds or periodically synchronize. • False sharing and cache contention: placing frequently-updated parameters on the same cache line across threads can throttle performance. Use padding or align data structures. • Poor sharding and imbalance: uneven data partitioning leads to stragglers even in synchronous setups. Balance workloads and prefetch data. • Lack of randomness: reusing the same data order across workers without shuffling increases gradient correlation and harms generalization. Shuffle per epoch and use different seeds across workers.

Key Formulas

SGD Update

wt+1​=wt​−ηt​∇ℓ(wt​;ξt​)

Explanation: At each step t, take a step from the current parameters wt​ in the direction opposite to the gradient of the loss on a random sample. The step size is controlled by the learning rate ηt​.

Mini-batch Gradient

gt​=B1​i=1∑B​∇ℓ(wt​;ξt,i​)

Explanation: The gradient estimate over a batch of B samples is the average of per-sample gradients. Averaging reduces variance compared to single-sample updates.

Synchronous Gradient Averaging

gˉ​t​=K1​k=1∑K​gt(k)​,wt+1​=wt​−ηt​gˉ​t​

Explanation: In synchronous data parallelism, K workers compute local gradients and average them. A single consistent update is then applied to all model replicas.

Asynchronous SGD with Staleness

w←w−ηt​∇ℓ(wt−τt​​;ξt​)

Explanation: An asynchronous worker reads an older parameter vector wt−τt​​, computes a gradient, and applies it immediately. The delay τ_t introduces extra noise that must be controlled.

Unbiasedness of Mini-batch Gradient

E[gt​]=∇L(wt​)

Explanation: If samples are drawn i.i.d., the expected value of the mini-batch gradient equals the true gradient of the expected loss. This justifies using gt​ as a proxy for the true gradient.

Variance vs. Batch Size

Var(gt​)≈Bσ2​

Explanation: Assuming independent samples with gradient variance σ2, the variance of the average gradient decreases inversely with the batch size B. Larger batches produce more stable updates.

Per-step Time in Synchronous SGD

Tsync​≈Tcompmax​+Tallreduce​

Explanation: Each synchronous step takes the time of the slowest worker's compute plus the time to communicate and average gradients. Stragglers and communication overhead limit speed.

Ring AllReduce Cost Model

Tallreduce​≈αlogK+βK2(K−1)​P

Explanation: The time to AllReduce P parameters over K workers scales with startup latency α times log K and bandwidth cost β times a factor that approaches 2P as K grows. This guides communication optimization.

Amdahl's Law

Sp​=f+p1−f​1​

Explanation: If fraction f of work is serial and the rest parallel, the speedup with p workers is bounded by this expression. Even small serial fractions can cap speedup.

Linear Learning-rate Scaling

η′=η⋅BB′​

Explanation: When increasing the batch size from B to B', scale the learning rate proportionally to maintain similar update magnitudes. Often combined with warmup to ensure stability.

SGD Convergence Rate (Convex)

E[L(wT​)]−L(w∗)≤O(T​1​)

Explanation: For convex smooth losses with appropriate step sizes, the expected suboptimality after T steps decays on the order of 1/√T. Noisy gradients still converge with diminishing steps.

Complexity Analysis

Let N be the number of samples, d the number of parameters, K the number of workers, and P the number of parameters to communicate (typically P=d). For simple linear or logistic models, computing a per-sample gradient is O(d); thus a mini-batch of size B costs O(B d) compute. In synchronous data parallelism with K workers, if each worker processes b=B/K samples per step, the per-worker compute is O(b d) and the total compute is still O(B d). Wall-clock per-step time is dominated by Ts​ync ≈ Tc​omp^max + Ta​llreduce, where Tc​omp^max is the slowest worker’s compute time and Ta​llreduce is the communication time to average gradients. Using a ring AllReduce, Ta​llreduce scales approximately as α log K + β · 2(K−1)P/K, where α captures latency and β captures inverse bandwidth. Hence, for large models (large P), communication can dominate unless overlapped or reduced (e.g., compression, mixed precision). In asynchronous SGD, there is no global barrier; workers continuously compute O(b d) per update and perform atomic or lock-free parameter updates. This reduces idle time due to stragglers, so throughput can approach K times single-worker throughput. However, parameter staleness introduces extra variance, potentially requiring smaller learning rates or bounded staleness, which may increase the number of updates needed (statistical efficiency). Space-wise, each worker holds O(P) parameters and O(b d) temporary buffers (e.g., gradients), plus its data shard O(N/K · d). Synchronous methods may require additional O(P) buffers for reduction; asynchronous parameter-server setups store O(P) on servers and O(P) replicas on workers. Overall, per-worker memory is typically O(P + N d / K).

Code Examples

Synchronous data-parallel mini-batch SGD (threads + barrier) for linear regression
1#include <bits/stdc++.h>
2using namespace std;
3
4// Simple reusable barrier for C++17
5class Barrier {
6 mutex mtx; condition_variable cv; size_t count; size_t waiting; size_t generation;
7public:
8 explicit Barrier(size_t cnt) : count(cnt), waiting(0), generation(0) {}
9 void arrive_and_wait() {
10 unique_lock<mutex> lk(mtx);
11 size_t gen = generation;
12 if (++waiting == count) {
13 generation++; waiting = 0; cv.notify_all();
14 } else {
15 cv.wait(lk, [&]{ return gen != generation; });
16 }
17 }
18};
19
20struct Dataset {
21 vector<vector<double>> X; // N x d
22 vector<double> y; // N
23};
24
25// Generate synthetic linear data: y = X w_true + noise
26Dataset make_data(size_t N, size_t d, unsigned seed=42) {
27 mt19937 rng(seed);
28 normal_distribution<double> noise(0.0, 0.1);
29 Dataset ds; ds.X.assign(N, vector<double>(d)); ds.y.assign(N, 0.0);
30 vector<double> w_true(d, 0.0);
31 for (size_t j = 0; j < d; ++j) w_true[j] = (j % 2 ? 1.0 : -1.0) * (0.5 + 0.5 * (j+1)/double(d));
32 uniform_real_distribution<double> uni(-1.0, 1.0);
33 for (size_t i = 0; i < N; ++i) {
34 for (size_t j = 0; j < d; ++j) ds.X[i][j] = uni(rng);
35 double pred = inner_product(ds.X[i].begin(), ds.X[i].end(), w_true.begin(), 0.0);
36 ds.y[i] = pred + noise(rng);
37 }
38 return ds;
39}
40
41int main() {
42 ios::sync_with_stdio(false);
43 cin.tie(nullptr);
44
45 const size_t N = 2000; // samples
46 const size_t d = 50; // features
47 const size_t K = 4; // worker threads
48 const size_t steps = 300; // number of SGD steps
49 const size_t batch_size = 256; // effective batch size per step (across all threads)
50 const double lr = 0.05; // learning rate
51
52 Dataset ds = make_data(N, d, 123);
53 vector<double> w(d, 0.0); // shared model parameters
54
55 // Prepare shuffled order of indices for mini-batches
56 vector<size_t> order(N); iota(order.begin(), order.end(), 0);
57
58 Barrier barrier(K);
59 vector<vector<double>> local_grad(K, vector<double>(d, 0.0));
60
61 atomic<size_t> step_idx{0};
62 atomic<bool> stop{false};
63
64 auto worker = [&](size_t tid) {
65 mt19937 rng(100 + (unsigned)tid);
66 while (!stop.load()) {
67 size_t t = step_idx.load();
68 if (t >= steps) break;
69 // Thread 0 shuffles at the beginning of each epoch-sized pass
70 if (tid == 0 && (t * batch_size) % N == 0) {
71 shuffle(order.begin(), order.end(), rng);
72 }
73 // Zero local gradient
74 fill(local_grad[tid].begin(), local_grad[tid].end(), 0.0);
75 // Determine batch window
76 size_t start = (t * batch_size) % N;
77 size_t m = min(batch_size, N); // cap by dataset size for simplicity
78 // Strided assignment within the batch to K threads
79 for (size_t u = tid; u < m; u += K) {
80 size_t idx = order[(start + u) % N];
81 const auto &x = ds.X[idx];
82 double pred = inner_product(x.begin(), x.end(), w.begin(), 0.0);
83 double err = pred - ds.y[idx];
84 for (size_t j = 0; j < d; ++j) local_grad[tid][j] += x[j] * err; // MSE gradient contribution
85 }
86 // Wait for all threads to finish gradient compute
87 barrier.arrive_and_wait();
88 // Single-updater: average gradients and update weights
89 if (tid == 0) {
90 vector<double> g(d, 0.0);
91 for (size_t k = 0; k < K; ++k)
92 for (size_t j = 0; j < d; ++j)
93 g[j] += local_grad[k][j];
94 for (size_t j = 0; j < d; ++j) g[j] /= double(m); // average over batch
95 for (size_t j = 0; j < d; ++j) w[j] -= lr * g[j];
96 step_idx.fetch_add(1);
97 }
98 // Ensure all see updated weights before next step
99 barrier.arrive_and_wait();
100 }
101 };
102
103 vector<thread> pool; pool.reserve(K);
104 for (size_t k = 0; k < K; ++k) pool.emplace_back(worker, k);
105 for (auto &th : pool) th.join();
106
107 // Report training loss (MSE)
108 double mse = 0.0;
109 for (size_t i = 0; i < N; ++i) {
110 double pred = inner_product(ds.X[i].begin(), ds.X[i].end(), w.begin(), 0.0);
111 double err = pred - ds.y[i]; mse += err * err;
112 }
113 mse /= double(N);
114 cout << fixed << setprecision(6) << "Final MSE: " << mse << "\n";
115 return 0;
116}
117

This program simulates synchronous data-parallel SGD across K threads for linear regression. Each step selects a mini-batch, splits it across threads, computes per-thread gradients, synchronizes at a barrier, averages gradients, and applies one consistent update. All threads then proceed to the next step with the same updated parameter vector. It demonstrates the barrier-and-average pattern found in synchronous multi-worker training.

Time: Per step: O(B d) compute total (O(B d / K) per thread) plus O(d K) to reduce gradients.Space: O(d) for parameters and O(K d) for per-thread gradients, plus O(N d) for data.
Asynchronous lock-free (Hogwild-style) SGD with atomic parameter updates
1#include <bits/stdc++.h>
2using namespace std;
3
4// Atomic add for double using compare-exchange loop (C++17-compatible)
5static inline void atomic_add_relaxed(atomic<double> &x, double delta) {
6 double old = x.load(memory_order_relaxed);
7 while (!x.compare_exchange_weak(old, old + delta, memory_order_relaxed)) {
8 // old is updated with the current value by compare_exchange_weak
9 }
10}
11
12struct Dataset {
13 vector<vector<double>> X; // N x d
14 vector<double> y; // N
15};
16
17Dataset make_data(size_t N, size_t d, unsigned seed=7) {
18 mt19937 rng(seed);
19 normal_distribution<double> noise(0.0, 0.1);
20 Dataset ds; ds.X.assign(N, vector<double>(d)); ds.y.assign(N, 0.0);
21 vector<double> w_true(d);
22 for (size_t j = 0; j < d; ++j) w_true[j] = (j % 3 == 0 ? 0.0 : 1.0); // induce sparsity in truth
23 uniform_real_distribution<double> uni(-1.0, 1.0);
24 for (size_t i = 0; i < N; ++i) {
25 for (size_t j = 0; j < d; ++j) ds.X[i][j] = (j % 3 == 0 ? 0.0 : uni(rng)); // sparse features
26 double pred = inner_product(ds.X[i].begin(), ds.X[i].end(), w_true.begin(), 0.0);
27 ds.y[i] = pred + noise(rng);
28 }
29 return ds;
30}
31
32int main() {
33 ios::sync_with_stdio(false);
34 cin.tie(nullptr);
35
36 const size_t N = 5000; // samples
37 const size_t d = 200; // features
38 const size_t K = 8; // worker threads
39 const size_t epochs = 3;
40 const double lr = 0.02; // smaller LR for async stability
41
42 Dataset ds = make_data(N, d, 2024);
43
44 // Shared parameters as atomics to ensure data-race-free updates
45 vector<atomic<double>> w(d);
46 for (size_t j = 0; j < d; ++j) w[j].store(0.0);
47
48 // Partition indices per thread (simple round-robin)
49 vector<vector<size_t>> shards(K);
50 for (size_t i = 0; i < N; ++i) shards[i % K].push_back(i);
51
52 vector<thread> pool; pool.reserve(K);
53 for (size_t tid = 0; tid < K; ++tid) {
54 pool.emplace_back([&, tid]() {
55 for (size_t e = 0; e < epochs; ++e) {
56 // Shuffle shard locally per epoch
57 auto &idxs = shards[tid];
58 mt19937 rng((unsigned)(tid * 131 + e));
59 shuffle(idxs.begin(), idxs.end(), rng);
60 for (size_t idx : idxs) {
61 const auto &x = ds.X[idx];
62 // Read a (possibly inconsistent) snapshot for prediction
63 double pred = 0.0;
64 for (size_t j = 0; j < d; ++j) {
65 double wj = w[j].load(memory_order_relaxed);
66 pred += wj * x[j];
67 }
68 double err = pred - ds.y[idx];
69 // Apply gradient step immediately: w_j -= lr * x_j * err
70 for (size_t j = 0; j < d; ++j) {
71 double g = x[j] * err; // MSE gradient component
72 if (g != 0.0) atomic_add_relaxed(w[j], -lr * g);
73 }
74 }
75 }
76 });
77 }
78
79 for (auto &th : pool) th.join();
80
81 // Measure final MSE using a consistent read
82 vector<double> w_final(d);
83 for (size_t j = 0; j < d; ++j) w_final[j] = w[j].load();
84 double mse = 0.0;
85 for (size_t i = 0; i < N; ++i) {
86 double pred = inner_product(ds.X[i].begin(), ds.X[i].end(), w_final.begin(), 0.0);
87 double err = pred - ds.y[i];
88 mse += err * err;
89 }
90 mse /= double(N);
91 cout << fixed << setprecision(6) << "Final MSE (async): " << mse << "\n";
92 return 0;
93}
94

This example demonstrates asynchronous, lock-free SGD (Hogwild-style). Each thread processes its shard and immediately applies parameter updates using atomic compare-and-swap loops, avoiding locks and barriers. Reads for prediction are taken without synchronization, so gradients may be computed on stale parameters—acceptable in many sparse or well-conditioned problems with careful learning-rate choices.

Time: Per epoch: O(N d) total compute. There is no explicit synchronization cost; atomic updates add small constant overhead per updated parameter.Space: O(d) for parameters (as atomics) and O(N d) for data. No per-thread gradient buffers.
#data parallelism#synchronous sgd#asynchronous sgd#hogwild#allreduce#parameter server#staleness#gradient averaging#learning rate scaling#distributed optimization#mini-batch#variance reduction#ring allreduce#atomic updates#thread synchronization