메인 콘텐츠로 건너뛰기
Dagster와 W&B를 함께 사용하여 MLOps 파이프라인을 오케스트레이션하고 ML 에셋을 관리하십시오. W&B와의 통합을 통해 Dagster 내에서 다음 작업을 쉽게 수행할 수 있습니다:
  • W&B Artifact를 생성하고 사용합니다.
  • W&B Registry에서 등록된 모델(Registered Model)을 사용하고 생성합니다.
  • W&B Launch를 사용하여 전용 컴퓨트에서 학습 작업을 실행합니다.
  • op 및 에셋에서 wandb 클라이언트를 사용합니다.
W&B Dagster 통합은 W&B 전용 Dagster 리소스와 IO Manager를 제공합니다:
  • wandb_resource: W&B API에 인증하고 통신하는 데 사용되는 Dagster 리소스입니다.
  • wandb_artifacts_io_manager: W&B 아티팩트를 사용하는 데 사용되는 Dagster IO Manager입니다.
다음 가이드는 Dagster에서 W&B를 사용하기 위한 사전 준비 사항을 충족하는 방법, op 및 에셋에서 W&B 아티팩트를 생성하고 사용하는 방법, W&B Launch를 사용하는 방법 및 권장 모범 사례를 설명합니다.

시작하기 전에

W&B에서 Dagster를 사용하려면 다음 리소스가 필요합니다:
  1. W&B API key
  2. W&B entity(사용자 또는 팀): 엔터티는 W&B 실행과 아티팩트를 전송하는 사용자 이름 또는 팀 이름입니다. 실행을 로깅하기 전에 W&B App UI에서 계정 또는 팀 엔터티를 먼저 생성하세요. 엔터티를 지정하지 않으면 실행은 기본 엔터티(일반적으로 본인 사용자 이름)로 전송됩니다. 기본 엔터티는 설정의 Project Defaults에서 변경할 수 있습니다.
  3. W&B project: W&B 실행이 저장되는 프로젝트 이름입니다.
해당 사용자 또는 팀의 W&B App 프로필 페이지를 확인하여 W&B 엔터티를 찾을 수 있습니다. 기존 W&B 프로젝트를 사용하거나 새로 생성할 수 있습니다. 새 프로젝트는 W&B App 홈 페이지 또는 사용자/팀 프로필 페이지에서 생성할 수 있습니다. 프로젝트가 존재하지 않는 경우 처음 사용할 때 자동으로 생성됩니다.

API key 설정

  1. W&B에 로그인합니다. 참고: W&B Server를 사용하는 경우 인스턴스 호스트 이름을 관리자에게 문의하세요.
  2. User Settings에서 API key를 생성합니다. 프로덕션 환경에서는 해당 key를 소유할 service account 사용을 권장합니다.
  3. 해당 API key를 환경 변수로 설정합니다: export WANDB_API_KEY=YOUR_KEY.
아래 예제에서는 Dagster 코드에서 API key를 지정하는 위치를 보여 줍니다. wandb_config 중첩 딕셔너리 안에 엔터티와 프로젝트 이름을 지정해야 합니다. 다른 W&B Project를 사용하려는 경우, 서로 다른 op/asset에 서로 다른 wandb_config 값을 전달할 수 있습니다. 전달할 수 있는 key에 대한 자세한 내용은 아래 Configuration 섹션을 참조하세요.
예시: @job에 대한 설정
# 이 내용을 config.yaml에 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 config를 설정할 수도 있습니다
# 참고: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 여기를 자신의 W&B 엔터티로 변경하세요
     project: my_project # 여기를 자신의 W&B 프로젝트로 변경하세요


@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
       "io_manager": wandb_artifacts_io_manager,
   }
)
def simple_job_example():
   my_op()

구성

아래의 구성 옵션은 통합에서 제공하는 W&B 전용 Dagster resource 및 IO Manager에 대한 설정으로 사용됩니다.
  • wandb_resource: W&B API와 통신하는 데 사용되는 Dagster resource입니다. 제공된 API key를 사용해 자동으로 인증합니다. 속성:
    • api_key: (str, 필수): W&B API와 통신하는 데 필요한 W&B API key입니다.
    • host: (str, 선택): 사용하려는 API 호스트 서버입니다. W&B Server를 사용하는 경우에만 필요합니다. 기본값은 Public Cloud 호스트인 https://api.wandb.ai입니다.
  • wandb_artifacts_io_manager: W&B 아티팩트를 사용하기 위한 Dagster IO Manager입니다. 속성:
    • base_dir: (int, 선택) 로컬 스토리지 및 캐싱에 사용되는 기본 디렉터리입니다. W&B 아티팩트와 W&B 실행 로그는 해당 디렉터리에 기록되고 해당 디렉터리에서 읽습니다. 기본적으로 DAGSTER_HOME 디렉터리를 사용합니다.
    • cache_duration_in_minutes: (int, 선택) W&B 아티팩트와 W&B 실행 로그를 로컬 스토리지에 얼마나 오래 유지할지 정의합니다. 지정한 시간 동안 한 번도 열리지 않은 파일과 디렉터리만 캐시에서 제거됩니다. 캐시 정리는 IO Manager 실행이 끝날 때 수행됩니다. 캐시를 완전히 끄려면 0으로 설정할 수 있습니다. 캐싱은 동일한 머신에서 실행되는 작업 간에 아티팩트를 재사용할 때 성능을 향상시킵니다. 기본값은 30일입니다.
    • run_id: (str, 선택): 재시작(resume)에 사용되는, 이 실행에 대한 고유 ID입니다. 프로젝트 내에서 고유해야 하며, 실행을 삭제한 경우 해당 ID를 재사용할 수 없습니다. 짧은 설명용 이름에는 name 필드를 사용하고, 실행 간 비교할 수 있도록 하이퍼파라미터를 저장하려면 config를 사용하십시오. ID에는 다음 특수 문자를 포함할 수 없습니다: /\#?%:.. Dagster 내에서 실험 추적을 수행하는 경우 IO Manager가 실행을 재개할 수 있도록 Run ID를 설정해야 합니다. 기본적으로 Dagster 실행 ID(예: 7e4df022-1bf2-44b5-a383-bb852df4077e)로 설정됩니다.
    • run_name: (str, 선택) UI에서 이 실행을 식별하는 데 도움이 되는 짧은 표시 이름입니다. 기본적으로 dagster-run-[Dagster 실행 ID의 처음 8자] 형식의 문자열입니다. 예: dagster-run-7e4df022.
    • run_tags: (list[str], 선택): UI에서 이 실행의 태그 목록을 채우는 문자열 목록입니다. 태그는 실행을 함께 구성하거나 baseline, production과 같은 임시 레이블을 적용하는 데 유용합니다. UI에서 태그를 쉽게 추가 및 제거하거나, 특정 태그가 있는 실행만 필터링할 수 있습니다. 통합에서 사용하는 모든 W&B 실행에는 dagster_wandb 태그가 추가됩니다.

W&B 아티팩트 사용하기

W&B 아티팩트 통합은 Dagster IO Manager에 의존합니다. IO Manager는 에셋 또는 op의 출력을 저장하고, 이를 다운스트림 에셋이나 op의 입력으로 로드하는 역할을 하는 사용자가 제공하는 객체입니다. 예를 들어, IO Manager는 파일 시스템 상의 파일에 객체를 저장하고, 그 파일에서 다시 로드할 수 있습니다. 이 통합은 W&B 아티팩트를 위한 IO Manager를 제공합니다. 이를 통해 모든 Dagster @op 또는 @asset이 W&B 아티팩트를 직접 생성하고 사용할 수 있습니다. 아래는 Python 리스트를 포함하는 dataset 유형의 W&B 아티팩트를 생성하는 @asset의 간단한 예시입니다.
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3] # 이 값은 아티팩트에 저장됩니다
@op, @asset, @multi_asset에 메타데이터 구성을 지정해 아티팩트를 기록할 수 있습니다. 마찬가지로 Dagster 외부에서 생성된 경우라도 W&B 아티팩트를 사용할 수 있습니다.

W&B 아티팩트 쓰기

계속 진행하기 전에 W&B 아티팩트 사용 방법을 충분히 이해하고 있는지 확인하는 것을 권장합니다. 아티팩트 가이드를 참고하십시오. Python 함수에서 객체를 반환하여 W&B 아티팩트를 기록할 수 있습니다. W&B는 다음과 같은 객체를 지원합니다:
  • Python 객체 (int, dict, list…)
  • W&B 객체 (Table, Image, Graph…)
  • W&B 아티팩트 객체
아래 예제들은 Dagster asset(@asset)을 사용하여 W&B 아티팩트를 쓰는 방법을 보여줍니다:
pickle 모듈로 직렬화할 수 있는 객체는 모두 피클링되어, 통합에서 생성한 아티팩트에 추가됩니다. 해당 아티팩트를 Dagster 내에서 읽을 때는 내용이 언피클링됩니다(자세한 내용은 Read artifacts를 참고하십시오).
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3]
W&B는 여러 Pickle 기반 직렬화 모듈(pickle, dill, cloudpickle, joblib)을 지원합니다. ONNX 또는 PMML과 같은 더 고급 직렬화도 사용할 수 있습니다. 자세한 내용은 Serialization 섹션을 참조하십시오.

구성

wandb_artifact_configuration라는 구성 딕셔너리를 @op, @asset, @multi_asset에 설정할 수 있습니다. 이 딕셔너리는 데코레이터 인수의 metadata로 전달해야 합니다. 이 구성은 IO 매니저에서 W&B 아티팩트를 읽고 쓰는 방식을 제어하는 데 필요합니다. @op의 경우 Out 의 metadata 인수를 통해 출력 메타데이터에 정의합니다.
@asset의 경우 @asset의 metadata 인수에 정의합니다.
@multi_asset의 경우 각 출력 메타데이터에 대해 AssetOut 의 metadata 인수를 통해 정의합니다.
다음 코드 예제는 @op, @asset, @multi_asset 연산에서 딕셔너리를 구성하는 방법을 보여줍니다:
@op 예시:
@op(
   out=Out(
       metadata={
           "wandb_artifact_configuration": {
               "name": "my_artifact",
               "type": "dataset",
           }
       }
   )
)
def create_dataset():
   return [1, 2, 3]
지원되는 속성:
  • name: (str) 이 아티팩트의 사람이 읽을 수 있는 이름으로, UI에서 아티팩트를 식별하거나 use_artifact 호출에서 참조할 때 사용합니다. 이름에는 문자, 숫자, 밑줄, 하이픈, 마침표를 포함할 수 있습니다. 이름은 프로젝트 전체에서 고유해야 합니다. @op에 필수입니다.
  • type: (str) 아티팩트의 타입으로, 아티팩트를 구성하고 구분하는 데 사용됩니다. 일반적인 타입으로는 dataset이나 model이 있지만, 문자, 숫자, 밑줄, 하이픈, 마침표를 포함하는 임의의 문자열을 사용할 수 있습니다. 출력이 이미 아티팩트가 아닌 경우 필수입니다.
  • description: (str) 아티팩트에 대한 설명을 제공하는 자유 형식 텍스트입니다. 설명은 UI에서 마크다운으로 렌더링되므로, 표, 링크 등을 넣기에 적합한 위치입니다.
  • aliases: (list[str]) 이 아티팩트에 적용하려는 하나 이상의 별칭(alias)을 담은 리스트입니다. 통합 기능에서는 설정 여부와 관계없이 해당 목록에 latest 태그도 추가합니다. 이는 모델과 데이터셋의 버전 관리를 수행하는 효과적인 방법입니다.
  • add_dirs: (list[dict[str, Any]]): 아티팩트에 포함할 각 로컬 디렉터리에 대한 구성을 담은 리스트입니다.
  • add_files: (list[dict[str, Any]]): 아티팩트에 포함할 각 로컬 파일에 대한 구성을 담은 리스트입니다.
  • add_references: (list[dict[str, Any]]): 아티팩트에 포함할 각 외부 참조에 대한 구성을 담은 리스트입니다.
  • serialization_module: (dict) 사용될 직렬화(serialization) 모듈의 구성입니다. 자세한 내용은 Serialization 섹션을 참조하세요.
    • name: (str) 직렬화 모듈의 이름입니다. 허용되는 값: pickle, dill, cloudpickle, joblib. 모듈은 로컬에서 사용 가능해야 합니다.
    • parameters: (dict[str, Any]) 직렬화 함수에 전달되는 선택적 인자입니다. 해당 모듈의 dump 메서드와 동일한 파라미터를 받습니다. 예: {"compress": 3, "protocol": 4}.
고급 예제:
@asset(
   name="my_advanced_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
           "description": "My *Markdown* description",
           "aliases": ["my_first_alias", "my_second_alias"],
           "add_dirs": [
               {
                   "name": "My directory",
                   "local_path": "path/to/directory",
               }
           ],
           "add_files": [
               {
                   "name": "validation_dataset",
                   "local_path": "path/to/data.json",
               },
               {
                   "is_tmp": True,
                   "local_path": "path/to/temp",
               },
           ],
           "add_references": [
               {
                   "uri": "https://picsum.photos/200/300",
                   "name": "External HTTP reference to an image",
               },
               {
                   "uri": "s3://my-bucket/datasets/mnist",
                   "name": "External S3 reference",
               },
           ],
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
   return [1, 2, 3]
자산은 통합의 양쪽에서 유용한 메타데이터와 함께 생성됩니다.
  • W&B 측: 소스 통합 이름과 버전, 사용된 Python 버전, pickle 프로토콜 버전 등
  • Dagster 측:
    • Dagster Run ID
    • W&B 실행: ID, 이름, 경로, URL
    • W&B 아티팩트: ID, 이름, 유형, 버전, 크기, URL
    • W&B 엔터티
    • W&B Project
다음 이미지는 Dagster 자산에 추가된 W&B 메타데이터를 보여 줍니다. 이 정보는 통합을 통해 Dagster로 전파됩니다.
W&B 프로젝트와 실행에 대한 참조를 포함해, W&B 메타데이터가 첨부된 자산 상세 보기가 표시된 Dagster UI
다음 이미지는 제공된 구성 정보가 W&B 아티팩트에 유용한 메타데이터로 어떻게 보강되었는지를 보여 줍니다. 이 정보는 재현성과 유지 관리에 도움이 되며, 이 통합이 없다면 사용할 수 없습니다.
Dagster에서 가져온 보강된 구성 메타데이터가 포함된 W&B 아티팩트 페이지
Dagster에서 추가 구성 세부 정보를 포함한 W&B 아티팩트 메타데이터 패널
Dagster에서 추가로 확장된 구성 메타데이터 필드를 포함한 W&B 아티팩트 뷰
mypy와 같은 정적 타입 체커를 사용하는 경우, 다음과 같이 구성 타입 정의 객체를 import 하세요:
from dagster_wandb import WandbArtifactConfiguration

파티션 사용

이 통합은 기본적으로 Dagster 파티션을 네이티브하게 지원합니다. 다음은 DailyPartitionsDefinition을 사용해 파티션을 정의한 예시입니다.
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    name="my_daily_partitioned_asset",
    compute_kind="wandb",
    metadata={
        "wandb_artifact_configuration": {
            "type": "dataset",
        }
    },
)
def create_my_daily_partitioned_asset(context):
    partition_key = context.asset_partition_key_for_output()
    context.log.info(f"Creating partitioned asset for {partition_key}")
    return random.randint(0, 100)
이 코드는 각 파티션마다 하나의 W&B 아티팩트를 생성합니다. 아티팩트 패널(UI)에서 파티션 키가 덧붙은 자산 이름 아래에서 아티팩트를 확인할 수 있습니다. 예를 들어 my_daily_partitioned_asset.2023-01-01, my_daily_partitioned_asset.2023-01-02, 또는 my_daily_partitioned_asset.2023-01-03처럼 표시됩니다. 여러 차원으로 파티셔닝된 자산은 각 차원을 점(.)으로 구분한 형식으로 표시됩니다. 예를 들어 my_asset.car.blue와 같습니다.
이 통합에서는 하나의 실행 내에서 여러 파티션을 머터리얼라이즈(materialize)할 수 없습니다. 자산을 머터리얼라이즈하려면 여러 실행을 수행해야 합니다. 이는 Dagit에서 자산을 머터리얼라이즈할 때 실행할 수 있습니다.
파티셔닝된 자산에 대해 각 파티션이 별도 실행으로 표시된 여러 실행이 있는 Dagster UI

고급 사용법

W&B 아티팩트 읽기

W&B 아티팩트를 읽는 방법은 쓰는 방법과 비슷합니다. wandb_artifact_configuration라는 설정 딕셔너리를 @op 또는 @asset에 설정할 수 있습니다. 차이점은 출력이 아니라 입력에 이 설정을 지정해야 한다는 점입니다. @op의 경우 In 메타데이터 인자를 통해 입력 메타데이터에 이 설정을 지정합니다. 이때 아티팩트의 이름을 명시적으로 전달해야 합니다. @asset의 경우 Asset In 메타데이터 인자를 통해 입력 메타데이터에 이 설정을 지정합니다. 상위 에셋의 이름이 아티팩트 이름과 일치해야 하므로, 아티팩트 이름을 직접 전달하면 안 됩니다. 이 통합 외부에서 생성된 아티팩트에 의존성을 두려면 SourceAsset을 사용해야 합니다. 이 경우 항상 해당 에셋의 최신 버전을 읽습니다. 다음 예제는 다양한 op에서 아티팩트를 읽는 방법을 보여줍니다.
@op에서 아티팩트 읽기
@op(
   ins={
       "artifact": In(
           metadata={
               "wandb_artifact_configuration": {
                   "name": "my_artifact",
               }
           }
       )
   },
   io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
   context.log.info(artifact)

구성

다음 구성은 IO Manager가 어떤 내용을 수집하여 데코레이터가 적용된 함수에 입력으로 제공해야 하는지를 지정하는 데 사용됩니다. 다음과 같은 읽기 패턴을 지원합니다.
  1. 아티팩트에 포함된 이름이 지정된 객체를 가져오려면 get을 사용하십시오:
@asset(
   ins={
       "table": AssetIn(
           key="my_artifact_with_table",
           metadata={
               "wandb_artifact_configuration": {
                   "get": "my_table",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_table(context, table):
   context.log.info(table.get_column("a"))
  1. 아티팩트에 포함된 다운로드한 파일의 로컬 경로를 가져오려면 get_path를 사용하세요:
@asset(
   ins={
       "path": AssetIn(
           key="my_artifact_with_file",
           metadata={
               "wandb_artifact_configuration": {
                   "get_path": "name_of_file",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_path(context, path):
   context.log.info(path)
  1. 전체 아티팩트 객체(내용을 로컬로 다운로드한 상태)를 가져오려면:
@asset(
   ins={
       "artifact": AssetIn(
           key="my_artifact",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def get_artifact(context, artifact):
   context.log.info(artifact.name)
지원되는 속성
  • get: (str) 아티팩트 상대 이름에 해당하는 W&B 객체를 가져옵니다.
  • get_path: (str) 아티팩트 상대 이름에 해당하는 파일의 경로를 가져옵니다.

직렬화 구성

기본적으로 이 통합은 표준 pickle 모듈을 사용하지만, 일부 객체는 pickle과 호환되지 않습니다. 예를 들어, yield를 사용하는 함수는 pickle로 직렬화하려고 하면 오류가 발생합니다. 추가적인 Pickle 기반 직렬화 모듈(dill, cloudpickle, joblib)도 지원합니다. 직렬화된 문자열을 반환하거나 아티팩트를 직접 생성하여 ONNXPMML과 같은 보다 고급 직렬화 방식을 사용할 수도 있습니다. 어떤 방식을 선택할지는 사용 사례에 따라 달라지므로, 이 주제에 대한 관련 문헌을 참고하십시오.

Pickle 기반 직렬화 모듈

Pickle 방식은 보안상 안전하지 않은 것으로 알려져 있습니다. 보안이 중요하다면 W&B 객체만 사용하십시오. 데이터에 서명하고 해시 키를 자체 시스템에 저장하는 것을 권장합니다. 더 복잡한 사용 사례가 있다면 언제든지 저희에게 연락해 주세요. 기꺼이 도와드리겠습니다.
wandb_artifact_configuration 내의 serialization_module 딕셔너리를 통해 사용할 직렬화 방식을 설정할 수 있습니다. Dagster를 실행하는 머신에 해당 모듈이 설치되어 있는지 확인하십시오. 통합 기능은 해당 아티팩트를 읽을 때 어떤 직렬화 모듈을 사용해야 하는지 자동으로 인식합니다. 현재 지원되는 모듈은 pickle, dill, cloudpicklejoblib입니다. 다음은 joblib으로 직렬화된 model을 생성한 후 추론에 사용하는 단순화된 예제입니다.
@asset(
    name="my_joblib_serialized_model",
    compute_kind="Python",
    metadata={
        "wandb_artifact_configuration": {
            "type": "model",
            "serialization_module": {
                "name": "joblib"
            },
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
    # 실제 ML 모델은 아니지만 pickle 모듈로는 이것이 불가능합니다
    return lambda x, y: x + y

@asset(
    name="inference_result_from_joblib_serialized_model",
    compute_kind="Python",
    ins={
        "my_joblib_serialized_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        )
    },
    metadata={
        "wandb_artifact_configuration": {
            "type": "results",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
    context: OpExecutionContext, my_joblib_serialized_model
):
    inference_result = my_joblib_serialized_model(1, 2)
    context.log.info(inference_result)  # 출력: 3
    return inference_result

고급 직렬화 형식 (ONNX, PMML)

ONNX와 PMML 같은 교환용 파일 형식을 사용하는 경우가 흔합니다. 이 통합은 해당 형식을 지원하지만, Pickle 기반 직렬화에 비해 약간 더 많은 작업이 필요합니다. 이러한 형식을 사용하는 방법은 두 가지입니다.
  1. 모델을 선택한 형식으로 변환한 다음, 그 형식의 문자열 표현을 일반 Python 객체처럼 반환합니다. 이 통합이 그 문자열을 pickle로 직렬화합니다. 이후 그 문자열을 사용해 모델을 다시 복원할 수 있습니다.
  2. 직렬화된 모델을 포함하는 새로운 로컬 파일을 생성한 다음, add_file 설정을 사용해 해당 파일로 사용자 정의 아티팩트를 생성합니다.
다음은 Scikit-learn 모델을 ONNX를 사용해 직렬화하는 예시입니다.
import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from dagster import AssetIn, AssetOut, asset, multi_asset

@multi_asset(
    compute_kind="Python",
    outs={
        "my_onnx_model": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "model",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "test_set",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def create_onnx_model():
    # https://onnx.ai/sklearn-onnx/ 참고

    # 모델 훈련
    iris = load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    clr = RandomForestClassifier()
    clr.fit(X_train, y_train)

    # ONNX 형식으로 변환
    initial_type = [("float_input", FloatTensorType([None, 4]))]
    onx = convert_sklearn(clr, initial_types=initial_type)

    # 아티팩트 저장 (모델 + 테스트 세트)
    return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}

@asset(
    name="experiment_results",
    compute_kind="Python",
    ins={
        "my_onnx_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
    # https://onnx.ai/sklearn-onnx/ 참고

    # ONNX Runtime으로 예측 계산
    sess = rt.InferenceSession(my_onnx_model)
    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    pred_onx = sess.run(
        [label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
    )[0]
    context.log.info(pred_onx)
    return pred_onx

파티션 사용하기

이 통합은 Dagster partitions를 기본적으로 지원합니다. 자산의 하나, 여러 개, 또는 모든 파티션을 선택적으로 읽을 수 있습니다. 모든 파티션은 딕셔너리로 제공되며, 키와 값은 각각 파티션 키와 아티팩트 내용을 나타냅니다.
업스트림 @asset의 모든 파티션을 딕셔너리 형태로 읽습니다. 이 딕셔너리에서 키와 값은 각각 파티션 키와 아티팩트 내용을 나타냅니다.
@asset(
    compute_kind="wandb",
    ins={"my_daily_partitioned_asset": AssetIn()},
    output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")
설정 객체인 metadata는 W&B가 프로젝트 내에서 서로 다른 아티팩트 파티션과 상호 작용하는 방식을 설정합니다. metadata 객체에는 wandb_artifact_configuration이라는 키가 포함되어 있으며, 이 안에 partitions라는 중첩 객체가 들어 있습니다. partitions 객체는 각 파티션 이름을 해당 설정에 매핑합니다. 각 파티션에 대한 설정에서는 그 파티션에서 데이터를 가져오는 방법을 지정할 수 있습니다. 이러한 설정에는 각 파티션의 요구 사항에 따라 get, version, alias라는 서로 다른 키가 포함될 수 있습니다. 설정 키
  1. get: get 키는 데이터를 가져올 W&B 객체(Table, Image 등)의 이름을 지정합니다.
  2. version: version 키는 특정 아티팩트 버전을 가져오고자 할 때 사용됩니다.
  3. alias: alias 키를 사용하면 아티팩트를 별칭(alias)으로 가져올 수 있습니다.
와일드카드 설정 와일드카드 "*"는 별도로 설정되지 않은 모든 파티션을 의미합니다. 이는 partitions 객체에 명시적으로 언급되지 않은 파티션에 대한 기본 설정을 제공합니다. 예를 들어,
"*": {
    "get": "default_table_name",
},
이 설정은 명시적으로 설정되지 않은 모든 파티션의 데이터를 default_table_name이라는 테이블에서 가져온다는 의미입니다. 특정 파티션 구성 키를 사용해 특정 파티션에 대한 개별 구성을 지정하면, 해당 파티션에 대해서는 와일드카드 구성이 재정의됩니다. 예를 들어,
"yellow": {
    "get": "custom_table_name",
},
이 구성은 yellow라는 파티션에 대해서는 와일드카드 구성 대신 custom_table_name이라는 테이블에서 데이터를 가져온다는 의미입니다. 버전 관리 및 별칭 버전 관리와 별칭을 위해 구성에서 특정 versionalias 키를 지정할 수 있습니다. 버전에 대해서는,
"orange": {
    "version": "v0",
},
이 구성은 orange 아티팩트 파티션의 v0 버전에서 데이터를 가져옵니다. 별칭의 경우에는,
"blue": {
    "alias": "special_alias",
},
이 구성은 구성에서 blue로 지칭되는 별칭 special_alias를 가진 아티팩트 파티션의 default_table_name 테이블에서 데이터를 가져옵니다.

고급 사용법

이 통합의 고급 사용 방법은 다음 전체 코드 예제를 참고하세요:

W&B Launch 사용하기

활발히 개발 중인 베타 제품 Launch에 관심이 있으신가요? W&B Launch 고객 파일럿 프로그램 참여에 대해 논의하려면 계정 담당 팀에 문의하세요. 파일럿 고객은 베타 프로그램 자격을 얻기 위해 AWS EKS 또는 SageMaker를 사용해야 합니다. 추후에는 추가 플랫폼도 지원할 예정입니다.
계속하기 전에 W&B Launch 사용 방법을 충분히 이해하시길 권장합니다. Launch 가이드를 참고하세요. Dagster 통합은 다음 작업에 도움이 됩니다:
  • Dagster 인스턴스에서 하나 또는 여러 개의 Launch 에이전트를 실행합니다.
  • Dagster 인스턴스 내에서 로컬 Launch 작업을 실행합니다.
  • 온프렘 또는 클라우드에서 원격 Launch 작업을 실행합니다.

Launch 에이전트

이 통합 기능은 가져와서 사용할 수 있는 @oprun_launch_agent를 제공합니다. 이 op는 Launch Agent를 시작하고, 수동으로 중지할 때까지 장시간 실행 프로세스로 동작합니다. 에이전트는 Launch 큐를 폴링하고, 작업을 순서대로 실행하거나 외부 서비스로 디스패치하여 실행되도록 하는 프로세스입니다. Launch 페이지를 참고하세요. Launchpad에서 모든 속성에 대한 유용한 설명도 확인할 수 있습니다.
Dagster 통합을 위한 에이전트 구성 옵션과 설명이 포함된 W&B Launchpad 인터페이스
간단한 예시
# 이 내용을 config.yaml에 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 설정할 수 있습니다
# 참고: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # W&B 엔터티로 교체하세요
     project: my_project # W&B 프로젝트로 교체하세요
ops:
 run_launch_agent:
   config:
     max_jobs: -1
     queues: 
       - my_dagster_queue

from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource

@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_agent_example():
   run_launch_agent()

Launch 작업

이 통합은 import해서 사용할 수 있는 @oprun_launch_job을 제공합니다. 이 op는 Launch 작업을 실행합니다. Launch 작업은 실행을 위해 큐에 할당됩니다. 새 큐를 만들거나 기본 큐를 사용할 수 있습니다. 해당 큐를 감시하는 활성 에이전트가 있는지 확인하세요. Dagster 인스턴스 내부에서 에이전트를 실행할 수도 있고, Kubernetes에서 배포형 에이전트를 사용하는 것도 고려할 수 있습니다. Launch 페이지를 참조하세요. 또한 Launchpad에서 모든 속성에 대한 유용한 설명을 확인할 수 있습니다.
Dagster 통합을 위한 작업 구성 옵션과 설명이 포함된 W&B Launchpad 인터페이스
간단한 예
# config.yaml에 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 config를 설정할 수 있습니다
# Reference: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 여기에 W&B 엔터티를 입력하세요
     project: my_project # 여기에 W&B 프로젝트를 입력하세요
ops:
 my_launched_job:
   config:
     entry_point:
       - python
       - train.py
     queue: my_dagster_queue
     uri: https://github.com/wandb/example-dagster-integration-with-launch


from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_job_example():
   run_launch_job.alias("my_launched_job")() # alias를 사용하여 job 이름을 변경합니다

모범 사례

  1. IO Manager를 사용해 아티팩트를 읽고 쓰십시오. Artifact.download() 또는 Run.log_artifact()를 직접 사용하는 것은 피하십시오. 이러한 메서드는 통합 기능에서 처리합니다. 대신, 아티팩트에 저장하려는 데이터를 반환하고 나머지는 통합 기능에 맡기십시오. 이 방식이 아티팩트의 lineage(계보)를 더 잘 제공합니다.
  2. 복잡한 사용 사례에만 직접 Artifact 객체를 생성하십시오. Python 객체와 W&B 객체는 op/asset에서 그대로 반환해야 합니다. 통합 기능이 아티팩트를 번들링합니다. 복잡한 사용 사례에서는 Dagster job 안에서 아티팩트를 직접 생성할 수 있습니다. 소스 통합 이름과 버전, 사용된 Python 버전, pickle 프로토콜 버전 등 메타데이터를 보강하기 위해 Artifact 객체를 통합 기능으로 전달할 것을 권장합니다.
  3. 파일, 디렉터리, 외부 참조를 메타데이터를 통해 아티팩트에 추가하십시오. 통합 기능의 wandb_artifact_configuration 객체를 사용해 임의의 파일, 디렉터리 또는 외부 참조(Amazon S3, GCS, HTTP 등)를 추가하십시오. 자세한 내용은 Artifact configuration 섹션의 고급 예제를 참조하십시오.
  4. 아티팩트가 생성될 때는 @op 대신 @asset을 사용하십시오. 아티팩트는 asset입니다. Dagster가 해당 asset을 관리하는 경우 asset을 사용하는 것을 권장합니다. 이렇게 하면 Dagit Asset Catalog에서 관측 가능성이 향상됩니다.
  5. Dagster 외부에서 생성된 아티팩트를 사용하려면 SourceAsset을 사용하십시오. 이렇게 하면 통합 기능을 활용해 외부에서 생성된 아티팩트를 읽을 수 있습니다. 그렇지 않으면 통합 기능으로 생성된 아티팩트만 사용할 수 있습니다.
  6. 대규모 모델을 위해 전용 컴퓨트에서 학습을 오케스트레이션하려면 W&B Launch를 사용하십시오. 작은 모델은 Dagster 클러스터 내부에서 학습할 수 있고, GPU 노드가 있는 Kubernetes 클러스터에서 Dagster를 실행할 수도 있습니다. 대규모 모델 학습에는 W&B Launch 사용을 권장합니다. 이렇게 하면 인스턴스 과부하를 방지하고 더 적절한 컴퓨트 리소스에 접근할 수 있습니다.
  7. Dagster 내에서 실험 추적을 할 때, W&B 실행 ID를 Dagster 실행 ID 값으로 설정하십시오. 실행을 재개 가능하게 만들고, W&B 실행 ID를 Dagster 실행 ID 또는 선택한 문자열로 설정할 것을 모두 권장합니다. 이 권장사항을 따르면 Dagster 내부에서 모델을 학습할 때 W&B 지표와 W&B 아티팩트가 동일한 W&B 실행에 저장되도록 할 수 있습니다.
W&B 실행 ID를 Dagster 실행 ID로 설정하십시오.
wandb.init(
    id=context.run_id,
    resume="allow",
    ...
)
또는 사용할 W&B 실행 ID를 직접 지정하고 이를 IO Manager 구성에 전달합니다.
wandb.init(
    id="my_resumable_run_id",
    resume="allow",
    ...
)

@job(
   resource_defs={
       "io_manager": wandb_artifacts_io_manager.configured(
           {"wandb_run_id": "my_resumable_run_id"}
       ),
   }
)
  1. 대용량 W&B 아티팩트의 경우 get 또는 get_path를 사용해 필요한 데이터만 가져오십시오. 기본적으로 이 통합은 전체 아티팩트를 다운로드합니다. 매우 큰 아티팩트를 사용하는 경우 필요한 특정 파일이나 객체만 가져오는 것이 좋습니다. 이렇게 하면 속도와 리소스 사용 효율이 향상됩니다.
  2. Python 객체의 경우 사용 사례에 맞게 pickling 모듈을 조정하십시오. 기본적으로 W&B 통합은 표준 pickle 모듈을 사용합니다. 하지만 일부 객체는 이 모듈과 호환되지 않습니다. 예를 들어, yield를 포함한 함수는 pickle로 직렬화하려고 하면 오류가 발생합니다. W&B는 다른 Pickle 기반 직렬화 모듈(dill, cloudpickle, joblib)도 지원합니다.
직렬화된 문자열을 반환하거나 아티팩트를 직접 생성하여 ONNX 또는 PMML과 같은 더 고급 직렬화를 사용할 수도 있습니다. 올바른 선택은 사용 사례에 따라 달라지며, 이 주제에 대한 관련 문헌을 참고하십시오.