Anomaly Detection by Autoencoder

In [0]:
# 런타임 -> 런타임 유형변경 -> 하드웨어 가속도 TPU변경
%tensorflow_version 2.x
#런타임 -> 런타임 다시시작
TensorFlow 2.x selected.

Data Upload

In [0]:
# Normal data_3000
from google.colab import files 

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))
Upload widget is only available when the cell has been executed in the current browser session. Please rerun this cell to enable.
Saving data_3000.zip to data_3000.zip
User uploaded file "data_3000.zip" with length 3309996 bytes
In [0]:
# Anomaly data_1000
from google.colab import files 

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))
Upload widget is only available when the cell has been executed in the current browser session. Please rerun this cell to enable.
Saving data_1000.zip to data_1000.zip
User uploaded file "data_1000.zip" with length 1196464 bytes

드라이브에 마운트 하기

In [0]:
from google.colab import drive
drive.mount('/content/drive')
Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive

Normal(data_3000) unzip 및 data split

In [0]:
! mkdir data_3000                       # 마운트에 폴더 생성
! unzip data_3000.zip -d ./data_3000    # unzip 
In [0]:
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import os 

# 압축해제된 데이터 경로를 찾아 복사해서 붙여넣어주세요
src = './data_3000/'

# 이미지 읽기
def img_read(src,file):
    img = cv.imread(src+file,cv.COLOR_BGR2GRAY)
    return img

# src 경로에 있는 파일 명을 저장합니다. 
files = os.listdir(src)

X = []
Y = []

# 경로와 파일명을 입력으로 넣어 확인하고 
# 데이터를 255로 나눠서 0~1사이로 정규화 하여 X 리스트에 넣습니다. 
for file in files: 
    X.append(img_read(src,file)/255.)
    Y.append(1) # nomal label : 1

# array로 데이터 변환
X = np.array(X)
Y = np.array(Y)

print('Normal shape:',np.shape(X))
Normal shape: (3000, 56, 56)
In [0]:
import sklearn
from sklearn.model_selection import train_test_split

# Train set, Test set으로 나누기 
X_train, X_test, Y_train, Y_test  = train_test_split(X, Y, test_size=0.2, random_state=1,shuffle=True)

# 형태를 3차원에서 2차원으로 변경, 첫 번째 인덱스 : 이미지 수, 두 번쨰 인덱스 : 2차원 이미지를 1차원으로 변경 후의 길이
X_train = X_train.reshape((len(X_train), np.prod(X_train.shape[1:])))
X_test = X_test.reshape((len(X_test), np.prod(X_test.shape[1:])))

print(np.shape(X_train))
print(np.shape(X_test))
(2400, 3136)
(600, 3136)

Anomaly(data_1000) unzip

In [0]:
! mkdir data_1000                       # 마운트에 폴더 생성
! unzip data_1000.zip -d ./data_1000    # unzip 
In [0]:
# 압축해제된 데이터 경로를 찾아 복사해서 붙여넣어주세요
src = './data_1000/'

# 이미지 읽기
def img_read(src,file):
    img = cv.imread(src+file,cv.COLOR_BGR2GRAY)
    return img

# src 경로에 있는 파일 명을 저장합니다. 
files = os.listdir(src)

ANom = []
ANom_Y = []

# 경로와 파일명을 입력으로 넣어 확인하고 
# 데이터를 255로 나눠서 0~1사이로 정규화 하여 X 리스트에 넣습니다. 
for file in files: 
    ANom.append(img_read(src,file)/255.)
    ANom_Y.append(0) # Anomal label : 0

# array로 데이터 변환
ANom = np.array(ANom)
ANom_Y = np.array(ANom_Y)
print('Anomaly shape:', np.shape(ANom_Y))

# 형태를 3차원에서 2차원으로 변경, 첫 번째 인덱스 : 이미지 수, 두 번쨰 인덱스 : 2차원 이미지를 1차원으로 변경 후의 길이
ANom_images = ANom.reshape((len(ANom), np.prod(ANom.shape[1:])))
print(np.shape(ANom_images))
Anomaly shape: (1000,)
(1000, 3136)

라이브러리 임포트

In [0]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, Activation
from sklearn.model_selection import train_test_split
In [0]:
print(tf.__version__)     # 텐서플로우 버전확인 (colab의 기본버전은 1.15.0) --> 2.0 변경 "%tensorflow_version 2.x"
print(keras.__version__)
2.1.0
2.2.4-tf

Autoencoder model

In [0]:
def AE():
    ## Encoder 구간
    model = tf.keras.Sequential()
    model.add(Dense(256, input_shape=(56*56,)))  # 입력계층
    model.add(Activation('relu'))      

    model.add(Dense(128))                       # 은닉계층
    model.add(Activation('relu'))

    ## Decoder 구간
    model.add(Dense(128))                        # 은닉계층
    model.add(Activation('relu'))

    model.add(Dense(56*56))                # 출력계층
    model.add(Activation('linear'))

    return model
In [0]:
# model 함수 실행
model = AE()
model.summary()
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
dense (Dense)                (None, 256)               803072    
_________________________________________________________________
activation (Activation)      (None, 256)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 128)               32896     
_________________________________________________________________
activation_1 (Activation)    (None, 128)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 128)               16512     
_________________________________________________________________
activation_2 (Activation)    (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 3136)              404544    
_________________________________________________________________
activation_3 (Activation)    (None, 3136)              0         
=================================================================
Total params: 1,257,024
Trainable params: 1,257,024
Non-trainable params: 0
_________________________________________________________________
In [0]:
# 위에서 정한 모델을 그림으로(plot) 보여줌
keras.utils.plot_model(model, to_file='model.png', show_shapes=True, show_layer_names=True) 
Out[0]:

Training

In [0]:
# 모델 구조 확정하고 컴파일 진행
model.compile(loss='MSE',                   # MSE : mean square error
              optimizer='adam',                 
              metrics=[ 'mae' ])            # MAE : mean absolute  error  

# model.fit을 써서 학습. 학습과정을 history에 저장
history = model.fit(X_train, X_train,
                    batch_size=256,
                    epochs=20,
                    validation_data=(X_test, X_test),
                    shuffle=True)
Train on 2400 samples, validate on 600 samples
Epoch 1/20
2400/2400 [==============================] - 1s 571us/sample - loss: 0.2813 - mae: 0.4136 - val_loss: 0.1213 - val_mae: 0.2762
Epoch 2/20
2400/2400 [==============================] - 1s 227us/sample - loss: 0.1106 - mae: 0.2586 - val_loss: 0.1024 - val_mae: 0.2485
Epoch 3/20
2400/2400 [==============================] - 1s 227us/sample - loss: 0.0979 - mae: 0.2421 - val_loss: 0.0932 - val_mae: 0.2361
Epoch 4/20
2400/2400 [==============================] - 1s 234us/sample - loss: 0.0878 - mae: 0.2279 - val_loss: 0.0835 - val_mae: 0.2140
Epoch 5/20
2400/2400 [==============================] - 1s 234us/sample - loss: 0.0770 - mae: 0.2088 - val_loss: 0.0729 - val_mae: 0.2020
Epoch 6/20
2400/2400 [==============================] - 1s 235us/sample - loss: 0.0682 - mae: 0.1931 - val_loss: 0.0669 - val_mae: 0.1854
Epoch 7/20
2400/2400 [==============================] - 1s 232us/sample - loss: 0.0618 - mae: 0.1798 - val_loss: 0.0601 - val_mae: 0.1781
Epoch 8/20
2400/2400 [==============================] - 1s 234us/sample - loss: 0.0568 - mae: 0.1708 - val_loss: 0.0553 - val_mae: 0.1711
Epoch 9/20
2400/2400 [==============================] - 1s 225us/sample - loss: 0.0522 - mae: 0.1635 - val_loss: 0.0516 - val_mae: 0.1611
Epoch 10/20
2400/2400 [==============================] - 1s 226us/sample - loss: 0.0483 - mae: 0.1568 - val_loss: 0.0477 - val_mae: 0.1562
Epoch 11/20
2400/2400 [==============================] - 1s 231us/sample - loss: 0.0444 - mae: 0.1504 - val_loss: 0.0441 - val_mae: 0.1499
Epoch 12/20
2400/2400 [==============================] - 1s 231us/sample - loss: 0.0411 - mae: 0.1441 - val_loss: 0.0415 - val_mae: 0.1446
Epoch 13/20
2400/2400 [==============================] - 1s 226us/sample - loss: 0.0393 - mae: 0.1406 - val_loss: 0.0398 - val_mae: 0.1416
Epoch 14/20
2400/2400 [==============================] - 1s 226us/sample - loss: 0.0375 - mae: 0.1372 - val_loss: 0.0378 - val_mae: 0.1373
Epoch 15/20
2400/2400 [==============================] - 1s 229us/sample - loss: 0.0357 - mae: 0.1335 - val_loss: 0.0365 - val_mae: 0.1348
Epoch 16/20
2400/2400 [==============================] - 1s 233us/sample - loss: 0.0343 - mae: 0.1309 - val_loss: 0.0350 - val_mae: 0.1328
Epoch 17/20
2400/2400 [==============================] - 1s 225us/sample - loss: 0.0330 - mae: 0.1284 - val_loss: 0.0341 - val_mae: 0.1302
Epoch 18/20
2400/2400 [==============================] - 1s 226us/sample - loss: 0.0324 - mae: 0.1276 - val_loss: 0.0332 - val_mae: 0.1298
Epoch 19/20
2400/2400 [==============================] - 1s 231us/sample - loss: 0.0312 - mae: 0.1250 - val_loss: 0.0323 - val_mae: 0.1279
Epoch 20/20
2400/2400 [==============================] - 1s 232us/sample - loss: 0.0302 - mae: 0.1229 - val_loss: 0.0312 - val_mae: 0.1252

Normal data 복원 확인 (data_3000 중 20%의 test data)

In [0]:
test_score = model.evaluate(X_test, X_test, verbose=0) # loss 값 결과 확인
print('Normal Test data cost = ', '{:.4f}'.format(test_score[0]))

sample_size = 10                        # 보여줄 sample 갯수 설정
prediction1 = model.predict(X_test)      # test 데이터로 이미지 복원(predict 함수 사용)

# plot 하기
fig, ax = plt.subplots(2, sample_size, figsize=(15, 4))
for i in range(sample_size):
  org_img1 = X_test[i].reshape(56,56)
  rec_img1 = prediction1[i].reshape(56,56)
  
  ax[0][i].set_axis_off()
  ax[1][i].set_axis_off()
  
  ax[0][i].imshow(org_img1, cmap=plt.cm.bone)
  ax[1][i].imshow(rec_img1, cmap=plt.cm.bone)

plt.show()
Normal Test data cost =  0.0312

Abnormal data 복원 확인 (data_1000)

In [0]:
ANom_score = model.evaluate(ANom_images, ANom_images, verbose=0) # loss 값 결과 확인
print('Abnormal Testing data cost = ', '{:.4f}'.format(ANom_score[0]))

sample_size = 10                        # 보여줄 sample 갯수 설정
prediction2 = model.predict(ANom_images)      # test 데이터로 이미지 복원(predict 함수 사용)

# plot 하기
fig, ax = plt.subplots(2, sample_size, figsize=(15, 4))
for i in range(sample_size):
  org_img2 = ANom_images[i].reshape(56,56)
  rec_img2 = prediction2[i].reshape(56,56)
  
  ax[0][i].set_axis_off()
  ax[1][i].set_axis_off()
  
  ax[0][i].imshow(org_img2, cmap=plt.cm.bone)
  ax[1][i].imshow(rec_img2, cmap=plt.cm.bone)

plt.show()
Abnormal Testing data cost =  0.0713

※ Confusion Matrix(오차행렬)

참이라고 간주되는 관측값과 평가를 위해 추출된 샘플간의 비교를 위한 행렬

참고 : https://en.wikipedia.org/wiki/Confusion_matrix

참고 : https://www.dataschool.io/simple-guide-to-confusion-matrix-terminology/

Test_data & data_1000 예측값(y_pred) 구하기 (AE 통과하기)

  1. data_3000과 data_1000의 이미지 별 loss(MSE) 구하기
  2. 둘을 비교하여 (히스토그램) 임계값 설정
In [0]:
import pandas as pd # pandas 라이브러리

# Normal data (data_3000)
Nom_data = np.concatenate((X_test, X_train), axis = 0)
Nom_pred = model.predict(Nom_data) # AE 이미지 결과값 
Nom_pred_mse = np.mean(np.power(Nom_data - Nom_pred, 2), axis=1) # input - output 이미지간의 MSE 구하기

# 기초통계학 평균, 분산, 최소값, 최대값, 25%, 50%, 75% 분포 구하는 방법
print('Normal Metric')
Nom_pred_mse_df = pd.DataFrame({'Normal_mse':Nom_pred_mse})
print(Nom_pred_mse_df.describe())

print('\n')

# Abnormal data (data_1000)
ANom_pred = model.predict(ANom_images) # AE 이미지 결과값 
ANom_pred_mse = np.mean(np.power(ANom_images - ANom_pred, 2), axis=1) # input - output 이미지간의 MSE 구하기

# 기초통계학 평균, 분산, 최소값, 최대값, 25%, 50%, 75% 분포 구하는 방법
print('Anomaly Metric')
ANom_pred_mse_df = pd.DataFrame({'Anomaly_mse':ANom_pred_mse})
print(ANom_pred_mse_df.describe())

# 히스토그램 그리기
print('\n')
plt.figure()
plt.title('MSE Histogram') # 그림 이름 설정 
plt.hist(Nom_pred_mse, bins=10, facecolor='blue', histtype='step') #히스토그램 함수, bins:데이터를 범주화 하는 기준
plt.hist(ANom_pred_mse, bins=10, facecolor='red', histtype='step') #히스토그램 함수, bins:데이터를 범주화 하는 기준
plt.show()
Normal Metric
        Normal_mse
count  3000.000000
mean      0.030013
std       0.009493
min       0.009573
25%       0.022727
50%       0.029485
75%       0.036068
max       0.066206


Anomaly Metric
       Anomaly_mse
count  1000.000000
mean      0.071266
std       0.017320
min       0.029061
25%       0.059189
50%       0.070016
75%       0.080889
max       0.135744


비교 가능하도록 데이터 설정 (data_3000 중 20%의 x_test + data_1000)

In [0]:
test_data = np.concatenate((X_test, ANom_images), axis = 0)
y_true = np.concatenate((Y_test, ANom_Y), axis = 0)

print(np.shape(test_data))
print(np.shape(y_true))
(1600, 3136)
(1600,)

임계값 설정

각 이미지별 Loss가 임계점보다 높으면 positive(이상), 낮으면 negative(정상)으로 구분

In [0]:
# 각 이미지별 loss를 구하여 설정한 임계값으로 조건을 제시한다.
test_data_pred = model.predict(test_data)
all_cost = np.mean(np.power(test_data - test_data_pred, 2), axis=1)

threshold = 0.045  # 설정한 임계값 (선택 가능)

y_pred = [] #y_pred list 초기화
for i in range(len(all_cost)):
    if all_cost[i] > threshold :
        y_pred.insert(i, 0) #임계점보다 높으면 positive(이상)
    else:
        y_pred.insert(i, 1) #임계점보다 낮으면 negative(정상)
        
print(y_pred[0:100]) #프린트로 값을 확인할 수 있다 (확인용으로 100개만).
print(np.shape(y_pred))
[1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
(1600,)

Confusion_matrix 계산하기

In [0]:
from sklearn.metrics import confusion_matrix #confusion_matrix 라이브러리 임포트

con_mat = confusion_matrix(y_true, y_pred) #confusion_matrix 함수 실행

# Print the confusion matrix as text.
TP = con_mat[0][0] #둘다 Anomal, 0이며 예측값이 positive(이상값)이다.
FN = con_mat[0][1] #실제값과 예측값이 달랐으며, 예측값이 negative(정상값)이다.
FP = con_mat[1][0] #실제값과 예측값이 달랐으며, 예측값이 postive(이상값)이다.
TN = con_mat[1][1] #둘다 nomal, 1이며 예측값이 negative(정상값)이다.
print('\n',
      'TP :', TP, '\n', 
      'FN :', FN, '\n', 
      'FP :', FP, '\n', 
      'TN :', TN, '\n')

Accuracy = (TP+TN)/(TP+FN+FP+TN) #정분류율 : 전체관측치 중 실제값과 예측치가 일치하는 정도
print('Accuracy(정분류율=(TP+TN)/(TP+FN+FP+TN)):', '{:.2%}'.format(Accuracy))

Error_rate = (FP+FN)/(TP+FN+FP+TN) #오분류율 : 전체 관측치 중 실제값과 예측치가 다른 정보 (1-accuracy)
print('Error_rate(오분류율=(FP+FN)/(TP+FN+FP+TN):', '{:.2%}'.format(Error_rate))

Precirion = TP/(TP+FP) #정확도 : TRUE로 예측한 관측치 중 실제값이 TRUE인 정도
print('Precirion(정확도=TP/(TP+FP)):', '{:.2%}'.format(Precirion))

Recall = TP/(TP+FN) #재현율 : 실제값이 TRUE인 관측치 중 예측치가 적중한 정도, 모형의 완정성을 평가함, 민감도
print('Recall(재현율,민감도=TP/(TP+FN)):', '{:.2%}'.format(Recall))

print('\n')
# Confusion Matrix 표 그리기
plt.imshow(con_mat, interpolation='nearest', cmap=plt.cm.Blues)

# Plot 구성하기
plt.title('Confusion Matrix', fontsize=20) # Plot 이름
plt.tight_layout()
plt.colorbar()
label=["Anomal(0)", "Nomal(1)"] # 라벨값
tick_marks = np.arange(len(label)) 
plt.xticks(tick_marks, label)
plt.yticks(tick_marks, label)
plt.xlabel('Predicted', fontsize=15)
plt.ylabel('True', fontsize=15)

# 표 안에 숫자 기입하는 방법
name = [['TP','FN'], ['FP', 'TN']]
thresh = con_mat.max() / 2.
for i in range(2):
     for j in range(2):
        plt.text(j, i, str(name[i][j])+" = "+str(con_mat[i, j]),
                 horizontalalignment="center",
                 color="white" if con_mat[i, j] > thresh else "black",
                 fontsize=16)
    
plt.show()
 TP : 952 
 FN : 48 
 FP : 56 
 TN : 544 

Accuracy(정분류율=(TP+TN)/(TP+FN+FP+TN)): 93.50%
Error_rate(오분류율=(FP+FN)/(TP+FN+FP+TN): 6.50%
Precirion(정확도=TP/(TP+FP)): 94.44%
Recall(재현율,민감도=TP/(TP+FN)): 95.20%