Learn how to efficiently process multiple areas or time periods in batch operations.
Batch processing allows you to efficiently analyze multiple areas of interest or time periods without having to manually process each one individually. This approach saves time and ensures consistent analysis across your dataset.
[Image: Diagram showing batch processing workflow]
Batch processing workflow: from multiple inputs to parallel processing to aggregated results.
When you need to analyze multiple geographic areas with the same parameters:
import requests
import json
import pandas as pd
import concurrent.futures
import matplotlib.pyplot as plt
from tqdm import tqdm
api_key = "your_api_key_here"
# Get all your AOIs
url = "https://observearth.com/api/geometry/"
headers = {
"X-API-Key": api_key
}
response = requests.get(url, headers=headers)
aois = response.json()["results"]
# Create a list of AOI IDs and names
aoi_ids = [aoi["id"] for aoi in aois]
aoi_names = [aoi["name"] for aoi in aois]
print(f"Found {len(aoi_ids)} areas of interest:")
for name, id in zip(aoi_names, aoi_ids):
print(f"- {name}: {id}")
def process_aoi(aoi_id, aoi_name):
"""Process a single AOI and return results"""
try:
# Get NDVI statistics for this AOI
stats_url = "https://observearth.com/api/s2/stats/"
payload = {
"geometry_id": aoi_id,
"start_date": "2023-01-01",
"end_date": "2023-12-31",
"cloud_cover": 20,
"index": "ndvi"
}
response = requests.post(stats_url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
stats_data = response.json()
# Extract mean NDVI values
dates = [result["date"] for result in stats_data["results"]]
ndvi_means = [result["mean_value"] for result in stats_data["results"]]
# Calculate summary statistics
if ndvi_means:
avg_ndvi = sum(ndvi_means) / len(ndvi_means)
max_ndvi = max(ndvi_means)
min_ndvi = min(ndvi_means)
return {
"aoi_id": aoi_id,
"aoi_name": aoi_name,
"avg_ndvi": avg_ndvi,
"max_ndvi": max_ndvi,
"min_ndvi": min_ndvi,
"image_count": len(ndvi_means),
"dates": dates,
"ndvi_values": ndvi_means
}
else:
return {
"aoi_id": aoi_id,
"aoi_name": aoi_name,
"error": "No data found"
}
else:
return {
"aoi_id": aoi_id,
"aoi_name": aoi_name,
"error": f"API error: {response.status_code}"
}
except Exception as e:
return {
"aoi_id": aoi_id,
"aoi_name": aoi_name,
"error": str(e)
}
# Process all AOIs in parallel
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Create a dictionary of futures to AOI names
future_to_aoi = {executor.submit(process_aoi, aoi_id, aoi_name): aoi_name
for aoi_id, aoi_name in zip(aoi_ids, aoi_names)}
# Process as they complete
for future in tqdm(concurrent.futures.as_completed(future_to_aoi), total=len(aoi_ids)):
aoi_name = future_to_aoi[future]
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error processing {aoi_name}: {e}")
# Create a DataFrame from results
successful_results = [r for r in results if "error" not in r]
results_df = pd.DataFrame(successful_results)
print(f"Successfully processed {len(successful_results)} out of {len(aoi_ids)} AOIs")
When you need to analyze the same area across different time periods:
import pandas as pd
from datetime import datetime, timedelta
# Define a single AOI
aoi_id = "123e4567-e89b-12d3-a456-426614174000"
# Define time periods (e.g., monthly for a year)
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)
# Create monthly periods
periods = []
current_date = start_date
while current_date < end_date:
month_end = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1) - timedelta(days=1)
if month_end > end_date:
month_end = end_date
periods.append({
"start": current_date.strftime("%Y-%m-%d"),
"end": month_end.strftime("%Y-%m-%d"),
"label": current_date.strftime("%b %Y")
})
# Move to next month
current_date = (month_end + timedelta(days=1))
print(f"Created {len(periods)} time periods:")
def process_time_period(aoi_id, period):
"""Process a single time period for an AOI"""
try:
# Get NDVI image for this period
image_url = f"https://observearth.com/api/s2/image/{aoi_id}/"
# First search for images in this period
search_url = "https://observearth.com/api/s2/search/"
search_payload = {
"geometry_id": aoi_id,
"start_date": period["start"],
"end_date": period["end"],
"cloud_cover": 20
}
search_response = requests.post(search_url, headers=headers, data=json.dumps(search_payload))
if search_response.status_code == 200:
search_data = search_response.json()
if search_data["count"] > 0:
# Get the first image
item_id = search_data["results"][0]["id"]
# Get NDVI image
params = {
"item_id": item_id,
"image_type": "png",
"index": "ndvi",
"colormap": "RdYlGn"
}
image_response = requests.get(image_url, headers=headers, params=params)
if image_response.status_code == 200:
# Save the image
filename = f"ndvi_{period['label'].replace(' ', '_')}.png"
with open(filename, "wb") as f:
f.write(image_response.content)
return {
"period": period["label"],
"filename": filename,
"item_id": item_id,
"success": True
}
else:
return {
"period": period["label"],
"error": f"Image API error: {image_response.status_code}",
"success": False
}
else:
return {
"period": period["label"],
"error": "No images found in this period",
"success": False
}
else:
return {
"period": period["label"],
"error": f"Search API error: {search_response.status_code}",
"success": False
}
except Exception as e:
return {
"period": period["label"],
"error": str(e),
"success": False
}
# Process all time periods
period_results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_period = {executor.submit(process_time_period, aoi_id, period): period["label"]
for period in periods}
for future in tqdm(concurrent.futures.as_completed(future_to_period), total=len(periods)):
period_label = future_to_period[future]
try:
result = future.result()
period_results.append(result)
except Exception as e:
print(f"Error processing {period_label}: {e}")
# Count successful periods
successful_periods = [r for r in period_results if r["success"]]
print(f"Successfully processed {len(successful_periods)} out of {len(periods)} time periods")
[Image: January 2023 NDVI]
January 2023
[Image: June 2023 NDVI]
June 2023
[Image: December 2023 NDVI]
December 2023
After batch processing, you'll want to aggregate and visualize the results:
# Create a bar chart comparing average NDVI across AOIs
plt.figure(figsize=(12, 6))
plt.bar(results_df['aoi_name'], results_df['avg_ndvi'])
plt.xlabel('Area of Interest')
plt.ylabel('Average NDVI')
plt.title('Average NDVI Comparison Across Areas')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()
# Create a summary table
summary_table = results_df[['aoi_name', 'avg_ndvi', 'min_ndvi', 'max_ndvi', 'image_count']]
print(summary_table.to_string(index=False))
# Create an animation from the monthly images
import imageio
import glob
# Get all the generated images
image_files = sorted(glob.glob('ndvi_*.png'))
# Create a GIF animation
with imageio.get_writer('ndvi_animation.gif', mode='I', duration=0.5) as writer:
for filename in image_files:
image = imageio.imread(filename)
writer.append_data(image)
print(f"Created animation with {len(image_files)} frames: ndvi_animation.gif")
Now that you understand batch processing, you can: