import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString, Polygon
from scipy.spatial import cKDTree
from sklearn.ensemble import RandomForestRegressor
import folium
from folium.plugins import MarkerCluster, HeatMap
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
class AdvancedSpatialIntelligence:
"""
Advanced spatial intelligence system with creative problem-solving capabilities.
Combines multiple analytical approaches for comprehensive spatial understanding.
"""
def __init__(self, data_sources=None):
self.data_sources = data_sources or {}
self.analysis_cache = {}
self.insights = []
def creative_spatial_analysis(self, points_data, polygons_data, analysis_type='comprehensive'):
"""
Perform creative spatial analysis combining multiple approaches.
Args:
points_data: Point features for analysis
polygons_data: Polygon features for context
analysis_type: Type of analysis to perform
"""
# Multi-approach analysis pipeline
results = {
'proximity_analysis': self._proximity_analysis(points_data, polygons_data),
'density_analysis': self._density_analysis(points_data),
'pattern_analysis': self._pattern_recognition(points_data),
'predictive_analysis': self._predictive_modeling(points_data, polygons_data),
'interactive_visualization': self._create_advanced_visualization(points_data, polygons_data)
}
# Generate insights using creative reasoning
insights = self._generate_creative_insights(results)
return {
'results': results,
'insights': insights,
'recommendations': self._generate_recommendations(insights)
}
def _proximity_analysis(self, points, polygons):
"""Creative proximity analysis with multiple distance metrics."""
if len(points) == 0 or len(polygons) == 0:
return {}
# Convert to GeoDataFrames
points_gdf = gpd.GeoDataFrame(points)
polygons_gdf = gpd.GeoDataFrame(polygons)
# Multiple proximity metrics
proximity_metrics = {}
# Nearest polygon for each point
nearest_polygons = []
distances = []
for idx, point in points_gdf.iterrows():
point_geom = point.geometry
min_distance = float('inf')
nearest_polygon = None
for poly_idx, polygon in polygons_gdf.iterrows():
distance = point_geom.distance(polygon.geometry)
if distance < min_distance:
min_distance = distance
nearest_polygon = poly_idx
nearest_polygons.append(nearest_polygon)
distances.append(min_distance)
proximity_metrics['nearest_polygon'] = nearest_polygons
proximity_metrics['distances'] = distances
proximity_metrics['avg_distance'] = np.mean(distances)
proximity_metrics['distance_std'] = np.std(distances)
# Spatial clustering analysis
if len(points) > 1:
coords = np.array([[p.x, p.y] for p in points_gdf.geometry])
tree = cKDTree(coords)
# Find points within various radii
radii = [100, 500, 1000, 5000] # meters
clustering_metrics = {}
for radius in radii:
neighbors = tree.query_ball_tree(tree, radius)
cluster_sizes = [len(n) for n in neighbors]
clustering_metrics[f'radius_{radius}m'] = {
'avg_cluster_size': np.mean(cluster_sizes),
'max_cluster_size': np.max(cluster_sizes),
'clusters_count': len([c for c in cluster_sizes if c > 1])
}
proximity_metrics['clustering'] = clustering_metrics
return proximity_metrics
def _density_analysis(self, points):
"""Creative density analysis with adaptive binning."""
if len(points) == 0:
return {}
points_gdf = gpd.GeoDataFrame(points)
bounds = points_gdf.total_bounds
# Adaptive grid based on point distribution
x_range = bounds[2] - bounds[0]
y_range = bounds[3] - bounds[1]
# Calculate optimal grid size based on point density
optimal_cells = min(50, max(10, int(np.sqrt(len(points) / 2))))
cell_size_x = x_range / optimal_cells
cell_size_y = y_range / optimal_cells
# Create grid
x_coords = np.arange(bounds[0], bounds[2] + cell_size_x, cell_size_x)
y_coords = np.arange(bounds[1], bounds[3] + cell_size_y, cell_size_y)
# Count points in each cell
density_grid = np.zeros((len(y_coords) - 1, len(x_coords) - 1))
for point in points_gdf.geometry:
x_idx = int((point.x - bounds[0]) / cell_size_x)
y_idx = int((point.y - bounds[1]) / cell_size_y)
if 0 <= x_idx < len(x_coords) - 1 and 0 <= y_idx < len(y_coords) - 1:
density_grid[y_idx, x_idx] += 1
return {
'density_grid': density_grid,
'cell_size': (cell_size_x, cell_size_y),
'bounds': bounds,
'max_density': np.max(density_grid),
'avg_density': np.mean(density_grid),
'density_std': np.std(density_grid),
'hotspots': self._identify_hotspots(density_grid)
}
def _pattern_recognition(self, points):
"""Creative pattern recognition in spatial data."""
if len(points) < 3:
return {}
points_gdf = gpd.GeoDataFrame(points)
coords = np.array([[p.x, p.y] for p in points_gdf.geometry])
patterns = {}
# Directional patterns
if len(coords) > 1:
vectors = np.diff(coords, axis=0)
angles = np.arctan2(vectors[:, 1], vectors[:, 0])
# Circular statistics
mean_angle = np.arctan2(np.mean(np.sin(angles)), np.mean(np.cos(angles)))
angular_variance = 1 - np.sqrt(np.mean(np.cos(angles))**2 + np.mean(np.sin(angles))**2)
patterns['directional'] = {
'mean_angle_rad': mean_angle,
'mean_angle_deg': np.degrees(mean_angle),
'angular_variance': angular_variance,
'directional_trend': 'strong' if angular_variance < 0.3 else 'weak'
}
# Spatial autocorrelation
if len(coords) > 2:
# Calculate distances between all points
distances = []
for i in range(len(coords)):
for j in range(i + 1, len(coords)):
dist = np.linalg.norm(coords[i] - coords[j])
distances.append(dist)
patterns['spatial_autocorrelation'] = {
'avg_distance': np.mean(distances),
'distance_std': np.std(distances),
'clustering_tendency': 'high' if np.std(distances) < np.mean(distances) * 0.5 else 'low'
}
return patterns
def _predictive_modeling(self, points, polygons):
"""Creative predictive modeling for spatial data."""
if len(points) < 10:
return {}
points_gdf = gpd.GeoDataFrame(points)
polygons_gdf = gpd.GeoDataFrame(polygons)
# Prepare features for prediction
features = []
targets = []
for idx, point in points_gdf.iterrows():
# Extract features
point_features = [
point.geometry.x,
point.geometry.y,
# Add any additional point attributes here
]
# Add polygon context features
for poly_idx, polygon in polygons_gdf.iterrows():
distance = point.geometry.distance(polygon.geometry)
point_features.append(distance)
# Add polygon attributes as features
# point_features.extend([polygon.get('attr1', 0), polygon.get('attr2', 0)])
features.append(point_features)
# Set target variable (modify based on your data)
targets.append(point.get('target_value', 0))
# Train predictive model
if len(features) > 0 and len(set(targets)) > 1:
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(features, targets)
# Feature importance
feature_importance = model.feature_importances_
return {
'model': model,
'feature_importance': feature_importance,
'prediction_accuracy': model.score(features, targets),
'feature_names': ['x', 'y'] + [f'polygon_{i}_distance' for i in range(len(polygons_gdf))]
}
return {}
def _create_advanced_visualization(self, points, polygons):
"""Create advanced interactive visualization."""
points_gdf = gpd.GeoDataFrame(points)
polygons_gdf = gpd.GeoDataFrame(polygons)
# Calculate center for map
all_geoms = list(points_gdf.geometry) + list(polygons_gdf.geometry)
center_lat = np.mean([g.y for g in all_geoms])
center_lon = np.mean([g.x for g in all_geoms])
# Create base map
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=12,
tiles='cartodbpositron'
)
# Add polygons with styling
if len(polygons_gdf) > 0:
folium.GeoJson(
polygons_gdf,
name='Polygons',
style_function=lambda x: {
'fillColor': '#3186cc',
'color': '#000000',
'weight': 2,
'fillOpacity': 0.3
},
popup=folium.GeoJsonPopup(fields=list(polygons_gdf.columns))
).add_to(m)
# Add points with clustering
if len(points_gdf) > 0:
marker_cluster = MarkerCluster(name='Points').add_to(m)
for idx, point in points_gdf.iterrows():
folium.Marker(
location=[point.geometry.y, point.geometry.x],
popup=folium.Popup(
f"<b>Point {idx}</b><br>" +
"<br>".join([f"{k}: {v}" for k, v in point.items() if k != 'geometry']),
parse_html=True
),
icon=folium.Icon(color='red', icon='info-sign')
).add_to(marker_cluster)
# Add layer control
folium.LayerControl().add_to(m)
return m
def _generate_creative_insights(self, results):
"""Generate creative insights from analysis results."""
insights = []
# Proximity insights
if 'proximity_analysis' in results:
prox = results['proximity_analysis']
if 'avg_distance' in prox:
if prox['avg_distance'] < 1000:
insights.append("Points show high spatial clustering, suggesting concentrated activity patterns.")
elif prox['avg_distance'] > 5000:
insights.append("Points are widely dispersed, indicating distributed or sparse activity.")
# Density insights
if 'density_analysis' in results:
density = results['density_analysis']
if 'hotspots' in density and len(density['hotspots']) > 0:
insights.append(f"Identified {len(density['hotspots'])} high-density hotspots that may represent activity centers.")
# Pattern insights
if 'pattern_analysis' in results:
pattern = results['pattern_analysis']
if 'directional' in pattern:
if pattern['directional']['directional_trend'] == 'strong':
insights.append("Strong directional pattern detected, suggesting linear or corridor-based activity.")
# Predictive insights
if 'predictive_analysis' in results:
pred = results['predictive_analysis']
if 'prediction_accuracy' in pred and pred['prediction_accuracy'] > 0.7:
insights.append("High prediction accuracy suggests strong spatial relationships in the data.")
return insights
def _generate_recommendations(self, insights):
"""Generate actionable recommendations based on insights."""
recommendations = []
for insight in insights:
if "high spatial clustering" in insight:
recommendations.append("Consider implementing targeted interventions in clustered areas.")
elif "widely dispersed" in insight:
recommendations.append("Explore strategies for connecting dispersed activities or services.")
elif "hotspots" in insight:
recommendations.append("Focus resources on identified hotspots for maximum impact.")
elif "directional pattern" in insight:
recommendations.append("Investigate linear infrastructure or transportation corridors.")
elif "high prediction accuracy" in insight:
recommendations.append("Leverage predictive models for planning and resource allocation.")
return recommendations
# Usage example
if __name__ == "__main__":
# Sample data
points_data = [
{'geometry': Point(0, 0), 'value': 10},
{'geometry': Point(1, 1), 'value': 20},
{'geometry': Point(2, 2), 'value': 15},
# Add more points...
]
polygons_data = [
{'geometry': Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]), 'type': 'zone_a'},
{'geometry': Polygon([(1, 1), (2, 1), (2, 2), (1, 2)]), 'type': 'zone_b'},
# Add more polygons...
]
# Initialize and run analysis
spatial_intelligence = AdvancedSpatialIntelligence()
results = spatial_intelligence.creative_spatial_analysis(points_data, polygons_data)
print("Analysis completed!")
print("Insights:", results['insights'])
print("Recommendations:", results['recommendations'])