批处理

大多数服务都提供同步 API,要求您发出请求然后等待响应,但 BatchJobService 允许您对多项服务执行批量操作,而无需等待操作完成。

与各服务的特定 mutate 操作不同,BatchJobService 中的单个作业可以处理由广告系列、广告组、广告、条件、标签和 Feed 项组成的混合集合。已提交的作业会并行运行,且 BatchJobService 会自动重试由于 RateExceededError 等瞬态错误而失败的操作。

此外,BatchJobService 允许您在请求中使用临时 ID,以便可以在单个作业中提交非独立操作。

支持的操作

BatchJobService 支持以下操作:

操作 对应的同步服务
AdGroupAdOperation
AdGroupAdLabelOperation
AdGroupAdService
AdGroupBidModifierOperation AdGroupBidModifierService
AdGroupCriterionOperation
AdGroupCriterionLabelOperation
AdGroupCriterionService
AdGroupExtensionSettingOperation AdGroupExtensionSettingService
AdGroupOperation
AdGroupLabelOperation
AdGroupService
BudgetOperation BudgetService
CampaignCriterionOperation CampaignCriterionService
CampaignExtensionSettingOperation CampaignExtensionSettingService
CampaignOperation
CampaignLabelOperation
CampaignService
CustomerExtensionSettingOperation CustomerExtensionSettingService
FeedItemOperation FeedItemService

架构

虽然每个客户端库都包含相应的实用工具,用于处理已上传操作的 XML 序列化和已下载结果的 XML 反序列化,但以下链接提供了关于批量作业请求与响应的完整架构:

https://adwords.google.com/api/adwords/cm/v201702/BatchJobOpsService?wsdl

批量作业流程

使用批量作业的步骤如下:

  1. 创建 BatchJob,然后从 mutate() 响应中获取 uploadUrl
  2. 将您想要执行的操作列表上传到 uploadUrl
  3. 定期轮询批量作业的 status,直到状态变为 CANCELEDDONE
  4. 从作业的 downloadUrl 下载作业结果,并检查是否存在 processingErrors

此外,您可以取消状态为 AWAITING_FILEACTIVEBatchJob,方法是将其 status 设为 CANCELING

创建批量作业

创建批量作业的方法是发送 ADD 操作,其中包含新的 BatchJob 对象。

// Create a BatchJob.
BatchJobOperation addOp = new BatchJobOperation();
addOp.setOperator(Operator.ADD);
addOp.setOperand(new BatchJob());

BatchJob batchJob = batchJobService.mutate(new BatchJobOperation[] {addOp}).getValue(0);

// Get the upload URL from the new job.
String uploadUrl = batchJob.getUploadUrl().getUrl();

System.out.printf("Created BatchJob with ID %d, status '%s' and upload URL %s.%n",
    batchJob.getId(), batchJob.getStatus(), uploadUrl);

此时,作业的 status 会是 AWAITING_FILE

为批量作业创建操作

在这一步,您为批量作业创建操作的方式与您使用同步 API 服务的方式相同。例如,以下代码段创建了 CampaignOperation 对象,用于添加新的广告系列。

List<CampaignOperation> operations = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_CAMPAIGNS_TO_ADD; i++) {
  Campaign campaign = new Campaign();
  campaign.setName(String.format("Batch Campaign %s.%s", namePrefix, i));

  // Recommendation: Set the campaign to PAUSED when creating it to prevent
  // the ads from immediately serving. Set to ENABLED once you've added
  // targeting and the ads are ready to serve.
  campaign.setStatus(CampaignStatus.PAUSED);

  campaign.setId(tempIdGenerator.next());
  campaign.setAdvertisingChannelType(AdvertisingChannelType.SEARCH);
  Budget budget = new Budget();
  budget.setBudgetId(budgetId);
  campaign.setBudget(budget);
  BiddingStrategyConfiguration biddingStrategyConfiguration =
      new BiddingStrategyConfiguration();
  biddingStrategyConfiguration.setBiddingStrategyType(BiddingStrategyType.MANUAL_CPC);

  // You can optionally provide a bidding scheme in place of the type.
  ManualCpcBiddingScheme cpcBiddingScheme = new ManualCpcBiddingScheme();
  cpcBiddingScheme.setEnhancedCpcEnabled(false);
  biddingStrategyConfiguration.setBiddingScheme(cpcBiddingScheme);

  campaign.setBiddingStrategyConfiguration(biddingStrategyConfiguration);

  CampaignOperation operation = new CampaignOperation();
  operation.setOperand(campaign);
  operation.setOperator(Operator.ADD);
  operations.add(operation);
}
return operations;

如果您要创建非独立对象,例如一个包含新广告系列和相应广告组、广告及关键字的完整广告系列,那么您可以在 ADD 操作中使用临时 ID

// Create a temporary ID generator that will produce a sequence of descending negative numbers.
Iterator<Long> tempIdGenerator =
    new AbstractSequentialIterator<Long>(-1L) {
      @Override
      protected Long computeNext(Long previous) {
        return Long.MIN_VALUE == previous.longValue() ? null : previous - 1;
      }
    };

// Use a random UUID name prefix to avoid name collisions.
String namePrefix = UUID.randomUUID().toString();

// Create the mutate request that will be sent to the upload URL.
List<Operation> operations = new ArrayList<>();

// Create and add an operation to create a new budget.
BudgetOperation budgetOperation = buildBudgetOperation(tempIdGenerator, namePrefix);
operations.add(budgetOperation);

// Create and add operations to create new campaigns.
List<CampaignOperation> campaignOperations =
    buildCampaignOperations(tempIdGenerator, namePrefix, budgetOperation);
operations.addAll(campaignOperations);

// Create and add operations to create new negative keyword criteria for each campaign.
operations.addAll(buildCampaignCriterionOperations(campaignOperations));

// Create and add operations to create new ad groups.
List<AdGroupOperation> adGroupOperations =
    new ArrayList<>(buildAdGroupOperations(tempIdGenerator, namePrefix, campaignOperations));
operations.addAll(adGroupOperations);

// Create and add operations to create new ad group criteria (keywords).
operations.addAll(buildAdGroupCriterionOperations(adGroupOperations));

// Create and add operations to create new ad group ads (text ads).
operations.addAll(buildAdGroupAdOperations(adGroupOperations));

将操作上传到上传网址

在您收集完作业的操作组合后,下一步就是将它们发送到上传网址。

如果您使用某个客户端库中的实用工具,则无需操心所有的底层细节。实用工具会为您构建并发送请求,并且为以下两种选择都提供了方法:

  1. 一次性上传所有操作。
  2. 通过多次调用实用工具来上传操作。

选择 1:一次性上传所有操作

以下示例使用 Java 客户端库中的 BatchJobHelper 实用工具一次性上传所有操作。

// Use a BatchJobHelper to upload all operations.
BatchJobHelper batchJobHelper = adWordsServices.getUtility(session, BatchJobHelper.class);

batchJobHelper.uploadBatchJobOperations(operations, uploadUrl);

选择 2:通过多次调用实用工具来上传操作

以下示例使用 Java 客户端库中的 BatchJobHelper 实用工具,通过多次调用该实用工具的 uploadIncrementalBatchJobOperations() 方法循序渐进地上传操作。

// Use a BatchJobUploadHelper to upload all operations.
BatchJobHelper batchJobUploadHelper = new BatchJobHelper(session);
BatchJobUploadStatus startingUploadStatus =
    new BatchJobUploadStatus(0, URI.create(batchJob.getUploadUrl().getUrl()));
BatchJobUploadResponse uploadResponse;

// Create and upload the first operation to create a new budget.
BudgetOperation budgetOperation = buildBudgetOperation(tempIdGenerator, namePrefix);
uploadResponse = batchJobUploadHelper.uploadIncrementalBatchJobOperations(
    Lists.newArrayList(budgetOperation),
    false, /* pass isLastRequest = false */
    startingUploadStatus);
System.out.printf("First upload response: %s%n", uploadResponse);

// Create and upload intermediate operations to create new campaigns.
List<CampaignOperation> campaignOperations =
    buildCampaignOperations(budgetOperation, tempIdGenerator, namePrefix);
uploadResponse = batchJobUploadHelper.uploadIncrementalBatchJobOperations(
    campaignOperations,
    false, /* pass isLastRequest = false */
    uploadResponse.getBatchJobUploadStatus());
System.out.printf("Intermediate upload response: %s%n", uploadResponse);

// Upload more intermediate requests...

// Create and upload operations to create new ad group ads (text ads).
// This is the final upload request for the BatchJob.
uploadResponse = batchJobUploadHelper.uploadIncrementalBatchJobOperations(
    buildAdGroupAdOperations(adGroupOperations),
    true, /* pass isLastRequest = true */
    uploadResponse.getBatchJobUploadStatus());
System.out.printf("Last upload response: %s%n", uploadResponse);

轮询批量作业状态

在您上传完操作后,系统会将批量作业提交到作业队列,因此您需要定期检查作业状态,直到状态变为 CANCELEDDONE。可使用指数退避策略来避免过于频繁的轮询。以下代码段将在首次尝试后等待 30 秒,在第二次尝试后等待 60 秒,在第三次尝试后等待 120 秒,以此类推。

int pollAttempts = 0;
boolean isPending = true;
Selector selector =
    new SelectorBuilder()
        .fields(BatchJobField.Id, BatchJobField.Status, BatchJobField.DownloadUrl,
            BatchJobField.ProcessingErrors, BatchJobField.ProgressStats)
        .equalsId(batchJob.getId())
        .build();
do {
  long sleepSeconds = (long) Math.scalb(30, pollAttempts);
  System.out.printf("Sleeping %d seconds...%n", sleepSeconds);
  Thread.sleep(sleepSeconds * 1000);

  batchJob = batchJobService.get(selector).getEntries(0);
  System.out.printf(
      "Batch job ID %d has status '%s'.%n", batchJob.getId(), batchJob.getStatus());

  pollAttempts++;
  isPending = PENDING_STATUSES.contains(batchJob.getStatus());
} while (isPending && pollAttempts < MAX_POLL_ATTEMPTS);

下载批量作业结果和检查是否存在错误

在这个阶段,您的作业应该处于以下两种状态之一。

状态 说明 要采取的行动
DONE BatchJobService 已成功解析并尝试了每一项上传的操作。
  • 从批量作业的 downloadUrl 下载每项操作的结果。
CANCELED 在 BatchJobService 尝试解析上传的操作时发生了错误。
  • 检查批量作业的 processingErrors 列表。
  • 从批量作业的 downloadUrl 下载已成功解析的任何操作的结果(如果有)。

对于每个已上传的操作,下载网址将返回一个 mutateResult 元素。每个结果都具有在 BatchJobOpsService.wsdl 中定义的以下属性:

属性 类型 说明
result Operand 如果操作成功,则必定且只能有以下子元素之一
  • Ad
  • AdGroup
  • AdGroupAd
  • AdGroupAdLabel
  • AdGroupBidModifier
  • AdGroupCriterion
  • AdGroupCriterionLabel
  • AdGroupExtensionSetting
  • AdGroupLabel
  • Budget
  • Campaign
  • CampaignCriterion
  • CampaignExtensionSetting
  • CampaignLabel
  • CustomerExtensionSetting
  • FeedItem
返回的元素和对象与用于 index 的操作类型相对应。例如,如果该操作是一个成功的 CampaignOperation,则会在此处返回一个 Campaign 元素。
errorList ErrorList 如果操作失败,则包含一个或多个 errors 元素,每个元素都是 ApiError 或它的某个子类的实例。
index long 从 0 开始的操作编号。此编号用来将结果与您上传的相应操作进行关联。

以下代码展示了处理从下载网址提取而来的结果的一种方式。

if (batchJob.getDownloadUrl() != null && batchJob.getDownloadUrl().getUrl() != null) {
  BatchJobMutateResponse mutateResponse =
      batchJobHelper.downloadBatchJobMutateResponse(batchJob.getDownloadUrl().getUrl());
  System.out.printf("Downloaded results from %s:%n", batchJob.getDownloadUrl().getUrl());
  for (MutateResult mutateResult : mutateResponse.getMutateResults()) {
    String outcome = mutateResult.getErrorList() == null ? "SUCCESS" : "FAILURE";
    System.out.printf("  Operation [%d] - %s%n", mutateResult.getIndex(), outcome);
  }
} else {
  System.out.println("No results available for download.");
}

批量作业的 processingErrors 将包含在预处理您上传的操作时发生的所有错误,比如输入文件损坏错误。以下代码展示了处理此类错误的一种方式。

if (batchJob.getProcessingErrors() != null) {
  int i = 0;
  for (BatchJobProcessingError processingError : batchJob.getProcessingErrors()) {
    System.out.printf(
        "  Processing error [%d]: errorType=%s, trigger=%s, errorString=%s, fieldPath=%s"
        + ", reason=%s%n",
        i++, processingError.getApiErrorType(), processingError.getTrigger(),
        processingError.getErrorString(), processingError.getFieldPath(),
        processingError.getReason());
  }
} else {
  System.out.println("No processing errors found.");
}

使用临时 ID

BatchJobService 的一个强大功能是它支持使用临时 ID。临时 ID 是一个负数 (long),让批量作业中的操作可以引用同一批量作业中的上一个操作的 ADD 操作结果。只需在 ADD 操作中指定父对象的负数 ID,然后就可以在后续的 mutate() 操作中将该 ID 重复用于同一批量作业中的其他非独立对象。

临时 ID 的一种常见使用情形是在单个批量作业中制作完整广告系列。例如,您可以创建一个包含 ADD 操作的作业,并为每个 operand 指定以下 ID:

顺序如下:

  • 添加一个(临时)ID 为 -1 的广告系列
    • 为广告系列 -1 添加一个临时 ID 为 -2 的广告组
      • 为广告组 -2 添加一个广告组广告
      • 为广告组 -2 添加多个广告组条件(关键字)
  • 向广告系列 -1 应用一个标签
  • 为广告系列 -1 添加一个广告系列否定条件(关键字)

取消批量作业

如果 BatchJobstatusAWAITING_FILEACTIVE,则可以取消。只需发出 BatchJobService.mutate() 请求并传递具有以下属性的 BatchJobOperation 即可:

  • operator = SET
  • operand = 某个 BatchJob,其:
    • id = 该批量作业的 ID
    • status = CANCELING

如果 BatchJobstatus 在上述请求发出时既不是 AWAITING_FILE,也不是 ACTIVE,请求将会失败,且返回一个 BatchJobError.INVALID_STATE_CHANGE 错误。

取消作业是一个异步操作,因此在您发出 mutate() 请求后,请轮询批量作业状态,直到状态变为 CANCELEDDONE。并且,请您务必下载结果和检查错误,因为在作业被取消之前,系统可能已经尝试执行作业中的部分操作。

上传要求

非增量上传

每个客户端库实用工具都提供了一步完成上传操作的便捷方法。但是,如果您未使用客户端库,请注意,非增量上传将不受支持

增量上传

使用增量上传,您可以将多个上传请求发送给批量作业 uploadUrl。该作业只会在您上传完最后一组操作后开始执行。

将 BatchJob uploadURL 更换为断点续传网址

上传过程遵循使用 XML API 实现断点续传的 Google 云端存储准则

您的 BatchJobuploadUrl 必须换成支持断点续传的上传网址。要将您的 uploadUrl 换成支持断点续传的上传网址,请向 uploadUrl 发送符合以下规范的请求:

请求属性
请求方法 POST
网址 BatchJobService.mutate 返回的上传网址
Content-Type HTTP 标头 application/xml
Content-Length HTTP 标头 0
x-goog-resumable HTTP 标头 start
请求正文 不需要请求正文

如果请求成功,则返回的响应状态为 201 Created,并且包含一个 Location 标头,其中的值就是断点续传网址。

将操作上传到断点续传网址

有了断点续传网址后,您就可以开始上传操作了。发送到断点续传网址的每个请求都必须符合以下规范。

请求属性
请求方法 PUT
网址 在上述初始化步骤中获得的断点续传网址。
Content-Type HTTP 标头 application/xml
Content-Length HTTP 标头 当前请求内容的字节数。
Content-Range HTTP 标头
请求的字节数范围,后跟总字节数。对于第一个请求和中间请求,总字节数为“*”,但是在发送最后一个请求时,应该将总字节数设置为最终的总字节数。
示例:
bytes 0-262143/*
bytes 262144-524287/*
bytes 524288-786431/786432
请求正文
BatchJobOpsService.wsdl 中指定的 XML 格式的操作。

断点续传网址的请求正文

BatchJobService 最终会将所有已上传的 XML 都连接到 uploadUrl,并将其作为单个请求来解析,因此您必须要注意只在第一个请求和最后一个请求分别包含开始和结束的 mutate 元素。

请求 开始的 mutate 元素 结束的 mutate 元素
第一个 复选标记 圆圈斜杠符号
中间 圆圈斜杠符号 圆圈斜杠符号
最后一个 圆圈斜杠符号 复选标记

此外,由于 BatchJobService 会将已上传的 XML 作为单个文档来解析,因此单个请求的请求正文无需包含完整的 XML 文档。例如,如果您上传 524305 字节 (256K + 256K + 17),您的请求可能如下所示:

请求 1

<?xml version="1.0" encoding="UTF-8"?>
<ns1:mutate xmlns:ns1="https://adwords.google.com/api/adwords/cm/v201702">
<operations xsi:type="ns1:CampaignOperation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <operator xsi:type="ns1:Operator">ADD</operator>
  <operand xsi:type="ns1:Campaign">
  ...
</operations>
<operations>
  ...
</operat

内容长度为 262144,最后一行中的“t”是第 262144 个字节。

请求 2

ions>
<operations xsi:type="ns1:AdGroupOperation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <operator xsi:type="ns1:Operator">ADD</operator>
  <operand xsi:type="ns1:AdGroup">
  ...
</operations>
<operations>
  ...
</ope

内容长度为 262144,最后一行中的“e”是第 262144 个字节。

请求 3

rations></mutate>
... (padded to 262144 bytes)

没有填充的内容长度为 17 字节,</mutate> 上的收尾 > 为第 17 个字节。填充后的内容总长度为 262144 字节。

最佳做法

在使用 BatchJobService 时请参考以下准则:

  • 要提高总处理能力,执行少量的大作业比执行大量的小作业要好。
  • 在提交同一 clientCustomerId 的多个并行作业时,尽量降低针对相同对象同时执行多个作业的可能性,同时保持大型作业的规模。如果针对同一组对象执行 mutate 操作的未完成作业(状态为 ACTIVECANCELING)过多,可能会出现死锁的情况,导致系统速度严重降低,甚至造成作业失败。
  • 避免在同一作业中有多个操作对同一对象执行 mutate 操作。
  • 为了提高吞吐量,请按操作类型对上传操作进行排序。例如,如果您的作业包含添加广告系列、广告组和广告组条件的操作,请在上传中对操作进行排序,使所有 CampaignOperations 排在最前,后跟所有 AdGroupOperations,最后是所有 AdGroupCriterionOperations
  • 不要过于频繁地轮询作业状态,否则可能发生速率限制错误

使用购物广告系列

使用 BatchJobService 更新产品划分树时,应遵循以下限制:

  1. 如果对产品划分树执行的操作列表会导致产品划分树结构无效(例如,在没有创建其他节点的情况下对节点进行了细分),则对该产品划分树执行的整个操作列表将失败。

  2. 不会对产品划分树进行结构性更改(例如对现有节点进行出价更改)的操作是独立执行的。

  3. 移除产品划分节点时,请将 AdGroupCriterion 对象的 criterion 字段设置为 ProductPartition 实例。将此字段设置为 Criterion 实例将导致操作失败,并出现 AdGroupCriterionError.CONCRETE_TYPE_REQUIRED 错误。

限制

  • 在任意给定时间,一个 AdWords 帐号中所有未完成的批量作业中的已上传操作最多不能超过 1 GB。一旦您的帐号达到此限制,当您试图添加新的批量作业时,系统将提示原因为 DISK_QUOTA_EXCEEDEDBatchJobError。如果出现这个错误,请耐心等待,直到待完成的已上传操作规模下降到限额以下,然后再创建新的作业。

代码示例

以下客户端库包含可演示上述所有功能的完整代码示例。

发送以下问题的反馈:

此网页
AdWords API
AdWords API
需要帮助?请访问我们的支持页面