Google Cloud Workflowsとは?サーバーレスで実現する業務自動化の完全ガイド
はじめに
現代のクラウドアプリケーション開発において、複数のサービスやAPIを連携させて業務を自動化することは不可欠です。しかし、複雑な処理フローを管理し、エラーハンドリングやリトライ処理を実装するのは容易ではありません。
Google Cloud Workflowsは、こうした課題を解決するフルマネージドのサーバーレスオーケストレーションサービスです。本記事では、Google Cloud Workflowsの基本から実践的な活用方法まで、徹底的に解説します。
Google Cloud Workflowsとは
概要
Google Cloud Workflows(以下、Workflows)は、Google Cloudが提供するサーバーレスなワークフローオーケストレーションサービスです。複数のCloud FunctionsやCloud Run、BigQueryなどのGoogle Cloudサービス、さらには外部APIを定義した順序で実行できます。
主な特徴
Workflowsには以下のような特徴があります:
- フルマネージド: インフラの管理が不要で、運用負荷を大幅に削減
- サーバーレス: 自動スケーリングにより、トラフィックに応じて柔軟に対応
- 低コスト: 従量課金制で、無料枠も充実(内部ステップ5,000回/月、外部ステップ2,000回/月)
- シンプルな定義: YAMLまたはJSON形式で直感的にワークフローを記述
- 豊富な機能: 条件分岐、繰り返し、並列実行、エラーハンドリングをサポート
なぜWorkflowsが必要なのか
従来の課題
従来、複数のサービスを連携させる際には以下のような課題がありました:
- 複雑なコード: 各サービスの呼び出しロジックをアプリケーションコードに埋め込む必要がある
- エラーハンドリング: リトライ処理や例外処理を個別に実装する必要がある
- スケーラビリティ: トラフィック増加時のスケーリング対応が必要
- 可視性の欠如: 処理フローの全体像が把握しづらい
Workflowsによる解決
Workflowsを使用することで、これらの課題を以下のように解決できます:
- 宣言的な定義: YAMLで処理フローを明示的に定義
- 組み込みのエラーハンドリング: リトライポリシーを簡単に設定
- 自動スケーリング: サーバーレスアーキテクチャによる自動対応
- 実行履歴: すべての実行ログを自動的に記録し、可視化
主なユースケース
Workflowsは、さまざまなシーンで活用できます。
1. データパイプライン
最も一般的なユースケースの一つが、データパイプラインの構築です。
具体例:
- 業務システムからデータを抽出(Extract)
- Cloud Storageにデータを一時保存
- BigQueryにデータをロード(Load)
- SQLクエリでデータを変換(Transform)
- 変換結果を別のテーブルに保存
このようなETL/ELTプロセスを、Workflowsで一連のフローとして定義できます。
2. バッチ処理
定期的に実行する必要があるバッチジョブの自動化にも最適です。
具体例:
- 日次でのレポート生成
- 定期的なデータバックアップ
- 機械学習モデルの再トレーニング
- ログファイルのアーカイブ処理
Cloud Schedulerと組み合わせることで、cron形式で定期実行を設定できます。
3. イベントドリブン処理
特定のイベントをトリガーとした自動処理も実現できます。
具体例:
- Cloud Storageへのファイルアップロード時の自動処理
- Pub/Subメッセージ受信時の処理実行
- HTTPリクエスト受信時のワークフロー起動
Eventarcと連携することで、さまざまなイベントソースからワークフローを起動できます。
4. マイクロサービスオーケストレーション
複数のマイクロサービスを連携させる際のオーケストレーターとしても機能します。
具体例:
- 注文処理フロー(在庫確認→決済→配送手配→通知)
- ユーザー登録フロー(検証→DB登録→メール送信→ウェルカムメッセージ)
- 承認ワークフロー(申請→承認待ち→承認/却下→通知)
Workflowsの基本構成
ワークフロー定義
Workflowsは、YAMLまたはJSON形式で定義します。以下は基本的な例です:
main:
steps:
- step1:
call: http.get
args:
url: https://api.example.com/data
result: apiResponse
- step2:
call: sys.log
args:
text: ${"Received: " + apiResponse.body}
主要な構成要素
1. ステップ(Steps)
ワークフローの個別の処理単位です。各ステップでは以下のような操作が可能です:
- 変数の定義と割り当て
- サービスの呼び出し(HTTP API、Cloud Functions、BigQuery等)
- 条件分岐
- 繰り返し処理
2. 条件分岐(Switch)
switch文を使用して、条件に応じた処理を実行できます:
- checkStatus:
switch:
- condition: ${status == "success"}
next: handleSuccess
- condition: ${status == "error"}
next: handleError
3. 繰り返し(For Loop)
forループを使用して、配列要素の反復処理が可能です:
- processItems:
for:
value: item
in: ${items}
steps:
- processItem:
call: processFunction
args:
data: ${item}
4. 並列実行(Parallel)
複数のステップを並列に実行することで、処理時間を短縮できます:
- parallelProcessing:
parallel:
branches:
- branch1:
steps:
- callService1:
call: http.get
args:
url: https://api1.example.com
- branch2:
steps:
- callService2:
call: http.get
args:
url: https://api2.example.com
実践例: データパイプラインの構築
ここでは、実際のデータパイプラインを構築する例を紹介します。
シナリオ
以下のような処理フローを実装します:
- Cloud Storageから新しいCSVファイルを検出
- ファイルをBigQueryにロード
- SQLクエリでデータを変換
- 変換結果を別のテーブルに保存
- 完了通知をSlackに送信
ワークフロー定義
main:
params: [event]
steps:
- init:
assign:
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- datasetId: "my_dataset"
- sourceTable: "raw_data"
- targetTable: "processed_data"
- bucketName: ${event.bucket}
- fileName: ${event.name}
- loadToBigQuery:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${projectId}
body:
configuration:
load:
sourceUris: [${"gs://" + bucketName + "/" + fileName}]
destinationTable:
projectId: ${projectId}
datasetId: ${datasetId}
tableId: ${sourceTable}
autodetect: true
writeDisposition: "WRITE_APPEND"
result: loadJob
- waitForLoad:
call: googleapis.bigquery.v2.jobs.get
args:
projectId: ${projectId}
jobId: ${loadJob.jobReference.jobId}
result: loadStatus
- checkLoadStatus:
switch:
- condition: ${loadStatus.status.state != "DONE"}
next: waitForLoad
- transformData:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${projectId}
body:
query: |
INSERT INTO `${projectId}.${datasetId}.${targetTable}`
SELECT
id,
name,
TIMESTAMP_TRUNC(created_at, DAY) as date,
amount
FROM `${projectId}.${datasetId}.${sourceTable}`
WHERE DATE(created_at) = CURRENT_DATE()
useLegacySql: false
result: transformJob
- notifySlack:
call: http.post
args:
url: https://hooks.slack.com/services/YOUR/WEBHOOK/URL
body:
text: ${"Data pipeline completed: " + fileName}
ポイント解説
- イベントパラメータ: Cloud Storageのイベントからファイル情報を取得
- BigQueryコネクタ:
googleapis.bigquery.v2を使用してBigQueryを操作 - ジョブの待機: ロードジョブの完了を待機してから次のステップへ
- SQLクエリ: データ変換ロジックをSQLで記述
- 外部通知: Slack WebhookでHTTP POSTリクエストを送信
エラーハンドリングとリトライ
Workflowsには、強力なエラーハンドリング機能が組み込まれています。
Try-Catch構文
- tryStep:
try:
steps:
- callAPI:
call: http.get
args:
url: https://api.example.com/data
except:
as: e
steps:
- logError:
call: sys.log
args:
severity: ERROR
text: ${"Error occurred: " + e.message}
- handleError:
call: sendErrorNotification
args:
error: ${e}
リトライポリシー
HTTPリクエストには、自動的にリトライポリシーを設定できます:
- callAPIWithRetry:
call: http.get
args:
url: https://api.example.com/data
timeout: 30
retry:
predicate: ${http.default_retry_predicate}
max_retries: 5
backoff:
initial_delay: 1
max_delay: 60
multiplier: 2
料金体系
Workflowsは、実行されたステップ数に応じた従量課金制です。
料金の詳細
- 内部ステップ: 1,000ステップあたり$0.01
- 外部ステップ: 1,000ステップあたり$0.025
無料枠
- 内部ステップ: 5,000回/月
- 外部ステップ: 2,000回/月
内部ステップと外部ステップの違い
- 内部ステップ:
*.googleapis.comへのリクエスト、Cloud FunctionsやCloud Runの呼び出し、変数操作など - 外部ステップ: Google Cloud外部へのHTTPリクエスト、カスタムドメインへのリクエスト、コールバック待機
多くの一般的なユースケースでは、月額数ドル以下で運用可能です。
実行方法
Workflowsには、複数の実行方法があります。
1. Cloud Schedulerによる定期実行
cron形式で定期実行を設定できます:
gcloud scheduler jobs create http my-workflow-job \
--schedule="0 2 * * *" \
--uri="https://workflowexecutions.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/workflows/WORKFLOW_NAME/executions" \
--message-body='{"argument":"value"}' \
--oauth-service-account-email=SERVICE_ACCOUNT_EMAIL
2. Eventarcによるイベントドリブン実行
Cloud Storageへのファイルアップロードなど、イベントをトリガーに実行:
gcloud eventarc triggers create storage-trigger \
--destination-workflow=WORKFLOW_NAME \
--destination-workflow-location=LOCATION \
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=BUCKET_NAME"
3. API経由のオンデマンド実行
REST APIまたはgcloudコマンドで手動実行:
gcloud workflows execute WORKFLOW_NAME \
--data='{"key":"value"}'
ベストプラクティス
Workflowsを効果的に活用するためのベストプラクティスを紹介します。
1. サブワークフローの活用
再利用可能なロジックは、サブワークフローとして分離しましょう:
main:
steps:
- callSubworkflow:
call: validateInput
args:
data: ${inputData}
result: validationResult
validateInput:
params: [data]
steps:
- validate:
switch:
- condition: ${data == null}
raise: "Input data is null"
- return:
return: true
2. 適切なエラーハンドリング
すべての外部呼び出しには、エラーハンドリングを実装しましょう。
3. ログの活用
sys.logを使用して、デバッグ情報を記録しましょう:
- logStep:
call: sys.log
args:
severity: INFO
json:
message: "Processing item"
itemId: ${item.id}
4. タイムアウトの設定
長時間実行される可能性のあるステップには、タイムアウトを設定しましょう:
- callWithTimeout:
call: http.get
args:
url: https://api.example.com/data
timeout: 300 # 5分
5. 並列実行の活用
独立した処理は並列実行することで、全体の実行時間を短縮できます。
まとめ
Google Cloud Workflowsは、サーバーレスで低コストなワークフローオーケストレーションサービスです。以下のような特徴があります:
- フルマネージド: インフラ管理不要
- シンプルな定義: YAMLで直感的に記述
- 豊富な機能: 条件分岐、繰り返し、並列実行、エラーハンドリング
- 低コスト: 無料枠が充実し、従量課金制
- 高い拡張性: 自動スケーリングで大規模処理にも対応
データパイプライン、バッチ処理、イベントドリブン処理など、さまざまなユースケースで活用できます。ぜひ、あなたのプロジェクトでもWorkflowsを活用して、業務自動化を実現してください。
参考リンク
ArcHackでは、Google Cloudを活用したシステム開発やDX推進を支援しています。Workflowsを使った業務自動化やデータパイプラインの構築にご興味がある方は、お気軽にお問い合わせください。
