Batch Processing

Learn how to efficiently process multiple areas or time periods in batch operations.

Prerequisites: Before starting this tutorial, make sure you have:
  • An ObservEarth API key
  • Created multiple Areas of Interest (AOIs)
  • Basic understanding of Python programming
If you haven't done these steps yet, check out our Authentication and Areas of Interest tutorials first.

Introduction to Batch Processing

Batch processing allows you to efficiently analyze multiple areas of interest or time periods without having to manually process each one individually. This approach saves time and ensures consistent analysis across your dataset.

Key Benefits:
  • Efficiency: Process multiple areas or time periods in a single operation
  • Consistency: Apply the same analysis parameters across all items
  • Automation: Set up workflows that can run with minimal supervision
  • Scalability: Handle large datasets that would be impractical to process manually
  • Reproducibility: Document and repeat analyses with the same parameters

[Image: Diagram showing batch processing workflow]

Batch processing workflow: from multiple inputs to parallel processing to aggregated results.

Common Batch Processing Scenarios:
  • Multiple AOIs: Process the same analysis across different geographic areas
  • Time series: Process the same area across multiple time periods
  • Parameter sweeps: Run the same analysis with different parameter settings
  • Multi-sensor analysis: Process data from different satellite sensors for the same area

Processing Multiple Areas of Interest

Analyzing Multiple Geographic Areas

When you need to analyze multiple geographic areas with the same parameters:

Step 1: List Your Areas of Interest
import requests
import json
import pandas as pd
import concurrent.futures
import matplotlib.pyplot as plt
from tqdm import tqdm

api_key = "your_api_key_here"

# Get all your AOIs
url = "https://observearth.com/api/geometry/"

headers = {
    "X-API-Key": api_key
}

response = requests.get(url, headers=headers)
aois = response.json()["results"]

# Create a list of AOI IDs and names
aoi_ids = [aoi["id"] for aoi in aois]
aoi_names = [aoi["name"] for aoi in aois]

print(f"Found {len(aoi_ids)} areas of interest:")
for name, id in zip(aoi_names, aoi_ids):
    print(f"- {name}: {id}")
Step 2: Define a Processing Function
def process_aoi(aoi_id, aoi_name):
    """Process a single AOI and return results"""
    try:
        # Get NDVI statistics for this AOI
        stats_url = "https://observearth.com/api/s2/stats/"
        
        payload = {
            "geometry_id": aoi_id,
            "start_date": "2023-01-01",
            "end_date": "2023-12-31",
            "cloud_cover": 20,
            "index": "ndvi"
        }
        
        response = requests.post(stats_url, headers=headers, data=json.dumps(payload))
        
        if response.status_code == 200:
            stats_data = response.json()
            
            # Extract mean NDVI values
            dates = [result["date"] for result in stats_data["results"]]
            ndvi_means = [result["mean_value"] for result in stats_data["results"]]
            
            # Calculate summary statistics
            if ndvi_means:
                avg_ndvi = sum(ndvi_means) / len(ndvi_means)
                max_ndvi = max(ndvi_means)
                min_ndvi = min(ndvi_means)
                
                return {
                    "aoi_id": aoi_id,
                    "aoi_name": aoi_name,
                    "avg_ndvi": avg_ndvi,
                    "max_ndvi": max_ndvi,
                    "min_ndvi": min_ndvi,
                    "image_count": len(ndvi_means),
                    "dates": dates,
                    "ndvi_values": ndvi_means
                }
            else:
                return {
                    "aoi_id": aoi_id,
                    "aoi_name": aoi_name,
                    "error": "No data found"
                }
        else:
            return {
                "aoi_id": aoi_id,
                "aoi_name": aoi_name,
                "error": f"API error: {response.status_code}"
            }
    except Exception as e:
        return {
            "aoi_id": aoi_id,
            "aoi_name": aoi_name,
            "error": str(e)
        }
Step 3: Process in Parallel
# Process all AOIs in parallel
results = []

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Create a dictionary of futures to AOI names
    future_to_aoi = {executor.submit(process_aoi, aoi_id, aoi_name): aoi_name 
                     for aoi_id, aoi_name in zip(aoi_ids, aoi_names)}
    
    # Process as they complete
    for future in tqdm(concurrent.futures.as_completed(future_to_aoi), total=len(aoi_ids)):
        aoi_name = future_to_aoi[future]
        try:
            result = future.result()
            results.append(result)
        except Exception as e:
            print(f"Error processing {aoi_name}: {e}")

# Create a DataFrame from results
successful_results = [r for r in results if "error" not in r]
results_df = pd.DataFrame(successful_results)

print(f"Successfully processed {len(successful_results)} out of {len(aoi_ids)} AOIs")

Processing Multiple Time Periods

Analyzing Temporal Patterns

When you need to analyze the same area across different time periods:

Step 1: Define Time Periods
import pandas as pd
from datetime import datetime, timedelta

# Define a single AOI
aoi_id = "123e4567-e89b-12d3-a456-426614174000"

# Define time periods (e.g., monthly for a year)
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)

# Create monthly periods
periods = []
current_date = start_date

while current_date < end_date:
    month_end = (current_date.replace(day=28) + timedelta(days=4)).replace(day=1) - timedelta(days=1)
    if month_end > end_date:
        month_end = end_date
        
    periods.append({
        "start": current_date.strftime("%Y-%m-%d"),
        "end": month_end.strftime("%Y-%m-%d"),
        "label": current_date.strftime("%b %Y")
    })
    
    # Move to next month
    current_date = (month_end + timedelta(days=1))

print(f"Created {len(periods)} time periods:")
Step 2: Process Each Time Period
def process_time_period(aoi_id, period):
    """Process a single time period for an AOI"""
    try:
        # Get NDVI image for this period
        image_url = f"https://observearth.com/api/s2/image/{aoi_id}/"
        
        # First search for images in this period
        search_url = "https://observearth.com/api/s2/search/"
        search_payload = {
            "geometry_id": aoi_id,
            "start_date": period["start"],
            "end_date": period["end"],
            "cloud_cover": 20
        }
        
        search_response = requests.post(search_url, headers=headers, data=json.dumps(search_payload))
        
        if search_response.status_code == 200:
            search_data = search_response.json()
            
            if search_data["count"] > 0:
                # Get the first image
                item_id = search_data["results"][0]["id"]
                
                # Get NDVI image
                params = {
                    "item_id": item_id,
                    "image_type": "png",
                    "index": "ndvi",
                    "colormap": "RdYlGn"
                }
                
                image_response = requests.get(image_url, headers=headers, params=params)
                
                if image_response.status_code == 200:
                    # Save the image
                    filename = f"ndvi_{period['label'].replace(' ', '_')}.png"
                    with open(filename, "wb") as f:
                        f.write(image_response.content)
                    
                    return {
                        "period": period["label"],
                        "filename": filename,
                        "item_id": item_id,
                        "success": True
                    }
                else:
                    return {
                        "period": period["label"],
                        "error": f"Image API error: {image_response.status_code}",
                        "success": False
                    }
            else:
                return {
                    "period": period["label"],
                    "error": "No images found in this period",
                    "success": False
                }
        else:
            return {
                "period": period["label"],
                "error": f"Search API error: {search_response.status_code}",
                "success": False
            }
    except Exception as e:
        return {
            "period": period["label"],
            "error": str(e),
            "success": False
        }

# Process all time periods
period_results = []

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future_to_period = {executor.submit(process_time_period, aoi_id, period): period["label"] 
                        for period in periods}
    
    for future in tqdm(concurrent.futures.as_completed(future_to_period), total=len(periods)):
        period_label = future_to_period[future]
        try:
            result = future.result()
            period_results.append(result)
        except Exception as e:
            print(f"Error processing {period_label}: {e}")

# Count successful periods
successful_periods = [r for r in period_results if r["success"]]
print(f"Successfully processed {len(successful_periods)} out of {len(periods)} time periods")

[Image: January 2023 NDVI]

January 2023

[Image: June 2023 NDVI]

June 2023

[Image: December 2023 NDVI]

December 2023

Aggregating and Visualizing Results

After batch processing, you'll want to aggregate and visualize the results:

Comparing Multiple AOIs
# Create a bar chart comparing average NDVI across AOIs
plt.figure(figsize=(12, 6))
plt.bar(results_df['aoi_name'], results_df['avg_ndvi'])
plt.xlabel('Area of Interest')
plt.ylabel('Average NDVI')
plt.title('Average NDVI Comparison Across Areas')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# Create a summary table
summary_table = results_df[['aoi_name', 'avg_ndvi', 'min_ndvi', 'max_ndvi', 'image_count']]
print(summary_table.to_string(index=False))
Creating a Time Series Animation
# Create an animation from the monthly images
import imageio
import glob

# Get all the generated images
image_files = sorted(glob.glob('ndvi_*.png'))

# Create a GIF animation
with imageio.get_writer('ndvi_animation.gif', mode='I', duration=0.5) as writer:
    for filename in image_files:
        image = imageio.imread(filename)
        writer.append_data(image)

print(f"Created animation with {len(image_files)} frames: ndvi_animation.gif")

Best Practices for Batch Processing

Error Handling
  • Always implement robust error handling
  • Log errors for later investigation
  • Continue processing even if some items fail
  • Implement retries for transient failures
Performance Optimization
  • Use parallel processing but don't overwhelm the API
  • Implement rate limiting to avoid API throttling
  • Process in batches of reasonable size
  • Monitor memory usage for large datasets
Data Management
  • Save intermediate results to avoid reprocessing
  • Use consistent naming conventions
  • Implement checkpointing for long-running processes
  • Clean up temporary files when done
Reproducibility
  • Document all processing parameters
  • Use version control for your scripts
  • Record API versions and dependencies
  • Save processing metadata with results

Next Steps