import rasterio
import numpy as np
import matplotlib.pyplot as plt
from rasterio.plot import show
from sklearn.cluster import KMeans
import geopandas as gpd
from shapely.geometry import Point
import folium
from folium.plugins import HeatMap
class SpatialAnalysisPipeline:
"""
Advanced spatial analysis pipeline with multimodal data processing.
Handles raster, vector, and tabular data with comprehensive visualization.
"""
def __init__(self, raster_path=None, vector_path=None, tabular_data=None):
self.raster_data = None
self.vector_data = None
self.tabular_data = tabular_data
self.analysis_results = {}
if raster_path:
self.load_raster_data(raster_path)
if vector_path:
self.load_vector_data(vector_path)
def load_raster_data(self, raster_path):
"""Load and preprocess raster data with error handling."""
try:
with rasterio.open(raster_path) as src:
self.raster_data = {
'data': src.read(1), # Read first band
'transform': src.transform,
'crs': src.crs,
'bounds': src.bounds,
'meta': src.meta
}
print(f"Loaded raster: {src.width}x{src.height} pixels, CRS: {src.crs}")
except Exception as e:
print(f"Error loading raster data: {e}")
raise
def load_vector_data(self, vector_path):
"""Load vector data with spatial indexing."""
try:
self.vector_data = gpd.read_file(vector_path)
self.vector_data = self.vector_data.to_crs(epsg=4326) # Standardize CRS
print(f"Loaded vector data: {len(self.vector_data)} features")
except Exception as e:
print(f"Error loading vector data: {e}")
raise
def perform_clustering_analysis(self, n_clusters=5, method='kmeans'):
"""
Perform spatial clustering analysis on raster data.
Args:
n_clusters (int): Number of clusters
method (str): Clustering method ('kmeans', 'dbscan', 'hierarchical')
"""
if self.raster_data is None:
raise ValueError("No raster data loaded")
# Prepare data for clustering
valid_pixels = self.raster_data['data'][~np.isnan(self.raster_data['data'])]
valid_pixels = valid_pixels.reshape(-1, 1)
if method == 'kmeans':
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(valid_pixels)
# Create clustered raster
clustered_raster = np.full_like(self.raster_data['data'], np.nan)
valid_indices = ~np.isnan(self.raster_data['data'])
clustered_raster[valid_indices] = clusters
self.analysis_results['clustering'] = {
'method': method,
'n_clusters': n_clusters,
'clustered_raster': clustered_raster,
'cluster_centers': kmeans.cluster_centers_,
'inertia': kmeans.inertia_
}
return self.analysis_results['clustering']
def create_interactive_visualization(self, output_path='spatial_analysis.html'):
"""
Create comprehensive interactive visualization combining all data types.
"""
if not self.analysis_results:
raise ValueError("No analysis results available")
# Create base map
center_lat = (self.raster_data['bounds'].top + self.raster_data['bounds'].bottom) / 2
center_lon = (self.raster_data['bounds'].left + self.raster_data['bounds'].right) / 2
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=10,
tiles='cartodbpositron'
)
# Add vector data if available
if self.vector_data is not None:
folium.GeoJson(
self.vector_data,
name='Vector Features',
style_function=lambda x: {
'fillColor': '#3186cc',
'color': '#000000',
'weight': 2,
'fillOpacity': 0.7
},
popup=folium.GeoJsonPopup(fields=['name', 'type'])
).add_to(m)
# Add clustering results if available
if 'clustering' in self.analysis_results:
clustered_data = self.analysis_results['clustering']['clustered_raster']
# Convert raster to points for heatmap
rows, cols = np.where(~np.isnan(clustered_data))
heatmap_data = []
for row, col in zip(rows, cols):
# Convert pixel coordinates to geographic coordinates
lon, lat = rasterio.transform.xy(
self.raster_data['transform'], row, col
)
cluster_value = clustered_data[row, col]
heatmap_data.append([lat, lon, cluster_value])
HeatMap(
heatmap_data,
name='Clustering Results',
radius=15,
blur=10,
max_zoom=13
).add_to(m)
# Add layer control
folium.LayerControl().add_to(m)
# Save the map
m.save(output_path)
print(f"Interactive visualization saved to: {output_path}")
return m
def generate_analysis_report(self):
"""Generate comprehensive analysis report with visualizations."""
report = {
'summary': {
'raster_info': {
'dimensions': self.raster_data['data'].shape if self.raster_data else None,
'crs': self.raster_data['crs'] if self.raster_data else None,
'bounds': self.raster_data['bounds'] if self.raster_data else None
},
'vector_info': {
'feature_count': len(self.vector_data) if self.vector_data is not None else 0,
'geometry_types': list(self.vector_data.geom_type.unique()) if self.vector_data is not None else []
},
'analysis_performed': list(self.analysis_results.keys())
},
'results': self.analysis_results
}
return report
# Usage example
if __name__ == "__main__":
# Initialize pipeline
pipeline = SpatialAnalysisPipeline(
raster_path='satellite_image.tif',
vector_path='boundaries.geojson'
)
# Perform analysis
clustering_result = pipeline.perform_clustering_analysis(n_clusters=6)
# Create visualization
map_obj = pipeline.create_interactive_visualization()
# Generate report
report = pipeline.generate_analysis_report()
print("Analysis completed successfully!")