Convolution AutoEncoder

tensorflow 2.x version_sequential model_GradientTape

1. 필요한 패키지 import and tensorflow 2.x 버전 설정

In [5]:
%tensorflow_version 2.x

import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import os 

import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, UpSampling2D, Input, Convolution2D, Reshape
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model

import scipy.io
from sklearn.model_selection import train_test_split

print(tf.__version__)
2.1.0

2. 컴퓨터의 파일을 colab으로 upload

In [2]:
from google.colab import files 

uploaded = files.upload()

# 코드가 돌아가면 파일선택 버튼을 눌러서 upload할 파일 선택
for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn]))) 
Upload widget is only available when the cell has been executed in the current browser session. Please rerun this cell to enable.
Saving data_3000.zip to data_3000.zip
User uploaded file "data_3000.zip" with length 3309996 bytes

3. colab 드라이브 연결 후, 파일 zip 풀기

In [ ]:
from google.colab import drive
# google drive와 연결 (인증코드 입력해야함)
drive.mount('/gdrive')

# data_3000 폴더 만듦. zip 파일 압출 풀기
! mkdir data_3000
! unzip data_3000.zip -d ./data_3000

4. 데이터 전처리

In [4]:
# 압축해제된 데이터 경로 (데이터 있는 파일 경로 입력)
src = './data_3000/'

# 이미지 읽기 및 출력
def img_read_plot(src,file):
    img = cv.imread(src+file,cv.COLOR_BGR2GRAY)
    plt.imshow(img)
    plt.xticks([]) # x축 눈금
    plt.yticks([]) # y축 눈금
    plt.show()
    return img

#이미지 읽기
def img_read(src,file):
    img = cv.imread(src+file,cv.COLOR_BGR2GRAY)
    return img

#src 경로에 있는 파일 명을 저장 
files = os.listdir(src)

X,Y = [],[]
count = 0

# 경로와 파일명을 입력으로 넣어 확인하고 
# 데이터를 255로 나눠서 0~1사이로 정규화 하여 X 리스트 추가

for file in files:
  # 데이터의 일부분만 확인해봅니다.
  if count < 10 : 
    print(count)
    X.append(img_read_plot(src,file)/255.)
    Y.append(float(file[:-4]))
    count+=1
  else : 
    X.append(img_read(src,file)/255.)
    Y.append(float(file[:-4]))

# 전체 X, Y data shape 출력
print('X_shape:',np.shape(X[0]),'Y_shape:',np.shape(Y[0]))
print('X_list shape:',np.shape(X),'Y_list shape:',np.shape(Y))

# hyper parameter 설정
img_size = 56       # 이미지 사이즈
latent_dim = 32     # latent dimension 크기 설정
BUFFER_SIZE = 5600  # 총 이미지 갯수
BATCH_SIZE = 20     # 배치 사이즈(나눴을때 이미지 갯수에 딱 떨어지게 설정해야 함)

# Train set(80%), Test set(20%)으로 나누기 
X_train, X_test, Y_train, Y_test = train_test_split(X,Y, test_size=0.2, random_state=1,shuffle=True)

# CNN layer에 들어갈 수 있게 (x, 56, 56, 1) 차원으로 맞춰줌 
X_train = np.reshape(X_train, (len(X_train), img_size, img_size, 1))
X_test = np.reshape(X_test, (len(X_test), img_size, img_size, 1))
print(np.shape(X_train),np.shape(X_test))

# 데이터 배치를 만들고 섞어줌
train_ds = tf.data.Dataset.from_tensor_slices((X_train)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_ds = tf.data.Dataset.from_tensor_slices((X_test)).batch(BATCH_SIZE)
0
1
2
3
4
5
6
7
8
9
X_shape: (56, 56) Y_shape: ()
X_list shape: (3000, 56, 56) Y_list shape: (3000,)
(2400, 56, 56, 1) (600, 56, 56, 1)

5. 모델 구성

In [ ]:
# 인코더
def encoder():

  model = tf.keras.Sequential()
  model.add(Conv2D(5, (3, 3), activation='relu', padding='same', input_shape=(img_size, img_size, 1)))
  model.add(MaxPooling2D((2, 2), padding='same'))
  model.add(Conv2D(10, (3, 3), activation='relu', padding='same'))
  model.add(MaxPooling2D((2, 2), padding='same'))
  model.add(Conv2D(15, (3, 3), activation='relu', padding='same'))
  model.add(MaxPooling2D((2, 2), padding='same'))
  model.add(Conv2D(20, (3, 3), activation='relu', padding='same'))
  model.add(MaxPooling2D((2, 2), padding='same'))
  model.add(Conv2D(25, (3, 3), activation='relu', padding='same'))
  model.add(Flatten())
  # latent dimension으로 축소됨
  model.add(Dense(latent_dim)) 

  return model
In [8]:
# 인코더 함수를 실행해서 모델을 build 후 변수에 집어넣고, 모델 구조를 살펴봄
e_model = encoder()
e_model.summary()
plot_model(e_model, show_shapes=True)
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 56, 56, 5)         50        
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 28, 28, 5)         0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 28, 28, 10)        460       
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 14, 14, 10)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 14, 14, 15)        1365      
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 7, 7, 15)          0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 7, 7, 20)          2720      
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 4, 4, 20)          0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 4, 4, 25)          4525      
_________________________________________________________________
flatten (Flatten)            (None, 400)               0         
_________________________________________________________________
dense (Dense)                (None, 32)                12832     
=================================================================
Total params: 21,952
Trainable params: 21,952
Non-trainable params: 0
_________________________________________________________________
Out[8]:
In [ ]:
# 디코더
def decoder():

  model = tf.keras.Sequential()
  model.add(Dense(7 * 7 * 56, input_shape=(latent_dim,)))
  model.add(Reshape((7, 7, 56)))
  model.add(Conv2D(56, (2, 2), activation='relu', padding='same'))
  model.add(UpSampling2D((2, 2)))
  model.add(Conv2D(56, (2, 2), activation='relu', padding='same'))
  model.add(UpSampling2D((2, 2)))
  model.add(Conv2D(56, (2, 2), activation='relu', padding='same'))
  model.add(UpSampling2D((2, 2)))
  model.add(Dropout(0.5))
  model.add(Conv2D(1, (2, 2), padding='same'))

  return model
In [15]:
# 디코더 함수를 실행해서 모델을 build 후 변수에 집어넣고, 모델 구조를 살펴봄
d_model = decoder()
d_model.summary()
plot_model(d_model, show_shapes=True)
Model: "sequential_4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
dense_4 (Dense)              (None, 2744)              90552     
_________________________________________________________________
reshape_1 (Reshape)          (None, 7, 7, 56)          0         
_________________________________________________________________
conv2d_14 (Conv2D)           (None, 7, 7, 56)          12600     
_________________________________________________________________
up_sampling2d_3 (UpSampling2 (None, 14, 14, 56)        0         
_________________________________________________________________
conv2d_15 (Conv2D)           (None, 14, 14, 56)        12600     
_________________________________________________________________
up_sampling2d_4 (UpSampling2 (None, 28, 28, 56)        0         
_________________________________________________________________
conv2d_16 (Conv2D)           (None, 28, 28, 56)        12600     
_________________________________________________________________
up_sampling2d_5 (UpSampling2 (None, 56, 56, 56)        0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 56, 56, 56)        0         
_________________________________________________________________
conv2d_17 (Conv2D)           (None, 56, 56, 1)         225       
=================================================================
Total params: 128,577
Trainable params: 128,577
Non-trainable params: 0
_________________________________________________________________
Out[15]:
In [ ]:
# 모델을 합쳐줌 
input_img = Input(shape=(img_size, img_size, 1))
model = Model(input_img, d_model(e_model(input_img)), name='autoencoder')

6. loss 함수, optimizer, accuracy 설정

In [ ]:
# loss 함수와 optimizer 설정
loss_object = tf.keras.losses.MeanSquaredError() 
optimizer = tf.keras.optimizers.Adam()           

# loss와 MAE계산을 위해 설정
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_mae = tf.keras.metrics.MeanAbsoluteError(name='train_mae')
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_mae = tf.keras.metrics.MeanAbsoluteError(name='test_mae')

7. train

In [ ]:
# `tf.function` 이 데코레이터는 함수를 "컴파일" 한다.
@tf.function
def train_step(images):
  with tf.GradientTape() as tape:
    predictions = model(images)                 # AE 모델에 image 학습
    loss = loss_object(images, predictions)     # 실제 이미지와 복원한 이미지의 차이를 계산해 loss값 구함

  # tape에 기록하며 미분이 실행됨
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  # loss와 mae의 값을 계산. 마찬가지로 원본이미지와 복원한 이미지를 갖고 계산
  train_loss(loss)
  train_mae(images, predictions)

# test image를 갖고 test를 위한 과정. 학습되지 않는다. 
@tf.function
def test_step(images):
  predictions = model(images)                   # AE 모델에 image 학습
  t_loss = loss_object(images, predictions)     # loss값 구함
  
  test_loss(t_loss)
  test_mae(images, predictions)
In [ ]:
# train 함수 정의
def train(train_ds, test_ds, epochs):
  for epoch in range(epochs):      # 매 epoch 마다 학습이 진행된다
    for images in train_ds:
      train_step(images)           # train 이미지 학습

    for test_images in test_ds:
      test_step(test_images)       # test 이미지로 테스트 (validation으로 사용됨)

    # print 보여주기 설정(10번 epoch마다)
    if epoch % 10 == 0 or epoch == epochs-1 :
      template = 'epoch: {}, loss: {}, mae: {}, val_loss: {}, val_mae: {}'
      print (template.format(epoch,       
                            train_loss.result(),
                            train_mae.result(),
                            test_loss.result(),
                            test_mae.result()))
In [33]:
# train 함수 실행, trian data, val data, epoch 순으로 작성
train(train_ds, test_ds, 100) 
epoch: 0, loss: 0.003817694028839469, mae: 0.03374487906694412, val_loss: 0.008789765648543835, val_mae: 0.04857337474822998
epoch: 10, loss: 0.003762309905141592, mae: 0.03352086618542671, val_loss: 0.008774245157837868, val_mae: 0.048476241528987885
epoch: 20, loss: 0.0037091674748808146, mae: 0.03330302610993385, val_loss: 0.008760820142924786, val_mae: 0.048393793404102325
epoch: 30, loss: 0.0036587812937796116, mae: 0.03309955820441246, val_loss: 0.008747711777687073, val_mae: 0.04831957817077637
epoch: 40, loss: 0.003610484302043915, mae: 0.032900094985961914, val_loss: 0.008736143819987774, val_mae: 0.04825020581483841
epoch: 50, loss: 0.0035639372654259205, mae: 0.03270759433507919, val_loss: 0.008724778890609741, val_mae: 0.04818321764469147
epoch: 60, loss: 0.003520107828080654, mae: 0.03252631053328514, val_loss: 0.008714836090803146, val_mae: 0.048116911202669144
epoch: 70, loss: 0.0034776837565004826, mae: 0.03234928846359253, val_loss: 0.008706072345376015, val_mae: 0.04804803803563118
epoch: 80, loss: 0.0034370385110378265, mae: 0.03217923268675804, val_loss: 0.008697527460753918, val_mae: 0.047985199838876724
epoch: 90, loss: 0.0033981807064265013, mae: 0.0320158526301384, val_loss: 0.008688506670296192, val_mae: 0.04792068526148796
epoch: 99, loss: 0.003364482894539833, mae: 0.031874485313892365, val_loss: 0.008681454695761204, val_mae: 0.0478818379342556

8. 원본이미지와 test data를 복원해서 출력 후 비교

In [34]:
sample_size = 10                        # 보여줄 sample 갯수 설정
prediction = model.predict(X_test)      # test 데이터로 이미지 복원(predict 함수 사용)

# plot 하기
fig, ax = plt.subplots(2, sample_size, figsize=(15, 4))
for i in range(sample_size):
  org_img = X_test[i].reshape(56,56)
  rec_img = prediction[i].reshape(56,56)
  
  ax[0][i].set_axis_off()
  ax[1][i].set_axis_off()
  
  ax[0][i].imshow(org_img, cmap=plt.cm.bone)
  ax[1][i].imshow(rec_img, cmap=plt.cm.bone)

plt.show()