部署连接器

Cloud Search 教程的这一页介绍了如何设置数据源和内容连接器,以便将数据编入索引。要从头开始学习本教程,请参阅 Cloud Search 入门教程

构建连接器

将您的工作目录更改为 cloud-search-samples/end-to-end/connector 目录并运行以下命令:

mvn package -DskipTests

该命令会下载构建内容连接器所需的依赖项并编译代码。

创建服务账号凭据

连接器需要使用服务帐号凭据才能调用 Cloud Search API。如需创建凭据,请执行以下操作:

  1. 返回 Google Cloud 控制台
  2. 在左侧导航栏中,点击凭据。系统随即会显示“凭据”页面。
  3. 点击 + 创建凭据下拉列表,然后选择服务帐号。系统随即会显示“创建服务帐号”页面。
  4. 服务帐号名称字段中,输入“教程”。
  5. 记下服务帐号 ID 值(紧跟在服务帐号名称后面)。 此值稍后使用。
  6. 点击创建。此时将显示“服务帐号权限(可选)”对话框。
  7. 点击继续。此时将显示“向用户授予访问此服务帐号的权限(可选)”对话框。
  8. 点击完成。此时会显示“凭据”屏幕。
  9. 在“服务帐号”下,点击服务帐号电子邮件。系统随即会显示“服务账号详情”页面。
  10. 在“密钥”下,点击添加密钥下拉列表,然后选择创建新密钥。此时会显示“创建私钥”对话框。
  11. 点击创建
  12. (可选)如果出现“要允许通过 console.cloud.google.com 下载吗?”对话框,请点击允许
  13. 私钥文件会保存到您的计算机中。请记下所下载文件的位置。此文件用于配置内容连接器,以便在调用 Google Cloud Search API 时对自身进行身份验证。

初始化第三方支持

您必须先初始化对 Google Cloud Search 的第三方支持,然后才能调用任何其他 Cloud Search API。

要初始化对 Cloud Search 的第三方支持,请执行以下操作:

  1. 您的 Cloud Search 平台项目包含服务帐号凭据。但是,为了初始化第三方支持,您必须创建 Web 应用凭据。如需了解如何创建 Web 应用凭据,请参阅创建凭据。完成此步骤后,您应该会获得客户端 ID 和客户端密钥文件。

  2. 使用 Google 的 OAuth 2 Playground 获取访问令牌:

    1. 点击“设置”,然后选中 User your own auth credentials(使用您自己的身份验证凭据)。
    2. 输入第 1 步中的客户端 ID 和客户端密钥。
    3. 点击关闭
    4. 在“范围”字段中,输入 https://www.googleapis.com/auth/cloud_search.settings,然后点击授权。OAuth 2 Playground 会返回授权代码。
    5. 点击交换令牌的授权代码。系统会返回一个令牌。
  3. 要初始化第三方对 Cloud Search 的支持,请使用以下 curl 命令。请务必将 [YOUR_ACCESS_TOKEN] 替换为在第 2 步中获得的令牌。

    curl --request POST \
    'https://cloudsearch.googleapis.com/v1:initializeCustomer' \
      --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '{}' \
      --compressed
    

    如果成功,响应正文将包含一个 operation 实例。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    }
    

    如果操作失败,请与 Cloud Search 支持团队联系。

  4. 使用 operations.get 可验证第三方支持是否已初始化:

    curl \
    'https://cloudsearch.googleapis.com/v1/operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY?key=
    [YOUR_API_KEY]' \
    --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
    --header 'Accept: application/json' \
    --compressed
    

    第三方初始化完成后,它会包含设置为 true 的字段 done。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    done: true
    }
    

创建数据源

接下来,在管理控制台中创建数据源。数据源会提供一个命名空间,以便使用连接器将内容编入索引。

  1. 打开 Google 管理控制台
  2. 点击“应用”图标。系统会显示“应用管理”页面。
  3. 点击 Google Workspace。系统会显示“Google Workspace 应用管理”页面。
  4. 向下滚动,然后点击 Cloud Search。系统随即会显示“Google Workspace 设置”页面。
  5. 点击第三方数据源。系统随即会显示“数据源”页面。
  6. 点击黄色圆形 +,随即会出现“添加新数据源”对话框。
  7. 显示名称字段中,输入“tutorials”。
  8. 服务帐号电子邮件地址字段中,输入您在上一部分中创建的服务帐号的电子邮件地址。如果您不知道服务帐号的电子邮件地址,请在服务帐号页面中查找该值。
  9. 点击添加。系统会显示“已成功创建数据源”对话框。
  10. 点击 *OK。记下新创建的数据源的来源 ID。来源 ID 用于配置内容连接器。

为 GitHub API 生成个人访问令牌

连接器需要以经过身份验证的方式访问 GitHub API 才能获得足够的配额。为简单起见,该连接器使用个人访问令牌而非 OAuth。个人令牌允许使用类似于 OAuth 的一组有限权限以用户身份进行身份验证。

  1. 登录 GitHub。
  2. 点击右上角的个人资料照片。系统随即会显示一个下拉菜单。
  3. 点击设置
  4. 点击开发者设置
  5. 点击 Personal access token(个人访问令牌)。
  6. 点击 Generate Personal access token(生成个人访问令牌)。
  7. 备注字段中,输入“Cloud Search 教程”。
  8. 检查 public_repo 范围。
  9. 点击 生成令牌
  10. 记下生成的令牌。连接器使用该 API 调用 GitHub API,并提供用于执行索引编制的 API 配额。

配置连接器

创建凭据和数据源后,请更新连接器配置以包含这些值:

  1. 在命令行中,将目录更改为 cloud-search-samples/end-to-end/connector/
  2. 使用文本编辑器打开 sample-config.properties 文件。
  3. api.serviceAccountPrivateKeyFile 参数设置为您之前下载的服务凭据的文件路径。
  4. api.sourceId 参数设置为您之前创建的数据源的 ID。
  5. github.user 参数设置为您的 GitHub 用户名。
  6. github.token 参数设置为您之前创建的访问令牌。
  7. 保存文件。

更新架构

连接器会将结构化和非结构化内容编入索引。在将数据编入索引之前,您必须先更新数据源的架构。运行以下命令以更新架构:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.SchemaTool \
    -Dexec.args="-Dconfig=sample-config.properties"

运行该连接器

如需运行连接器并开始编制索引,请运行以下命令:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.GithubConnector \
    -Dexec.args="-Dconfig=sample-config.properties"

连接器的默认配置是将 googleworkspace 组织中的单个代码库编入索引。将代码库编入索引大约需要 1 分钟。 初始索引编制后,连接器会继续轮询需要反映在 Cloud Search 索引中的代码库更改。

查看代码

其余部分将介绍如何构建连接器。

启动应用

连接器的入口点是 GithubConnector 类。main 方法会实例化 SDK 的 IndexingApplication 并启动它。

GithubConnector.java
/**
 * Main entry point for the connector. Creates and starts an indexing
 * application using the {@code ListingConnector} template and the sample's
 * custom {@code Repository} implementation.
 *
 * @param args program command line arguments
 * @throws InterruptedException thrown if an abort is issued during initialization
 */
public static void main(String[] args) throws InterruptedException {
  Repository repository = new GithubRepository();
  IndexingConnector connector = new ListingConnector(repository);
  IndexingApplication application = new IndexingApplication.Builder(connector, args)
      .build();
  application.start();
}

SDK 提供的 ListingConnector 实现了一种遍历策略,该策略利用 Cloud Search 队列跟踪索引中的项的状态。它会委托给示例连接器实现的 GithubRepository,以访问 GitHub 中的内容。

遍历 GitHub 代码库

在完全遍历期间,系统会调用 getIds() 方法,将可能需要编入索引的项推送到队列中。

连接器可以将多个代码库或组织编入索引。为了尽量减少故障造成的影响,系统一次会遍历一个 GitHub 代码库。系统会返回一个检查点以及遍历结果,其中包含要在后续调用 getIds() 中编入索引的代码库的列表。如果发生错误,系统会在当前代码库中恢复索引,而不是从头开始。

GithubRepository.java
/**
 * Gets all of the existing item IDs from the data repository. While
 * multiple repositories are supported, only one repository is traversed
 * per call. The remaining repositories are saved in the checkpoint
 * are traversed on subsequent calls. This minimizes the amount of
 * data that needs to be reindex in the event of an error.
 *
 * <p>This method is called by {@link ListingConnector#traverse()} during
 * <em>full traversals</em>. Every document ID and metadata hash value in
 * the <em>repository</em> is pushed to the Cloud Search queue. Each pushed
 * document is later polled and processed in the {@link #getDoc(Item)} method.
 * <p>
 * The metadata hash values are pushed to aid document change detection. The
 * queue sets the document status depending on the hash comparison. If the
 * pushed ID doesn't yet exist in Cloud Search, the document's status is
 * set to <em>new</em>. If the ID exists but has a mismatched hash value,
 * its status is set to <em>modified</em>. If the ID exists and matches
 * the hash value, its status is unchanged.
 *
 * <p>In every case, the pushed content hash value is only used for
 * comparison. The hash value is only set in the queue during an
 * update (see {@link #getDoc(Item)}).
 *
 * @param checkpoint value defined and maintained by this connector
 * @return this is typically a {@link PushItems} instance
 */
@Override
public CheckpointCloseableIterable<ApiOperation> getIds(byte[] checkpoint)
    throws RepositoryException {
  List<String> repositories;
  // Decode the checkpoint if present to get the list of remaining
  // repositories to index.
  if (checkpoint != null) {
    try {
      FullTraversalCheckpoint decodedCheckpoint = FullTraversalCheckpoint
          .fromBytes(checkpoint);
      repositories = decodedCheckpoint.getRemainingRepositories();
    } catch (IOException e) {
      throw new RepositoryException.Builder()
          .setErrorMessage("Unable to deserialize checkpoint")
          .setCause(e)
          .build();
    }
  } else {
    // No previous checkpoint, scan for repositories to index
    // based on the connector configuration.
    try {
      repositories = scanRepositories();
    } catch (IOException e) {
      throw toRepositoryError(e, Optional.of("Unable to scan repositories"));
    }
  }

  if (repositories.isEmpty()) {
    // Nothing left to index. Reset the checkpoint to null so the
    // next full traversal starts from the beginning
    Collection<ApiOperation> empty = Collections.emptyList();
    return new CheckpointCloseableIterableImpl.Builder<>(empty)
        .setCheckpoint((byte[]) null)
        .setHasMore(false)
        .build();
  }

  // Still have more repositories to index. Pop the next repository to
  // index off the list. The remaining repositories make up the next
  // checkpoint.
  String repositoryToIndex = repositories.get(0);
  repositories = repositories.subList(1, repositories.size());

  try {
    log.info(() -> String.format("Traversing repository %s", repositoryToIndex));
    Collection<ApiOperation> items = collectRepositoryItems(repositoryToIndex);
    FullTraversalCheckpoint newCheckpoint = new FullTraversalCheckpoint(repositories);
    return new CheckpointCloseableIterableImpl.Builder<>(items)
        .setHasMore(true)
        .setCheckpoint(newCheckpoint.toBytes())
        .build();
  } catch (IOException e) {
    String errorMessage = String.format("Unable to traverse repo: %s",
        repositoryToIndex);
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

collectRepositoryItems() 方法可处理单个 GitHub 代码库的遍历。此方法会返回 ApiOperations 的集合,表示要推送到队列中的项。这些项作为资源名称和表示项的当前状态的哈希值被推送。

哈希值将在 GitHub 代码库的后续遍历中使用。此值可以进行简单检查,轻松确定内容是否发生了变化,而无需上传其他内容。连接器盲目地将所有项排入队列。如果项是新的或哈希值已更改,则可用于在队列中进行轮询。否则,该项目会被视为未经修改。

GithubRepository.java
/**
 * Fetch IDs to  push in to the queue for all items in the repository.
 * Currently captures issues & content in the master branch.
 *
 * @param name Name of repository to index
 * @return Items to push into the queue for later indexing
 * @throws IOException if error reading issues
 */
private Collection<ApiOperation> collectRepositoryItems(String name)
    throws IOException {
  List<ApiOperation> operations = new ArrayList<>();
  GHRepository repo = github.getRepository(name);

  // Add the repository as an item to be indexed
  String metadataHash = repo.getUpdatedAt().toString();
  String resourceName = repo.getHtmlUrl().getPath();
  PushItem repositoryPushItem = new PushItem()
      .setMetadataHash(metadataHash);
  PushItems items = new PushItems.Builder()
      .addPushItem(resourceName, repositoryPushItem)
      .build();

  operations.add(items);
  // Add issues/pull requests & files
  operations.add(collectIssues(repo));
  operations.add(collectContent(repo));
  return operations;
}

处理队列

完全遍历完成后,连接器开始轮询队列以获取需要编入索引的项。系统会针对从队列中拉取的每项内容调用 getDoc() 方法。该方法会从 GitHub 读取项并将其转换为适当的表示法,以便编入索引。

当连接器针对可能随时更改的实时数据运行时,getDoc() 还会验证队列中的项是否仍然有效,并从索引中删除任何不再存在的项。

GithubRepository.java
/**
 * Gets a single data repository item and indexes it if required.
 *
 * <p>This method is called by the {@link ListingConnector} during a poll
 * of the Cloud Search queue. Each queued item is processed
 * individually depending on its state in the data repository.
 *
 * @param item the data repository item to retrieve
 * @return the item's state determines which type of
 * {@link ApiOperation} is returned:
 * {@link RepositoryDoc}, {@link DeleteItem}, or {@link PushItem}
 */
@Override
public ApiOperation getDoc(Item item) throws RepositoryException {
  log.info(() -> String.format("Processing item: %s ", item.getName()));
  Object githubObject;
  try {
    // Retrieve the item from GitHub
    githubObject = getGithubObject(item.getName());
    if (githubObject instanceof GHRepository) {
      return indexItem((GHRepository) githubObject, item);
    } else if (githubObject instanceof GHPullRequest) {
      return indexItem((GHPullRequest) githubObject, item);
    } else if (githubObject instanceof GHIssue) {
      return indexItem((GHIssue) githubObject, item);
    } else if (githubObject instanceof GHContent) {
      return indexItem((GHContent) githubObject, item);
    } else {
      String errorMessage = String.format("Unexpected item received: %s",
          item.getName());
      throw new RepositoryException.Builder()
          .setErrorMessage(errorMessage)
          .setErrorType(RepositoryException.ErrorType.UNKNOWN)
          .build();
    }
  } catch (FileNotFoundException e) {
    log.info(() -> String.format("Deleting item: %s ", item.getName()));
    return ApiOperations.deleteItem(item.getName());
  } catch (IOException e) {
    String errorMessage = String.format("Unable to retrieve item: %s",
        item.getName());
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

对于连接器编入索引的每个 GitHub 对象,相应的 indexItem() 方法会负责为 Cloud Search 构建项表示法。例如,如需为内容项构建表示法,请执行以下操作:

GithubRepository.java
/**
 * Build the ApiOperation to index a content item (file).
 *
 * @param content      Content item to index
 * @param previousItem Previous item state in the index
 * @return ApiOperation (RepositoryDoc if indexing,  PushItem if not modified)
 * @throws IOException if unable to create operation
 */
private ApiOperation indexItem(GHContent content, Item previousItem)
    throws IOException {
  String metadataHash = content.getSha();

  // If previously indexed and unchanged, just requeue as unmodified
  if (canSkipIndexing(previousItem, metadataHash)) {
    return notModified(previousItem.getName());
  }

  String resourceName = new URL(content.getHtmlUrl()).getPath();
  FieldOrValue<String> title = FieldOrValue.withValue(content.getName());
  FieldOrValue<String> url = FieldOrValue.withValue(content.getHtmlUrl());

  String containerName = content.getOwner().getHtmlUrl().getPath();
  String programmingLanguage = FileExtensions.getLanguageForFile(content.getName());

  // Structured data based on the schema
  Multimap<String, Object> structuredData = ArrayListMultimap.create();
  structuredData.put("organization", content.getOwner().getOwnerName());
  structuredData.put("repository", content.getOwner().getName());
  structuredData.put("path", content.getPath());
  structuredData.put("language", programmingLanguage);

  Item item = IndexingItemBuilder.fromConfiguration(resourceName)
      .setTitle(title)
      .setContainerName(containerName)
      .setSourceRepositoryUrl(url)
      .setItemType(IndexingItemBuilder.ItemType.CONTAINER_ITEM)
      .setObjectType("file")
      .setValues(structuredData)
      .setVersion(Longs.toByteArray(System.currentTimeMillis()))
      .setHash(content.getSha())
      .build();

  // Index the file content too
  String mimeType = FileTypeMap.getDefaultFileTypeMap()
      .getContentType(content.getName());
  AbstractInputStreamContent fileContent = new InputStreamContent(
      mimeType, content.read())
      .setLength(content.getSize())
      .setCloseInputStream(true);
  return new RepositoryDoc.Builder()
      .setItem(item)
      .setContent(fileContent, IndexingService.ContentFormat.RAW)
      .setRequestMode(IndexingService.RequestMode.SYNCHRONOUS)
      .build();
}

接下来,部署搜索界面。

上一个 下一个