본문 바로가기

딥러닝/딥러닝을 이용한 자연어처리 입문

[딥러닝 NLP] 16. Transformer

# 16-01 Transformer 란?

트랜스포머: 기존 seq2seq 모델의 한계를 보정하기 위해 Attention을 사용하는 것이 아니라, 아예 Attention만으로 인코더-디코더 구조를 설계한 것으로, RNN을 사용하지 않았음에도 RNN보다 우수한 성능을 보임

 

1. 트랜스포머의 주요 하이퍼파라미터

d_model: 인코더와 디코더의 입출력 크기, 임베딩벡터의 차원

num_layers: 인코더와 디코더로 이루어진 층의 개수

num_heads: 어텐션을 분할 수행할 병렬의 개수

d_ff: 피드포워드 신경망의 은닉층 크기

 

2. 트랜스포머 기본구조

기존 seq2seq는 인코더와 디코더가 각각 t개의 시점을 가졌다면, 트랜스포머는 인코더와 디코더 자체가 N개식 쌓여있는 구조이다.

트랜스포머의 기본 구조

 

3. 트랜스포머의 입력: 포지셔널 인코딩

포지셔널 인코딩: 트랜스포머는 단어 입력을 순차적으로 받지 않으므로 단어의 위치 정보를 다른 방식으로 알려주어야 하는데, 각 단어의 임베딩벡터에 위치정보를 더하는 방식을 사용한다. 이를 포지셔널 인코딩이라고 한다.

포지셔널 인코딩: 단어별 임베딩벡터+위치정보

 

실제 덧셈은 "임베딩벡터가 모여 만들어진 문장행렬"과 "포지셔널 인코딩 행렬"의 덧셈으로 수행된다.

d_model은 모든 층의 출력차원을 의미하는 하이퍼파라미터이다.

pos는 문장 내에서 임베딩벡터의 위치를, i는 임베딩벡터 내에서 차원의 인덱스를 의미한다.

i가 짝수인 경우 사인함수 값을, 홀수인 경우 코사인함수 값을 사용해 포지셔널 인코딩 값을 구한다.

예를 들어, 입력 문장의 단어가 50개이면서 각 단어의 임베딩벡터가 128차원일 때 다음과 같은 행렬을 가진다.

 

4. 트랜스포머 구조 뜯어보기: 세가지 어텐션

트랜스포머에는 세 가지 어텐션이 사용된다.

-Encoder Self-Attention: 인코더에서 이루어지며, Q, K, V 모두 인코더벡터

-Decoder Self-Attention: 디코더에서 이루어지며, Q, K, V 모두 디코더벡터

-Endoder-Decoder Attention: 디코더에서 이루어지며, Query는 디코더벡터인 반면 Key와 Value는 인코더벡터

-Multi-head: 병렬 수행을 의미

 

4.1. 인코더 구조

하나의 인코더 구조

-트랜스포머는 num_layers 개수의 인코더 층을 쌓는다.

-하나의 인코더 층은 2개의 sublayer로 구성됨: Multi-head Self-Attention, Position-wise FFNN

 

(1) Encoder Sublayer 1: Self-Attention

-Self-Attention: 어텐션을 자기 자신에게 수행한다는 의미. Q, K, V가 모두 '입력문장의 모든 단어벡터들'로 동일

-장점: 입력문장 내의 단어끼리의 유사도를 구함으로써 대명사(ex. it)가 가리키는 것이 문장내의 어떤 단어인지 알 수 있음

 

1) Q, K, V 행렬 구하기

-문장 행렬에 3개의 가중치 행렬을 곱해 각각 Q, K, V 행렬을 얻음

-문장행렬의 크기는 (seq_len, d_model)

-Q, K 행렬의 크기는 (seq_len, d_k). 이 때 d_k = d_model / num_heads (ex. d_model=512, num_heads=8 일 때, d_k=64)

-V 행렬의 크기는 (seq_len, d_v)

-따라서 W^Q 행렬과 W^K 행렬의 크기는 (d_model, d_k), W^K 행렬의 크기는 (d_model, d_v)

 

2) 어텐션 스코어 행렬 구하기(ft. 패딩 마스크)

-Q행렬과 K전치행렬을 곱하고 스케일링값(=루트d_k)으로 나누어 어텐션 스코어 행렬을 구함

ex) 어텐션 스코어 행렬에서 I행 student열의 값은 I의 Q벡터와 student의 K벡터의 어텐션 스코어(=두 단어의 연관성)

검정색 칸에 매우 작은 음수값을 넣는다.

-패딩 마스크: 실질적 의미를 갖지 않는 <PAD>에는 유사도를 구하지 않도록 마스킹해주는 작업

-마스킹 방법: Key 열에 <PAD> 토큰이 존재하면 해당 열 전체에 매우 작은 음수값(=mask*-1e9)을 넣어줌으로써 소프트맥스 함수를 거치면 0이 되도록 함.

-구현 방법은 아래와 같다.

# 정수시퀀스가 0인 단어(=패딩할 토큰)은 1로, 나머지는 다 0으로 변환하는 함수를 만든 후
def create_padding_mask(x):
  mask = tf.cast(tf.math.equal(x, 0), tf.float32)
  # (batch_size, 1, 1, key의 문장 길이)
  return mask[:, tf.newaxis, tf.newaxis, :]
  
# 위 함수를 통과한 벡터(=유의미한 0과 무의미한 1로만 이루어짐) mask를 어텐션 스코어행렬 구하는 함수의 인자로 전달
# 이 함수에서는 mask 벡터에 -1e9를 곱하고 어텐션 스코어 행렬 logits 옆에 붙임
def scaled_dot_product_attention(query, key, value, mask):
... 중략 ...
    logits += (mask * -1e9) # 어텐션 스코어 행렬인 logits에 mask*-1e9 값을 더해주고 있다.
... 중략 ...

 

3) 어텐션 값 행렬 구하기

-어텐션 스코어 행렬에 소프트맥스 함수를 곱해 어텐션 분포를 구하고, V 행렬을 곱해 어텐션 값 행렬 a를 구함

-어텐션 값 행렬 a의 크기는 (seq_len, d_v)

 

4) Multi-head Attention Matrix 구하기

Q. 왜 굳이 d_model 차원의 벡터를 num_heads로 나누어서 d_k, d_v 차원의 벡터로 축소시키는가?

A. 여러번의 어텐션을 병렬로 사용하는 것이 더 효과적이기 때문!

ex) 첫번째 어텐션 헤드는 it과 animal이, 두번째 어텐션 헤드는 it과 tired가 높은 연관성을 갖는다고 볼 수 있음

-즉, d_k, d_v 차원에 대해 num_heads 개의 병렬 어텐션을 수행하게 되는 것

-Attention Head: 각각의 병렬 어텐션 수행의 결과로 나온 어텐션 값 행렬들

 

-이제 모든 어텐션 헤드를 연결(concatenate)한다. 연결된 행렬의 크기는 최종적으로 (seq_len, d_model)이 된다.

 

-연결된 행렬에 또 한 번 가중치 행렬 W^o를 곱해 Multi-head attention matrix를 구한다.

-이제까지 한 인코더 내의 첫번째 서브층의 결과물로 Multi-head attention matrix을 얻었다. 이제 이것을 두번째 서브층 FFNN으로 넘겨준다.

 

(2) Encoder Sublayer 2: Position-wise FFNN

첫번째 서브층의 결과인 Multi-head attention matrix를 x라고 하자. x의 크기는 (seq_len, d_model)이다.

x는 FFNN에서 다음 과정을 거친다.

FFNN 식

1) W1을 곱하고 b1을 더해줌. W1의 크기는 (d_model, d_ff)

2) RELU (0과 입력값 중 max 선택)

3) W2를 곱하고 b2를 더해줌. W2의 크기는 (d_ff, d_model)

* d_ff는 은닉층의 크기를 의미하는 하이퍼파라미터

 

-x의 입력크기는 FFNN을 지난 후에도 크기가 똑같이 유지되고, 다음 인코더의 입력으로 사용하게 된다.

 

(3) Residual connection & Layer Normalization

인코더는 크게 Self-Attention과 FFNN 층으로 이루어지고, 이를 보조하는 역할로 잔차연결과 층 정규화라는 것이 또 있다.

인코더의 전체 구조
잔차연결: 서브층의 입력과 서브층의 출력을 더해주는 것
층 정규화: 텐서의 마지막 차원(d_model)에 대한 평균과 분산을 구한 후 이것들을 이용해 값을 정규화하는 것

 

한 개의 인코더에서 서브층1, 서브층2, 잔차연결, 층 정규화까지 모두 마쳤다. 

한 인코더의 출력은 한 디코더에게 전달되며, 인코더와 디코더의 연산은 각각 num_layers 만큼 반복된다.

 

4.2. 디코더 구조

 

(1) Decoder Sublayer 1: Self-Attention

-디코더의 첫번째 층은 인코더의 첫번째 층과 완전히 동일한 과정을 거친다. 오직 다른 점은 어텐션 스코어 행렬을 구한 후 look-ahead mask를 적용한다는 것.

-look-ahead mask: 트랜스포머는 한번에 전체 문장을 입력받으므로, 현재 시점보다 미래에 있는 단어들을 참고하지 못하도록 하는 마스크

미래 시점의 단어가 가려졌다.

 

(2) Decoder Sublayer 2: Encoder-Decoder Attention

-인코더의 출력이 여기서 입력된다!!

-인코더의 출력으로부터 Key와 Value를 얻고, 디코더의 첫번째 서브층 출력으로부터 Query를 얻는다.

-행렬의 출처만 다르고 나머지 연산은 다른 어텐션들과 같다.

 

5. 트랜스포머 구현

트랜스포머 깃허브 링크: https://github.com/ukairia777/tensorflow-transformer

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
# 포지셔널 인코딩
class PositionalEncoding(tf.keras.layers.Layer):
  def __init__(self, position, d_model):
    super(PositionalEncoding, self).__init__()
    self.pos_encoding = self.positional_encoding(position, d_model)

  def get_angles(self, position, i, d_model):
    angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
    return position * angles

  def positional_encoding(self, position, d_model):
    angle_rads = self.get_angles(
        position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
        i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
        d_model=d_model)

    # 배열의 짝수 인덱스(2i)에는 사인 함수 적용
    sines = tf.math.sin(angle_rads[:, 0::2])

    # 배열의 홀수 인덱스(2i+1)에는 코사인 함수 적용
    cosines = tf.math.cos(angle_rads[:, 1::2])

    angle_rads = np.zeros(angle_rads.shape)
    angle_rads[:, 0::2] = sines
    angle_rads[:, 1::2] = cosines
    pos_encoding = tf.constant(angle_rads)
    pos_encoding = pos_encoding[tf.newaxis, ...]

    print(pos_encoding.shape)
    return tf.cast(pos_encoding, tf.float32)

  def call(self, inputs):
    return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
# Scaled Dot-production Attention
def scaled_dot_product_attention(query, key, value, mask):
  # query 크기 : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
  # key 크기 : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
  # value 크기 : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
  # padding_mask : (batch_size, 1, 1, key의 문장 길이)

  # 어텐션 스코어 행렬: Q행렬과 K전치행렬의 곱
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  # 스케일링: dk의 루트값으로 나눠줌
  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # 마스킹
  if mask is not None:
    logits += (mask * -1e9)

  # 소프트맥스 함수는 마지막 차원인 key의 문장 길이 방향으로 수행된다.
  # attention weight : (batch_size, num_heads, query의 문장 길이, key의 문장 길이)
  attention_weights = tf.nn.softmax(logits, axis=-1)

  # output : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
  output = tf.matmul(attention_weights, value)

  return output, attention_weights
# Multi-head Attention
class MultiHeadAttention(tf.keras.layers.Layer):

  def __init__(self, d_model, num_heads, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = num_heads
    self.d_model = d_model

    assert d_model % self.num_heads == 0

    # d_model을 num_heads로 나눈 값.
    # 논문 기준 : 64
    self.depth = d_model // self.num_heads

    # WQ, WK, WV에 해당하는 밀집층 정의
    self.query_dense = tf.keras.layers.Dense(units=d_model)
    self.key_dense = tf.keras.layers.Dense(units=d_model)
    self.value_dense = tf.keras.layers.Dense(units=d_model)

    # WO에 해당하는 밀집층 정의
    self.dense = tf.keras.layers.Dense(units=d_model)

  # num_heads 개수만큼 q, k, v를 split하는 함수
  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs[
        'value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # 1. WQ, WK, WV에 해당하는 밀집층 지나기
    # q : (batch_size, query의 문장 길이, d_model)
    # k : (batch_size, key의 문장 길이, d_model)
    # v : (batch_size, value의 문장 길이, d_model)
    # 참고) 인코더(k, v)-디코더(q) 어텐션에서는 query 길이와 key, value의 길이는 다를 수 있다.
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # 2. 헤드 나누기
    # q : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
    # k : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
    # v : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    # 3. 스케일드 닷 프로덕트 어텐션. 앞서 구현한 함수 사용.
    # (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
    scaled_attention, _ = scaled_dot_product_attention(query, key, value, mask)
    # (batch_size, query의 문장 길이, num_heads, d_model/num_heads)
    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # 4. 헤드 연결(concatenate)하기
    # (batch_size, query의 문장 길이, d_model)
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # 5. WO에 해당하는 밀집층 지나기
    # (batch_size, query의 문장 길이, d_model)
    outputs = self.dense(concat_attention)

    return outputs
# FFNN
  outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)
# Encoder
def encoder_layer(dff, d_model, num_heads, dropout, name="encoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")

  # 인코더는 패딩 마스크 사용
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  # 멀티-헤드 어텐션 (첫번째 서브층 / 셀프 어텐션)
  attention = MultiHeadAttention(
      d_model, num_heads, name="attention")({
          'query': inputs, 'key': inputs, 'value': inputs, # Q = K = V
          'mask': padding_mask # 패딩 마스크 사용
      })

  # 드롭아웃 + 잔차 연결과 층 정규화
  attention = tf.keras.layers.Dropout(rate=dropout)(attention)
  attention = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(inputs + attention)

  # 포지션 와이즈 피드 포워드 신경망 (두번째 서브층)
  outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)

  # 드롭아웃 + 잔차 연결과 층 정규화
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention + outputs)

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)
# 인코더 쌓기
def encoder(vocab_size, num_layers, dff,
            d_model, num_heads, dropout,
            name="encoder"):
  inputs = tf.keras.Input(shape=(None,), name="inputs")

  # 인코더는 패딩 마스크 사용
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  # 포지셔널 인코딩 + 드롭아웃
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  # 인코더를 num_layers개 쌓기
  for i in range(num_layers):
    outputs = encoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
        dropout=dropout, name="encoder_layer_{}".format(i),
    )([outputs, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)
# look-ahead mask
def create_look_ahead_mask(x):
  seq_len = tf.shape(x)[1]
  look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
  padding_mask = create_padding_mask(x) # 패딩 마스크도 포함
  return tf.maximum(look_ahead_mask, padding_mask)
# Decoder
def decoder_layer(dff, d_model, num_heads, dropout, name="decoder_layer"):
  inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
  enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")

  # 룩어헤드 마스크(첫번째 서브층)
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name="look_ahead_mask")

  # 패딩 마스크(두번째 서브층)
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  # 멀티-헤드 어텐션 (첫번째 서브층 / 마스크드 셀프 어텐션)
  attention1 = MultiHeadAttention(
      d_model, num_heads, name="attention_1")(inputs={
          'query': inputs, 'key': inputs, 'value': inputs, # Q = K = V
          'mask': look_ahead_mask # 룩어헤드 마스크
      })

  # 잔차 연결과 층 정규화
  attention1 = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention1 + inputs)

  # 멀티-헤드 어텐션 (두번째 서브층 / 디코더-인코더 어텐션)
  attention2 = MultiHeadAttention(
      d_model, num_heads, name="attention_2")(inputs={
          'query': attention1, 'key': enc_outputs, 'value': enc_outputs, # Q != K = V
          'mask': padding_mask # 패딩 마스크
      })

  # 드롭아웃 + 잔차 연결과 층 정규화
  attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
  attention2 = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(attention2 + attention1)

  # 포지션 와이즈 피드 포워드 신경망 (세번째 서브층)
  outputs = tf.keras.layers.Dense(units=dff, activation='relu')(attention2)
  outputs = tf.keras.layers.Dense(units=d_model)(outputs)

  # 드롭아웃 + 잔차 연결과 층 정규화
  outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
  outputs = tf.keras.layers.LayerNormalization(
      epsilon=1e-6)(outputs + attention2)

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)
# 디코더 쌓기
def decoder(vocab_size, num_layers, dff,
            d_model, num_heads, dropout,
            name='decoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')

  # 디코더는 룩어헤드 마스크(첫번째 서브층)와 패딩 마스크(두번째 서브층) 둘 다 사용.
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name='look_ahead_mask')
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  # 포지셔널 인코딩 + 드롭아웃
  embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
  embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
  outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)

  # 디코더를 num_layers개 쌓기
  for i in range(num_layers):
    outputs = decoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
        dropout=dropout, name='decoder_layer_{}'.format(i),
    )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)
# 트랜스포머 조립: 지금까지의 코드들을 합친다.
def transformer(vocab_size, num_layers, dff,
                d_model, num_heads, dropout,
                name="transformer"):

  # 인코더의 입력
  inputs = tf.keras.Input(shape=(None,), name="inputs")

  # 디코더의 입력
  dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

  # 인코더의 패딩 마스크
  enc_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='enc_padding_mask')(inputs)

  # 디코더의 룩어헤드 마스크(첫번째 서브층)
  look_ahead_mask = tf.keras.layers.Lambda(
      create_look_ahead_mask, output_shape=(1, None, None),
      name='look_ahead_mask')(dec_inputs)

  # 디코더의 패딩 마스크(두번째 서브층)
  dec_padding_mask = tf.keras.layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='dec_padding_mask')(inputs)

  # 인코더의 출력은 enc_outputs. 디코더로 전달된다.
  enc_outputs = encoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff,
      d_model=d_model, num_heads=num_heads, dropout=dropout,
  )(inputs=[inputs, enc_padding_mask]) # 인코더의 입력은 입력 문장과 패딩 마스크

  # 디코더의 출력은 dec_outputs. 출력층으로 전달된다.
  dec_outputs = decoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff,
      d_model=d_model, num_heads=num_heads, dropout=dropout,
  )(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])

  # 다음 단어 예측을 위한 출력층
  outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)

  return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)

 

이제 구축한 트랜스포머를 사용해 모델을 만들어보자.

# 하이퍼파라미터 정의
small_transformer = transformer(
    vocab_size = 9000,
    num_layers = 4,
    dff = 512,
    d_model = 128,
    num_heads = 4,
    dropout = 0.3,
    name="small_transformer")

tf.keras.utils.plot_model(
    small_transformer, to_file='small_transformer.png', show_shapes=True)
# 손실함수 정의
def loss_function(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))

  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')(y_true, y_pred)

  mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
  loss = tf.multiply(loss, mask)

  return tf.reduce_mean(loss)
# 학습률 정의
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()
    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)
    self.warmup_steps = warmup_steps

  def __call__(self, step):
    step = tf.cast(step, tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

 

여기서 구현한 트랜스포머 모델을 이용해 다음 실습에서 한국어 챗봇을 만든다.

 

# 16-02 트랜스포머로 한국어 챗봇

1. 데이터 로드 및 전처리

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import urllib.request
import time
import tensorflow_datasets as tfds
import tensorflow as tf

urllib.request.urlretrieve("https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv", filename="ChatBotData.csv")
train_data = pd.read_csv('ChatBotData.csv')
len(train_data) # 개수확인: 11823
train_data.isnull().sum() # 결측값 확인: 없음

# 구두점 제거
questions = []
for sentence in train_data['Q']:
    # 구두점에 대해서 띄어쓰기
    # ex) 12시 땡! -> 12시 땡 !
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = sentence.strip()
    questions.append(sentence)
    
answers = []
for sentence in train_data['A']:
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = sentence.strip()
    answers.append(sentence)

 

2. 단어집합 생성

# 서브워드텍스트인코더를 사용하여 질문, 답변 데이터로부터 단어 집합(Vocabulary) 생성
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    questions + answers, target_vocab_size=2**13)

# 시작 토큰과 종료 토큰에 대한 정수 부여.
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]

# 시작 토큰과 종료 토큰을 고려하여 단어 집합의 크기를 + 2
VOCAB_SIZE = tokenizer.vocab_size + 2

 

3. 정수 인코딩과 패딩

# 함수 정의
MAX_LENGTH = 40 # 최대길이 정의
def tokenize_and_filter(inputs, outputs):
  tokenized_inputs, tokenized_outputs = [], []

  for (sentence1, sentence2) in zip(inputs, outputs):
    # encode(토큰화 + 정수 인코딩), 시작 토큰과 종료 토큰 추가
    sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
    sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN

    tokenized_inputs.append(sentence1)
    tokenized_outputs.append(sentence2)

  # 패딩
  tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
  tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_outputs, maxlen=MAX_LENGTH, padding='post')

  return tokenized_inputs, tokenized_outputs

# 실행
questions, answers = tokenize_and_filter(questions, answers)

 

4. 인코더와 디코더의 입력, 레이블 만들기

# 텐서플로우 dataset을 이용하여 셔플을 수행하되, 배치 크기로 데이터를 묶는다.
# 이 과정에서 교사 강요를 사용하기 위해 디코더의 입력과 실제값 시퀀스를 구성한다.
BATCH_SIZE = 64
BUFFER_SIZE = 20000

# 디코더의 실제값 시퀀스에서는 시작 토큰을 제거
dataset = tf.data.Dataset.from_tensor_slices((
    {
        'inputs': questions,
        'dec_inputs': answers[:, :-1] # 디코더의 입력. 마지막 패딩 토큰 제거
    },
    {
        'outputs': answers[:, 1:]  # 맨 처음 토큰(=시작 토큰) 제거
    },
))

dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

 

5. 트랜스포머 만들기

tf.keras.backend.clear_session()

# 하이퍼파라미터
D_MODEL = 256
NUM_LAYERS = 2
NUM_HEADS = 8
DFF = 512
DROPOUT = 0.1

model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    dff=DFF,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)

# 학습률, 옵티마이저
learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
  # 레이블의 크기는 (batch_size, MAX_LENGTH - 1)
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

# 모델 컴파일
model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])
# 모델 학습
EPOCHS = 50
model.fit(dataset, epochs=EPOCHS)

 

6. 챗봇 평가

# 전처리 함수 정의
def preprocess_sentence(sentence):
  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
  sentence = sentence.strip()
  return sentence
# 트랜스포머 수행하는 함수 정의
def evaluate(sentence):
  # preprocess_sentence 함수 호출
  sentence = preprocess_sentence(sentence)

  # 입력 문장에 시작 토큰과 종료 토큰 추가
  sentence = tf.expand_dims(
      START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
  output = tf.expand_dims(START_TOKEN, 0)

  # 디코더의 예측 시작
  for i in range(MAX_LENGTH):
    predictions = model(inputs=[sentence, output], training=False)

    # 현재 시점의 예측 단어 받아옴
    predictions = predictions[:, -1:, :]
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    # 만약 현재 시점의 예측 단어가 종료 토큰이라면 예측을 중단
    if tf.equal(predicted_id, END_TOKEN[0]):
      break

    # 현재 시점의 예측 단어를 output(출력)에 연결
    # output은 for문의 다음 루프에서 디코더의 입력이 됨
    output = tf.concat([output, predicted_id], axis=-1)

  # 단어 예측이 모두 끝났다면 output을 리턴
  return tf.squeeze(output, axis=0)
# 정수시퀀스를 문자열로 변환해 출력해주는 함수 정의
def predict(sentence):
  # evaluate 함수 호출
  prediction = evaluate(sentence)
  predicted_sentence = tokenizer.decode(
      [i for i in prediction if i < tokenizer.vocab_size])

  print('Input: {}'.format(sentence))
  print('Output: {}'.format(predicted_sentence))

  return predicted_sentence

 

대화해보자.

predict("영화 볼래?")

# 결과
Input: 영화 볼래?
Output: 최신 영화가 좋을 것 같아요 .

 

 

# 16-03 셀프어텐션으로 텍스트분류

트랜스포머의 인코더를 사용해 텍스트분류를 수행해본다.

1. Multi-head Attention

import tensorflow as tf
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads=8):
        super(MultiHeadAttention, self).__init__()
        self.embedding_dim = embedding_dim # d_model
        self.num_heads = num_heads

        assert embedding_dim % self.num_heads == 0

        self.projection_dim = embedding_dim // num_heads
        self.query_dense = tf.keras.layers.Dense(embedding_dim)
        self.key_dense = tf.keras.layers.Dense(embedding_dim)
        self.value_dense = tf.keras.layers.Dense(embedding_dim)
        self.dense = tf.keras.layers.Dense(embedding_dim)

    def scaled_dot_product_attention(self, query, key, value):
        matmul_qk = tf.matmul(query, key, transpose_b=True)
        depth = tf.cast(tf.shape(key)[-1], tf.float32)
        logits = matmul_qk / tf.math.sqrt(depth)
        attention_weights = tf.nn.softmax(logits, axis=-1)
        output = tf.matmul(attention_weights, value)
        return output, attention_weights

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]

        # (batch_size, seq_len, embedding_dim)
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)

        # (batch_size, num_heads, seq_len, projection_dim)
        query = self.split_heads(query, batch_size)  
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        scaled_attention, _ = self.scaled_dot_product_attention(query, key, value)
        # (batch_size, seq_len, num_heads, projection_dim)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  

        # (batch_size, seq_len, embedding_dim)
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.embedding_dim))
        outputs = self.dense(concat_attention)
        return outputs

 

2. Encoder

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embedding_dim, num_heads, dff, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadAttention(embedding_dim, num_heads)
        self.ffn = tf.keras.Sequential(
            [tf.keras.layers.Dense(dff, activation="relu"),
             tf.keras.layers.Dense(embedding_dim),]
        )
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs) # 첫번째 서브층 : 멀티 헤드 어텐션
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output) # Add & Norm
        ffn_output = self.ffn(out1) # 두번째 서브층 : 포지션 와이즈 피드 포워드 신경망
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output) # Add & Norm

 

3. Position Embedding

"포지션 임베딩"은 "포지셔널 임베딩"과 다름!

포지션 임베딩: 위치 벡터를 학습할 수 있도록, 임베딩 층의 첫번째 인자로 단어집합의 크기가 아닌 문장의 최대길이를 넣음

class TokenAndPositionEmbedding(tf.keras.layers.Layer):
    def __init__(self, max_len, vocab_size, embedding_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.pos_emb = tf.keras.layers.Embedding(max_len, embedding_dim)

    def call(self, x):
        max_len = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=max_len, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

 

4. 데이터 로드 및 전처리

vocab_size = 20000  # 빈도수 상위 20000개의 단어만 사용
max_len = 200  # 문장의 최대 길이

(X_train, y_train), (X_test, y_test) = tf.keras.datasets.imdb.load_data(num_words=vocab_size)

X_train = tf.keras.preprocessing.sequence.pad_sequences(X_train, maxlen=max_len)
X_test = tf.keras.preprocessing.sequence.pad_sequences(X_test, maxlen=max_len)

 

5. 텍스트분류

embedding_dim = 32  # 각 단어의 임베딩 벡터의 차원
num_heads = 2  # Attention-head 개수(=병렬 개수)
dff = 32  # FFNN의 은닉층 크기

inputs = tf.keras.layers.Input(shape=(max_len,))
embedding_layer = TokenAndPositionEmbedding(max_len, vocab_size, embedding_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embedding_dim, num_heads, dff)
x = transformer_block(x)
x = tf.keras.layers.GlobalAveragePooling1D()(x)
x = tf.keras.layers.Dropout(0.1)(x)
x = tf.keras.layers.Dense(20, activation="relu")(x)
x = tf.keras.layers.Dropout(0.1)(x)
outputs = tf.keras.layers.Dense(2, activation="softmax")(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])
history = model.fit(X_train, y_train, batch_size=32, epochs=2, validation_data=(X_test, y_test))

print("테스트 정확도: %.4f" % (model.evaluate(X_test, y_test)[1])) # 0.8695