はじまり
あれっ、あの関数って今日動いてなくないか・・・?
ありゃりゃ?
以前に作ったCloud SQLを自動で制御するツール
以前にCloud SQLにDBインスタンスを自動で起動もしくは停止するCloud Run Functionsを作成して、自動実行させるようにしました。
このCloud SQL上のDBインスタンスを自動で起動もしくは停止する際に、別のオペレーションがそのDBインスタンスに対して動いていると、ちゃんと処理できなくなるんですよね。このようなエラーメッセージと共に。
Error 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete., operationInProgress
ということで、今回はこのエラーメッセージに対する対策を施していきたいと思います。それではやっていきましょう。
対策の仕方
上記のエラーメッセージは、Cloud SQL上のDBインスタンスに対して何かしらの処理が動いていると表示されるものです。そしてそれは、直前に自分がDBに対して操作を実行していなくても発生することがあります。(もしかすると、DBのバックアップ処理とバッティングしてしまったりしているのかもしれません。)
今回のようなエラーが発生した時の対処法は、Google Cloud公式ページでも「一般的なベスト プラクティス」として掲示されていました。
Cloud SQL インスタンスは、前のオペレーションが完了するまで、新しいオペレーション リクエストを受け付けません。準備が整う前に新しいオペレーションを開始しようとすると、オペレーション リクエストは失敗します。こうしたオペレーションには、インスタンスの再起動も含まれます。
Google Cloud Console のインスタンス ステータスには、オペレーションが実行されているかどうかは反映されません。緑色のチェックマークは、インスタンスが RUNNABLE 状態にあることのみを示します。オペレーションが実行中かどうかを確認するには、[オペレーション] タブに移動して、最新のオペレーションのステータスをチェックします。
なので今回は、自分が意図していない処理が作動している時でも、その処理が終わるまで自分が実行したい処理の開始を待機させる処理を作って、今回のエラーを解消していきたいと思います。
オペレーション終了まで待機するための処理
上記の、処理が終わるまで自分が実行したい処理の開始を待機させる処理のコードです。Cloud SchedulerからCloud Pub/Sub経由でメッセージを受け取って、ProcessPubSub
エンドポイントにリクエストさせるCloud Run Functionsを想定しています。
function.go
package mypkg
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
sqladmin "google.golang.org/api/sqladmin/v1beta4"
_ "github.com/GoogleCloudPlatform/functions-framework-go/funcframework"
)
type PubSubMessage struct {
Data []byte `json:"data"`
}
type MessagePayload struct {
Instance string
Project string
Action string
DiscordWebhookUrl string
CloudSqlIconUrl string
}
func WaitOperation(ctx context.Context, svc *sqladmin.Service, prj string, instance string) error {
// Use a backoff strategy for retries
retryInterval := 10 * time.Second
maxRetries := 60
for i := 0; i < maxRetries; i++ {
// See more details at: https://pkg.go.dev/google.golang.org/api/sqladmin/v1beta4#OperationsListCall
ops, err := svc.Operations.List(prj).Instance(instance).Context(ctx).Do()
if err != nil {
return fmt.Errorf("failed to list operations: %w", err)
}
if len(ops.Items) > 0 {
op := ops.Items[0] // Assuming we're interested in the most recent operation for the instance
switch op.Status {
case "DONE":
log.Printf("Operation %s completed with status: %s", op.Name, op.Status)
return nil // Operation completed successfully
case "RUNNING":
// Wait before retrying
log.Printf("Operation %s is still running. Retrying in %v...", op.Name, retryInterval)
time.Sleep(retryInterval)
continue
default: // Includes "FAILED" and other possible states
return fmt.Errorf("operation %s failed with status: %s, Error: %v", op.Name, op.Status, op.Error)
}
} else {
log.Printf("No operations found for instance %s. Retrying in %v...", instance, retryInterval)
// time.Sleep(retryInterval)
return nil // Operation completed successfully
}
}
return fmt.Errorf("operation timed out after %v", time.Duration(maxRetries)*retryInterval)
}
// ProcessPubSub consumes and processes a Pub/Sub message.
func ProcessPubSub(ctx context.Context, m PubSubMessage) error {
var psData MessagePayload
log.Printf("Receiving a request with the following payload: %+v\n", psData)
err := json.Unmarshal(m.Data, &psData)
if err != nil {
log.Println(err)
}
log.Printf("Received a request with the following payload: %+v\n", psData)
log.Printf("Request received for Cloud SQL instance %s action: %s, %s", psData.Action, psData.Instance, psData.Project)
// Create an http.Client that uses Application Default Credentials.
hc, err := google.DefaultClient(ctx, sqladmin.CloudPlatformScope)
if err != nil {
return err
}
// Create the Google Cloud SQL service.
service, err := sqladmin.NewService(ctx, option.WithHTTPClient(hc))
if err != nil {
return err
}
// Wait until running operations are completed.
err = WaitOperation(ctx, service, psData.Project, psData.Instance)
if err != nil {
fmt.Printf("Error: %+v\n", err)
}
// my DB operations...
return nil
}
ChatGPTからはコッチのGoパッケージ(genproto)で実装するように言われたんですけど、既に「google.golang.org/api/sqladmin/v1beta4」のGoパッケージで処理を作っていたのでソッチで進めます。
既にDB起動用に作成したsqladmin.NewService
のOperations.List(prj).Instance(instance).Context(ctx).Do()
で、目的のDBに対するオペレーションの一覧を取得して、最近のオペレーションが終了しているかどうかをfor文で確認するという流れです。
待機処理の実行結果
それでは実装した待機処理を実行してみます。DBインスタンスを起動した直後にDBインスタンスを停止する処理を実行したら、裏で何かしらのプロセスが実行されている間は、Go関数内のfor文がちゃんと回っています。(「Default STARTUP TCP probe succeeded after 1 attempt for container “worker” on port 8080.」というメッセージが表示されました。DBを起動するためのプロセスですかね。DB起動処理実行後、4分ぐらい常駐していました。)
この待機処理の後に、DB起動処理、およびDiscordのWebhookを叩く処理があるので、それらの後続の処理も実行されていることが確認できました。
ちなみに、シェルから実行する場合の参考。
Golangではなくシェルから実行したい場合は、ここにgcloud sql operations list
というコマンドの公式リファレンスを参考できます。
gcloudであれば、gcloud sql operations wait
でDBに対する現在のオペレーションが終わるまで待つためのコマンドも用意されているみたいです。
まとめ
今回は、Go言語を使って、Cloud SQL上に作成したDBインスタンスに対する処理が完了するまで待つようにする処理を実装しました。
全く想定していなかった不具合だったので面食らいましたが、実装は容易なものでした。他のCloud SQLの「一般的なベスト プラクティス」も、DB周りをデプロイする際に参考にできそうですね。
Cloud SQL関連の記事
おしまい
よし、今日も実行されたな
DB待機!
以上になります!
コメント