部署连接器

Cloud Search 教程的这一页介绍了如何设置数据源 以及用于将数据编入索引的内容连接器。 要从头开始学习本教程,请参阅 Cloud Search 使用入门教程

构建连接器

将工作目录更改为 cloud-search-samples/end-to-end/connector 然后运行以下命令:

mvn package -DskipTests

该命令会下载构建 内容连接器,并编译代码。

创建服务账号凭据

连接器需要使用服务账号凭据才能调用 Cloud Search API。如需创建凭据,请执行以下操作:

  1. 返回 Google Cloud 控制台
  2. 在左侧导航栏中,点击凭据。“凭据”页面。
  3. 点击 + 创建凭据下拉列表,然后选择 服务账号。“创建服务账号”页面。
  4. 服务账号名称字段中,输入“教程”。
  5. 记下服务账号 ID 值(紧跟在服务账号名称后面)。 此值稍后使用。
  6. 点击创建。“服务账号权限(可选)”对话框。
  7. 点击继续。“授权用户访问此服务账号” (可选)”对话框。
  8. 点击完成。“凭据”部分屏幕。
  9. 在“服务账号”下,点击服务账号电子邮件。“服务 账号详细信息”页面。
  10. 在“键”下,点击添加键下拉列表,然后选择 创建新密钥。“创建私钥”对话框。
  11. 点击创建
  12. (可选)如果系统显示“您是否要允许 console.cloud.google.com?”对话框中,点击允许
  13. 私钥文件会保存到您的计算机中。记录位置 文件副本。此文件用于配置内容连接器, 它可以在调用 Google Cloud Search API 时验证自己的身份。

初始化第三方支持

您必须先初始化第三方,然后才能调用任何其他 Cloud Search API 。

要初始化对 Cloud Search 的第三方支持,请执行以下操作:

  1. 您的 Cloud Search 平台项目包含服务账号凭据。 不过,为了初始化第三方支持, 应用凭据有关如何创建 Web 应用的说明 请参阅 创建凭据。 完成此步骤后,您应该会获得客户端 ID 和客户端密钥文件。

  2. 使用 Google 的 OAuth 2 Playground 来获取访问令牌:

    1. 点击“设置”,然后选中 User your own auth credentials(使用您自己的身份验证凭据)。
    2. 输入第 1 步中的客户端 ID 和客户端密钥。
    3. 点击关闭
    4. 在“范围”字段中,输入 https://www.googleapis.com/auth/cloud_search.settings 然后点击授权。OAuth 2 Playground 会返回授权代码。
    5. 点击交换令牌的授权代码。系统会返回一个令牌。
  3. 要初始化对 Cloud Search 的第三方支持,请使用以下 curl 命令。请务必将 [YOUR_ACCESS_TOKEN] 替换为在 第 2 步:

    curl --request POST \
    'https://cloudsearch.googleapis.com/v1:initializeCustomer' \
      --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '{}' \
      --compressed
    

    如果成功,响应正文将包含一个 operation。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    }
    

    如果操作失败,请与 Cloud Search 支持团队联系。

  4. 使用 operations.get 验证 则会初始化第三方支持:

    curl \
    'https://cloudsearch.googleapis.com/v1/operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY?key=
    [YOUR_API_KEY]' \
    --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
    --header 'Accept: application/json' \
    --compressed
    

    第三方初始化完成后,它会包含 字段 done 设置为 true。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    done: true
    }
    

创建数据源

接下来,在管理控制台中创建数据源。数据源 提供了一个命名空间,以便使用连接器将内容编入索引。

  1. 打开 Google 管理控制台
  2. 点击“应用”图标。“Google Apps 管理”页面。
  3. 点击 Google Workspace。“Google Workspace 中的 Apps 管理”页面页面。
  4. 向下滚动,然后点击 Cloud Search。“Google Workspace 设置”信息页 。
  5. 点击第三方数据源。“数据源”部分页面。
  6. 点击黄色圆形 +。“添加新数据源”页面对话框。
  7. 显示名称字段中,输入“tutorials”。
  8. 服务账号电子邮件地址字段中,输入 您在上一部分中创建的服务账号如果您不知道 电子邮件地址,请在 该 服务账号 页面。
  9. 点击添加。“已成功创建数据源”对话框。
  10. 点击 *OK。记下新创建的数据源的来源 ID。通过 来源 ID 用于配置内容连接器。

为 GitHub API 生成个人访问令牌

连接器需要以经过身份验证的方式访问 GitHub API,才能执行 以便拥有足够的配额为简单起见,该连接器利用了 访问令牌而不是 OAuth个人令牌允许使用以下身份进行身份验证: 用户具有与 OAuth 类似的一组有限权限。

  1. 登录 GitHub。
  2. 点击右上角的个人资料照片。系统随即会显示一个下拉菜单。
  3. 点击设置
  4. 点击开发者设置
  5. 点击 Personal access token(个人访问令牌)。
  6. 点击 Generate Personal access token(生成个人访问令牌)。
  7. 备注字段中,输入“Cloud Search 教程”。
  8. 检查 public_repo 范围。
  9. 点击 生成令牌
  10. 记下生成的令牌。连接器使用它来调用 GitHub API 并提供用于执行索引编制的 API 配额。

配置连接器

创建凭据和数据源后,更新连接器 配置中包含以下值:

  1. 在命令行中,将目录更改为 cloud-search-samples/end-to-end/connector/
  2. 使用文本编辑器打开 sample-config.properties 文件。
  3. api.serviceAccountPrivateKeyFile 参数设置为 凭据。
  4. api.sourceId 参数设置为您要创建的数据源的 ID 凭据。
  5. github.user 参数设置为您的 GitHub 用户名。
  6. github.token 参数设置为您之前创建的访问令牌。
  7. 保存文件。

更新架构

连接器会将结构化和非结构化内容编入索引。编入索引之前 数据,您必须更新数据源的架构。运行以下命令 以更新架构:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.SchemaTool \
    -Dexec.args="-Dconfig=sample-config.properties"

运行该连接器

如需运行连接器并开始编制索引,请运行以下命令:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.GithubConnector \
    -Dexec.args="-Dconfig=sample-config.properties"

连接器的默认配置是将单个代码库编入索引 在 googleworkspace 组织中。将代码库编入索引大约需要 1 分钟。 首次编入索引后,该连接器会继续轮询 需要反映在 Cloud Search 索引中的存储库。

查看代码

其余部分将介绍如何构建连接器。

启动应用

连接器的入口点是 GithubConnector 类。通过 main 方法会实例化 SDK 的 IndexingApplication 然后启动它

GithubConnector.java
/**
 * Main entry point for the connector. Creates and starts an indexing
 * application using the {@code ListingConnector} template and the sample's
 * custom {@code Repository} implementation.
 *
 * @param args program command line arguments
 * @throws InterruptedException thrown if an abort is issued during initialization
 */
public static void main(String[] args) throws InterruptedException {
  Repository repository = new GithubRepository();
  IndexingConnector connector = new ListingConnector(repository);
  IndexingApplication application = new IndexingApplication.Builder(connector, args)
      .build();
  application.start();
}

ListingConnector 实施遍历策略 (利用 Cloud Search 队列) 用于跟踪索引中项的状态。它会委托给 GithubRepository, 由示例连接器实现,用于访问 GitHub 中的内容。

遍历 GitHub 代码库

在完全遍历期间,getIds() 方法将可能需要编入索引的项推送到队列中。

连接器可以将多个代码库或组织编入索引。为了尽可能减少 一次遍历一个 GitHub 代码库。检查点 将返回包含遍历列表的 在后续调用 getIds() 时要编入索引的代码库。如果出现错误 发生索引时,索引会在当前仓库中恢复,而不是开始 从头开始。

GithubRepository.java
/**
 * Gets all of the existing item IDs from the data repository. While
 * multiple repositories are supported, only one repository is traversed
 * per call. The remaining repositories are saved in the checkpoint
 * are traversed on subsequent calls. This minimizes the amount of
 * data that needs to be reindex in the event of an error.
 *
 * <p>This method is called by {@link ListingConnector#traverse()} during
 * <em>full traversals</em>. Every document ID and metadata hash value in
 * the <em>repository</em> is pushed to the Cloud Search queue. Each pushed
 * document is later polled and processed in the {@link #getDoc(Item)} method.
 * <p>
 * The metadata hash values are pushed to aid document change detection. The
 * queue sets the document status depending on the hash comparison. If the
 * pushed ID doesn't yet exist in Cloud Search, the document's status is
 * set to <em>new</em>. If the ID exists but has a mismatched hash value,
 * its status is set to <em>modified</em>. If the ID exists and matches
 * the hash value, its status is unchanged.
 *
 * <p>In every case, the pushed content hash value is only used for
 * comparison. The hash value is only set in the queue during an
 * update (see {@link #getDoc(Item)}).
 *
 * @param checkpoint value defined and maintained by this connector
 * @return this is typically a {@link PushItems} instance
 */
@Override
public CheckpointCloseableIterable<ApiOperation> getIds(byte[] checkpoint)
    throws RepositoryException {
  List<String> repositories;
  // Decode the checkpoint if present to get the list of remaining
  // repositories to index.
  if (checkpoint != null) {
    try {
      FullTraversalCheckpoint decodedCheckpoint = FullTraversalCheckpoint
          .fromBytes(checkpoint);
      repositories = decodedCheckpoint.getRemainingRepositories();
    } catch (IOException e) {
      throw new RepositoryException.Builder()
          .setErrorMessage("Unable to deserialize checkpoint")
          .setCause(e)
          .build();
    }
  } else {
    // No previous checkpoint, scan for repositories to index
    // based on the connector configuration.
    try {
      repositories = scanRepositories();
    } catch (IOException e) {
      throw toRepositoryError(e, Optional.of("Unable to scan repositories"));
    }
  }

  if (repositories.isEmpty()) {
    // Nothing left to index. Reset the checkpoint to null so the
    // next full traversal starts from the beginning
    Collection<ApiOperation> empty = Collections.emptyList();
    return new CheckpointCloseableIterableImpl.Builder<>(empty)
        .setCheckpoint((byte[]) null)
        .setHasMore(false)
        .build();
  }

  // Still have more repositories to index. Pop the next repository to
  // index off the list. The remaining repositories make up the next
  // checkpoint.
  String repositoryToIndex = repositories.get(0);
  repositories = repositories.subList(1, repositories.size());

  try {
    log.info(() -> String.format("Traversing repository %s", repositoryToIndex));
    Collection<ApiOperation> items = collectRepositoryItems(repositoryToIndex);
    FullTraversalCheckpoint newCheckpoint = new FullTraversalCheckpoint(repositories);
    return new CheckpointCloseableIterableImpl.Builder<>(items)
        .setHasMore(true)
        .setCheckpoint(newCheckpoint.toBytes())
        .build();
  } catch (IOException e) {
    String errorMessage = String.format("Unable to traverse repo: %s",
        repositoryToIndex);
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

collectRepositoryItems() 方法可处理单个元素的遍历 GitHub 代码库。此方法会返回 ApiOperations 的集合。 表示要推送到队列中的项。项目作为 资源名称和表示商品当前状态的哈希值。

该哈希值用于后续的 GitHub 遍历 代码库该值用于进行轻量级检查,以确定内容 已更改。连接器盲目 将所有项加入队列。如果是新项或哈希值已更改,则会 可在队列中进行轮询否则,该项目会被视为未经修改。

GithubRepository.java
/**
 * Fetch IDs to  push in to the queue for all items in the repository.
 * Currently captures issues & content in the master branch.
 *
 * @param name Name of repository to index
 * @return Items to push into the queue for later indexing
 * @throws IOException if error reading issues
 */
private Collection<ApiOperation> collectRepositoryItems(String name)
    throws IOException {
  List<ApiOperation> operations = new ArrayList<>();
  GHRepository repo = github.getRepository(name);

  // Add the repository as an item to be indexed
  String metadataHash = repo.getUpdatedAt().toString();
  String resourceName = repo.getHtmlUrl().getPath();
  PushItem repositoryPushItem = new PushItem()
      .setMetadataHash(metadataHash);
  PushItems items = new PushItems.Builder()
      .addPushItem(resourceName, repositoryPushItem)
      .build();

  operations.add(items);
  // Add issues/pull requests & files
  operations.add(collectIssues(repo));
  operations.add(collectContent(repo));
  return operations;
}

处理队列

完全遍历完成后,连接器会开始轮询 需要编入索引的项的队列。getDoc() 方法。该方法会读取 从 GitHub 提取内容,并将其转换为适当的表示法 以便编入索引。

由于连接器要针对可能随时更改的实时数据运行 getDoc() 还会验证队列中的项是否仍然有效 并删除索引中不再存在的所有项。

GithubRepository.java
/**
 * Gets a single data repository item and indexes it if required.
 *
 * <p>This method is called by the {@link ListingConnector} during a poll
 * of the Cloud Search queue. Each queued item is processed
 * individually depending on its state in the data repository.
 *
 * @param item the data repository item to retrieve
 * @return the item's state determines which type of
 * {@link ApiOperation} is returned:
 * {@link RepositoryDoc}, {@link DeleteItem}, or {@link PushItem}
 */
@Override
public ApiOperation getDoc(Item item) throws RepositoryException {
  log.info(() -> String.format("Processing item: %s ", item.getName()));
  Object githubObject;
  try {
    // Retrieve the item from GitHub
    githubObject = getGithubObject(item.getName());
    if (githubObject instanceof GHRepository) {
      return indexItem((GHRepository) githubObject, item);
    } else if (githubObject instanceof GHPullRequest) {
      return indexItem((GHPullRequest) githubObject, item);
    } else if (githubObject instanceof GHIssue) {
      return indexItem((GHIssue) githubObject, item);
    } else if (githubObject instanceof GHContent) {
      return indexItem((GHContent) githubObject, item);
    } else {
      String errorMessage = String.format("Unexpected item received: %s",
          item.getName());
      throw new RepositoryException.Builder()
          .setErrorMessage(errorMessage)
          .setErrorType(RepositoryException.ErrorType.UNKNOWN)
          .build();
    }
  } catch (FileNotFoundException e) {
    log.info(() -> String.format("Deleting item: %s ", item.getName()));
    return ApiOperations.deleteItem(item.getName());
  } catch (IOException e) {
    String errorMessage = String.format("Unable to retrieve item: %s",
        item.getName());
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

对于连接器索引的每个 GitHub 对象,对应的 indexItem() 方法负责为以下项构建商品表示法 Cloud Search。例如,如需为内容项构建表示法,请执行以下操作:

GithubRepository.java
/**
 * Build the ApiOperation to index a content item (file).
 *
 * @param content      Content item to index
 * @param previousItem Previous item state in the index
 * @return ApiOperation (RepositoryDoc if indexing,  PushItem if not modified)
 * @throws IOException if unable to create operation
 */
private ApiOperation indexItem(GHContent content, Item previousItem)
    throws IOException {
  String metadataHash = content.getSha();

  // If previously indexed and unchanged, just requeue as unmodified
  if (canSkipIndexing(previousItem, metadataHash)) {
    return notModified(previousItem.getName());
  }

  String resourceName = new URL(content.getHtmlUrl()).getPath();
  FieldOrValue<String> title = FieldOrValue.withValue(content.getName());
  FieldOrValue<String> url = FieldOrValue.withValue(content.getHtmlUrl());

  String containerName = content.getOwner().getHtmlUrl().getPath();
  String programmingLanguage = FileExtensions.getLanguageForFile(content.getName());

  // Structured data based on the schema
  Multimap<String, Object> structuredData = ArrayListMultimap.create();
  structuredData.put("organization", content.getOwner().getOwnerName());
  structuredData.put("repository", content.getOwner().getName());
  structuredData.put("path", content.getPath());
  structuredData.put("language", programmingLanguage);

  Item item = IndexingItemBuilder.fromConfiguration(resourceName)
      .setTitle(title)
      .setContainerName(containerName)
      .setSourceRepositoryUrl(url)
      .setItemType(IndexingItemBuilder.ItemType.CONTAINER_ITEM)
      .setObjectType("file")
      .setValues(structuredData)
      .setVersion(Longs.toByteArray(System.currentTimeMillis()))
      .setHash(content.getSha())
      .build();

  // Index the file content too
  String mimeType = FileTypeMap.getDefaultFileTypeMap()
      .getContentType(content.getName());
  AbstractInputStreamContent fileContent = new InputStreamContent(
      mimeType, content.read())
      .setLength(content.getSize())
      .setCloseInputStream(true);
  return new RepositoryDoc.Builder()
      .setItem(item)
      .setContent(fileContent, IndexingService.ContentFormat.RAW)
      .setRequestMode(IndexingService.RequestMode.SYNCHRONOUS)
      .build();
}

接下来,部署搜索界面。

上一个 下一个