본문 바로가기
Data science/Deep learning

Pytorch DDP(Distributed Data Parallel), DP(Data Parallel) 비교 총정리

by 연금(Pension)술사 2024. 5. 3.

 

 

요약


  DP DDP
모델 복제 오버해드 매 반복마다 각 GPU에 모델 복제 초기 한번만으로 프로세스에 모델 복제
데이터 분산 및 수집 Scatter-Gather방식으로 통신비용발생 각 프로세스가 독립적으로 작업
(통신비용 적음)
GIL GIL로인해 multi-thread 성능제한 GIL문제없음
통신비용 GPU간 동기화없음 GPU 간 All-redeuce 통신비용발생
적합한 환경 단일 머신 멀티노드

 

DataParallel (DP)

DP은 데이터 병렬화 기술 중, 싱글노드에서만 사용할 수 있는 병렬화 기술입니다. DP은 한 프로세스에서만 돌아가기에 "Single process, multi-threaded"입니다. 하나의 프로세스에서 여러 GPU을사용하는 방식입니다. 즉 하나의 프로세스이기 때문에, 모델과 데이터를 한 번만 메모리에 로드하고, 공유하는 방식입니다. 레핑코드인 torch.nn.DataParallel은 다음과 같은 방법을 따릅니다.

 

  1. Scatter mini-batch inputs to GPUs: DataLoader에서 병렬처리를하든 일단 하나의 GPU로 데이터를 모은 후, 이를 서로다른 GPU에 퍼뜨립니다. (여기서 불필요하게 오버해드가 발생합니다. 큰 데이터를 로드하는 경우, 데이터를 복사하는데 병목이 될 수 있습니다)
  2. Replicate model on GPUs: DP은 매 반복(step, iteration)마다 모델의 복제본(model replicas)을 만들어 각 GPU에 뿌려줍니다. 이유는 각 GPU가 독립적으로 연산을 수행할 수 있도록 동리한 모델을 GPU별로 만들기 위함입니다.(여기서 불필요하게 오버해드 발생)
  3. Parallel forward: 이 단계에서는 미니배치가 N이라면, N/4씩의 미니배치씩 처리하여 출력을 생성합니다. (이 과정은 병렬로 처리되기 때문에, 데이터가 미리 복제가되지않았거나, 모델이 복제되지 않았다면 연산시작까지 대기해야해서 오버해드가 발생합니다)
  4. Gather : 모든 출력을 하나의 GPU1에 가져옵니다. 이 과정에서는 GPU to GPU 통신이 발생합니다. (GPU1의 데이터를 모으는 과정에서 병목이 발생할 수 있음)
  5. Compute gradient: O1~O4까지 얻었으니 하나로 합쳐서 loss을 계산합니다. 이 합친 텐서에서는 .grad_fn 속성에 GatherBackward가 붙는데 여러 GPU에서 왔음을 알 수 있는 흔적입니다.
  6. Backword process: 하나의 모델에 대하여 백워드 프로세스가 진행됩니다.
  7. Parameter update: 모델의 파라미터를 한 GPU에서 업데이트 합니다.
import torch
from torchvision.models import resnet50

model = resnet50()
dp_model = torch.nn.DataParallel(model).to("cuda") # 이 단계까지는 GPU:0에만 옮겨집니다.
o = dp_model(torch.rand(8, 3, 256, 256)) # forward하는 순간 모델이 복제되고 배치가 나뉘어집니다.
o.device
tensor([[-0.4658, -0.4177,  1.4429,  ..., -0.5929,  0.6103, -0.0062],
        [-0.2560, -0.3838,  1.4179,  ..., -0.4299,  0.6211,  0.0942],
        [-0.3859, -0.4096,  1.4012,  ..., -0.5815,  0.6894, -0.0200],
        ...,
        [-0.4031, -0.4946,  1.3107,  ..., -0.4936,  0.5158,  0.0206],
        [-0.3646, -0.3747,  1.4674,  ..., -0.5438,  0.6902, -0.0674],
        [-0.3301, -0.4867,  1.3945,  ..., -0.4488,  0.6343,  0.0417]],
       device='cuda:0', grad_fn=<GatherBackward>)
model = nn.DataParallel(model)

# 옵티마이저 초기화
optimizer.zero_grad()

# 순전파
outputs = model(inputs) # 이 단계에서 모델복제, 데이터 scatter, forward, gather가 이뤄짐

# 손실 계산
loss = criterion(outputs, labels)  # 손실은 하나의 GPU에서만

# 역전파
loss.backward() # 역전파도 하나의 GPU에서만

# 파라미터 갱신
optimizer.step() # 파라미터 갱신

 

실제로 pytoch.nn.DataParallel에서도 다음과 같이 foward가 정의되어있습니다. 모아서 output만 반환해주는 것이죠.

class DataParallel(Module, Generic[T]):

    def forward(self, *inputs: Any, **kwargs: Any) -> Any:
    	inputs, module_kwargs = self.scatter(inputs, kwargs, self.device_ids)
        replicas = self.replicate(self.module, self.device_ids[: len(inputs)])
        outputs = self.parallel_apply(replicas, inputs, module_kwargs)
        return self.gather(outputs, self.output_device)
        
# 사용시
CLASS torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

아래와 같이 DP은 `torch.DataParallel(model)`의 원라인만 추가하면 사용할 수 있던 기술입니다 [1, 2]. 이 클레스는 데이터 병렬(DP)은 모듈레벨(torch.nn.module) 에서 손 쉽게 사용할 수 있도록 구현한 것입니다. 일단 데이터를 작은 배치사이즈로 나눈 후(split), 데이모델을 각 장치(GPU)에 복제한 후, 데이터를 fowarding합니다. 이후, 각 복제된 모델로부터 백워드(backpropagation)을 진행하고, 원본(original module, 첫 번째 장치에 있던)로 모듈로 가져와 집계합니다[3]. 

  • module: pytorch model을 의미
  • device_ids:cuda devices (디폴트가 all devices)
  • output_devices: 복제한 모델로부터 집계할 장치를 의미합니다. (디폴트: device_ids=[0])

어찌되었든 하나의 텐서를 모아서 backward을 진행하기 때문에, gather을 수행하는 GPU입장에서는 VRAM이 더 요구되는 사항입니다. 그래서 다른 개발자들은 Gradent까지 다 계산한다음에 GPU1로 가져오는 방법도 제안합니다.

 

 

 

 

 

 

 

Distributed DataParallel (DDP)

사전 학습 사항

  • 병렬처리 방식: spawn vs fork 설명
  • 모델 복제본(Replcica)
  • 그레디언트(Gradient)
  • 버퍼(Buffer): torch.nn.Module내에 학습되지 않는 변수(static variable)을 의미. 즉, requires_grad=False인 변수를 의미 (예, BatchNorm에서늬 Mean, SD).
  • 프로세스 그룹(Process group): torch.distributed 모듈에서 병렬처리에 참여하는 프로세스들이 서로 정보를 공유할 수 있도록 해주는 그룹

설명

  • DDP은 데이터 병렬화를 모듈레벨(모델레벨)에서 쉽게 구현한 것이고, 노드수(=머신수)가 여럿일 때도 사용
  • DPP은 병렬처리시에, multiprocessing으로 프로세스를 spawn(부모프로세스의 메모리를 복제하지 않는, 새롭게 프로세스를 실행시키는) 방식으로 진행
  • 각 프로세스간 통시는 torch.distributed 패키지를 이용해서, 그레디언트와 버퍼(buffer)을 동기화함. 즉, 각 프로세스에서 모델의 복사본(Replicas)가 존재하지만, 따로따로 학습하는 것이 아니라, 데이터 배치만 나눠서 처리하고, 역전파 단계에서 GPU 그레디언트를 동기화해서 동일한 업데이트를 진행함.
  • DDP 내에서는 역전파 단계에서 싱크를 맞추기 위해서, 각 프로세스에 있는 백워드(backward)가 진행될 떄, autograd hook이 자동으로 싱크를 맞추어줌. 결국, 동일한 그레디언트을 사용(동일한 그레디언트를 사용한다는 말은 데이터배치가 다르기에 그레디언트가 다른데 어떻게 동일하게 하냐?가 아니라, 각 배치에 대해서 그레디언트 계산 후, auto-grad hook이 "all-reduce"연산을 통해 평균낸 값으로 동일하게 맞춘다라는 것)
  • 권장: 한 프로세스에는 하나의 모델 복제본(replicas)가 있는 것이 권장.



DP와 DDP의 비교

  • DP에서는 멀티쓰레드 전반에 걸쳐서 GIL의 병목 때문에, step을 하나 수행할 때 마다, 모델을 복제하는 과정이 있음. 모델 복제과정에 불필요한 오버해드가 발생함. 그리고, 입력배치(input)을 각 GPU에 뿌리거나, 결과를 하나의 GPU로 모을 때도 불필요한 비용이 소모됨. 따라서, 싱글머신을 사용할 경우에도, DP은 DDP보다는 일반적으로 느림.
  • DDP을 이용하면, model parallel도 지원도 가능. 모델이 너무 큰 경우에도 하나의 GPU가 아니라 모델을 여러 GPU에 분리하여 올릴 수 있음. 
  병렬처리방식 노드수 속도
DP Single-process, Multithread (GIL발생) 싱글 노드 느림
DDP Multiprocessing, Multimachine 싱글 / 멀티 노드 빠름

 

사용 예시: 싱글머신의 사용

  • 프로세스 그룹정의: DDP사용을 위해 첫 번째로 해야할 것은 각 병렬처리에 참여하는 프로세스를 관리해줄 프로세스그룹을 정의해야 함.
  • "MASTER_ADDR"와 "MASTER_PORT"의 정의: 마스터노드는 병렬처리 프로세스를 관리하는 노드를 의미함. 이 코드내에서는 싱글머신을 가정해서 'local_host'로 지정. 멀티노드시에는 별도의 마스터노드의 IP가 필요. 'MASTER_PORT'은 DDP 프로세스들이 마스터노드에 어떤 프로세스랑 통신할지 논리적 포트를 저장
  • 'dist.init_process_group': 분산학습을 위한 프로세스 그룹을 초기화. rank와 world_size은 학습이 실행되는 최상단(메인루틴)에서 지정해서 사용하거나, torchrun을 사용하는 경우 자동으로 지정
    • 첫 인자는 프로세스 통신 방식을 의미함. "gloo"은 윈도우즈에서, 리눅스는 "nccl"을 사용. NCCL (NVIDIA Collective communication library)로 GPU 병렬연산을 위한 치적의 백앤드임. torch.cuda을 이용시에는 NCCL을 이용.
    • rank: 프로세스의 고유 ID (0부터 시작). Rank=0은 마스터 역할. global rank임.
      • global rank을 의미. global rank은 전체 클러스터에서 각 프로세스당 고유한 rank을 의미
      • local rank: 각 노드에서 고율한 rank을 의미.
    • worid_size: 전체 프로세스 개수
  • torch.nn.Module을 해당 GPU(rank)로 옮겨줘야함 model.to(rank): 이렇게만하면 모델이 특정 GPU로만 이동할 뿐, DDP가 모델을 여러 GPU에서 동기화하지 않음.
  • DDP(model, device_dis=rank): DDP로 모델을 감싸줘야, DDP에 등록되어 여러 GPU에서 동기화관리를 해줌. 이 작업을 안하면 backward을 호출하더라도 다른 GPU와 공유되지않아 모델업데이트가 다르게 진행됨.

https://github.com/YunchaoYang/Blogs/issues/3

실행 방법은 "python3 ddp_base.py"라는 명령어로 실행하거나 torchrun을 이용할 수 있음.

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP


def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"

    # initialize the process group
    dist.init_process_group("nccl", rank=rank, world_size=world_size)


def cleanup():
    dist.destroy_process_group()


class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def train(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running basic DDP example on rank {rank}.")


if __name__ == "__main__":
    world_size = torch.cuda.device_count()
    mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)

 

사용 예시: 멀티 노드(Multi-node)로 torchrun을 이용하는 방법

  • torchrun을 사용하면 rank와 world_size을 환경변수에서 자동으로 가져오기 때문에 os.environ에서 가져옴. 따라서, 프로세스그룹을 초기화할 때, 초기화하는 함수에서 별도로 인자를 안받아도 됨.
  • 그 외는 SingleNode DDP와 동일함.
  • 실행시에는 torchrun을 이용하여 아래와 같이 실행
    • 노드1 서버(마스터)에서 다음의 커멘드 실행: torchrun --nnodes=2 --nproc_per_node=2 --node_rank=0 --master_addr="xxx.xxx.xxx.xxx" --master_port=29500 ddp_base.py
    • 노드2 서버에서 다음의 커멘드를 실행: torchrun --nnodes=2 --nproc_per_node=2 --node_rank=1 --master_addr= "xxx.xxx.xxx.xxx" --master_port=29500 ddp_base.py
      • --node_rank=1을 부여
      • --nnodes=2이기 때문에, 2개의 노드에서 torchrun명령어가 띄워질때까지 기다리게됨.
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP


def setup():
    # torchrun으로 실행하기에 환경변수에서 가져와야함
    rank = int(os.environ["RANK"])
    world_size = int(os.environ["WORLD_SIZE"])
    local_rank = int(os.environ["LOCAL_RANK"])

    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    return rank, world_size, local_rank


def cleanup():
    dist.destroy_process_group()


class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def train():

    rank, world_size, local_rank = setup()
    print(f"Running DDP example on rank {rank} (local_rank {local_rank}).")

    # 모델을 `local_rank`에 해당하는 GPU로 이동
    model = ToyModel().to(local_rank)
    ddp_model = DDP(model, device_ids=[local_rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10).to(local_rank))  # GPU로 이동
    labels = torch.randn(20, 5).to(local_rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()
    print(f"Finished running DDP example on rank {rank}.")


if __name__ == "__main__":
    train()

 

주의사항

처리속도의 편차가 존재하여 타임아웃 될 가능성이 있음

  • 모든 프로세스가 DDP(model)을 호출한 상태여야하며, forward와 backward도 동시에 같은 시점에 실행되어야 함.
  • 그렇지 않은 경우, 먼저 실행이 완료된 프로세스는 기다려야함.
  • 기다리는 정도가 심한 경우에 타임 아웃(Timeout)되어 비정상적으로 종료될 수 있음. 따라서 워크로드가 균형있게 관리되어야함.
  • 또는 타임아웃시간을 더 큰 시간을 할당.
dist.init_process_group("nccl", timeout=torch.timedelta(seconds=600), rank=rank, world_size=world_size)

 

체크포인트(파라미터) 저장하기

  • 일반적인 싱글머신에서 모델을 저장하는경우 torch.save, torch.load을 이용함.
  • 반면, DDP을 이용하는 경우는 한 프로세스만 저장 한 후에 모든 프로세스에 저장된 모델을 불러오게 되어있음 (모든 프로세스에서 동일한 모델을 가지고 있고, 그레디언트도 동일하기에, 저장할 모델도 동일함. 모델을 여러번 저장하는게 비효율적이기 때문)
    • 따라서, rank=0에서만 저장하도록 함
  • 또한, 위를 구현할 때, 저장이 완료전에 프로세스가 시작되지 않도록 해야함.
    • 따라서, dist.barrer()라는 함수를 이용해서 다른 프로세스의 시작을 기다림.
  • 또한, map_location인자를 올바르게 할당하여, 서로 같은 자원(rank)에 모델이 할당되지 않도록 해야함. map_location을 사용하지 않는 경우는 torch.load로 CPU에 로드되고, 원래 저장되었던 장치로 올라가려고 하기 때문에, 프로세스그룹에 있는 모든 프로세스가 동일한 GPU에 모델을 올리려고 할 수 있음.

저장하기 예시코드

import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP

def setup():
    """ Initialize process group for DDP """
    rank = int(os.environ["RANK"])  # Global Rank
    world_size = int(os.environ["WORLD_SIZE"])  # Total number of processes
    local_rank = int(os.environ["LOCAL_RANK"])  # Local GPU ID for this process

    os.environ["MASTER_ADDR"] = os.environ.get("MASTER_ADDR", "localhost")
    os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "12355")

    # DDP 초기화
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    return rank, world_size, local_rank

def cleanup():
    """ Destroy process group """
    dist.destroy_process_group()

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def train():
    rank, world_size, local_rank = setup()
    print(f"Running DDP training on rank {rank} (local_rank {local_rank})")

    # 모델 생성 및 DDP 적용
    model = ToyModel().to(local_rank)
    ddp_model = DDP(model, device_ids=[local_rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    if rank == 0:
        torch.save(ddp_model.state_dict(), "checkpoint.pth")
        print("Model saved at checkpoint.pth")

    dist.barrier()  # Rank 0이 저장을 끝낼 때까지 다른 프로세스는 대기
    map_location = f"cuda:{local_rank}"
    state_dict = torch.load("checkpoint.pth", map_location=map_location)
    ddp_model.load_state_dict(state_dict)
    print(f"Rank {rank}: Model loaded successfully!")

    cleanup()

if __name__ == "__main__":
    train()

 

FAQ

Q1. torchrun을 하려면 각 서버에서 torchrun을 실행해야하는데, 개발환경이 모두 동일해야하나?

=> Yes. 모든 노드가 동일한 환경을 가져야 함. python, torch가 동일해야함. 

Q2. 각 노드에 접속해서 torchrun 명령어를 실행시키는게 번거로울 것 같은데, 실무에서는 어떻게 진행하나?

  1. sh파일을 사용해서 각 노드에 접속하여 torchrun을 작성하는 방법
  2. torchrun을 각 노드마다 실행시켜주는 fabric 이라는 python 라이브러리를 사용
  3. Ray을 이용
  4. SLURM (sbatch)을 이용

 

Load map

노드 수, GPU, 스케쥴러 지원에 따라 데이터병렬화 방법

노드 수 GPU 수 데이터페러렐 방법 스케쥴러지원
Single-machine single GPU DistributedDataParallel NA
Single-machine multiple GPU DistributedDataParallel NA
Multi-machine multiple GPU torchrun   + rendezvous  X
Multi-machine multiple GPU SLURM 이용 SLURM

 

Trouble shooting 1): The IPv6 network addresses of (서버명, 55039) cannot be retrieved (gai error: -3 - Temporary failure in name resolution

- 도메인을 찾지 못하는 문제입니다. `DNS`쪽 문제일 수 있습니다. DNS 세팅을 다시하거나 서버명에 해당하는 내용을 `/etc/hosts`에 추가합니다.

저의 경우 아래와 같이, `etc/hosts`을 추가해서 해결했습니다.

heon@gpusvr04:~/repositories/misc$ cat /etc/hosts
127.0.0.1 localhost
127.0.1.1 gpusvr04
***.***.***.*** gpusvr03

 

반응형