【Cloud SQL】GolangでDBインスタンスへのオペレーション完了まで待つ

Code

はじまり

リサちゃん
リサちゃん

あれっ、あの関数って今日動いてなくないか・・・?

135ml
135ml

ありゃりゃ?

以前に作ったCloud SQLを自動で制御するツール

以前にCloud SQLにDBインスタンスを自動で起動もしくは停止するCloud Run Functionsを作成して、自動実行させるようにしました。

このCloud SQL上のDBインスタンスを自動で起動もしくは停止する際に、別のオペレーションがそのDBインスタンスに対して動いていると、ちゃんと処理できなくなるんですよね。このようなエラーメッセージと共に。

Error 409: Operation failed because another operation was already in progress. Try your request after the current operation is complete., operationInProgress

ということで、今回はこのエラーメッセージに対する対策を施していきたいと思います。それではやっていきましょう。

対策の仕方

上記のエラーメッセージは、Cloud SQL上のDBインスタンスに対して何かしらの処理が動いていると表示されるものです。そしてそれは、直前に自分がDBに対して操作を実行していなくても発生することがあります。(もしかすると、DBのバックアップ処理とバッティングしてしまったりしているのかもしれません。)

今回のようなエラーが発生した時の対処法は、Google Cloud公式ページでも「一般的なベスト プラクティス」として掲示されていました。

一般的なベスト プラクティス  |  Cloud SQL for MySQL  |  Google Cloud

Cloud SQL インスタンスは、前のオペレーションが完了するまで、新しいオペレーション リクエストを受け付けません。準備が整う前に新しいオペレーションを開始しようとすると、オペレーション リクエストは失敗します。こうしたオペレーションには、インスタンスの再起動も含まれます。

Google Cloud Console のインスタンス ステータスには、オペレーションが実行されているかどうかは反映されません。緑色のチェックマークは、インスタンスが RUNNABLE 状態にあることのみを示します。オペレーションが実行中かどうかを確認するには、[オペレーション] タブに移動して、最新のオペレーションのステータスをチェックします。

なので今回は、自分が意図していない処理が作動している時でも、その処理が終わるまで自分が実行したい処理の開始を待機させる処理を作って、今回のエラーを解消していきたいと思います。

オペレーション終了まで待機するための処理

上記の、処理が終わるまで自分が実行したい処理の開始を待機させる処理のコードです。Cloud SchedulerからCloud Pub/Sub経由でメッセージを受け取って、ProcessPubSubエンドポイントにリクエストさせるCloud Run Functionsを想定しています。

function.go

package mypkg

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"golang.org/x/oauth2/google"
	"google.golang.org/api/option"
	sqladmin "google.golang.org/api/sqladmin/v1beta4"

	_ "github.com/GoogleCloudPlatform/functions-framework-go/funcframework"
)

type PubSubMessage struct {
	Data []byte `json:"data"`
}

type MessagePayload struct {
	Instance          string
	Project           string
	Action            string
	DiscordWebhookUrl string
	CloudSqlIconUrl   string
}

func WaitOperation(ctx context.Context, svc *sqladmin.Service, prj string, instance string) error {
	// Use a backoff strategy for retries
	retryInterval := 10 * time.Second
	maxRetries := 60

	for i := 0; i < maxRetries; i++ {
		// See more details at: https://pkg.go.dev/google.golang.org/api/sqladmin/v1beta4#OperationsListCall
		ops, err := svc.Operations.List(prj).Instance(instance).Context(ctx).Do()
		if err != nil {
			return fmt.Errorf("failed to list operations: %w", err)
		}

		if len(ops.Items) > 0 {
			op := ops.Items[0] // Assuming we're interested in the most recent operation for the instance

			switch op.Status {
			case "DONE":
				log.Printf("Operation %s completed with status: %s", op.Name, op.Status)
				return nil // Operation completed successfully
			case "RUNNING":
				// Wait before retrying
				log.Printf("Operation %s is still running. Retrying in %v...", op.Name, retryInterval)
				time.Sleep(retryInterval)
				continue
			default: // Includes "FAILED" and other possible states
				return fmt.Errorf("operation %s failed with status: %s, Error: %v", op.Name, op.Status, op.Error)
			}
		} else {
			log.Printf("No operations found for instance %s. Retrying in %v...", instance, retryInterval)
			// time.Sleep(retryInterval)
			return nil // Operation completed successfully
		}
	}

	return fmt.Errorf("operation timed out after %v", time.Duration(maxRetries)*retryInterval)
}

// ProcessPubSub consumes and processes a Pub/Sub message.
func ProcessPubSub(ctx context.Context, m PubSubMessage) error {
	var psData MessagePayload
	log.Printf("Receiving a request with the following payload: %+v\n", psData)

	err := json.Unmarshal(m.Data, &psData)
	if err != nil {
		log.Println(err)
	}
	log.Printf("Received a request with the following payload: %+v\n", psData)
	log.Printf("Request received for Cloud SQL instance %s action: %s, %s", psData.Action, psData.Instance, psData.Project)

	// Create an http.Client that uses Application Default Credentials.
	hc, err := google.DefaultClient(ctx, sqladmin.CloudPlatformScope)
	if err != nil {
		return err
	}

	// Create the Google Cloud SQL service.
	service, err := sqladmin.NewService(ctx, option.WithHTTPClient(hc))
	if err != nil {
		return err
	}

	// Wait until running operations are completed.
	err = WaitOperation(ctx, service, psData.Project, psData.Instance)
	if err != nil {
		fmt.Printf("Error: %+v\n", err)
	}

	// my DB operations...

	return nil
}

ChatGPTからはコッチのGoパッケージ(genproto)で実装するように言われたんですけど、既に「google.golang.org/api/sqladmin/v1beta4」のGoパッケージで処理を作っていたのでソッチで進めます。

sqladmin package - google.golang.org/api/sqladmin/v1beta4 - Go Packages
Package sqladmin provides access to the Cloud SQL Admin API.

既にDB起動用に作成したsqladmin.NewServiceOperations.List(prj).Instance(instance).Context(ctx).Do()で、目的のDBに対するオペレーションの一覧を取得して、最近のオペレーションが終了しているかどうかをfor文で確認するという流れです。

待機処理の実行結果

それでは実装した待機処理を実行してみます。DBインスタンスを起動した直後にDBインスタンスを停止する処理を実行したら、裏で何かしらのプロセスが実行されている間は、Go関数内のfor文がちゃんと回っています。(「Default STARTUP TCP probe succeeded after 1 attempt for container “worker” on port 8080.」というメッセージが表示されました。DBを起動するためのプロセスですかね。DB起動処理実行後、4分ぐらい常駐していました。)

この待機処理の後に、DB起動処理、およびDiscordのWebhookを叩く処理があるので、それらの後続の処理も実行されていることが確認できました。

ちなみに、シェルから実行する場合の参考。

Golangではなくシェルから実行したい場合は、ここにgcloud sql operations listというコマンドの公式リファレンスを参考できます。

gcloud sql operations list  |  Google Cloud CLI Documentation

gcloudであれば、gcloud sql operations waitでDBに対する現在のオペレーションが終わるまで待つためのコマンドも用意されているみたいです。

gcloud sql operations wait  |  Google Cloud CLI Documentation

まとめ

今回は、Go言語を使って、Cloud SQL上に作成したDBインスタンスに対する処理が完了するまで待つようにする処理を実装しました。

全く想定していなかった不具合だったので面食らいましたが、実装は容易なものでした。他のCloud SQLの「一般的なベスト プラクティス」も、DB周りをデプロイする際に参考にできそうですね。

Cloud SQL関連の記事

おしまい

リサちゃん
リサちゃん

よし、今日も実行されたな

135ml
135ml

DB待機!

以上になります!

コメント

タイトルとURLをコピーしました