Exponential Backoff and Retry for Google Cloud Client Libraries

2021-12-15

This section describes how to manually configure the backoff-retry scheme used within Google Cloud Client libraries.

GCP libraries comes with built in values for backoff-retry values at the per-method level which already accounts for what the API does, the type of API response and what to do next from the client’s perspective.

For example, different apis have different ways to handle API responses as shown here for GCS and PubSub

How does the client library handle these at the per-method level? Well, GCP generates many of these libraries from configurations files with metadata that describes how the the api should behave.

Consider the the pubsub ListTopics API call which has the signature to allow you to define your own retry or to use the default gapic_v1.method.DEFAULT

    def list_topics(
        self,
        request: Union[pubsub.ListTopicsRequest, dict] = None,
        *,
        project: str = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: TimeoutType = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = (),
    ) -> pagers.ListTopicsPager:
        r"""Lists matching topics.

Where is the default coming from? These are generally generated and have extensive scaffolding that describes many built-in common capabilities like backoff-retry, endpoints, and so on.

For more information, see Using Google’s Client Library Generation system

For now, see source the config file for the method that generated the library:

  "methodConfig": [
    {
      "name": [
        {
          "service": "google.pubsub.v1.Publisher",
          "method": "GetTopic"
        },
        {
          "service": "google.pubsub.v1.Publisher",
          "method": "ListTopics"
        },
        {
          "service": "google.pubsub.v1.Publisher",
          "method": "ListTopicSubscriptions"
        },
        {
          "service": "google.pubsub.v1.Publisher",
          "method": "ListTopicSnapshots"
        },
        {
          "service": "google.pubsub.v1.Publisher",
          "method": "GetIamPolicy"
        }
      ],
      "timeout": "60s",
      "retryPolicy": {
        "maxAttempts": 5,
        "initialBackoff": "0.100s",
        "maxBackoff": "60s",
        "backoffMultiplier": 1.3,
        "retryableStatusCodes": [
          "UNKNOWN",
          "ABORTED",
          "UNAVAILABLE"
        ]
      }
    },

Notice it describes the default override for retry for a set of methods. This actually translates to code through the gapic generator into pubsub_v1.publisher.transports.base.py

            self.list_topics: gapic_v1.method.wrap_method(
                self.list_topics,
                default_retry=retries.Retry(
                    initial=0.1,
                    maximum=60.0,
                    multiplier=1.3,
                    predicate=retries.if_exception_type(
                        core_exceptions.Aborted,
                        core_exceptions.ServiceUnavailable,
                        core_exceptions.Unknown,
                    ),
                    deadline=600.0,
                ),
                default_timeout=60.0,
                client_info=client_info,
            ),

Developers can override the retry behavior at the client or at the method level as shown in the python snippet below.

Please be careful with overriding at the client level since that will uniformly apply the same policy even if it does not make sense to the underlying API Method called

For background information, also see AIP-194 Automatic retry configuration

Finally, some services describe the retry settings directly in the cloud client library set or at the API level Some services

You can enable verbose logging for the pubsub GRPC traffic by setting some of gRPC Environment Variables

export GRPC_VERBOSITY=DEBUG 
export GRPC_TRACE=all
# export GRPC_GO_LOG_VERBOSITY_LEVEL=99 
# export GRPC_GO_LOG_SEVERITY_LEVEL=info

project='project_id'

# https://googleapis.dev/python/storage/latest/retry_timeout.html
from google.api_core import exceptions
from google.api_core.retry import Retry
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY

from google import api_core

custom_retry = api_core.retry.Retry(
    initial=0.250,  # seconds (default: 0.1)
    maximum=90.0,  # seconds (default: 60.0)
    multiplier=1.45,  # default: 1.3
    deadline=300.0,  # seconds (default: 60.0)
    predicate=api_core.retry.if_exception_type(
        api_core.exceptions.Aborted,
        api_core.exceptions.DeadlineExceeded,
        api_core.exceptions.InternalServerError,
        api_core.exceptions.ResourceExhausted,
        api_core.exceptions.ServiceUnavailable,
        api_core.exceptions.Unknown,
        api_core.exceptions.Cancelled,
    ),
)

from google.cloud import storage

client = storage.Client(project=project)
for b in client.list_buckets(timeout=_DEFAULT_TIMEOUT,retry=custom_retry):
   print(b.name)

## ********************

# see https://github.com/googleapis/python-pubsub/blob/main/google/pubsub_v1/services/publisher/transports/base.py#L194
# self.list_topics: gapic_v1.method.wrap_method(
#     self.list_topics,
#     default_retry=retries.Retry(
#         initial=0.1,
#         maximum=60.0,
#         multiplier=1.3,
#         predicate=retries.if_exception_type(
#             core_exceptions.Aborted,
#             core_exceptions.ServiceUnavailable,
#             core_exceptions.Unknown,
#         ),
#         deadline=600.0,
#     ),
#     default_timeout=60.0,
#     client_info=client_info,
# ),

from google.cloud import pubsub_v1

custom_timeout=api_core.timeout.ExponentialTimeout(
    initial=1.0,  
    maximum=10.0,  
    multiplier=1.0,  
    deadline=300.0,  
)
publisher = pubsub_v1.PublisherClient(
            # Optional
            publisher_options = pubsub_v1.types.PublisherOptions(
                timeout=custom_timeout,
                # retry=custom_retry,
            ),
)
project_path = f"projects/{project}"
for topic in publisher.list_topics(request={"project": project_path, "retry": custom_retry}):
  print(topic.name)
package main

import (
	"fmt"
	"time"

	pubsub "cloud.google.com/go/pubsub"
	storage "cloud.google.com/go/storage"
	"github.com/googleapis/gax-go/v2"

	"golang.org/x/net/context"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"
	"google.golang.org/grpc"
	"google.golang.org/grpc/backoff"
)

const (
	projectID = "your_project_id"
)

func main() {

	ctx := context.Background()

	storageClient, err := storage.NewClient(ctx)
	if err != nil {
		fmt.Printf("storage.NewClient: %v", err)
		return
	}
	defer storageClient.Close()

	customRetry := []storage.RetryOption{
		storage.WithBackoff(gax.Backoff{
			Initial:    2 * time.Second,
			Max:        30 * time.Second,
			Multiplier: 3,
		}),
		storage.WithPolicy(storage.RetryAlways),
		storage.WithErrorFunc(func(err error) bool { return false }),
	}
	storageClient.SetRetry(customRetry...)

	it := storageClient.Buckets(ctx, projectID)
	for {
		battrs, err := it.Next()
		if err == iterator.Done {
			break
		}

		if err != nil {
			fmt.Printf("storage.Iterating error: %v", err)
			return
		}
		fmt.Printf("Bucket Name: %s\n", battrs.Name)
	}

	// *******************************************
	pubsubClient, err := pubsub.NewClient(ctx, projectID, option.WithGRPCDialOption(grpc.WithConnectParams(grpc.ConnectParams{
		Backoff: backoff.Config{
			BaseDelay:  1.0 * time.Second,
			Multiplier: 1.6,
			Jitter:     0.2,
			MaxDelay:   120 * time.Second,
		}, //DefaultConfig,
	})))
	if err != nil {
		fmt.Printf("pubsub.NewClient: %v", err)
		return
	}
	defer pubsubClient.Close()

	pit := pubsubClient.Topics(ctx)
	for {
		topic, err := pit.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			fmt.Printf("pubssub.Iterating error: %v", err)
			return
		}
		fmt.Printf("Topic Name: %s\n", topic.ID())
	}
}
package com.test;

import java.util.concurrent.TimeUnit;

import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient.ListTopicsPagedResponse;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;

import org.threeten.bp.Duration;

public class TestApp {
	public static void main(String[] args) {
		TestApp tc = new TestApp();
	}

	public TestApp() {
		try {

			String projectName = "your_project_id";
			String topicName = "testing";

			// uses org.threeten.bp.Duration
			// https://github.com/googleapis/gax-java/blob/main/gax/src/main/java/com/google/api/gax/retrying/RetrySettings.java#L36
			RetrySettings gcsRetrySettings = RetrySettings
					.newBuilder()
					.setMaxAttempts(50)
					.setInitialRetryDelay(org.threeten.bp.Duration.ofSeconds(10))
					.setRetryDelayMultiplier(1.5)
					.setMaxRetryDelay(org.threeten.bp.Duration.ofSeconds(20))
					.build();
			Storage storage_service = StorageOptions.newBuilder().setRetrySettings(gcsRetrySettings).build()
					.getService();
			for (Bucket b : storage_service.list().iterateAll()) {
				System.out.println(b);
			}

			// https://cloud.google.com/pubsub/docs/samples/pubsub-publisher-retry-settings

			Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
			double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
			Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
			Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
			double rpcTimeoutMultiplier = 1.0; // default: 1.0
			Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
			Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds

			RetrySettings pubsubRetrySettings = RetrySettings.newBuilder()
					.setInitialRetryDelay(initialRetryDelay)
					.setRetryDelayMultiplier(retryDelayMultiplier)
					.setMaxRetryDelay(maxRetryDelay)
					.setInitialRpcTimeout(initialRpcTimeout)
					.setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
					.setMaxRpcTimeout(maxRpcTimeout)
					.setTotalTimeout(totalTimeout)
					.build();

			TopicAdminSettings.Builder topicAdminSettingsBuilder = TopicAdminSettings.newBuilder();
			topicAdminSettingsBuilder.createTopicSettings().setRetrySettings(pubsubRetrySettings);
			TopicAdminClient topicClient = TopicAdminClient.create(topicAdminSettingsBuilder.build());

			ProjectTopicName projectTopicName = ProjectTopicName.of(projectName, topicName);
			System.out.println(projectTopicName);

			Publisher publisher =
			Publisher.newBuilder(projectTopicName).setRetrySettings(pubsubRetrySettings).build();
			System.out.println(publisher.getTopicName());
			ByteString data = ByteString.copyFromUtf8("some message");
			PubsubMessage pubsubMessage =
			PubsubMessage.newBuilder().setData(data).build();
			publisher.publish(pubsubMessage);
			publisher.shutdown();
			publisher.awaitTermination(1, TimeUnit.MINUTES);

			ListTopicsRequest listTopicsRequest = ListTopicsRequest.newBuilder()
					.setProject("projects/" + projectName)
					.build();

			ListTopicsPagedResponse response = topicClient.listTopics(listTopicsRequest);
			Iterable<Topic> topics = response.iterateAll();
			for (Topic topic : topics)
				System.out.println(topic);

		} catch (Exception ex) {
			System.out.println("Error: " + ex);
		}
	}

}

TODO:

(i can’t confirm this works)

var log4js = require("log4js");
var logger = log4js.getLogger();

const { PubSub } = require('@google-cloud/pubsub');
const { Storage, StorageOptions } = require('@google-cloud/storage');
const { CallOptions, RetryOptions } = require('google-gax');
const { storage } = require("googleapis/build/src/apis/storage");

// export GRPC_VERBOSITY=DEBUG GRPC_TRACE=all

// https://cloud.google.com/nodejs/docs/reference/storage/latest/storage/retryoptions
// https://googleapis.github.io/gax-nodejs/interfaces/CallOptions.html
// https://googleapis.github.io/gax-nodejs/classes/RetryOptions.html

var projectId = "your_project_id";
var gcs = new Storage({
	projectId: projectId,
	retryOptions: {
		retryDelayMultiplier: 1.3,
		totalTimeout: 600,
		maxRetryDelay: 64,
		autoRetry: true,
		maxRetries: 3,
	}

});
gcs.getBuckets(function (err, buckets) {
	if (!err) {
		buckets.forEach(function (value) {
			logger.info(value.id);
		});
	}
});

const retryAndTimeoutSettings = {
	retryCodes: [
		10, // 'ABORTED'
		1, // 'CANCELLED',
		4, // 'DEADLINE_EXCEEDED'
		13, // 'INTERNAL'
		8, // 'RESOURCE_EXHAUSTED'
		14, // 'UNAVAILABLE'
		2, // 'UNKNOWN'
    ],
	backoffSettings: {
		maxRetries: 1,
		initialRetryDelayMillis: 100,
		retryDelayMultiplier: 1.3,
		maxRetryDelayMillis: 60000,
		initialRpcTimeoutMillis: 5000,
		rpcTimeoutMultiplier: 1.0,
		maxRpcTimeoutMillis: 600000,
		totalTimeoutMillis: 600000,
	}
};


const callOptions = {
	timeout: 2000, // I've set it to 2 seconds to be able to reproduce the deadline exceeded error easily
	retry: retryAndTimeoutSettings,
};

// https://cloud.google.com/nodejs/docs/reference/pubsub/latest/pubsub/pageoptions
const pageOptions = {
	gaxOpts: callOptions
}
const pubsub = new PubSub({
	projectId: projectId,
});

// https://googleapis.dev/nodejs/pubsub/latest/PubSub.html#getTopicsStream
pubsub.getTopics({ options: pageOptions }, (err, topic) => {
	if (err) {
		logger.error(err);
		return;
	}
	topic.forEach(function (entry) {
		logger.info(entry.name);
	});
});

This site supports webmentions. Send me a mention via this form.