Pagination with Google Cloud Client Libraries

2021-12-15

This section covers how to manually page through results.

Many services make extensive use of Pagination where the result set could be fairly large. For example, for BigQuery, you may need to iterate over may rows (see BigQuery: Paging Results)

Fortunately, Google api clients do this for you automatically so you do not normally have to inspect and iterate on your own.

For example, if you want to list_buckets with GCS, simply do that directly

for bucket in client.list_buckets():
    print(bucket)

The client library will do the heavy lifting for you.

However, in some cases, you may want to iterate and and inspect each page (eg, to stop iteration). To do that, you need to take some extra steps that acquire the page and loop on that page.

This section covers how to construct the manual iteration objects in various languages.

For background reference, see AIP-158 Pagination

TODO: figure out how to page for the PubSub client since its automatically wrapped and done for you

references

For Cloud Logging and Monitoring see salrashid123/gcpsamples.iterators

project='your_project'

from typing import NewType
from google.cloud import storage
client = storage.Client(project=project)
for b in client.list_buckets():
   print(b.name)

# https://googleapis.dev/python/google-api-core/latest/page_iterator.html

iterator =  client.list_buckets(page_size=5)
for page in iterator.pages:
  print('    Page number: %d' % (iterator.page_number,))
  print('  Items in page: %d' % (page.num_items,))
  print('Items remaining: %d' % (page.remaining,))
  print('Next page token: %s' % (iterator.next_page_token,))  
  print('----------------------------')
  for entry in page:
      print(entry.name)


# pubsub returns google.pubsub_v1.services.publisher.pagers.ListTopicsPager
# https://github.com/googleapis/google-cloud-python/issues/895
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
project_path = f"projects/{project}"

for topic in publisher.list_topics(request={"project": project_path,"page_size": 5}):
  print(topic.name)

TODO

Not sure how to inject the maxResults parameter in since the Buckets and Topics calls seem to do all this for you


//cloud.google.com/go/storage@v1.18.2/bucket.go

func (c *Client) Buckets(ctx context.Context, projectID string) *BucketIterator {
	it := &BucketIterator{
		ctx:       ctx,
		client:    c,
		projectID: projectID,
	}
	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
		it.fetch,
		func() int { return len(it.buckets) },
		func() interface{} { b := it.buckets; it.buckets = nil; return b })

	return it
}


// cloud.google.com/go/pubsub@v1.17.1/topic.go
func (c *Client) Topics(ctx context.Context) *TopicIterator {
	it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
	return &TopicIterator{
		c:  c,
		it: it,
		next: func() (string, error) {
			topic, err := it.Next()
			if err != nil {
				return "", err
			}
			return topic.Name, nil
		},
	}
}

ref

package main

// https://pkg.go.dev/google.golang.org/api/iterator

import (
	"fmt"

	pubsub "cloud.google.com/go/pubsub"
	storage "cloud.google.com/go/storage"
	"golang.org/x/net/context"
	"google.golang.org/api/iterator"
)

const (
	projectID = "your_project_id"
)

func main() {

	ctx := context.Background()

	storageClient, err := storage.NewClient(ctx)
	if err != nil {
		fmt.Printf("storage.NewClient: %v", err)
		return
	}
	defer storageClient.Close()

	it := storageClient.Buckets(ctx, projectID)
	for {
		battrs, err := it.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			fmt.Printf("storage.Iterating error: %v", err)
			return
		}
		fmt.Printf("Bucket Name: %s\n", battrs.Name)
	}

	// *******************************************
	pubsubClient, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		fmt.Printf("pubsub.NewClient: %v", err)
		return
	}
	defer pubsubClient.Close()

	pit := pubsubClient.Topics(ctx)
	for {
		topic, err := pit.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			fmt.Printf("pubssub.Iterating error: %v", err)
			return
		}
		fmt.Printf("Topic Name: %s\n", topic.ID())
	}
}

Some service may also surface a Pages interface. To use htat

package main

import (
	"fmt"
	"io/ioutil"

	"context"

	"golang.org/x/oauth2/google"
	"google.golang.org/api/cloudidentity/v1"
	"google.golang.org/api/option"
)

func main() {

	ctx := context.Background()

	ts, err := google.DefaultTokenSource(ctx)
	if err != nil {
		fmt.Printf("%v", err)
		return
	}

	cisvc, err := cloudidentity.NewService(ctx, option.WithTokenSource(ts))
	if err != nil {
		fmt.Printf("%v", err)
		return
	}

	err = cisvc.Groups.Memberships.SearchTransitiveGroups("groups/-").Query("member_key_id=='alice@domain.com' && 'cloudidentity.googleapis.com/groups.discussion_forum' in labels").Pages(ctx, func(g *cloudidentity.SearchTransitiveGroupsResponse) error {
		for _, m := range g.Memberships {
			fmt.Printf("%s (%s)\n", m.GroupKey.Id, m.DisplayName)
		}
		return nil
	})

	if err != nil {
		fmt.Printf("%v", err)
		return
	}

}

TODO:

package com.test;

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient.ListTopicsPage;
import com.google.cloud.pubsub.v1.TopicAdminClient.ListTopicsPagedResponse;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Topic;
import com.google.api.gax.paging.Page;
import java.util.Iterator;

import com.google.cloud.storage.Storage.BucketGetOption;

public class TestApp {
	public static void main(String[] args) {
		TestApp tc = new TestApp();
	}

	public TestApp() {
		try {

			String projectID = "your_project_id";

			Storage storage_service = StorageOptions.newBuilder().build().getService();
			// https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.Storage.BucketListOption
			Storage.BucketListOption opt = Storage.BucketListOption.pageSize(5);

			Page<Bucket> pages = storage_service.list(opt);
			// https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.paging.Page#com_google_api_gax_paging_Page_hasNextPage__
			while (pages.hasNextPage()) {
				pages = pages.getNextPage();
				for (Bucket b : pages.getValues()) {
					System.out.println(b.getName());
				}
				System.out.println("Getting Next Storage page");
			}

			// Page<Bucket> buckets = storage_service.list(opt);
			// Iterator<Bucket> bucketIterator = buckets.iterateAll().iterator();
			// while (bucketIterator.hasNext()) {
			// Bucket b = bucketIterator.next();
			// System.out.println(b.getName());
			// }

			// for (Bucket b : storage_service.list(opt).iterateAll()) {
			// System.out.println(b);
			// }

			TopicAdminClient topicClient = TopicAdminClient.create(TopicAdminSettings.newBuilder().build());

			ListTopicsRequest listTopicsRequest = ListTopicsRequest.newBuilder()
					.setProject(ProjectName.format(projectID)).setPageSize(5)
					.build();

			ListTopicsPagedResponse response = topicClient.listTopics(listTopicsRequest);
			Page<Topic> ppages = response.getPage();
			while (ppages.hasNextPage()) {
				ppages = ppages.getNextPage();
				for (Topic t : ppages.getValues()) {
					System.out.println(t.getName());
				}
				System.out.println("Getting Next Pubsub page");
			}

			// ListTopicsRequest listTopicsRequest = ListTopicsRequest.newBuilder()
			// .setProject(ProjectName.format(projectID))
			// .build();
			// ListTopicsPagedResponse response = topicClient.listTopics(listTopicsRequest);
			// Iterable<Topic> topics = response.iterateAll();
			// for (Topic topic : topics)
			// System.out.println(topic);

		} catch (Exception ex) {
			System.out.println("Error: " + ex);
		}
	}

}

TODO.

var log4js = require("log4js");
var logger = log4js.getLogger();

const {PubSub} = require('@google-cloud/pubsub');
const {Storage} = require('@google-cloud/storage');

// https://cloud.google.com/storage/docs/samples/storage-list-files-paginated

var gcs = new Storage();
async function listBucketsPaginated() {
	function manualPaginationCallback(err, rows, nextQuery) {
		console.log("Getting Next page")
		rows.forEach(bucket => {
		  console.log(`name: ${bucket.name}`);
		});
	
		if (nextQuery) {
		  gcs.getBuckets(nextQuery, manualPaginationCallback);
		}
	  }
	await gcs.getBuckets({autoPaginate: false, maxResults: 5},manualPaginationCallback);
  }
  
listBucketsPaginated().catch(console.error);
  
// **********************************


const pubsub = new PubSub({
  projectId: 'your_project_id'
});

async function listTopicsPaginated() {
	function manualPaginationCallback(err, rows, nextQuery) {
		console.log("Getting Next page")
		rows.forEach(topic => {
		  console.log(`name: ${topic.name}`);
		});
	
		if (nextQuery) {
			pubsub.getTopics(nextQuery, manualPaginationCallback);
		}
	  }
	await pubsub.getTopics({autoPaginate: false, maxResults: 5},manualPaginationCallback);
  }

listTopicsPaginated().catch(console.error);

TODO

This site supports webmentions. Send me a mention via this form.