티스토리 뷰
TFX로 파일이나 서비스에서 데이터를 수집하는 컴포넌트를 실행할 수 있다. 데이터를 split하고, 여러 데이터를 결합하고, 다양한 형태의 데이터를 수집하는 전략들을 알아보자.
데이터를 컴포넌트로 전달하기 전에 다음의 절차를 따른다.
- 데이터를 데이터셋(train, valid)로 split
- TFRecord 파일로 변환
TFRecord는 데이터셋 스트리밍에 최적화된 형식이다. TFRecord는 대량의 데이터를 빠르게 다운로드하거나 write할 때 쓰는데 최적화 되어있고, 모든 TFX 컴포넌트에서 사용한다.
TFRecord로 변환, 혹은 기존의 TFRecord 가져오기
tfx.components의 CsvExampleGen 패키지를 통해서 기존의 .csv 파일을 tf.Example로 변환할 수 있다.
import os
from pathlib import Path
from tfx.components import CsvExampleGen
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
dir_path = Path().parent.absolute()
input_base = os.path.join(dir_path, "..", "data", "taxi")
example_gen = CsvExampleGen(input_base=input_base)
context.run(example_gen)
기존의 TFRecord를 가져올수도, parquet나 avro파일을 변환할 수도 있다.
TFRecord의 데이터 구조는 다음과 같다.
Record 1:
tf.Example
tf.Features #key는 항상 str, value는 tf.train.Feature 객체를 가짐
'column A': tf.train.Feature
'column B': tf.train.Feature
'column C': tf.train.Feature
tf.train.Feature는 BytesList, FloatList, Int64List 세 가지의 데이터 타입을 허용하며, 헬퍼 함수를 통해서 코드 중복을 줄인다.
def _bytes_feature(value):
return tf.train.Feature(
bytes_list=tf.train.BytesList(value=[value.encode()])
)
def _float_feature(value):
return tf.train.Feature(
float_list=tf.train.FloatList(value=[value])
)
def _int64_feature(value):
return tf.train.Feature(
int64_list=tf.train.Int64List(value=[value])
)
GCP 빅쿼리 테이블에서 수집하기
BigQueryExampleGen 컴포넌트를 통해서 빅쿼리 테이블을 선택할 수 있다.
이 과정을 수행하기 전에 먼저 GCP 서비스 계정을 생성하고 key를 가져와서 자격 증명을 해야한다.
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path/of/your/key.json"
from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen
query = """
SELECT * FROM `<bigquery_public_data>.<covid19_open_data_eu>.<covid19_open_data_eu>`
"""
example_gen = BigQueryExampleGen(query=query)
데이터셋 분할
example_gen_pb2 패키지를 이용해서 train, eval, test셋으로 각각 나눌 수 있다.
from tfx.proto import example_gen_pb2
output = example_gen_pb2.Output(
# 선호하는 분할을 정의합니다.
split_config=example_gen_pb2.SplitConfig(splits=[
# 비율을 지정합니다.
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
]))
본 예시에서는 6:2:2로 분할하였고, 디폴트는 2:1이다.
이미 데이터셋이 나눠진 상태라면, 기존 입력 분할을 유지할 수 있다.
input = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(name='train', pattern='train/*'),
example_gen_pb2.Input.Split(name='eval', pattern='eval/*'),
example_gen_pb2.Input.Split(name='test', pattern='test/*')
])
머신러닝 파이프라인에서는 데이터, 모델을 꾸준히 업데이트 해야 하는데, span을 이용해서 데이터 스냅샷을 만들 수 있다.
input = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(pattern='export-{SPAN}/*')
])
데이터 셋의 유형에 따른 수집 전략
데이터의 유형에 따라 권장되는 수집 전략을 정리해보면 다음과 같다.
- 정형 데이터
- 데이터베이스에 있다면 csv로 내보내 CsvExampleGen으로 수집한다. 혹은 데이터베이스에 PrestoExampleGen 또는 BigQueryExampleGen으로 직접 사용한다. 대용량의 경우 TFRecord파일로 변환하거나 Parquet형태로 저장하는 것이 좋다. - 텍스트 데이터
- Corpus는 데이터 증가세가 급격히 커질 수 있으니 처음부터 TFRecord나 Parquet형태로 변환하는 것이 좋다. - 이미지 데이터
- 이미지의 경우 TFRecord로 변환하는 것이 좋으나, 다시 이미지로 디코딩하는 것은 디스크 공간을 증가시키니 지양한다. 압축한 이미지를 tf.Example 레코드에 바이트 문자열로 저장할 수 있다.
tfrecord_filename = 'data/image_dataset.tfrecord' with tf.io.TFRecordWriter(tfrecord_filename) as writer: for img_path in filenames: image_path = os.path.join(base_path, img_path) try: raw_file = tf.io.read_file(image_path) except FileNotFoundError: print("File {} could not be found".format(image_path)) continue example = tf.train.Example(features=tf.train.Features(feature={ 'image_raw': _bytes_feature(raw_file.numpy()), 'label': _int64_feature(generate_label_from_path(image_path)) })) writer.write(example.SerializeToString())
데이터를 수집하는 다양한 방법을 알아보았는데, TFX를 사용하는 머신러닝 파이프라인에서는 아직 익숙하지는 않지만 tf.Example(TFRecord)의 사용이 매우 중요한 것 같다.
'Study > MLOps' 카테고리의 다른 글
도커와 쿠버네티스 (1) - 개념과 실습환경 (0) | 2021.11.25 |
---|---|
Building ML Pipelines 따라잡기 (5) - 데이터 전처리 (0) | 2021.11.24 |
Building ML Pipelines 따라잡기 (4) - 데이터 검증 (0) | 2021.11.22 |
Building ML Pipelines 따라잡기 (2) - TFX(Tensorflow Extended) (0) | 2021.11.17 |
Building ML Pipelines 따라잡기 (1) - intro (0) | 2021.11.15 |
- Total
- Today
- Yesterday
- 도커
- Bert
- 머신러닝파이프라인
- productmanager
- 쿠버네티스
- Oreilly
- 딥러닝
- torch
- nlp
- PO
- Tennis
- pmpo
- container
- 전처리
- MLOps
- 머신러닝
- 인공지능
- docker
- ML
- productresearch
- DDUX
- dl
- 자연어처리
- deeplearning
- 파이프라인
- 스타트업
- mlpipeline
- PM
- Kubernetes
- productowner
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |