部署连接器

Cloud Search 教程的这一页面介绍了如何设置数据源和内容连接器,以便将数据编入索引。要从头开始学习本教程,请参阅 Cloud Search 使用入门教程

构建连接器

将您的工作目录更改为 cloud-search-samples/end-to-end/connector 目录,然后运行以下命令:

mvn package -DskipTests

该命令会下载构建内容连接器所需的依赖项并编译代码。

创建服务账号凭据

连接器需要服务帐号凭据才能调用 Cloud Search API。如需创建凭据,请执行以下操作:

  1. 返回 Google Cloud 控制台
  2. 在左侧导航栏中,点击凭据。系统会显示“凭据”页面。
  3. 点击 + 创建凭据下拉列表,然后选择服务帐号。随即会出现“创建服务帐号”页面。
  4. 服务帐号名称字段中,输入“tutorial”。
  5. 记下服务帐号 ID 值(紧跟在服务帐号名称后面)。稍后会使用此值。
  6. 点击创建。此时将显示“服务帐号权限(可选)”对话框。
  7. 点击继续。此时将显示“向用户授予访问此服务帐号的权限(可选)”对话框。
  8. 点击完成。系统会显示“凭据”屏幕。
  9. 在“服务帐号”下方,点击相应服务帐号电子邮件地址。“服务帐号详情”页面显示。
  10. 在“密钥”下,点击添加密钥下拉列表,然后选择创建新密钥。此时会显示“创建私钥”对话框。
  11. 点击创建
  12. (可选)如果出现“要允许通过 console.cloud.google.com 下载吗?”对话框,请点击允许
  13. 私钥文件已保存到您的计算机。记下下载文件的位置。此文件用于配置内容连接器,以便在调用 Google Cloud Search API 时进行自我身份验证。

初始化第三方支持

您必须先初始化对 Google Cloud Search 的第三方支持,然后才能调用任何其他 Cloud Search API。

如需初始化对 Cloud Search 的第三方支持,请执行以下操作:

  1. 您的 Cloud Search 平台项目包含服务帐号凭据。 但是,为了初始化第三方支持,您必须创建 Web 应用凭据。如需了解如何创建 Web 应用凭据,请参阅创建凭据。完成此步骤后,您应该得到一个客户端 ID 和客户端密钥文件。

  2. 使用 Google 的 OAuth 2 Playground 获取访问令牌:

    1. 点击“Settings”(设置),然后选中 User your own auth credentials(使用您自己的身份验证凭据)。
    2. 输入第 1 步中的客户端 ID 和客户端密钥。
    3. 点击关闭
    4. 在“范围”字段中,输入 https://www.googleapis.com/auth/cloud_search.settings,然后点击授权。OAuth 2 Playground 会返回授权代码。
    5. 点击交换令牌的授权代码。返回令牌。
  3. 如需初始化对 Cloud Search 的第三方支持,请使用以下 curl 命令。请务必将 [YOUR_ACCESS_TOKEN] 替换为第 2 步中获得的令牌。

    curl --request POST \
    'https://cloudsearch.googleapis.com/v1:initializeCustomer' \
      --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '{}' \
      --compressed
    

    如果成功,响应正文将包含一个 operation 实例。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    }
    

    如果操作失败,请与 Cloud Search 支持团队联系。

  4. 使用 operations.get 验证第三方支持是否已初始化:

    curl \
    'https://cloudsearch.googleapis.com/v1/operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY?key=
    [YOUR_API_KEY]' \
    --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
    --header 'Accept: application/json' \
    --compressed
    

    第三方初始化完成后,它会包含设置为 true 的字段 done。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    done: true
    }
    

创建数据源

接下来,在管理控制台中创建数据源。数据源提供了一个命名空间,以便使用连接器将内容编入索引。

  1. 打开 Google 管理控制台
  2. 点击“应用”图标。系统会显示“Google Apps 管理”页面。
  3. 点击 Google Workspace。系统随即会显示“Google Workspace 管理应用”页面。
  4. 向下滚动,然后点击 Cloud Search。系统会显示“Google Workspace 设置”页面。
  5. 点击第三方数据源。系统会显示“数据源”页面。
  6. 点击黄色圆角 +。系统随即会显示“添加新数据源”对话框。
  7. 显示名称字段中,输入“tutorial”。
  8. 服务帐号电子邮件地址字段中,输入您在上一部分中创建的服务帐号的电子邮件地址。如果您不知道服务帐号的电子邮件地址,请在服务帐号页面中查找该值。
  9. 点击添加。此时将显示“已成功创建数据源”对话框。
  10. 点击 *OK。记下新创建的数据源的来源 ID。来源 ID 用于配置内容连接器。

为 GitHub API 生成个人访问令牌

连接器需要以经过身份验证的方式访问 GitHub API,以便获得足够的配额。为简单起见,该连接器使用个人访问令牌(而不是 OAuth)。个人令牌允许以用户身份进行身份验证,且具有类似于 OAuth 的一组有限权限。

  1. 登录 GitHub。
  2. 点击右上角的个人资料照片。系统随即会显示一个下拉菜单。
  3. 点击设置
  4. 点击开发者设置
  5. 点击个人访问令牌
  6. 点击 Generate personal access token
  7. 注意字段中,输入“Cloud Search tutorial”。
  8. 检查 public_repo 范围。
  9. 点击 生成令牌
  10. 记下生成的令牌。连接器使用它来调用 GitHub API 并提供 API 配额来执行索引编制。

配置连接器

创建凭据和数据源后,请更新连接器配置以包含这些值:

  1. 在命令行中,将目录更改为 cloud-search-samples/end-to-end/connector/
  2. 使用文本编辑器打开 sample-config.properties 文件。
  3. api.serviceAccountPrivateKeyFile 参数设置为您之前下载的服务凭据的文件路径。
  4. api.sourceId 参数设置为您之前创建的数据源的 ID。
  5. github.user 参数设置为您的 GitHub 用户名。
  6. github.token 参数设置为您之前创建的访问令牌。
  7. 保存文件。

更新架构

连接器会将结构化和非结构化内容编入索引。在将数据编入索引之前,您必须先更新数据源的架构。运行以下命令以更新架构:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.SchemaTool \
    -Dexec.args="-Dconfig=sample-config.properties"

运行该连接器

要运行连接器并开始编制索引,请运行以下命令:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.GithubConnector \
    -Dexec.args="-Dconfig=sample-config.properties"

连接器的默认配置是将 googleworkspace 组织中的单个代码库编入索引。将代码库编入索引大约需要 1 分钟。完成初始索引后,连接器会继续轮询代码库,以确定需要反映在 Cloud Search 索引中的更改。

查看代码

其余部分将介绍连接器的构建方式。

启动应用

连接器的入口点是 GithubConnector 类。main 方法会实例化 SDK 的 IndexingApplication 并启动它。

GithubConnector.java
/**
 * Main entry point for the connector. Creates and starts an indexing
 * application using the {@code ListingConnector} template and the sample's
 * custom {@code Repository} implementation.
 *
 * @param args program command line arguments
 * @throws InterruptedException thrown if an abort is issued during initialization
 */
public static void main(String[] args) throws InterruptedException {
  Repository repository = new GithubRepository();
  IndexingConnector connector = new ListingConnector(repository);
  IndexingApplication application = new IndexingApplication.Builder(connector, args)
      .build();
  application.start();
}

SDK 提供的 ListingConnector 实现了遍历策略,该策略利用 Cloud Search 队列跟踪索引中项的状态。它会委托由示例连接器实现的 GithubRepository,以访问 GitHub 中的内容。

遍历 GitHub 代码库

在完全遍历期间,系统会调用 getIds() 方法,将可能需要编入索引的项推送到队列中。

连接器可以将多个代码库或组织编入索引。为了尽量减少故障的影响,一次仅遍历一个 GitHub 代码库。系统会返回检查点以及遍历结果,其中包含在后续调用 getIds() 中要编入索引的代码库列表。如果出现错误,索引会在当前仓库中恢复,而不是从头开始。

GithubRepository.java
/**
 * Gets all of the existing item IDs from the data repository. While
 * multiple repositories are supported, only one repository is traversed
 * per call. The remaining repositories are saved in the checkpoint
 * are traversed on subsequent calls. This minimizes the amount of
 * data that needs to be reindex in the event of an error.
 *
 * <p>This method is called by {@link ListingConnector#traverse()} during
 * <em>full traversals</em>. Every document ID and metadata hash value in
 * the <em>repository</em> is pushed to the Cloud Search queue. Each pushed
 * document is later polled and processed in the {@link #getDoc(Item)} method.
 * <p>
 * The metadata hash values are pushed to aid document change detection. The
 * queue sets the document status depending on the hash comparison. If the
 * pushed ID doesn't yet exist in Cloud Search, the document's status is
 * set to <em>new</em>. If the ID exists but has a mismatched hash value,
 * its status is set to <em>modified</em>. If the ID exists and matches
 * the hash value, its status is unchanged.
 *
 * <p>In every case, the pushed content hash value is only used for
 * comparison. The hash value is only set in the queue during an
 * update (see {@link #getDoc(Item)}).
 *
 * @param checkpoint value defined and maintained by this connector
 * @return this is typically a {@link PushItems} instance
 */
@Override
public CheckpointCloseableIterable<ApiOperation> getIds(byte[] checkpoint)
    throws RepositoryException {
  List<String> repositories;
  // Decode the checkpoint if present to get the list of remaining
  // repositories to index.
  if (checkpoint != null) {
    try {
      FullTraversalCheckpoint decodedCheckpoint = FullTraversalCheckpoint
          .fromBytes(checkpoint);
      repositories = decodedCheckpoint.getRemainingRepositories();
    } catch (IOException e) {
      throw new RepositoryException.Builder()
          .setErrorMessage("Unable to deserialize checkpoint")
          .setCause(e)
          .build();
    }
  } else {
    // No previous checkpoint, scan for repositories to index
    // based on the connector configuration.
    try {
      repositories = scanRepositories();
    } catch (IOException e) {
      throw toRepositoryError(e, Optional.of("Unable to scan repositories"));
    }
  }

  if (repositories.isEmpty()) {
    // Nothing left to index. Reset the checkpoint to null so the
    // next full traversal starts from the beginning
    Collection<ApiOperation> empty = Collections.emptyList();
    return new CheckpointCloseableIterableImpl.Builder<>(empty)
        .setCheckpoint((byte[]) null)
        .setHasMore(false)
        .build();
  }

  // Still have more repositories to index. Pop the next repository to
  // index off the list. The remaining repositories make up the next
  // checkpoint.
  String repositoryToIndex = repositories.get(0);
  repositories = repositories.subList(1, repositories.size());

  try {
    log.info(() -> String.format("Traversing repository %s", repositoryToIndex));
    Collection<ApiOperation> items = collectRepositoryItems(repositoryToIndex);
    FullTraversalCheckpoint newCheckpoint = new FullTraversalCheckpoint(repositories);
    return new CheckpointCloseableIterableImpl.Builder<>(items)
        .setHasMore(true)
        .setCheckpoint(newCheckpoint.toBytes())
        .build();
  } catch (IOException e) {
    String errorMessage = String.format("Unable to traverse repo: %s",
        repositoryToIndex);
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

collectRepositoryItems() 方法可处理单个 GitHub 代码库的遍历。此方法会返回 ApiOperations 的集合,表示要推送到队列的项。系统会以资源名称和表示项当前状态的哈希值的形式推送项。

该哈希值用于 GitHub 代码库的后续遍历。此值提供了一种轻量级检查,用于确定内容是否已更改,而无需上传其他内容。连接器会盲目将所有项排入队列。如果该项是新项或哈希值已更改,就可以在队列中进行轮询。否则,系统会认为该内容未经修改。

GithubRepository.java
/**
 * Fetch IDs to  push in to the queue for all items in the repository.
 * Currently captures issues & content in the master branch.
 *
 * @param name Name of repository to index
 * @return Items to push into the queue for later indexing
 * @throws IOException if error reading issues
 */
private Collection<ApiOperation> collectRepositoryItems(String name)
    throws IOException {
  List<ApiOperation> operations = new ArrayList<>();
  GHRepository repo = github.getRepository(name);

  // Add the repository as an item to be indexed
  String metadataHash = repo.getUpdatedAt().toString();
  String resourceName = repo.getHtmlUrl().getPath();
  PushItem repositoryPushItem = new PushItem()
      .setMetadataHash(metadataHash);
  PushItems items = new PushItems.Builder()
      .addPushItem(resourceName, repositoryPushItem)
      .build();

  operations.add(items);
  // Add issues/pull requests & files
  operations.add(collectIssues(repo));
  operations.add(collectContent(repo));
  return operations;
}

处理队列

完全遍历完成后,连接器开始轮询队列以获取需要编入索引的项。系统会针对从队列中拉取的每项内容调用 getDoc() 方法。该方法会从 GitHub 中读取项并将其转换为适当的表示法,以便编入索引。

当连接器针对可能随时更改的实时数据运行时,getDoc() 还会验证队列中的项是否仍然有效,并从索引中删除不再存在的项。

GithubRepository.java
/**
 * Gets a single data repository item and indexes it if required.
 *
 * <p>This method is called by the {@link ListingConnector} during a poll
 * of the Cloud Search queue. Each queued item is processed
 * individually depending on its state in the data repository.
 *
 * @param item the data repository item to retrieve
 * @return the item's state determines which type of
 * {@link ApiOperation} is returned:
 * {@link RepositoryDoc}, {@link DeleteItem}, or {@link PushItem}
 */
@Override
public ApiOperation getDoc(Item item) throws RepositoryException {
  log.info(() -> String.format("Processing item: %s ", item.getName()));
  Object githubObject;
  try {
    // Retrieve the item from GitHub
    githubObject = getGithubObject(item.getName());
    if (githubObject instanceof GHRepository) {
      return indexItem((GHRepository) githubObject, item);
    } else if (githubObject instanceof GHPullRequest) {
      return indexItem((GHPullRequest) githubObject, item);
    } else if (githubObject instanceof GHIssue) {
      return indexItem((GHIssue) githubObject, item);
    } else if (githubObject instanceof GHContent) {
      return indexItem((GHContent) githubObject, item);
    } else {
      String errorMessage = String.format("Unexpected item received: %s",
          item.getName());
      throw new RepositoryException.Builder()
          .setErrorMessage(errorMessage)
          .setErrorType(RepositoryException.ErrorType.UNKNOWN)
          .build();
    }
  } catch (FileNotFoundException e) {
    log.info(() -> String.format("Deleting item: %s ", item.getName()));
    return ApiOperations.deleteItem(item.getName());
  } catch (IOException e) {
    String errorMessage = String.format("Unable to retrieve item: %s",
        item.getName());
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

对于连接器编入索引的每个 GitHub 对象,相应的 indexItem() 方法会处理为 Cloud Search 构建项表示法。例如,如需为内容项构建表示形式,请使用以下代码:

GithubRepository.java
/**
 * Build the ApiOperation to index a content item (file).
 *
 * @param content      Content item to index
 * @param previousItem Previous item state in the index
 * @return ApiOperation (RepositoryDoc if indexing,  PushItem if not modified)
 * @throws IOException if unable to create operation
 */
private ApiOperation indexItem(GHContent content, Item previousItem)
    throws IOException {
  String metadataHash = content.getSha();

  // If previously indexed and unchanged, just requeue as unmodified
  if (canSkipIndexing(previousItem, metadataHash)) {
    return notModified(previousItem.getName());
  }

  String resourceName = new URL(content.getHtmlUrl()).getPath();
  FieldOrValue<String> title = FieldOrValue.withValue(content.getName());
  FieldOrValue<String> url = FieldOrValue.withValue(content.getHtmlUrl());

  String containerName = content.getOwner().getHtmlUrl().getPath();
  String programmingLanguage = FileExtensions.getLanguageForFile(content.getName());

  // Structured data based on the schema
  Multimap<String, Object> structuredData = ArrayListMultimap.create();
  structuredData.put("organization", content.getOwner().getOwnerName());
  structuredData.put("repository", content.getOwner().getName());
  structuredData.put("path", content.getPath());
  structuredData.put("language", programmingLanguage);

  Item item = IndexingItemBuilder.fromConfiguration(resourceName)
      .setTitle(title)
      .setContainerName(containerName)
      .setSourceRepositoryUrl(url)
      .setItemType(IndexingItemBuilder.ItemType.CONTAINER_ITEM)
      .setObjectType("file")
      .setValues(structuredData)
      .setVersion(Longs.toByteArray(System.currentTimeMillis()))
      .setHash(content.getSha())
      .build();

  // Index the file content too
  String mimeType = FileTypeMap.getDefaultFileTypeMap()
      .getContentType(content.getName());
  AbstractInputStreamContent fileContent = new InputStreamContent(
      mimeType, content.read())
      .setLength(content.getSize())
      .setCloseInputStream(true);
  return new RepositoryDoc.Builder()
      .setItem(item)
      .setContent(fileContent, IndexingService.ContentFormat.RAW)
      .setRequestMode(IndexingService.RequestMode.SYNCHRONOUS)
      .build();
}

接下来,部署搜索界面。

上一个 下一个