수민 '-'

플오그래밍

제가 작성하는 모든 글은 절대 상업적인 이용이 아니며, 그저 개인적인 공부 용도로만 사용하는 것임을 밝힙니다.

Multi-class Weather 이미지 분류 실습 - ImageFolder, DataLoader, 완전연결 신경망

날씨 사진으로 다중 클래스 분류하기

Multi-class Weather Dataset은 맑음, 비, 흐림, 일출 등 다양한 날씨 이미지를 모아 둔 다중 클래스 분류용 데이터셋이다. 이 글에서는 Kaggle에서 데이터를 내려받아 폴더 구조를 정리하고, PyTorch의 ImageFolder · DataLoader · transforms를 이용해 날씨 이미지를 분류하는 완전연결 신경망(FCN) 을 학습시켜 본다. 단순 선형 모델부터 은닉층과 Dropout이 포함된 모델까지 세 가지 구조를 비교해 보며, 학습·검증·테스트 루프와 학습률 스케줄링 구현 방법도 함께 정리한다.

(Multi-class Weather Dataset 원문 정리를 기반으로 재구성)


1. Multi-class Weather Dataset 준비

1-1. Kaggle에서 데이터 내려받기 (Colab 기준)

Colab에서 Kaggle API를 사용해 데이터셋을 내려받는다. 먼저 kaggle.json을 업로드한다.

from google.colab import files

files.upload()  # kaggle.json 업로드
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Kaggle에서 날씨 데이터셋을 다운로드한다.

!kaggle datasets download pratik2901/multiclass-weather-dataset

1-2. 압축 해제 및 train/test 폴더 구성

데이터를 압축 해제하고, Cloudy, Rain, Shine, Sunrise 별로 8:2 비율로 train/test 폴더에 나눈다.

import os
import zipfile
import random
from shutil import copyfile, rmtree

zip_file = '/content/multiclass-weather-dataset.zip'
base_dir = './Multi-class Weather Dataset'
train_dir = './train'
test_dir = './test'

with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall('.')  # 현재 디렉터리에 압축 해제

categories = ['Cloudy', 'Rain', 'Shine', 'Sunrise']

if os.path.exists(train_dir):
    rmtree(train_dir)
if os.path.exists(test_dir):
    rmtree(test_dir)

os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

for category in categories:
    os.makedirs(os.path.join(train_dir, category), exist_ok=True)
    os.makedirs(os.path.join(test_dir, category), exist_ok=True)

for category in categories:
    category_path = os.path.join(base_dir, category)
    files = os.listdir(category_path)
    random.shuffle(files)

    split_idx = int(len(files) * 0.8)
    train_files = files[:split_idx]
    test_files = files[split_idx:]

    for file in train_files:
        src = os.path.join(category_path, file)
        dst = os.path.join(train_dir, category, file)
        copyfile(src, dst)

    for file in test_files:
        src = os.path.join(category_path, file)
        dst = os.path.join(test_dir, category, file)
        copyfile(src, dst)

print('데이터 분리가 완료되었습니다.')

2. 데이터 전처리와 ImageFolder

import torch
import time
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torchvision.utils import make_grid
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import random_split, DataLoader

import matplotlib.pyplot as plt
import numpy as np

2-1. transforms.ToTensor / Normalize

transforms.ToTensor()

  • 이미지를 PyTorch 텐서로 변환한다.
  • 픽셀 값을 [0, 255] → [0.0, 1.0] 범위로 스케일링한다.
  • 이미지 형식을 (H, W, C)(C, H, W) 로 바꾼다.

transforms.Normalize(mean, std)

  • 텐서 픽셀 값을 정규화해 학습을 안정화한다.
  • mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5] 를 쓰면 [0, 1] 범위가 대략 [-1, 1]로 변환된다.

2-2. 학습/테스트용 변환 정의

transform_train = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),  # 50% 확률로 좌우 반전
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.5, 0.5, 0.5],
        std=[0.5, 0.5, 0.5]
    )
])

transform_test = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.5, 0.5, 0.5],
        std=[0.5, 0.5, 0.5]
    )
])

2-3. ImageFolder와 train/val/test 분할

※ ImageFolder

datasets.ImageFolder는 디렉터리 구조를 보고 폴더 이름을 레이블로 삼아 이미지를 불러온다.

  • root/train/Cloudy/*.jpg → 레이블 0
  • root/train/Rain/*.jpg → 레이블 1
    • … 이런 식으로 자동 매핑
train_dataset = datasets.ImageFolder(
    root='train/',
    transform=transform_train
)

dataset_size = len(train_dataset)
train_size = int(dataset_size * 0.8)
val_size = dataset_size - train_size

train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])
print(len(train_dataset), len(val_dataset))

test_dataset = datasets.ImageFolder(
    root='test/',
    transform=transform_test
)

DataLoader로 배치 단위 데이터를 만든다.

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

2-4. 이미지 시각화

plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['figure.dpi'] = 60
plt.rcParams.update({'font.size': 20})

def imshow(input):
    input = input.numpy().transpose((1, 2, 0))  # C,H,W → H,W,C
    mean = np.array([0.5, 0.5, 0.5])
    std = np.array([0.5, 0.5, 0.5])
    input = std * input + mean      # 역정규화
    input = np.clip(input, 0, 1)
    plt.imshow(input)
    plt.show()

class_names = {
    0: "Cloudy", 1: "Rain", 2: "Shine", 3: "Sunrise"
}

iterator = iter(train_dataloader)
imgs, labels = next(iterator)
out = make_grid(imgs[:4])           # 4장의 이미지를 그리드로 합침
imshow(out)

[class_names[labels[i].item()] for i in range(4)]

3. 다양한 완전연결 신경망(FCN) 모델 비교

이제 256×256 RGB 이미지를 완전연결 신경망으로 분류하는 세 가지 모델을 만든다.

3-1. nn.Module 상속의 의미

nn.Module 상속의 장점

  1. 레이어·파라미터 관리: 하위 모듈과 파라미터를 자동으로 추적.
  2. 순전파 정의: forward()에서 한 번에 연산 정의.
  3. 계층적 설계: 서브모듈 조합으로 복잡한 모델 설계.
  4. 유틸리티 제공: state_dict() 저장/로드, .train()/.eval() 등.
  5. PyTorch와 호환성: Optimizer, DataLoader 등과 자연스럽게 연동.

3-2. Model1 – 단일 선형 계층

활성화 함수 없이 한 번에 4개 클래스로 매핑하는 가장 단순한 모델이다.

# (256 * 256 * 3 + 1) * 4 ≈ 786,436 파라미터
class Model1(nn.Module):
    def __init__(self):
        super(Model1, self).__init__()
        self.linear1 = nn.Linear(256 * 256 * 3, 4)
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.flatten(x)
        x = self.linear1(x)
        return x

3-3. 학습/검증/테스트 루프와 학습률 스케줄

def train():
    start_time = time.time()
    print(f'[Epoch: {epoch + 1} - Training]')
    model.train()
    total = 0
    running_loss = 0.0
    running_corrects = 0

    for i, batch in enumerate(train_dataloader):
        imgs, labels = batch
        imgs, labels = imgs.cuda(), labels.cuda()

        outputs = model(imgs)
        optimizer.zero_grad()
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total += labels.shape[0]
        running_loss += loss.item()
        running_corrects += torch.sum(preds == labels.data)

        if i % log_step == 0:
            print(f'[Batch: {i + 1}] running train loss: {running_loss / total}, '
                  f'running train accuracy: {running_corrects / total}')

    print(f'train loss: {running_loss / total}, accuracy: {running_corrects / total}')
    print('elapsed time:', time.time() - start_time)
    return running_loss / total, (running_corrects / total).item()
def validate():
    start_time = time.time()
    print(f'[Epoch: {epoch + 1} - Validation]')
    model.eval()
    total = 0
    running_loss = 0.0
    running_corrects = 0

    for i, batch in enumerate(val_dataloader):
        imgs, labels = batch
        imgs, labels = imgs.cuda(), labels.cuda()

        with torch.no_grad():
            outputs = model(imgs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

        total += labels.shape[0]
        running_loss += loss.item()
        running_corrects += torch.sum(preds == labels.data)

        if i % log_step == 0:
            print(f'[Batch: {i + 1}] running val loss: {running_loss / total}, '
                  f'running val accuracy: {running_corrects / total}')

    print(f'val loss: {running_loss / total}, accuracy: {running_corrects / total}')
    print('elapsed time:', time.time() - start_time)
    return running_loss / total, (running_corrects / total).item()
def test():
    start_time = time.time()
    print('[Test]')
    model.eval()
    total = 0
    running_loss = 0.0
    running_corrects = 0

    for i, batch in enumerate(test_dataloader):
        imgs, labels = batch
        imgs, labels = imgs.cuda(), labels.cuda()

        with torch.no_grad():
            outputs = model(imgs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

        total += labels.shape[0]
        running_loss += loss.item()
        running_corrects += torch.sum(preds == labels.data)

        if i % log_step == 0:
            print(f'[Batch: {i + 1}] running test loss: {running_loss / total}, '
                  f'running test accuracy: {running_corrects / total}')

    print(f'test loss: {running_loss / total}, accuracy: {running_corrects / total}')
    print('elapsed time:', time.time() - start_time)
    return running_loss / total, (running_corrects / total).item()
def adjust_learning_rate(optimizer, epoch):
    lr = learning_rate
    if epoch >= 3:
        lr /= 10
    if epoch >= 7:
        lr /= 10
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

3-4. Model1 학습

learning_rate = 0.01
log_step = 8

model = Model1().cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

num_epochs = 20
best_val_acc = 0
best_epoch = 0
history = []
accuracy = []

os.makedirs('weights/Model1', exist_ok=True)

for epoch in range(num_epochs):
    adjust_learning_rate(optimizer, epoch)
    train_loss, train_acc = train()
    val_loss, val_acc = validate()
    history.append((train_loss, val_loss))
    accuracy.append((train_acc, val_acc))

    if val_acc > best_val_acc:
        print('[Info] best validation accuracy!')
        best_val_acc = val_acc
        best_epoch = epoch
        torch.save(model.state_dict(), f'weights/Model1/best_checkpoint_epoch_{epoch + 1}.pth')

torch.save(model.state_dict(), f'weights/Model1/last_checkpoint_epoch_{num_epochs}.pth')

plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--', label='validation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

test_loss, test_accuracy = test()
print(f'Test loss: {test_loss:.8f}')
print(f'Test accuracy: {test_accuracy * 100.:.2f}%')

4. 더 깊은 모델: Model2, Model3

4-1. Model2 – 은닉층 1개

입력을 64차원으로 압축한 뒤 4개 클래스 출력으로 변환한다. 활성화 함수는 없어서 여전히 전체적으로는 선형 변환이지만, 파라미터 수가 늘어난다.

# 파라미터 수: (256*256*3 + 1) * 64 + (64 + 1) * 4 ≈ 12,583,236
class Model2(nn.Module):
    def __init__(self):
        super(Model2, self).__init__()
        self.linear1 = nn.Linear(256 * 256 * 3, 64)
        self.linear2 = nn.Linear(64, 4)
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.flatten(x)
        x = self.linear1(x)
        x = self.linear2(x)
        return x
os.makedirs('weights/Model2', exist_ok=True)

learning_rate = 0.01
log_step = 8

model = Model2().cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

num_epochs = 20
best_val_acc = 0
best_epoch = 0
history = []
accuracy = []

for epoch in range(num_epochs):
    adjust_learning_rate(optimizer, epoch)
    train_loss, train_acc = train()
    val_loss, val_acc = validate()
    history.append((train_loss, val_loss))
    accuracy.append((train_acc, val_acc))

    if val_acc > best_val_acc:
        print('[Info] best validation accuracy!')
        best_val_acc = val_acc
        best_epoch = epoch
        torch.save(model.state_dict(), f'weights/Model2/best_checkpoint_epoch_{epoch + 1}.pth')

torch.save(model.state_dict(), f'weights/Model2/last_checkpoint_epoch_{num_epochs}.pth')

plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--', label='validation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

test_loss, test_accuracy = test()
print(f'Test loss: {test_loss:.8f}')
print(f'Test accuracy: {test_accuracy * 100.:.2f}%')

4-2. Model3 – 은닉층 3개 + ReLU + Dropout

※ Dropout

nn.Dropout()은 학습 중 일부 뉴런을 무작위로 비활성화해 특정 뉴런에 과도하게 의존하는 것을 막고, 과적합을 완화한다.

class Model3(nn.Module):
    def __init__(self):
        super(Model3, self).__init__()
        # (196,608 + 1) * 128 ≈ 25,165,952
        self.linear1 = nn.Linear(256 * 256 * 3, 128)
        self.dropout1 = nn.Dropout(0.5)
        # (128 + 1) * 64 = 8,256
        self.linear2 = nn.Linear(128, 64)
        self.dropout2 = nn.Dropout(0.5)
        # (64 + 1) * 32 = 2,080
        self.linear3 = nn.Linear(64, 32)
        self.dropout3 = nn.Dropout(0.5)
        # (32 + 1) * 4 = 132
        self.linear4 = nn.Linear(32, 4)
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.flatten(x)
        x = F.relu(self.linear1(x))
        x = self.dropout1(x)
        x = F.relu(self.linear2(x))
        x = self.dropout2(x)
        x = F.relu(self.linear3(x))
        x = self.dropout3(x)
        x = self.linear4(x)
        return x
os.makedirs('weights/Model3', exist_ok=True)

learning_rate = 0.01
log_step = 8

model = Model3().cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

num_epochs = 20
best_val_acc = 0
best_epoch = 0
history = []
accuracy = []

for epoch in range(num_epochs):
    adjust_learning_rate(optimizer, epoch)
    train_loss, train_acc = train()
    val_loss, val_acc = validate()
    history.append((train_loss, val_loss))
    accuracy.append((train_acc, val_acc))

    if val_acc > best_val_acc:
        print('[Info] best validation accuracy!')
        best_val_acc = val_acc
        best_epoch = epoch
        torch.save(model.state_dict(), f'weights/Model3/best_checkpoint_epoch_{epoch + 1}.pth')

torch.save(model.state_dict(), f'weights/Model3/last_checkpoint_epoch_{num_epochs}.pth')

plt.plot([x[0] for x in accuracy], 'b', label='train')
plt.plot([x[1] for x in accuracy], 'r--', label='validation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

test_loss, test_accuracy = test()
print(f'Test loss: {test_loss:.8f}')
print(f'Test accuracy: {test_accuracy * 100.:.2f}%')

마치며

  • Multi-class Weather Dataset으로 날씨 이미지를 4개 클래스로 분류하면서, Kaggle 다운로드부터 폴더 구성까지 전체 흐름을 정리했다.
  • PyTorch의 ImageFolder · DataLoader · transforms를 활용해 이미지 로딩·증강·정규화를 처리하고, 완전연결 신경망 Model1·Model2·Model3 의 구조 차이와 성능을 비교했다.
  • 모델 깊이, 파라미터 수, Dropout·ReLU 사용 여부에 따라 표현력과 일반화 성능이 어떻게 달라지는지 실험해 볼 수 있다.

실무에서는 CNN·전이학습(ResNet, EfficientNet 등) 모델을 사용하면 더 높은 성능을 얻을 수 있으므로, 이번 실습 코드를 기반으로 컨볼루션 기반 모델로 확장해 보는 것도 좋다.