使用批处理的步骤如下:
创建新的批量作业
您需要通过调用 MutateBatchJob
创建 BatchJob
资源。
Java
private String createBatchJob(BatchJobServiceClient batchJobServiceClient, long customerId) { BatchJobOperation operation = BatchJobOperation.newBuilder().setCreate(BatchJob.newBuilder().build()).build(); String batchJobResourceName = batchJobServiceClient .mutateBatchJob(Long.toString(customerId), operation) .getResult() .getResourceName(); System.out.printf("Created a mutate job with resource name: '%s'.%n", batchJobResourceName); return batchJobResourceName; }
C#
private static string CreateBatchJob(BatchJobServiceClient batchJobService, long customerId) { BatchJobOperation operation = new BatchJobOperation() { Create = new BatchJob() { } }; string batchJobResourceName = batchJobService.MutateBatchJob(customerId.ToString(), operation) .Result.ResourceName; Console.WriteLine($"Created a batch job with resource name: " + $"'{batchJobResourceName}'."); return batchJobResourceName; }
PHP
private static function createBatchJob( BatchJobServiceClient $batchJobServiceClient, int $customerId ): string { // Creates a batch job operation to create a new batch job. $batchJobOperation = new BatchJobOperation(); $batchJobOperation->setCreate(new BatchJob()); // Issues a request to the API and get the batch job's resource name. $batchJobResourceName = $batchJobServiceClient->mutateBatchJob( MutateBatchJobRequest::build($customerId, $batchJobOperation) )->getResult()->getResourceName(); printf( "Created a batch job with resource name: '%s'.%s", $batchJobResourceName, PHP_EOL ); return $batchJobResourceName; }
Python
def create_batch_job(batch_job_service, customer_id, batch_job_operation): """Creates a batch job for the specified customer ID. Args: batch_job_service: an instance of the BatchJobService message class. customer_id: a str of a customer ID. batch_job_operation: a BatchJobOperation instance set to "create" Returns: a str of a resource name for a batch job. """ try: response = batch_job_service.mutate_batch_job( customer_id=customer_id, operation=batch_job_operation ) resource_name = response.result.resource_name print(f'Created a batch job with resource name "{resource_name}"') return resource_name except GoogleAdsException as exception: handle_googleads_exception(exception)
Ruby
def create_batch_job(client, batch_job_service, customer_id) # Creates a batch job operation to create a new batch job. operation = client.operation.create_resource.batch_job # Issues a request to the API and get the batch job's resource name. response = batch_job_service.mutate_batch_job( customer_id: customer_id, operation: operation ) batch_job_resource_name = response.result.resource_name puts "Created a batch job with resource name: '#{batch_job_resource_name}'" batch_job_resource_name end
Perl
sub create_batch_job { my ($batch_job_service, $customer_id) = @_; # Create a batch job operation. my $batch_job_operation = Google::Ads::GoogleAds::V18::Services::BatchJobService::BatchJobOperation-> new({create => Google::Ads::GoogleAds::V18::Resources::BatchJob->new({})}); my $batch_job_resource_name = $batch_job_service->mutate({ customerId => $customer_id, operation => $batch_job_operation })->{result}{resourceName}; printf "Created a batch job with resource name: '%s'.\n", $batch_job_resource_name; return $batch_job_resource_name; }
在此流程的此阶段,作业的 status
为 PENDING
。
向批处理作业添加一个或多个更改操作
通过调用 AddBatchJobOperations
,将一个或多个 MutateOperation
添加到上一步中创建的批处理作业。然后,响应将包含以下内容:
- 为此作业到目前为止添加的操作总数
- 调用此方法添加更多操作时要使用的序列令牌
再次调用 AddBatchJobOperations
以添加更多操作时,请务必在请求的 sequence_token
字段中指定之前获取的序列令牌。如果您使用之前获取的序列令牌以外的任何序列令牌调用该方法,则会导致错误。
sequence_token
还可作为 BatchJob
的 next_add_sequence_token
使用,您稍后可以检索该值。
如果您要创建依赖对象(例如由新广告系列以及相应的广告组、广告和关键字组成的完整广告系列),则可以使用临时 ID 指定资源名称。
Java
private void addAllBatchJobOperations( BatchJobServiceClient batchJobServiceClient, long customerId, String batchJobResourceName) { AddBatchJobOperationsResponse response = batchJobServiceClient.addBatchJobOperations( AddBatchJobOperationsRequest.newBuilder() .setResourceName(batchJobResourceName) .addAllMutateOperations(buildAllOperations(customerId)) .build()); System.out.printf( "%d mutate operations have been added so far.%n", response.getTotalOperations()); // You can use this next sequence token for calling addBatchJobOperations() next time. System.out.printf( "Next sequence token for adding next operations is '%s'.%n", response.getNextSequenceToken()); }
C#
private static void AddAllBatchJobOperations(BatchJobServiceClient batchJobService, long customerId, string batchJobResourceName) { AddBatchJobOperationsResponse response = batchJobService.AddBatchJobOperations( new AddBatchJobOperationsRequest() { ResourceName = batchJobResourceName, MutateOperations = { BuildAllOperations(customerId) } }); Console.WriteLine($"{response.TotalOperations} mutate operations have been added" + $" so far."); // You can use this next sequence token for calling AddBatchJobOperations() next time. Console.WriteLine($"Next sequence token for adding next operations is " + $"'{response.NextSequenceToken}'."); }
PHP
private static function addAllBatchJobOperations( BatchJobServiceClient $batchJobServiceClient, int $customerId, string $batchJobResourceName ): void { $response = $batchJobServiceClient->addBatchJobOperations( AddBatchJobOperationsRequest::build( $batchJobResourceName, '', self::buildAllOperations($customerId) ) ); printf( "%d mutate operations have been added so far.%s", $response->getTotalOperations(), PHP_EOL ); // You can use this next sequence token for calling addBatchJobOperations() next time. printf( "Next sequence token for adding next operations is '%s'.%s", $response->getNextSequenceToken(), PHP_EOL ); }
Python
def add_all_batch_job_operations(batch_job_service, operations, resource_name): """Adds all mutate operations to the batch job. As this is the first time for this batch job, we pass null as a sequence token. The response will contain the next sequence token that we can use to upload more operations in the future. Args: batch_job_service: an instance of the BatchJobService message class. operations: a list of a mutate operations. resource_name: a str of a resource name for a batch job. """ try: response = batch_job_service.add_batch_job_operations( resource_name=resource_name, sequence_token=None, mutate_operations=operations, ) print( f"{response.total_operations} mutate operations have been " "added so far." ) # You can use this next sequence token for calling # add_batch_job_operations() next time. print( "Next sequence token for adding next operations is " f"{response.next_sequence_token}" ) except GoogleAdsException as exception: handle_googleads_exception(exception)
Ruby
def add_all_batch_job_operations( client, batch_job_service, customer_id, batch_job_resource_name) response = batch_job_service.add_batch_job_operations( resource_name: batch_job_resource_name, mutate_operations: build_all_operations(client, customer_id), ) puts "#{response.total_operations} mutate operations have been added so far." # You can use this next sequence token for calling # add_all_batch_job_operations() next time puts "Next sequence token for adding next operations is " \ "'#{response.next_sequence_token}'" end
Perl
sub add_all_batch_job_operations { my ($batch_job_service, $customer_id, $batch_job_resource_name) = @_; my $add_batch_job_operations_response = $batch_job_service->add_operations({ resourceName => $batch_job_resource_name, sequenceToken => undef, mutateOperations => build_all_operations($customer_id)}); printf "%d batch operations have been added so far.\n", $add_batch_job_operations_response->{totalOperations}; # You can use this next sequence token for calling add_operations() next time. printf "Next sequence token for adding next operations is '%s'.\n", $add_batch_job_operations_response->{nextSequenceToken}; }
点击以下链接,在 GitHub 中查看客户端库的 build 操作函数的内容:
Java
buildAllOperations()
C#
BuildAllOperations()
PHP
buildAllOperations()
Python
build_all_operations()
Ruby
build_all_operations()
Perl
build_all_operations
运行批量作业
添加所有操作后,您可以对上传的操作调用 RunBatchJob
,请求 Google Ads API 运行批处理作业。
Java
private OperationFuture runBatchJob( BatchJobServiceClient batchJobServiceClient, String batchJobResourceName) { OperationFuture operationResponse = batchJobServiceClient.runBatchJobAsync(batchJobResourceName); // BEWARE! The above call returns an OperationFuture. The execution of that future depends on // the thread pool which is owned by batchJobServiceClient. If you use this future, you *must* // keep the service client in scope too. // See https://developers.google.com/google-ads/api/docs/client-libs/java/lro for more detail. System.out.printf( "Mutate job with resource name '%s' has been executed.%n", batchJobResourceName); return operationResponse; }
C#
private Operation<Empty, BatchJobMetadata> RunBatchJob( BatchJobServiceClient batchJobService, string batchJobResourceName) { Operation<Empty, BatchJobMetadata> operationResponse = batchJobService.RunBatchJob(batchJobResourceName); Console.WriteLine($"Batch job with resource name '{batchJobResourceName}' has been " + $"executed."); return operationResponse; }
PHP
private static function runBatchJob( BatchJobServiceClient $batchJobServiceClient, string $batchJobResourceName ): OperationResponse { $operationResponse = $batchJobServiceClient->runBatchJob(RunBatchJobRequest::build($batchJobResourceName)); printf( "Batch job with resource name '%s' has been executed.%s", $batchJobResourceName, PHP_EOL ); return $operationResponse; }
Python
def run_batch_job(batch_job_service, resource_name): """Runs the batch job for executing all uploaded mutate operations. Args: batch_job_service: an instance of the BatchJobService message class. resource_name: a str of a resource name for a batch job. Returns: a google.api_core.operation.Operation instance. """ try: response = batch_job_service.run_batch_job(resource_name=resource_name) print( f'Batch job with resource name "{resource_name}" has been ' "executed." ) return response except GoogleAdsException as exception: handle_googleads_exception(exception)
Ruby
def run_batch_job(batch_job_service, batch_job_resource_name) operation_response = batch_job_service.run_batch_job( resource_name: batch_job_resource_name, ) puts "Batch job with resource name '#{batch_job_resource_name}' " \ "has been executed." operation_response end
Perl
sub run_batch_job { my ($batch_job_service, $batch_job_resource_name) = @_; my $batch_job_lro = $batch_job_service->run({resourceName => $batch_job_resource_name}); printf "Batch job with resource name '%s' has been executed.\n", $batch_job_resource_name; return $batch_job_lro; }
返回的响应是长时间运行的 Operation
(LRO) 的对象。LRO 包含批量作业的元数据以及作业状态的相关信息。
轮询批量作业的状态,直到其完成
下一步是使用 LRO 的 GetOperation
轮询批量作业的状态,直到 LRO 的 done
为 true
。
Java
private void pollBatchJob(OperationFuture operationResponse) { try { operationResponse.get(MAX_TOTAL_POLL_INTERVAL_SECONDS, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { System.err.printf("Failed polling the mutate job. Exception: %s%n", e); System.exit(1); } }
C#
private static void PollBatchJob(Operation<Empty, BatchJobMetadata> operationResponse) { PollSettings pollSettings = new PollSettings( Expiration.FromTimeout(TimeSpan.FromSeconds(MAX_TOTAL_POLL_INTERVAL_SECONDS)), TimeSpan.FromSeconds(1)); operationResponse.PollUntilCompleted(pollSettings); }
PHP
private static function pollBatchJob(OperationResponse $operationResponse): void { $operationResponse->pollUntilComplete([ 'initialPollDelayMillis' => self::POLL_FREQUENCY_SECONDS * 1000, 'totalPollTimeoutMillis' => self::MAX_TOTAL_POLL_INTERVAL_SECONDS * 1000 ]); }
Python
def poll_batch_job(operations_response, event): """Polls the server until the batch job execution finishes. Sets the initial poll delay time and the total time to wait before time-out. Args: operations_response: a google.api_core.operation.Operation instance. event: an instance of asyncio.Event to invoke once the operations have completed, alerting the awaiting calling code that it can proceed. """ loop = asyncio.get_event_loop() def done_callback(future): # The operations_response object will call callbacks from a daemon # thread so we must use a threadsafe method of setting the event here # otherwise it will not trigger the awaiting code. loop.call_soon_threadsafe(event.set) # operations_response represents a Long-Running Operation or LRO. The class # provides an interface for polling the API to check when the operation is # complete. Below we use the asynchronous interface, but there's also a # synchronous interface that uses the Operation.result method. # See: https://googleapis.dev/python/google-api-core/latest/operation.html operations_response.add_done_callback(done_callback)
Ruby
def poll_batch_job(operation_response) operation_response.wait_until_done! end
Perl
sub poll_batch_job { my ($operation_service, $batch_job_lro) = @_; $operation_service->poll_until_done({ name => $batch_job_lro->{name}, pollFrequencySeconds => POLL_FREQUENCY_SECONDS, pollTimeoutSeconds => POLL_TIMEOUT_SECONDS }); }
列出所有批处理作业结果
所有批处理作业都完成后,您可以使用 ListBatchJobResults
列出其结果,以便输出其状态和响应:
Java
private void fetchAndPrintResults( BatchJobServiceClient batchJobServiceClient, String batchJobResourceName) { System.out.printf( "Mutate job with resource name '%s' has finished. Now, printing its results...%n", batchJobResourceName); // Gets all the results from running mutate job and prints their information. ListBatchJobResultsPagedResponse batchJobResults = batchJobServiceClient.listBatchJobResults( ListBatchJobResultsRequest.newBuilder() .setResourceName(batchJobResourceName) .setPageSize(PAGE_SIZE) .build()); for (BatchJobResult batchJobResult : batchJobResults.iterateAll()) { System.out.printf( "Mutate job #%d has a status '%s' and response of type '%s'.%n", batchJobResult.getOperationIndex(), batchJobResult.getStatus().getMessage().isEmpty() ? "N/A" : batchJobResult.getStatus().getMessage(), batchJobResult .getMutateOperationResponse() .getResponseCase() .equals(ResponseCase.RESPONSE_NOT_SET) ? "N/A" : batchJobResult.getMutateOperationResponse().getResponseCase()); } }
C#
private static void FetchAndPrintResults(BatchJobServiceClient batchJobService, string batchJobResourceName) { Console.WriteLine($"batch job with resource name '{batchJobResourceName}' has " + $"finished. Now, printing its results..."); ListBatchJobResultsRequest request = new ListBatchJobResultsRequest() { ResourceName = batchJobResourceName, PageSize = PAGE_SIZE, }; ListBatchJobResultsResponse resp = new ListBatchJobResultsResponse(); // Gets all the results from running batch job and prints their information. foreach (BatchJobResult batchJobResult in batchJobService.ListBatchJobResults(request)) { if (!batchJobResult.IsFailed) { Console.WriteLine($"batch job result #{batchJobResult.OperationIndex} is " + $"successful and response is of type " + $"'{batchJobResult.MutateOperationResponse.ResponseCase}'."); } else { Console.WriteLine($"batch job result #{batchJobResult.OperationIndex} " + $"failed with error message {batchJobResult.Status.Message}."); foreach (GoogleAdsError error in batchJobResult.Failure.Errors) { Console.WriteLine($"Error found: {error}."); } } } }
PHP
private static function fetchAndPrintResults( BatchJobServiceClient $batchJobServiceClient, string $batchJobResourceName ): void { printf( "Batch job with resource name '%s' has finished. Now, printing its results...%s", $batchJobResourceName, PHP_EOL ); // Gets all the results from running batch job and print their information. $batchJobResults = $batchJobServiceClient->listBatchJobResults( ListBatchJobResultsRequest::build($batchJobResourceName)->setPageSize(self::PAGE_SIZE) ); foreach ($batchJobResults->iterateAllElements() as $batchJobResult) { /** @var BatchJobResult $batchJobResult */ printf( "Batch job #%d has a status '%s' and response of type '%s'.%s", $batchJobResult->getOperationIndex(), $batchJobResult->getStatus() ? $batchJobResult->getStatus()->getMessage() : 'N/A', $batchJobResult->getMutateOperationResponse() ? $batchJobResult->getMutateOperationResponse()->getResponse() : 'N/A', PHP_EOL ); } }
Python
def fetch_and_print_results(client, batch_job_service, resource_name): """Prints all the results from running the batch job. Args: client: an initialized GoogleAdsClient instance. batch_job_service: an instance of the BatchJobService message class. resource_name: a str of a resource name for a batch job. """ print( f'Batch job with resource name "{resource_name}" has finished. ' "Now, printing its results..." ) list_results_request = client.get_type("ListBatchJobResultsRequest") list_results_request.resource_name = resource_name list_results_request.page_size = 1000 # Gets all the results from running batch job and prints their information. batch_job_results = batch_job_service.list_batch_job_results( request=list_results_request ) for batch_job_result in batch_job_results: status = batch_job_result.status.message status = status if status else "N/A" result = batch_job_result.mutate_operation_response result = result or "N/A" print( f"Batch job #{batch_job_result.operation_index} " f'has a status "{status}" and response type "{result}"' )
Ruby
def fetch_and_print_results(batch_job_service, batch_job_resource_name) puts "Batch job with resource name '#{batch_job_resource_name}' has " \ "finished. Now, printing its results..." \ # Gets all the results from running batch job and print their information. batch_job_results = batch_job_service.list_batch_job_results( resource_name: batch_job_resource_name, page_size: PAGE_SIZE, ) batch_job_results.each do |result| puts "Batch job ##{result.operation_index} has a status " \ "#{result.status ? result.status.message : 'N/A'} and response of type " \ "#{result.mutate_operation_response ? result.mutate_operation_response.response : 'N/A'}" end end
Perl
sub fetch_and_print_results { my ($batch_job_service, $batch_job_resource_name) = @_; printf "Batch job with resource name '%s' has finished. " . "Now, printing its results...\n", $batch_job_resource_name; # Get all the results from running batch job and print their information. my $list_batch_job_results_response = $batch_job_service->list_results({ resourceName => $batch_job_resource_name, pageSize => PAGE_SIZE }); foreach my $batch_job_result (@{$list_batch_job_results_response->{results}}) { printf "Batch job #%d has a status '%s' and response of type '%s'.\n", $batch_job_result->{operationIndex}, $batch_job_result->{status} ? $batch_job_result->{status}{message} : "N/A", $batch_job_result->{mutateOperationResponse} ? [keys %{$batch_job_result->{mutateOperationResponse}}]->[0] : "N/A"; } }
如果相应操作成功,mutate_operation_response
的 response
将包含其 resource_name
的结果。此外,如果 response_content_type
设置为 MUTABLE_RESOURCE
,则结果的资源包含已修改的资源,其中填充了其所有可变字段。
如果相应操作产生了错误且无法完成,则 mutate_operation_response
字段为 null
。
BatchJobResult
的 status
字段包含每个失败操作的错误详情。
错误处理
BatchJobService
会自动重试因暂时性错误而失败的操作,但无法避免所有失败场景。您可以修正因验证错误而失败的操作,然后在新的批处理作业中重新提交。您可以将已取消的操作添加到新的批处理作业中,以便重试这些操作。