本指南提供了一个完整的端到端工作流,用于使用 Google Cloud 的 Vertex AI 平台和 Gemini 2.5 Flash 训练模型并对图像素材资源进行分类。您将学习如何在 Python Colab 环境中集成 BigQuery 以进行数据检索、集成 Cloud Storage 以进行资产管理,以及集成 Vertex AI 以进行机器学习推理。
配置
在运行代码示例之前,请设置以下特定于项目的变量:
PROJECT_ID = "PROJECT_ID"
REGION = "REGION " # e.g., "us-central1"
LOCATION = "LOCATION " # e.g., "us"
CUSTOMER_ID = "CUSTOMER_ID" # required to subscribe to the dataset
环境设置
安装必需的依赖项并配置身份验证以访问 Google Cloud 服务:
# Install Google Cloud SDK dependencies for AI Platform integration
!pip install google-cloud-aiplatform google-cloud-storage google-cloud-bigquery google-cloud-bigquery-data-exchange -q
# Import core libraries for cloud services and machine learning operations
import json
import os
from google.cloud import bigquery
import vertexai
from vertexai.generative_models import GenerativeModel, Part
# Configure authentication for Google Cloud service access
# Initiates OAuth flow in new browser tab if authentication required
from google.colab import auth
if os.environ.get("VERTEX_PRODUCT") != "COLAB_ENTERPRISE":
from google.colab import auth
auth.authenticate_user(project_id=PROJECT_ID)
# Initialize Vertex AI client with project configuration
vertexai.init(project=PROJECT_ID, location=REGION)
print(f"Vertex AI initialized for project: {PROJECT_ID} in region: {REGION}")
订阅 Analytics Hub 数据集
您还必须订阅 Analytics Hub 数据集。
from google.cloud import bigquery_data_exchange_v1beta1
ah_client = bigquery_data_exchange_v1beta1.AnalyticsHubServiceClient()
HUB_PROJECT_ID = 'maps-platform-analytics-hub'
DATA_EXCHANGE_ID = f"imagery_insights_exchange_{LOCATION}"
LINKED_DATASET_NAME = f"imagery_insights___preview___{LOCATION}"
# subscribe to the listing (create a linked dataset in your consumer project)
destination_dataset = bigquery_data_exchange_v1beta1.DestinationDataset()
destination_dataset.dataset_reference.dataset_id = LINKED_DATASET_NAME
destination_dataset.dataset_reference.project_id = PROJECT_ID
destination_dataset.location = LOCATION
LISTING_ID=f"imagery_insights_{CUSTOMER_ID.replace('-', '_')}__{LOCATION}"
published_listing = f"projects/{HUB_PROJECT_ID}/locations/{LOCATION}/dataExchanges/{DATA_EXCHANGE_ID}/listings/{LISTING_ID}"
request = bigquery_data_exchange_v1beta1.SubscribeListingRequest(
destination_dataset=destination_dataset,
name=published_listing,
)
# request the subscription
ah_client.subscribe_listing(request=request)
使用 BigQuery 提取数据
执行 BigQuery 查询,从 latest_observations
表中提取 Google Cloud Storage URI。这些 URI 将直接传递给 Vertex AI 模型以进行分类。
# Initialize BigQuery client
bigquery_client = bigquery.Client(project=PROJECT_ID)
# Define SQL query to retrieve observation records from imagery dataset
query = f"""
SELECT
*
FROM
`{PROJECT_ID}.imagery_insights___preview___{LOCATION}.latest_observations`
LIMIT 10;
"""
print(f"Executing BigQuery query:\n{query}")
# Submit query job to BigQuery service and await completion
query_job = bigquery_client.query(query)
# Transform query results into structured data format for downstream processing
# Convert BigQuery Row objects to dictionary representations for enhanced accessibility
query_response_data = []
for row in query_job:
query_response_data.append(dict(row))
# Extract Cloud Storage URIs from result set, filtering null values
gcs_uris = [item.get("gcs_uri") for item in query_response_data if item.get("gcs_uri")]
print(f"BigQuery query returned {len(query_response_data)} records.")
print(f"Extracted {len(gcs_uris)} GCS URIs:")
for uri in gcs_uris:
print(uri)
图片分类函数
此辅助函数使用 Vertex AI 的 Gemini 2.5 Flash 模型处理图片分类:
def classify_image_with_gemini(gcs_uri: str, prompt: str = "What is in this image?") -> str:
"""
Performs multimodal image classification using Vertex AI's Gemini 2.5 Flash model.
Leverages direct Cloud Storage integration to process image assets without local
download requirements, enabling scalable batch processing workflows.
Args:
gcs_uri (str): Fully qualified Google Cloud Storage URI
(format: gs://bucket-name/path/to/image.jpg)
prompt (str): Natural language instruction for classification task execution
Returns:
str: Generated textual description from the generative model, or error message
if classification pipeline fails
Raises:
Exception: Captures service-level errors and returns structured failure response
"""
try:
# Instantiate Gemini 2.5 Flash model for inference operations
model = GenerativeModel("gemini-2.5-flash")
# Construct multimodal Part object from Cloud Storage reference
# Note: MIME type may need dynamic inference for mixed image formats
image_part = Part.from_uri(uri=gcs_uri, mime_type="image/jpeg")
# Execute multimodal inference request with combined visual and textual inputs
responses = model.generate_content([image_part, prompt])
return responses.text
except Exception as e:
print(f"Error classifying image from URI {gcs_uri}: {e}")
return "Classification failed."
批量图片分类
处理所有提取的 URI 并生成分类:
classification_results = []
# Execute batch classification pipeline across all extracted GCS URIs
for uri in gcs_uris:
print(f"\nProcessing: {uri}")
# Define comprehensive classification prompt for detailed feature extraction
classification_prompt = "Describe this image in detail, focusing on any objects, signs, or features visible."
# Invoke Gemini model for multimodal inference on current asset
result = classify_image_with_gemini(uri, classification_prompt)
# Aggregate structured results for downstream analytics and reporting
classification_results.append({"gcs_uri": uri, "classification": result})
print(f"Classification for {uri}:\n{result}")
后续步骤
对图片进行分类后,您可以考虑以下高级工作流程:
- 模型微调:使用分类结果训练自定义模型。
- 自动处理:设置 Cloud Functions 以自动对新图片进行分类。
- 数据分析:对分类模式执行统计分析。
- 集成:将结果连接到下游应用。
问题排查
常见问题和解决方案:
- 身份验证错误:确保 IAM 角色和 API 启用情况正确无误。
- 速率限制:针对大型批次实现指数退避。
- 内存限制:对于大型数据集,以较小的批次处理图片。
- URI 格式错误:验证 GCS URI 是否采用
gs://bucket-name/path/to/image
格式。
如需更多支持,请参阅 Vertex AI 文档和 BigQuery 文档。