部署连接器

Cloud Search 教程的这一页介绍了如何设置数据源和内容连接器,以便将数据编入索引。要从头开始学习本教程,请参阅 Cloud Search 入门教程

构建连接器

将您的工作目录更改为 cloud-search-samples/end-to-end/connector 目录并运行以下命令:

mvn package -DskipTests

该命令会下载构建内容连接器所需的依赖项,并编译代码。

创建服务账号凭据

连接器需要服务账号凭据才能调用 Cloud Search API。如需创建凭据,请执行以下操作:

  1. 返回 Google Cloud 控制台
  2. 在左侧导航栏中,点击凭据。系统随即会显示“凭据”页面。
  3. 点击 + CREATE CREDENTIALS(创建凭据)下拉列表,然后选择服务账号。系统随即会显示“创建服务账号”页面。
  4. 服务账号名称字段中,输入“tutorial”。
  5. 记下服务账号 ID 值(位于服务账号名称的右侧)。 稍后会用到此值。
  6. 点击创建。系统随即会显示“服务账号权限(可选)”对话框。
  7. 点击继续。系统随即会显示“向用户授予访问此服务账号的权限(可选)”对话框。
  8. 点击完成。系统随即会显示“凭据”屏幕。
  9. 在“服务账号”下,点击相应服务账号的电子邮件地址。系统随即会显示“服务账号详情”页面。
  10. 在“密钥”下,点击添加密钥下拉列表,然后选择创建新密钥。系统随即会显示“创建私钥”对话框。
  11. 点击创建
  12. (可选)如果出现“要允许通过 console.cloud.google.com 下载吗?”对话框,请点击允许
  13. 私钥文件会保存到您的计算机。记下下载文件的位置。此文件用于配置内容连接器,以便其在调用 Google Cloud Search API 时进行自我身份验证。

初始化第三方支持

您必须先初始化对 Google Cloud Search 的第三方支持,然后才能调用任何其他 Cloud Search API。

如需为 Cloud Search 初始化第三方支持,请执行以下操作:

  1. 您的 Cloud Search 平台项目包含服务账号凭据。不过,为了初始化第三方支持,您必须创建 Web 应用凭据。如需了解如何创建 Web 应用凭据,请参阅创建凭据。完成此步骤后,您应该会获得一个客户端 ID 和客户端密钥文件。

  2. 使用 Google 的 OAuth 2 Playground 获取访问令牌:

    1. 点击“设置”,然后选中使用您自己的身份验证凭据
    2. 输入第 1 步中的客户端 ID 和客户端密钥。
    3. 点击关闭
    4. 在“范围”字段中,输入 https://www.googleapis.com/auth/cloud_search.settings,然后点击授权。OAuth 2 Playground 会返回一个授权代码。
    5. 点击 Exchange authorization code for tokens。系统会返回一个令牌。
  3. 如需初始化对 Cloud Search 的第三方支持,请使用以下 curl 命令。请务必将 [YOUR_ACCESS_TOKEN] 替换为第 2 步中获取的令牌。

    curl --request POST \
    'https://cloudsearch.googleapis.com/v1:initializeCustomer' \
      --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '{}' \
      --compressed
    

    如果成功,响应正文将包含一个 operation 实例。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    }
    

    如果未能成功,请与 Cloud Search 支持团队联系。

  4. 使用 operations.get 验证第三方支持是否已初始化:

    curl \
    'https://cloudsearch.googleapis.com/v1/operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY?key=
    [YOUR_API_KEY]' \
    --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
    --header 'Accept: application/json' \
    --compressed
    

    第三方初始化完成后,它将包含设置为 true 的字段 done。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    done: true
    }
    

创建数据源

接下来,在管理控制台中创建数据源。数据源提供一个命名空间,以便使用连接器为内容编入索引。

  1. 打开 Google 管理控制台
  2. 点击“应用”图标。系统随即会显示“应用管理”页面。
  3. 点击 Google Workspace。系统随即会显示“Google Workspace 应用管理”页面。
  4. 向下滚动,然后点击 Cloud Search。系统随即会显示“Google Workspace 设置”页面。
  5. 点击第三方数据源。系统随即会显示“数据源”页面。
  6. 点击圆形黄色 +。系统随即会显示“添加新数据源”对话框。
  7. 显示名称字段中,输入“tutorial”。
  8. 服务账号电子邮件地址字段中,输入您在上一部分中创建的服务账号的电子邮件地址。如果您不知道服务账号的电子邮件地址,请在服务账号页面中查找该值。
  9. 点击添加。系统会显示“已成功创建数据源”对话框。
  10. 点击 *OK。记下新创建的数据源的来源 ID。来源 ID 用于配置内容连接器。

为 GitHub API 生成个人访问令牌

连接器需要经过身份验证才能访问 GitHub API,以便获得足够的配额。为简单起见,该连接器使用个人访问令牌,而不是 OAuth。个人令牌允许以具有一组有限权限(类似于 OAuth)的用户身份进行身份验证。

  1. 登录 GitHub。
  2. 点击右上角的个人资料照片。系统随即会显示一个下拉菜单。
  3. 点击设置
  4. 点击开发者设置
  5. 点击 Personal access token(个人访问令牌)。
  6. 点击 Generate Personal access token(生成个人访问令牌)。
  7. 备注字段中,输入“Cloud Search 教程”。
  8. 勾选 public_repo 范围。
  9. 点击 生成令牌
  10. 记下生成的令牌。连接器使用此令牌调用 GitHub API,并提供 API 配额以执行索引编制。

配置连接器

创建凭据和数据源后,请更新连接器配置以添加以下值:

  1. 在命令行中,将目录更改为 cloud-search-samples/end-to-end/connector/
  2. 使用文本编辑器打开 sample-config.properties 文件。
  3. api.serviceAccountPrivateKeyFile 参数设置为您之前下载的服务凭据的文件路径。
  4. api.sourceId 参数设置为您之前创建的数据源的 ID。
  5. github.user 参数设置为您的 GitHub 用户名。
  6. github.token 参数设置为您之前创建的访问令牌。
  7. 保存文件。

更新架构

该连接器会为结构化和非结构化内容编制索引。在为数据编制索引之前,您必须更新数据源的架构。运行以下命令以更新架构:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.SchemaTool \
    -Dexec.args="-Dconfig=sample-config.properties"

运行该连接器

如需运行连接器并开始编制索引,请运行以下命令:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.GithubConnector \
    -Dexec.args="-Dconfig=sample-config.properties"

该连接器的默认配置是将 googleworkspace 组织中的单个代码库编入索引。对代码库编制索引大约需要 1 分钟。 初始编入索引后,连接器会继续轮询存储库中需要反映在 Cloud Search 索引中的更改。

查看代码

其余部分将介绍如何构建连接器。

启动应用

连接器的入口点是 GithubConnector 类。main 方法会实例化 SDK 的 IndexingApplication 并启动它。

GithubConnector.java
/**
 * Main entry point for the connector. Creates and starts an indexing
 * application using the {@code ListingConnector} template and the sample's
 * custom {@code Repository} implementation.
 *
 * @param args program command line arguments
 * @throws InterruptedException thrown if an abort is issued during initialization
 */
public static void main(String[] args) throws InterruptedException {
  Repository repository = new GithubRepository();
  IndexingConnector connector = new ListingConnector(repository);
  IndexingApplication application = new IndexingApplication.Builder(connector, args)
      .build();
  application.start();
}

SDK 提供的 ListingConnector 会实现一种遍历策略,该策略利用 Cloud Search 队列跟踪索引中项的状态。它会委托由示例连接器实现的 GithubRepository 来访问 GitHub 中的内容。

遍历 GitHub 代码库

在完整遍历期间,系统会调用 getIds() 方法,以将可能需要编入索引的项推送到队列中。

该连接器可以为多个代码库或组织编制索引。为最大限度地减少失败的影响,系统会一次遍历一个 GitHub 代码库。系统会返回一个检查点,其中包含遍历结果,其中包含在后续调用 getIds() 时要编入索引的代码库列表。如果发生错误,编入索引会从当前代码库继续,而不是从开头开始。

GithubRepository.java
/**
 * Gets all of the existing item IDs from the data repository. While
 * multiple repositories are supported, only one repository is traversed
 * per call. The remaining repositories are saved in the checkpoint
 * are traversed on subsequent calls. This minimizes the amount of
 * data that needs to be reindex in the event of an error.
 *
 * <p>This method is called by {@link ListingConnector#traverse()} during
 * <em>full traversals</em>. Every document ID and metadata hash value in
 * the <em>repository</em> is pushed to the Cloud Search queue. Each pushed
 * document is later polled and processed in the {@link #getDoc(Item)} method.
 * <p>
 * The metadata hash values are pushed to aid document change detection. The
 * queue sets the document status depending on the hash comparison. If the
 * pushed ID doesn't yet exist in Cloud Search, the document's status is
 * set to <em>new</em>. If the ID exists but has a mismatched hash value,
 * its status is set to <em>modified</em>. If the ID exists and matches
 * the hash value, its status is unchanged.
 *
 * <p>In every case, the pushed content hash value is only used for
 * comparison. The hash value is only set in the queue during an
 * update (see {@link #getDoc(Item)}).
 *
 * @param checkpoint value defined and maintained by this connector
 * @return this is typically a {@link PushItems} instance
 */
@Override
public CheckpointCloseableIterable<ApiOperation> getIds(byte[] checkpoint)
    throws RepositoryException {
  List<String> repositories;
  // Decode the checkpoint if present to get the list of remaining
  // repositories to index.
  if (checkpoint != null) {
    try {
      FullTraversalCheckpoint decodedCheckpoint = FullTraversalCheckpoint
          .fromBytes(checkpoint);
      repositories = decodedCheckpoint.getRemainingRepositories();
    } catch (IOException e) {
      throw new RepositoryException.Builder()
          .setErrorMessage("Unable to deserialize checkpoint")
          .setCause(e)
          .build();
    }
  } else {
    // No previous checkpoint, scan for repositories to index
    // based on the connector configuration.
    try {
      repositories = scanRepositories();
    } catch (IOException e) {
      throw toRepositoryError(e, Optional.of("Unable to scan repositories"));
    }
  }

  if (repositories.isEmpty()) {
    // Nothing left to index. Reset the checkpoint to null so the
    // next full traversal starts from the beginning
    Collection<ApiOperation> empty = Collections.emptyList();
    return new CheckpointCloseableIterableImpl.Builder<>(empty)
        .setCheckpoint((byte[]) null)
        .setHasMore(false)
        .build();
  }

  // Still have more repositories to index. Pop the next repository to
  // index off the list. The remaining repositories make up the next
  // checkpoint.
  String repositoryToIndex = repositories.get(0);
  repositories = repositories.subList(1, repositories.size());

  try {
    log.info(() -> String.format("Traversing repository %s", repositoryToIndex));
    Collection<ApiOperation> items = collectRepositoryItems(repositoryToIndex);
    FullTraversalCheckpoint newCheckpoint = new FullTraversalCheckpoint(repositories);
    return new CheckpointCloseableIterableImpl.Builder<>(items)
        .setHasMore(true)
        .setCheckpoint(newCheckpoint.toBytes())
        .build();
  } catch (IOException e) {
    String errorMessage = String.format("Unable to traverse repo: %s",
        repositoryToIndex);
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

collectRepositoryItems() 方法可处理单个 GitHub 代码库的遍历。此方法会返回一个 ApiOperations 集合,表示要推送到队列中的项。项以资源名称和表示项当前状态的哈希值的形式推送。

该哈希值会在后续遍历 GitHub 代码库时使用。此值可提供轻量级检查,以确定内容是否已更改,而无需上传其他内容。连接器会盲目地将所有项加入队列。如果项是新的或哈希值已更改,则可在队列中进行轮询。否则,系统会将该项视为未修改。

GithubRepository.java
/**
 * Fetch IDs to  push in to the queue for all items in the repository.
 * Currently captures issues & content in the master branch.
 *
 * @param name Name of repository to index
 * @return Items to push into the queue for later indexing
 * @throws IOException if error reading issues
 */
private Collection<ApiOperation> collectRepositoryItems(String name)
    throws IOException {
  List<ApiOperation> operations = new ArrayList<>();
  GHRepository repo = github.getRepository(name);

  // Add the repository as an item to be indexed
  String metadataHash = repo.getUpdatedAt().toString();
  String resourceName = repo.getHtmlUrl().getPath();
  PushItem repositoryPushItem = new PushItem()
      .setMetadataHash(metadataHash);
  PushItems items = new PushItems.Builder()
      .addPushItem(resourceName, repositoryPushItem)
      .build();

  operations.add(items);
  // Add issues/pull requests & files
  operations.add(collectIssues(repo));
  operations.add(collectContent(repo));
  return operations;
}

处理队列

完全遍历完成后,连接器会开始轮询队列,以查找需要编入索引的项。系统会针对从队列中拉取的每项内容调用 getDoc() 方法。该方法会从 GitHub 读取项并将其转换为适当的表示法,以便编入索引。

由于连接器针对随时可能发生更改的实时数据运行,因此 getDoc() 还会验证队列中的项是否仍然有效,并从索引中删除不再存在的任何项。

GithubRepository.java
/**
 * Gets a single data repository item and indexes it if required.
 *
 * <p>This method is called by the {@link ListingConnector} during a poll
 * of the Cloud Search queue. Each queued item is processed
 * individually depending on its state in the data repository.
 *
 * @param item the data repository item to retrieve
 * @return the item's state determines which type of
 * {@link ApiOperation} is returned:
 * {@link RepositoryDoc}, {@link DeleteItem}, or {@link PushItem}
 */
@Override
public ApiOperation getDoc(Item item) throws RepositoryException {
  log.info(() -> String.format("Processing item: %s ", item.getName()));
  Object githubObject;
  try {
    // Retrieve the item from GitHub
    githubObject = getGithubObject(item.getName());
    if (githubObject instanceof GHRepository) {
      return indexItem((GHRepository) githubObject, item);
    } else if (githubObject instanceof GHPullRequest) {
      return indexItem((GHPullRequest) githubObject, item);
    } else if (githubObject instanceof GHIssue) {
      return indexItem((GHIssue) githubObject, item);
    } else if (githubObject instanceof GHContent) {
      return indexItem((GHContent) githubObject, item);
    } else {
      String errorMessage = String.format("Unexpected item received: %s",
          item.getName());
      throw new RepositoryException.Builder()
          .setErrorMessage(errorMessage)
          .setErrorType(RepositoryException.ErrorType.UNKNOWN)
          .build();
    }
  } catch (FileNotFoundException e) {
    log.info(() -> String.format("Deleting item: %s ", item.getName()));
    return ApiOperations.deleteItem(item.getName());
  } catch (IOException e) {
    String errorMessage = String.format("Unable to retrieve item: %s",
        item.getName());
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

对于连接器编入索引的每个 GitHub 对象,相应的 indexItem() 方法都会负责为 Cloud Search 构建项表示法。例如,如需构建内容项的表示法,请执行以下操作:

GithubRepository.java
/**
 * Build the ApiOperation to index a content item (file).
 *
 * @param content      Content item to index
 * @param previousItem Previous item state in the index
 * @return ApiOperation (RepositoryDoc if indexing,  PushItem if not modified)
 * @throws IOException if unable to create operation
 */
private ApiOperation indexItem(GHContent content, Item previousItem)
    throws IOException {
  String metadataHash = content.getSha();

  // If previously indexed and unchanged, just requeue as unmodified
  if (canSkipIndexing(previousItem, metadataHash)) {
    return notModified(previousItem.getName());
  }

  String resourceName = new URL(content.getHtmlUrl()).getPath();
  FieldOrValue<String> title = FieldOrValue.withValue(content.getName());
  FieldOrValue<String> url = FieldOrValue.withValue(content.getHtmlUrl());

  String containerName = content.getOwner().getHtmlUrl().getPath();
  String programmingLanguage = FileExtensions.getLanguageForFile(content.getName());

  // Structured data based on the schema
  Multimap<String, Object> structuredData = ArrayListMultimap.create();
  structuredData.put("organization", content.getOwner().getOwnerName());
  structuredData.put("repository", content.getOwner().getName());
  structuredData.put("path", content.getPath());
  structuredData.put("language", programmingLanguage);

  Item item = IndexingItemBuilder.fromConfiguration(resourceName)
      .setTitle(title)
      .setContainerName(containerName)
      .setSourceRepositoryUrl(url)
      .setItemType(IndexingItemBuilder.ItemType.CONTAINER_ITEM)
      .setObjectType("file")
      .setValues(structuredData)
      .setVersion(Longs.toByteArray(System.currentTimeMillis()))
      .setHash(content.getSha())
      .build();

  // Index the file content too
  String mimeType = FileTypeMap.getDefaultFileTypeMap()
      .getContentType(content.getName());
  AbstractInputStreamContent fileContent = new InputStreamContent(
      mimeType, content.read())
      .setLength(content.getSize())
      .setCloseInputStream(true);
  return new RepositoryDoc.Builder()
      .setItem(item)
      .setContent(fileContent, IndexingService.ContentFormat.RAW)
      .setRequestMode(IndexingService.RequestMode.SYNCHRONOUS)
      .build();
}

接下来,部署搜索界面。

上一个 下一个