CNN Ensemble 모델 학습

출처 : https://github.com/deeplearningzerotoall/TensorFlow (모두의 딥러닝)

In [0]:
# 런타임 -> 런타임 유형변경 -> 하드웨어 가속도 TPU변경
%tensorflow_version 2.x
#런타임 -> 런타임 다시시작
TensorFlow 2.x selected.

Importing Libraries

In [0]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical # ont-hot encoding
import numpy as np
import matplotlib.pyplot as plt
import os

print(tf.__version__)     # 텐서플로우 버전확인 (colab의 기본버전은 1.15.0) --> 2.0 변경 "%tensorflow_version 2.x"
print(keras.__version__)  # 케라스 버전확인
2.1.0
2.2.4-tf

Hyper Parameters

In [0]:
learning_rate = 0.001
training_epochs = 15
batch_size = 100

Creating Checkpoint Directory

In [0]:
cur_dir = os.getcwd()
ckpt_dir_name = 'checkpoints'
model_dir_name = 'minst_cnn_emsemble'

checkpoint_dir = os.path.join(cur_dir, ckpt_dir_name, model_dir_name)
os.makedirs(checkpoint_dir, exist_ok=True)

checkpoint_prefix = os.path.join(checkpoint_dir, model_dir_name)

MNIST Data

In [0]:
## MNIST Dataset #########################################################
mnist = keras.datasets.mnist
class_names = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
##########################################################################

Datasets

In [0]:
# MNIST image load (trian, test)
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()    

# 0~255 중 하나로 표현되는 입력 이미지들의 값을 1 이하가 되도록 정규화    
train_images = train_images.astype(np.float32) / 255.
test_images = test_images.astype(np.float32) / 255.

# np.expand_dims 차원을 변경
train_images = np.expand_dims(train_images, axis=-1)
test_images = np.expand_dims(test_images, axis=-1)

# 라벨값 one-hot encoding으로 변경    
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)    

# dataset 인스턴스 만들기(tf.data를 사용하여 데이터셋을 섞고 배치를 만듭니다)
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(
                buffer_size=100000).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(batch_size)
# from_tensor_slices : 이미지를 이미지와 라벨로 나누기
# batch : 해당 배치 사이즈 만큼 나누기
# shuffle : 고정된 buffer_size만큼 epoch 마다 이미지를 섞어서 오버피팅이 줄도록 도와줌
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step

Model Class

In [0]:
# model class 구현
class MNISTModel(tf.keras.Model): # keras.model 구현
    def __init__(self):  # 기본이 되는 층을 구현
        # call the parent constructor(class의 tf.keras.Model) 
        super(MNISTModel, self).__init__() 
        # initialize the layers
        self.conv1 = keras.layers.Conv2D(filters=32, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu)
        self.pool1 = keras.layers.MaxPool2D(padding='SAME')
        self.conv2 = keras.layers.Conv2D(filters=64, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu)
        self.pool2 = keras.layers.MaxPool2D(padding='SAME')
        self.conv3 = keras.layers.Conv2D(filters=128, kernel_size=[3, 3], padding='SAME', activation=tf.nn.relu)
        self.pool3 = keras.layers.MaxPool2D(padding='SAME')
        self.pool3_flat = keras.layers.Flatten()
        self.dense4 = keras.layers.Dense(units=256, activation=tf.nn.relu)
        self.drop4 = keras.layers.Dropout(rate=0.4)
        self.dense5 = keras.layers.Dense(units=10)
        
     # init에서 만든 층을 불러와서 network 구성 (연산부분을 담당)   
    def call(self, inputs, training=False):  # training : training과 test시에 다르게 동작할 때, true면 둘이 동일하게 사용됨
        net = self.conv1(inputs)
        net = self.pool1(net)
        net = self.conv2(net)
        net = self.pool2(net)
        net = self.conv3(net)
        net = self.pool3(net)
        net = self.pool3_flat(net)
        net = self.dense4(net)
        net = self.drop4(net)
        net = self.dense5(net)
        return net
In [0]:
models = []    # modles라는 객체 생성
num_models = 3 
for m in range(num_models): # models에 앞선 MNISTModel을 3번 겹쳐서 네트워크를 구성함
    models.append(MNISTModel()) # append : ( )안의 내용을 순차적으로 저장함

Loss Function

In [0]:
def loss_fn(model, images, labels):
    logits = model(images, training=True)
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits( #softmax값 함께 계산
            logits=logits, labels=labels))    
    return loss   

Calculating Gradient

In [0]:
def grad(model, images, labels):
    with tf.GradientTape() as tape: #  자동 미분이 가능하고 실행된 모든 연산을 테이프에 기록함
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.variables)

Caculating Model's Accuracy

In [0]:
def evaluate(models, images, labels):
    predictions = np.zeros_like(labels)
    for model in models:
        logits = model(images, training=False)
        predictions += logits
    correct_prediction = tf.equal(tf.argmax(predictions, 1), tf.argmax(labels, 1)) # 라벨값들을 비교
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))             # 동일한 라벨값들에 대한 평균을 구함
    return accuracy

Optimizer

In [0]:
optimizer = tf.optimizers.Adam(learning_rate=learning_rate)

Creating Checkpoints

In [0]:
checkpoints = []
for m in range(num_models):
    checkpoints.append(tf.train.Checkpoint(cnn=models[m])) # 데이터 그룹화하여 저장하고 추후 복원에 사용됨 

Training

In [0]:
# train my model
print('Learning started. It takes sometime.')
for epoch in range(training_epochs): # 하이퍼 파마리터로 설정 (training_epochs = 15)

    # 값 초기화
    avg_loss = 0.
    avg_train_acc = 0.
    avg_test_acc = 0.
    train_step = 0
    test_step = 0    
    
    # Train
    for images, labels in train_dataset:
        for model in models: # 이전과 달리, model마다 계산이 되어져야 하기에 loop가 하나 더 추가됨
            grads = grad(model, images, labels)                       # GradientTape에 계산된 데이터 저장 
            optimizer.apply_gradients(zip(grads, model.variables))    # optimizer 실행
            loss = loss_fn(model, images, labels)                     # 해당 iteration의 loss 계산
            avg_loss += loss / num_models                             # 총 loss 합산
        acc = evaluate(models, images, labels)                 # 해당 iteration의 accuracy 계산
        avg_train_acc += acc                                   # 총 accuracy 계산
        train_step += 1                                        # 한 iteration의 실행마다 step 갯수 1씩 늘어남 
    avg_loss = avg_loss / train_step                     # loss 값 계산
    avg_train_acc = avg_train_acc / train_step           # accuracy 값 계산
    
    # Test
    for images, labels in test_dataset:        
        acc = evaluate(models, images, labels)                  # 해당 iteration의 accuracy 계산
        avg_test_acc = avg_test_acc + acc                      # 총 accuracy 계산
        test_step += 1                                         # 한 iteration의 실행마다 step 갯수 1씩 늘어남
    avg_test_acc = avg_test_acc / test_step                    # accuracy 값 계산   

    # epoch 별 loss, accuracy값 출력하기
    print('Epoch:', '{}'.format(epoch + 1), 'loss =', '{:.8f}'.format(avg_loss), 
          'train accuracy = ', '{:.2%}'.format(avg_train_acc), 
          'test accuracy = ', '{:.2%}'.format(avg_test_acc))
    
    # 해당 모델의 값들 저장
    for idx, checkpoint in enumerate(checkpoints):
        checkpoint.save(file_prefix=checkpoint_prefix+'-{}'.format(idx))

print('Learning Finished!')
Learning started. It takes sometime.
Epoch: 1 loss = 0.16140813 train accuracy =  96.34% test accuracy =  98.86%
Epoch: 2 loss = 0.04124818 train accuracy =  99.24% test accuracy =  99.17%
Epoch: 3 loss = 0.02788240 train accuracy =  99.54% test accuracy =  99.22%
Epoch: 4 loss = 0.01977942 train accuracy =  99.72% test accuracy =  99.34%
Epoch: 5 loss = 0.01528851 train accuracy =  99.81% test accuracy =  99.31%
Epoch: 6 loss = 0.01293646 train accuracy =  99.86% test accuracy =  99.39%
In [0]: