encoder-decoder 구조: 하나의 RNN을 인코더, 또 다른 하나의 RNN을 디코더라는 모듈로 명명하고 두 RNN을 연결한 구조로, 주로 입출력의 길이가 다를 경우 사용한다. (ex.번역기, 텍스트 요약)
# 14-01 시퀀스-투-시퀀스(seq2seq, Sequence-to-Sequence)
seq2seq: 입력된 시퀀스로와는 다른 도메인의 시퀀스를 출력하는 데 사용 (ex. 챗봇, 기계번역, 내용 요약, Speech to Text)
1. seq2seq의 구조
-위의 그림에서는 이해를 위해 바닐라 RNN으로 표현했으나 실제로는 LSTM 혹은 GRU를 사용
-훈련과정에서는 교사 강요(teacher forcing) 사용
(1) 인코더
-입력문장의 모든 단어들을 임베딩벡터로 변환
-현재 시점을 t라고 할 때, t-1의 은닉상태와 t의 입력벡터를 입력으로 받아 t의 은닉상태를 만들어 다음 시점으로 보냄
-마지막 시점의 은닉상태, 즉 모든 단어 정보를 압축한 하나의 context vector 를 디코더의 첫번째 은닉상태로 넘겨줌
(2) 디코더
-인코더로부터 받은 context vector 와 첫번째 시점의 입력값인 <sos>의 벡터를 사용해 다음에 올 단어벡터 예측
-각 시점의 출력벡터(출력시퀀스)는 소프트맥스 함수를 거쳐 단어별 확률벡터로 반환되고, 그 중 가장 높은 확률 단어 선택
2. seq2seq 으로 문자레벨 번역기 실습
병렬 코퍼스: 두 개 이상의 언어가 병렬적으로 구성된 코퍼스프랑스-영어 병렬 코퍼스 파일 다운로드 : http://www.manythings.org/anki 에서 fra-eng.zip 파일
(1) 병렬 코퍼스 데이터 전처리
src(source) = 입력 문장
tar(target) = 출력 문장
# 패키지 임포트
import os
import shutil
import zipfile
import pandas as pd
import tensorflow as tf
import urllib3
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
import requests
# 데이터 다운로드
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def download_zip(url, output_path):
response = requests.get(url, headers=headers, stream=True)
if response.status_code == 200:
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"ZIP file downloaded to {output_path}")
else:
print(f"Failed to download. HTTP Response Code: {response.status_code}")
url = "http://www.manythings.org/anki/fra-eng.zip"
output_path = "fra-eng.zip"
download_zip(url, output_path)
path = os.getcwd()
zipfilename = os.path.join(path, output_path)
with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
zip_ref.extractall(path)
# 전체 샘플 개수 출력
lines = pd.read_csv('fra.txt', names=['src', 'tar', 'lic'], sep='\t')
del lines['lic']
print('전체 샘플의 개수 :',len(lines)) # 191954
# 일부 샘플만 선택
lines = lines.loc[:, 'src':'tar']
lines = lines[0:60000] # 6만개만 저장
lines.sample(10) # 랜덤으로 10개 선택
# 시작 심볼 <sos> 대신 \t, 종료 심볼 <eos> 대신 \n 추가
lines.tar = lines.tar.apply(lambda x : '\t '+ x + ' \n')
# 문자 집합 구축
src_vocab = set()
for line in lines.src: # 1줄씩 읽음
for char in line: # 1개의 문자씩 읽음
src_vocab.add(char)
tar_vocab = set()
for line in lines.tar: # 1줄씩 읽음
for char in line: # 1개의 문자씩 읽음
tar_vocab.add(char)
# 문자 집합 크기 확인
src_vocab_size = len(src_vocab)+1
tar_vocab_size = len(tar_vocab)+1
print('source 문장의 char 집합 :',src_vocab_size) # 79
print('target 문장의 char 집합 :',tar_vocab_size) # 105
# 에러 방지 위해 정렬 후 인덱스 사용해 출력해보기(문자 단위로 잘 끊어졌는지 확인용)
src_vocab = sorted(list(src_vocab))
tar_vocab = sorted(list(tar_vocab))
print(src_vocab[45:75])
print(tar_vocab[45:75])
# 각 문자에 인덱스 부여
src_to_index = dict([(word, i+1) for i, word in enumerate(src_vocab)])
tar_to_index = dict([(word, i+1) for i, word in enumerate(tar_vocab)])
# 훈련 데이터에 정수 인코딩 수행
# 인코더의 입력 영어 데이터
encoder_input = []
for line in lines.src:
encoded_line = []
for char in line:
encoded_line.append(src_to_index[char])
encoder_input.append(encoded_line)
print('source 문장의 정수 인코딩 :',encoder_input[:5])
# 디코더의 입력 프랑스어 데이터
decoder_input = []
for line in lines.tar:
encoded_line = []
for char in line:
encoded_line.append(tar_to_index[char])
decoder_input.append(encoded_line)
print('target 문장의 정수 인코딩 :',decoder_input[:5])
# 디코더의 예측값과 비교할 실제값
decoder_target = []
for line in lines.tar:
timestep = 0
encoded_line = []
for char in line:
if timestep > 0: # 두번째 문자부터 저장
encoded_line.append(tar_to_index[char])
timestep = timestep + 1 #첫번째 문자(=\t)는 건너뛰도록 함
decoder_target.append(encoded_line)
print('target 문장 레이블의 정수 인코딩 :',decoder_target[:5])
# 영어와 프랑스 각각 길이가 가장 긴 샘플 활인
max_src_len = max([len(line) for line in lines.src])
max_tar_len = max([len(line) for line in lines.tar])
print('source 문장의 최대 길이 :',max_src_len) # 23
print('target 문장의 최대 길이 :',max_tar_len) # 76
# 최대길이로 패딩
encoder_input = pad_sequences(encoder_input, maxlen=max_src_len, padding='post')
decoder_input = pad_sequences(decoder_input, maxlen=max_tar_len, padding='post')
decoder_target = pad_sequences(decoder_target, maxlen=max_tar_len, padding='post')
# 원핫인코딩
encoder_input = to_categorical(encoder_input)
decoder_input = to_categorical(decoder_input)
decoder_target = to_categorical(decoder_target)
(2) 교사 강요(Teacher forcing)
교사 강요란?
: 훈련 과정에서 정확한 학습을 위해 이전시점의 예측값 대신 실제값을 입력으로 주는 방법
(3) seq2seq 모델 설계
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense
from tensorflow.keras.models import Model
import numpy as np
# Encoder
encoder_inputs = Input(shape=(None, src_vocab_size))
encoder_lstm = LSTM(units=256, return_state=True)
# state_h 는 은닉상태, state_c 는 셀 상태
encoder_outputs, state_h, state_c = encoder_lstm(encoder_inputs)
# context vector = 은닉상태 + 셀 상태
encoder_states = [state_h, state_c]
# Decoder
decoder_inputs = Input(shape=(None, tar_vocab_size))
decoder_lstm = LSTM(units=256, return_sequences=True, return_state=True)
# 입력으로 context vector 전달
decoder_outputs, _, _= decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_softmax_layer = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_softmax_layer(decoder_outputs)
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer="rmsprop", loss="categorical_crossentropy")
# 모델 훈련
model.fit(x=[encoder_input, decoder_input], y=decoder_target, batch_size=64, epochs=40, validation_split=0.2)
(4) 번역기 돌려보기
# Encoder 정의
encoder_model = Model(inputs=encoder_inputs, outputs=encoder_states)
# Decoder 부품 설계
# 이전 시점의 상태들을 저장하는 텐서
decoder_state_input_h = Input(shape=(256,))
decoder_state_input_c = Input(shape=(256,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
# 문장의 다음 단어를 예측하기 위해서 초기 상태(initial_state)를 이전 시점의 상태로 사용
# 뒤의 함수 decode_sequence()에 동작을 구현 예정
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
# 훈련 과정에서와 달리 LSTM의 리턴하는 은닉 상태와 셀 상태를 버리지 않음
decoder_states = [state_h, state_c]
decoder_outputs = decoder_softmax_layer(decoder_outputs)
decoder_model = Model(inputs=[decoder_inputs] + decoder_states_inputs, outputs=[decoder_outputs] + decoder_states)
# 인덱스로부터 단어 얻는 문장 정의
index_to_src = dict((i, char) for char, i in src_to_index.items())
index_to_tar = dict((i, char) for char, i in tar_to_index.items())
# Decoder 정의
def decode_sequence(input_seq):
# 입력으로부터 인코더의 상태를 얻음
states_value = encoder_model.predict(input_seq)
# <SOS>에 해당하는 원-핫 벡터 생성
target_seq = np.zeros((1, 1, tar_vocab_size))
target_seq[0, 0, tar_to_index['\t']] = 1.
stop_condition = False
decoded_sentence = ""
# stop_condition이 True가 될 때까지 루프 반복
while not stop_condition:
# 이점 시점의 상태 states_value를 현 시점의 초기 상태로 사용
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
# 예측 결과를 문자로 변환
sampled_token_index = np.argmax(output_tokens[0, -1, :])
sampled_char = index_to_tar[sampled_token_index]
# 현재 시점의 예측 문자를 예측 문장에 추가
decoded_sentence += sampled_char
# <eos>에 도달하거나 최대 길이를 넘으면 중단.
if (sampled_char == '\n' or
len(decoded_sentence) > max_tar_len):
stop_condition = True
# 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
target_seq = np.zeros((1, 1, tar_vocab_size))
target_seq[0, 0, sampled_token_index] = 1.
# 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
states_value = [h, c]
return decoded_sentence
# 샘플 번역해보기
for seq_index in [3,50,100,300,1001]: # 입력 문장의 인덱스
input_seq = encoder_input[seq_index:seq_index+1]
decoded_sentence = decode_sequence(input_seq)
print(35 * "-")
print('입력 문장:', lines.src[seq_index])
print('정답 문장:', lines.tar[seq_index][2:len(lines.tar[seq_index])-1]) # '\t'와 '\n'을 빼고 출력
print('번역 문장:', decoded_sentence[1:len(decoded_sentence)-1]) # '\n'을 빼고 출력
-----------------------------------
입력 문장: Hi.
정답 문장: Salut !
번역 문장: Salut.
-----------------------------------
입력 문장: I see.
정답 문장: Aha.
번역 문장: Je change.
-----------------------------------
입력 문장: Hug me.
정답 문장: Serrez-moi dans vos bras !
번역 문장: Serre-moi dans vos patents !
-----------------------------------
입력 문장: Help me.
정답 문장: Aidez-moi.
번역 문장: Aidez-moi.
-----------------------------------
입력 문장: I beg you.
정답 문장: Je vous en prie.
번역 문장: Je vous en prie.
3. seq2seq 으로 단어레벨 번역기 실습
2번 실습과 똑같은 데이터 사용
(1) 데이터 전처리
# 데이터 다운로드
import os
import re
import shutil
import zipfile
import numpy as np
import pandas as pd
import tensorflow as tf
import unicodedata
import urllib3
from tensorflow.keras.layers import Embedding, GRU, Dense
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
import requests
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def download_zip(url, output_path):
response = requests.get(url, headers=headers, stream=True)
if response.status_code == 200:
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"ZIP file downloaded to {output_path}")
else:
print(f"Failed to download. HTTP Response Code: {response.status_code}")
url = "http://www.manythings.org/anki/fra-eng.zip"
output_path = "fra-eng.zip"
download_zip(url, output_path)
path = os.getcwd()
zipfilename = os.path.join(path, output_path)
with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
zip_ref.extractall(path)
# 전처리함수 정의
def to_ascii(s):
# 프랑스어 악센트(accent) 삭제
# 예시 : 'déjà diné' -> deja dine
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn')
def preprocess_sentence(sent):
sent = to_ascii(sent.lower()) # 악센트 제거 함수 호출
sent = re.sub(r"([?.!,¿])", r" \1", sent) # 단어와 구두점 사이에 공백 추가
sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent) # 필요없는건 공백으로 변환
sent = re.sub(r"\s+", " ", sent) # 다수의 공백을 하나로 치환
return sent
# 샘플 33000개에 전처리 적용
num_samples = 33000
def load_preprocessed_data():
encoder_input, decoder_input, decoder_target = [], [], []
with open("fra.txt", "r") as lines:
for i, line in enumerate(lines):
# source 데이터와 target 데이터 분리
src_line, tar_line, _ = line.strip().split('\t')
# source 데이터 전처리
src_line = [w for w in preprocess_sentence(src_line).split()]
# target 데이터 전처리
tar_line = preprocess_sentence(tar_line)
tar_line_in = [w for w in ("<sos> " + tar_line).split()]
tar_line_out = [w for w in (tar_line + " <eos>").split()]
encoder_input.append(src_line)
decoder_input.append(tar_line_in)
decoder_target.append(tar_line_out)
if i == num_samples - 1:
break
return encoder_input, decoder_input, decoder_target
# 인코더의 입력, 디코더의 입력, 디코더의 레이블 저장
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()
# 단어집합 생성, 정수 인코딩, 패딩
# 정의
tokenizer_en = Tokenizer(filters="", lower=False)
tokenizer_en.fit_on_texts(sents_en_in)
encoder_input = tokenizer_en.texts_to_sequences(sents_en_in)
encoder_input = pad_sequences(encoder_input, padding="post")
# 인코더 입력
tokenizer_fra = Tokenizer(filters="", lower=False)
tokenizer_fra.fit_on_texts(sents_fra_in)
tokenizer_fra.fit_on_texts(sents_fra_out)
# 디코더 입력
decoder_input = tokenizer_fra.texts_to_sequences(sents_fra_in)
decoder_input = pad_sequences(decoder_input, padding="post")
# 디코더 레이블
decoder_target = tokenizer_fra.texts_to_sequences(sents_fra_out)
decoder_target = pad_sequences(decoder_target, padding="post")
# 단어집합 크기 정의
src_vocab_size = len(tokenizer_en.word_index) + 1 # 영어단어집합크기 4647
tar_vocab_size = len(tokenizer_fra.word_index) + 1 # 프랑스어단어집합크기 8022
# 단어로부터 정수 얻는 딕셔너리
src_to_index = tokenizer_en.word_index
tar_to_index = tokenizer_fra.word_index
# 정수로부터 단어 얻는 딕셔너리
index_to_src = tokenizer_en.index_word
index_to_tar = tokenizer_fra.index_word
# 순서 섞기
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]
# 훈련데이터 10%를 테스트데이터로 분리
n_of_val = int(33000*0.1) # 3300
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]
encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]
(2) seq2seq 모델 설계
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking
from tensorflow.keras.models import Model
# Encoder
embedding_dim = 64
hidden_units = 64
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(src_vocab_size, embedding_dim)(encoder_inputs) # 임베딩 층
enc_masking = Masking(mask_value=0.0)(enc_emb) # 패딩 0은 연산에서 제외
encoder_lstm = LSTM(hidden_units, return_state=True) # 상태값 리턴을 위해 return_state는 True
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking) # 은닉 상태와 셀 상태를 리턴
encoder_states = [state_h, state_c] # 인코더의 은닉 상태와 셀 상태를 저장
# Decoder
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(tar_vocab_size, hidden_units) # 임베딩 층
dec_emb = dec_emb_layer(decoder_inputs) # 패딩 0은 연산에서 제외
dec_masking = Masking(mask_value=0.0)(dec_emb)
# 상태값 리턴을 위해 return_state=True, 모든 시점에 대해 단어 예측하기 위해 return_sequences=True
decoder_lstm = LSTM(hidden_units, return_sequences=True, return_state=True)
# 인코더의 은닉상태를 초기 은닉상태(initial_state)로 사용
decoder_outputs, _, _ = decoder_lstm(dec_masking,
initial_state=encoder_states)
# 모든 시점의 결과에 대해서 소프트맥스 함수를 사용한 출력층을 통해 단어 예측
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)
# 모델의 입출력 정의
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])
# 모델 훈련
model.fit(x=[encoder_input_train, decoder_input_train], y=decoder_target_train, \
validation_data=([encoder_input_test, decoder_input_test], decoder_target_test),
batch_size=128, epochs=50)
(3) 번역기 돌려보기
# Encoder 정의
encoder_model = Model(encoder_inputs, encoder_states)
# Decoder 부품 설계
# 이전 시점의 상태를 보관할 텐서
decoder_state_input_h = Input(shape=(hidden_units,))
decoder_state_input_c = Input(shape=(hidden_units,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
# 훈련 때 사용했던 임베딩 층 재사용
dec_emb2 = dec_emb_layer(decoder_inputs)
# 다음 단어 예측을 위해 이전시점의 상태를 현시점의 초기상태로 사용
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]
# 모든 시점에 대해 단어 예측
decoder_outputs2 = decoder_dense(decoder_outputs2)
# 수정된 디코더
decoder_model = Model(
[decoder_inputs] + decoder_states_inputs,
[decoder_outputs2] + decoder_states2)
# Decoder 정의
def decode_sequence(input_seq):
# 입력으로부터 인코더의 마지막 시점의 상태(은닉 상태, 셀 상태)를 얻음
states_value = encoder_model.predict(input_seq)
# <SOS>에 해당하는 정수 생성
target_seq = np.zeros((1,1))
target_seq[0, 0] = tar_to_index['<sos>']
stop_condition = False
decoded_sentence = ''
# stop_condition이 True가 될 때까지 루프 반복
# 구현의 간소화를 위해서 이 함수는 배치 크기를 1로 가정합니다.
while not stop_condition:
# 이점 시점의 상태 states_value를 현 시점의 초기 상태로 사용
output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
# 예측 결과를 단어로 변환
sampled_token_index = np.argmax(output_tokens[0, -1, :])
sampled_char = index_to_tar[sampled_token_index]
# 현재 시점의 예측 단어를 예측 문장에 추가
decoded_sentence += ' '+sampled_char
# <eos>에 도달하거나 정해진 길이를 넘으면 중단.
if (sampled_char == '<eos>' or
len(decoded_sentence) > 50):
stop_condition = True
# 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
target_seq = np.zeros((1,1))
target_seq[0, 0] = sampled_token_index
# 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
states_value = [h, c]
return decoded_sentence
# 정수시퀀스 -> 텍스트 시퀀스 변환 함수 정의
# 원문
def seq_to_src(input_seq):
sentence = ''
for encoded_word in input_seq:
if(encoded_word != 0):
sentence = sentence + index_to_src[encoded_word] + ' '
return sentence
# 번역문
def seq_to_tar(input_seq):
sentence = ''
for encoded_word in input_seq:
if(encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']):
sentence = sentence + index_to_tar[encoded_word] + ' '
return sentence
# 샘플
for seq_index in [3, 50, 100, 300, 1001]:
input_seq = encoder_input_train[seq_index: seq_index + 1]
decoded_sentence = decode_sequence(input_seq)
print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
print("번역문장 :",decoded_sentence[1:-5])
print("-"*50)
# 결과
입력문장 : when does it end ?
정답문장 : quand est ce que ca finit ?
번역문장 : quand est ce que ca marche ?
--------------------------------------------------
입력문장 : it s sand .
정답문장 : c est du sable .
번역문장 : c est de l eau .
--------------------------------------------------
입력문장 : i didn t go .
정답문장 : je n y suis pas allee .
번역문장 : je ne suis pas encore .
--------------------------------------------------
입력문장 : it was a mistake .
정답문장 : ce fut une erreur .
번역문장 : il s agit d une blague .
--------------------------------------------------
입력문장 : it boggles my mind .
정답문장 : ca me laisse perplexe .
번역문장 : ca m en femme .
--------------------------------------------------
# 14-02 BLEU Score(Bilingual Evaluation Understudy Score)
앞서 3장(언어모델)에서 Perplexity(PPL)에 대해 배웠었다. PPL은 '헷갈리는 정도'로, 수치가 낮을수록 좋은 것이다.
ex) PPL=10이라면 모든 시점마다 평균 10개의 단어 중 어떤게 정답인지 고민하고 있다는 것
그러나 PPL은 번역의 성능을 직접적으로 반영하지 못한다.
1. BLEU란?
기계번역 결과와 사람이 직접 번역한 결과의 유사한 정도를 비교하여 번역 성능을 측정하는 방법. 높을수록 성능이 좋음
장점: 언어에 구애받지 않고 사용 가능, 빠른 계산속도
BLEU의 작동 과정을 순차적으로 알아보자
(1) 유니그램 정밀도(Unigram Precision)
사람의 번역문에서 등장한 단어가 기계의 번역문에서 몇 번 나타나는지 세어 모두 더한 후, 기계의 번역문에 등장한 모든 단어의 카운트 총합으로 나누는 것.
(2) 보정된 유니그램 정밀도(Modified Unigram Precision)
사람 번역문과 기계 번역문을 매칭하여 카운트하는 과정에서 이전에 매칭된 적이 있었다면 중복을 제거함.
(3) BLEU: n-gram 으로 확장
결국 단어의 빈도수로 접근하는 방식은 단어의 순서를 고려하지 못함. 따라서 문법을 고려하려면 n-gram을 이용해야 함.
2,3,4...n의 n-gram에 대한 보정된 정밀도를 각각 구해 모두 조합한 것이 바로 BLEU
(4) Brevity Penalty
그러나 BLEU는 짧은 문장에 대해 잘 작동하지 못함. 따라서 짧은 문장에 대해 페널티 점수를 줌(=BP, Brevity Penality)
(5) BLEU 구현 코드
# BP 함수
def brevity_penalty(candidate, reference_list):
ca_len = len(candidate)
ref_len = closest_ref_length(candidate, reference_list)
if ca_len > ref_len:
return 1
# candidate가 비어있다면 BP = 0 → BLEU = 0.0
elif ca_len == 0 :
return 0
else:
return np.exp(1 - ref_len/ca_len)
# BLEU 함수
def bleu_score(candidate, reference_list, weights=[0.25, 0.25, 0.25, 0.25]):
bp = brevity_penalty(candidate, reference_list)
p_n = [modified_precision(candidate, reference_list, n=n) for n, _ in enumerate(weights,start=1)]
score = np.sum([w_i * np.log(p_i) if p_i != 0 else 0 for w_i, p_i in zip(weights, p_n)])
return bp * np.exp(score)
2. NLTK로 BLEU 측정
import nltk.translate.bleu_score as bleu
candidate = '문장'
references = ['문장1', '문장2', '문장3']
print(bleu_score(candidate.split(),list(map(lambda ref: ref.split(), references))))
print(bleu.sentence_bleu(list(map(lambda ref: ref.split(), references)),candidate.split()))
'딥러닝 > 딥러닝을 이용한 자연어처리 입문' 카테고리의 다른 글
[딥러닝 NLP] 16. Transformer (1) | 2024.02.07 |
---|---|
[딥러닝 NLP] 15. Attention(Dot-Product Attention, Bahdanau Attention) (1) | 2024.02.06 |
[딥러닝 NLP] 13. Subword Tokenizer(BPE, SentencePiece, Huggingface) (0) | 2024.02.01 |
[딥러닝 NLP] 12. Tagging Task 실습(NER, POS) (1) | 2024.02.01 |
[딥러닝 NLP] 11. CNN(Convolution Neural Network) (1) | 2024.01.31 |