The most comprehensive Python SDK for e-commerce product data and pricing intelligence.
Access real-time product information, pricing data, and historical trends across thousands of retailers and millions of products with the official ShopSavvy Data API.
pip install shopsavvy-sdk
- π Visit shopsavvy.com/data
- π Sign up for a free account
- π³ Choose a subscription plan
- π Get your API key from the dashboard
from shopsavvy import create_client
# Initialize the client
api = create_client("ss_live_your_api_key_here")
# Look up any product by barcode, ASIN, or URL
product = api.get_product_details("012345678901")
print(f"π¦ {product.data.name} by {product.data.brand}")
# Get current prices from all retailers
offers = api.get_current_offers("012345678901")
cheapest = min(offers.data, key=lambda x: x.price)
print(f"π° Best price: ${cheapest.price} at {cheapest.retailer}")
# Set up price monitoring
api.schedule_product_monitoring("012345678901", "daily")
print("π Price alerts activated!")
Feature | Description | Use Cases |
---|---|---|
π Universal Product Lookup | Search by barcode, ASIN, URL, model number | Product catalogs, inventory management |
π² Real-Time Pricing | Current prices across major retailers | Price comparison, competitive analysis |
π Historical Data | Price trends and availability over time | Market research, pricing strategy |
π Smart Monitoring | Automated price tracking and alerts | Price drops, stock notifications |
πͺ Multi-Retailer Support | Amazon, Walmart, Target, Best Buy + more | Comprehensive market coverage |
β‘ Batch Operations | Process multiple products efficiently | Bulk analysis, data processing |
π‘οΈ Type Safety | Full Pydantic models with validation | Reliable data structures |
π Multiple Formats | JSON and CSV response options | Easy data integration |
pip install shopsavvy-sdk
git clone https://github.com/shopsavvy/sdk-python
cd sdk-python
pip install -e ".[dev]"
# Optional: Store your API key securely
export SHOPSAVVY_API_KEY="ss_live_your_api_key_here"
from shopsavvy import create_client
# Basic setup
api = create_client("ss_live_your_api_key_here")
# With custom timeout and base URL
api = create_client(
api_key="ss_live_your_api_key_here",
timeout=60.0,
base_url="https://api.shopsavvy.com/v1"
)
from shopsavvy import ShopSavvyDataAPI, ShopSavvyConfig
config = ShopSavvyConfig(
api_key="ss_live_your_api_key_here",
timeout=45.0
)
api = ShopSavvyDataAPI(config)
# Automatically closes connections when done
with create_client("ss_live_your_api_key_here") as api:
product = api.get_product_details("012345678901")
print(product.data.name)
# Connection automatically closed here
# Search by barcode (UPC/EAN)
product = api.get_product_details("012345678901")
# Search by Amazon ASIN
amazon_product = api.get_product_details("B08N5WRWNW")
# Search by product URL
url_product = api.get_product_details("https://www.amazon.com/dp/B08N5WRWNW")
# Search by model number
model_product = api.get_product_details("iPhone-14-Pro")
# Access product information
print(f"π¦ Product: {product.data.name}")
print(f"π·οΈ Brand: {product.data.brand}")
print(f"π Category: {product.data.category}")
print(f"π’ Product ID: {product.data.product_id}")
print(f"π· Image: {product.data.image_url}")
# Look up multiple products at once
identifiers = [
"012345678901", # Barcode
"B08N5WRWNW", # Amazon ASIN
"https://www.target.com/p/example", # URL
"MODEL-ABC123" # Model number
]
products = api.get_product_details_batch(identifiers)
for product in products.data:
print(f"β
Found: {product.name} by {product.brand}")
print(f" ID: {product.product_id}")
print(f" Category: {product.category}")
print("---")
# Get product data in CSV format for easy processing
product_csv = api.get_product_details("012345678901", format="csv")
# Process with pandas
import pandas as pd
import io
df = pd.read_csv(io.StringIO(product_csv.data))
print(df.head())
# Get prices from all retailers
offers = api.get_current_offers("012345678901")
print(f"Found {len(offers.data)} offers:")
for offer in offers.data:
print(f"πͺ {offer.retailer}: ${offer.price}")
print(f" π¦ Condition: {offer.condition}")
print(f" β
Available: {offer.availability}")
print(f" π Buy: {offer.url}")
if offer.shipping:
print(f" π Shipping: ${offer.shipping}")
print("---")
# Get offers from specific retailers
amazon_offers = api.get_current_offers("012345678901", retailer="amazon")
walmart_offers = api.get_current_offers("012345678901", retailer="walmart")
target_offers = api.get_current_offers("012345678901", retailer="target")
print("Amazon prices:")
for offer in amazon_offers.data:
print(f" ${offer.price} - {offer.condition}")
# Get current offers for multiple products
products = ["012345678901", "B08N5WRWNW", "045496596439"]
batch_offers = api.get_current_offers_batch(products)
for identifier, offers in batch_offers.data.items():
best_price = min(offers, key=lambda x: x.price) if offers else None
if best_price:
print(f"{identifier}: Best price ${best_price.price} at {best_price.retailer}")
else:
print(f"{identifier}: No offers found")
from datetime import datetime, timedelta
# Get 30 days of price history
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
history = api.get_price_history("012345678901", start_date, end_date)
for offer in history.data:
print(f"πͺ {offer.retailer}:")
print(f" π° Current price: ${offer.price}")
print(f" π Historical points: {len(offer.price_history)}")
if offer.price_history:
prices = [point.price for point in offer.price_history]
print(f" π Lowest: ${min(prices)}")
print(f" π Highest: ${max(prices)}")
print(f" π Average: ${sum(prices) / len(prices):.2f}")
print("---")
# Get price history from Amazon only
amazon_history = api.get_price_history(
"012345678901",
"2024-01-01",
"2024-01-31",
retailer="amazon"
)
for offer in amazon_history.data:
print(f"Amazon price trends for {offer.retailer}:")
for point in offer.price_history[-10:]: # Last 10 data points
print(f" {point.date}: ${point.price} ({point.availability})")
# Monitor daily across all retailers
result = api.schedule_product_monitoring("012345678901", "daily")
if result.data.get("scheduled"):
print("β
Daily monitoring activated!")
# Monitor hourly at specific retailer
result = api.schedule_product_monitoring(
"012345678901",
"hourly",
retailer="amazon"
)
print(f"Amazon monitoring: {result.data}")
# Schedule multiple products for monitoring
products_to_monitor = [
"012345678901",
"B08N5WRWNW",
"045496596439"
]
batch_result = api.schedule_product_monitoring_batch(products_to_monitor, "daily")
for item in batch_result.data:
if item.get('scheduled'):
print(f"β
Monitoring activated for {item['identifier']}")
else:
print(f"β Failed to monitor {item['identifier']}")
# View all monitored products
scheduled = api.get_scheduled_products()
print(f"π Currently monitoring {len(scheduled.data)} products:")
for product in scheduled.data:
print(f"π {product.identifier}")
print(f" π
Frequency: {product.frequency}")
print(f" πͺ Retailer: {product.retailer or 'All retailers'}")
print(f" π
Created: {product.created_at}")
if product.last_refreshed:
print(f" π Last refresh: {product.last_refreshed}")
print("---")
# Remove products from monitoring
api.remove_product_from_schedule("012345678901")
print("ποΈ Removed from monitoring")
# Remove multiple products
api.remove_products_from_schedule(["012345678901", "B08N5WRWNW"])
print("ποΈ Batch removal complete")
# Check your API usage
usage = api.get_usage()
print("π API Usage Summary:")
print(f"π³ Plan: {usage.data.plan_name}")
print(f"β
Credits used: {usage.data.credits_used:,}")
print(f"π Credits remaining: {usage.data.credits_remaining:,}")
print(f"π Total credits: {usage.data.credits_total:,}")
print(f"π
Billing period: {usage.data.billing_period_start} to {usage.data.billing_period_end}")
# Calculate usage percentage
usage_percent = (usage.data.credits_used / usage.data.credits_total) * 100
print(f"π Usage: {usage_percent:.1f}%")
def find_best_deals(identifier: str, max_results: int = 5):
"""Find the best deals for a product across all retailers"""
try:
# Get product info
product = api.get_product_details(identifier)
print(f"π Searching deals for: {product.data.name}")
print(f"π¦ Brand: {product.data.brand}")
print("=" * 50)
# Get all current offers
offers = api.get_current_offers(identifier)
if not offers.data:
print("β No offers found")
return
# Filter and sort offers
available_offers = [
offer for offer in offers.data
if offer.availability == "in_stock"
]
if not available_offers:
print("β No in-stock offers found")
return
# Sort by total cost (price + shipping)
def total_cost(offer):
return offer.price + (offer.shipping or 0)
sorted_offers = sorted(available_offers, key=total_cost)[:max_results]
print(f"π Top {len(sorted_offers)} Deals:")
for i, offer in enumerate(sorted_offers, 1):
total = total_cost(offer)
print(f"{i}. πͺ {offer.retailer}")
print(f" π° Price: ${offer.price}")
if offer.shipping:
print(f" π Shipping: ${offer.shipping}")
print(f" π³ Total: ${total}")
print(f" π¦ Condition: {offer.condition}")
print(f" π Buy now: {offer.url}")
print("---")
# Calculate savings
if len(sorted_offers) > 1:
cheapest = total_cost(sorted_offers[0])
most_expensive = total_cost(sorted_offers[-1])
savings = most_expensive - cheapest
print(f"π° Potential savings: ${savings:.2f}")
except Exception as e:
print(f"β Error: {e}")
# Usage
find_best_deals("012345678901")
import time
from datetime import datetime
class PriceAlertBot:
def __init__(self, api_client):
self.api = api_client
self.alerts = {} # identifier -> target_price
def add_alert(self, identifier: str, target_price: float):
"""Add a price alert for a product"""
self.alerts[identifier] = target_price
# Schedule monitoring
self.api.schedule_product_monitoring(identifier, "daily")
print(f"π Alert set: {identifier} @ ${target_price}")
def check_alerts(self):
"""Check all price alerts"""
print(f"π Checking {len(self.alerts)} price alerts...")
for identifier, target_price in self.alerts.items():
try:
offers = self.api.get_current_offers(identifier)
if not offers.data:
continue
# Find best available offer
best_offer = min(
[o for o in offers.data if o.availability == "in_stock"],
key=lambda x: x.price,
default=None
)
if best_offer and best_offer.price <= target_price:
self.trigger_alert(identifier, best_offer, target_price)
except Exception as e:
print(f"β Error checking {identifier}: {e}")
def trigger_alert(self, identifier: str, offer, target_price: float):
"""Trigger price alert notification"""
product = self.api.get_product_details(identifier)
print("π¨" * 10)
print("π° PRICE ALERT TRIGGERED!")
print(f"π¦ Product: {product.data.name}")
print(f"π― Target: ${target_price}")
print(f"πΈ Current: ${offer.price} at {offer.retailer}")
print(f"β
Savings: ${target_price - offer.price:.2f}")
print(f"π Buy now: {offer.url}")
print("π¨" * 10)
# Remove alert after triggering
del self.alerts[identifier]
# Usage
alert_bot = PriceAlertBot(api)
alert_bot.add_alert("012345678901", 199.99)
alert_bot.add_alert("B08N5WRWNW", 299.99)
# Run periodic checks
alert_bot.check_alerts()
import statistics
from collections import defaultdict
def analyze_market_trends(identifiers: list, days: int = 30):
"""Comprehensive market analysis for multiple products"""
from datetime import datetime, timedelta
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
print(f"π Market Analysis Report ({days} days)")
print("=" * 50)
for identifier in identifiers:
try:
# Get product info
product = api.get_product_details(identifier)
print(f"\\nπ¦ {product.data.name}")
print(f"π·οΈ {product.data.brand} | {product.data.category}")
print("-" * 40)
# Get price history
history = api.get_price_history(identifier, start_date, end_date)
retailer_stats = {}
for offer in history.data:
if not offer.price_history:
continue
prices = [point.price for point in offer.price_history]
retailer_stats[offer.retailer] = {
'current_price': offer.price,
'avg_price': statistics.mean(prices),
'min_price': min(prices),
'max_price': max(prices),
'volatility': statistics.stdev(prices) if len(prices) > 1 else 0,
'data_points': len(prices),
'trend': calculate_trend(prices)
}
# Display results
if retailer_stats:
print("πͺ Retailer Analysis:")
for retailer, stats in sorted(retailer_stats.items()):
print(f" {retailer}:")
print(f" π° Current: ${stats['current_price']}")
print(f" π Average: ${stats['avg_price']:.2f}")
print(f" π Min: ${stats['min_price']} | π Max: ${stats['max_price']}")
print(f" π Trend: {stats['trend']}")
print(f" π Data points: {stats['data_points']}")
# Find best value
best_retailer = min(retailer_stats.items(), key=lambda x: x[1]['current_price'])
print(f"\\nπ Best Price: {best_retailer[0]} @ ${best_retailer[1]['current_price']}")
else:
print("β No price history available")
except Exception as e:
print(f"β Error analyzing {identifier}: {e}")
def calculate_trend(prices: list) -> str:
"""Calculate price trend direction"""
if len(prices) < 2:
return "Unknown"
recent = prices[-7:] # Last week
older = prices[:-7] # Everything else
if not older:
return "New"
recent_avg = statistics.mean(recent)
older_avg = statistics.mean(older)
if recent_avg > older_avg * 1.05: # 5% threshold
return "π Rising"
elif recent_avg < older_avg * 0.95:
return "π Falling"
else:
return "β‘οΈ Stable"
# Usage
products_to_analyze = [
"012345678901",
"B08N5WRWNW",
"045496596439"
]
analyze_market_trends(products_to_analyze, days=60)
def bulk_product_manager(csv_file_path: str):
"""Manage products from CSV file"""
import csv
print("π Loading products from CSV...")
products = []
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
products.append({
'identifier': row['identifier'],
'target_price': float(row.get('target_price', 0)),
'monitor': row.get('monitor', 'true').lower() == 'true'
})
print(f"π Processing {len(products)} products...")
# Batch lookup
identifiers = [p['identifier'] for p in products]
try:
product_details = api.get_product_details_batch(identifiers)
current_offers = api.get_current_offers_batch(identifiers)
results = []
for product, details in zip(products, product_details.data):
offers = current_offers.data.get(product['identifier'], [])
best_price = min([o.price for o in offers if o.availability == "in_stock"], default=None)
result = {
'identifier': product['identifier'],
'name': details.name,
'brand': details.brand,
'target_price': product['target_price'],
'current_best_price': best_price,
'price_alert': best_price <= product['target_price'] if best_price else False,
'offers_count': len(offers)
}
results.append(result)
# Setup monitoring if requested
if product['monitor']:
api.schedule_product_monitoring(product['identifier'], "daily")
# Generate report
print("\\nπ Bulk Analysis Report:")
print("=" * 80)
for result in results:
status = "π¨ ALERT" if result['price_alert'] else "π TRACKING"
print(f"{status} | {result['name']} by {result['brand']}")
print(f" π― Target: ${result['target_price']} | π° Current: ${result['current_best_price'] or 'N/A'}")
print(f" πͺ Offers: {result['offers_count']}")
print()
except Exception as e:
print(f"β Error processing bulk products: {e}")
# Usage
# bulk_product_manager("my_products.csv")
def export_product_data(identifiers: list, format: str = "json"):
"""Export product data in various formats"""
import json
import csv
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if format.lower() == "json":
# Export as JSON
products = api.get_product_details_batch(identifiers)
offers = api.get_current_offers_batch(identifiers)
export_data = {
"exported_at": datetime.now().isoformat(),
"products": []
}
for product in products.data:
product_offers = offers.data.get(product.product_id, [])
export_data["products"].append({
"product": product.dict(),
"offers": [offer.dict() for offer in product_offers]
})
filename = f"shopsavvy_export_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"β
Exported {len(products.data)} products to {filename}")
elif format.lower() == "csv":
# Export as CSV
filename = f"shopsavvy_export_{timestamp}.csv"
with open(filename, 'w', newline='') as csvfile:
fieldnames = ['product_id', 'name', 'brand', 'category', 'barcode',
'retailer', 'price', 'availability', 'condition', 'url']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for identifier in identifiers:
try:
product = api.get_product_details(identifier)
offers = api.get_current_offers(identifier)
for offer in offers.data:
writer.writerow({
'product_id': product.data.product_id,
'name': product.data.name,
'brand': product.data.brand,
'category': product.data.category,
'barcode': product.data.barcode,
'retailer': offer.retailer,
'price': offer.price,
'availability': offer.availability,
'condition': offer.condition,
'url': offer.url
})
except Exception as e:
print(f"β Error exporting {identifier}: {e}")
print(f"β
Exported data to {filename}")
# Usage
export_product_data(["012345678901", "B08N5WRWNW"], format="json")
export_product_data(["012345678901", "B08N5WRWNW"], format="csv")
from shopsavvy import (
APIError,
AuthenticationError,
RateLimitError,
NotFoundError,
ValidationError,
TimeoutError,
NetworkError
)
def robust_product_lookup(identifier: str):
"""Example of robust error handling"""
try:
product = api.get_product_details(identifier)
offers = api.get_current_offers(identifier)
print(f"β
Success: {product.data.name}")
print(f"π° Found {len(offers.data)} offers")
return product, offers
except AuthenticationError:
print("β Authentication failed - check your API key")
print("π Get your key at: https://shopsavvy.com/data/dashboard")
except NotFoundError:
print(f"β Product not found: {identifier}")
print("π‘ Try a different identifier (barcode, ASIN, URL)")
except RateLimitError:
print("β³ Rate limit exceeded - please slow down")
print("π‘ Consider upgrading your plan for higher limits")
time.sleep(60) # Wait before retrying
except ValidationError as e:
print(f"β Invalid request: {e}")
print("π‘ Check your parameters and try again")
except TimeoutError:
print("β±οΈ Request timeout - API might be slow")
print("π‘ Try increasing timeout or retry later")
except NetworkError as e:
print(f"π Network error: {e}")
print("π‘ Check your internet connection")
except APIError as e:
print(f"π¨ API Error: {e}")
print("π‘ This might be a temporary issue")
return None, None
# Usage with retry logic
def lookup_with_retry(identifier: str, max_retries: int = 3):
"""Lookup with automatic retry on failures"""
for attempt in range(max_retries):
try:
return api.get_product_details(identifier)
except (TimeoutError, NetworkError) as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"β³ Retry {attempt + 1}/{max_retries} in {wait_time}s...")
time.sleep(wait_time)
else:
raise e
import time
from functools import wraps
def rate_limit(calls_per_second: float = 10):
"""Decorator to rate limit function calls"""
min_interval = 1.0 / calls_per_second
last_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
last_called[0] = time.time()
return ret
return wrapper
return decorator
# Rate-limited API calls
@rate_limit(calls_per_second=5) # Max 5 calls per second
def safe_get_offers(identifier: str):
return api.get_current_offers(identifier)
# Batch processing with rate limiting
def process_products_safely(identifiers: list):
"""Process products with automatic rate limiting"""
results = []
total = len(identifiers)
for i, identifier in enumerate(identifiers, 1):
print(f"π Processing {i}/{total}: {identifier}")
try:
offers = safe_get_offers(identifier)
results.append((identifier, offers))
print(f" β
Found {len(offers.data)} offers")
except Exception as e:
print(f" β Error: {e}")
results.append((identifier, None))
return results
import os
from dataclasses import dataclass
@dataclass
class ShopSavvySettings:
"""Application settings"""
api_key: str
timeout: float = 30.0
max_retries: int = 3
rate_limit: float = 10.0 # calls per second
@classmethod
def from_env(cls):
"""Load settings from environment variables"""
api_key = os.getenv("SHOPSAVVY_API_KEY")
if not api_key:
raise ValueError("SHOPSAVVY_API_KEY environment variable required")
return cls(
api_key=api_key,
timeout=float(os.getenv("SHOPSAVVY_TIMEOUT", "30.0")),
max_retries=int(os.getenv("SHOPSAVVY_MAX_RETRIES", "3")),
rate_limit=float(os.getenv("SHOPSAVVY_RATE_LIMIT", "10.0"))
)
# Usage
settings = ShopSavvySettings.from_env()
api = create_client(settings.api_key, timeout=settings.timeout)
# Install development dependencies
pip install -e ".[dev]"
# Run all tests
pytest
# Run with coverage
pytest --cov=shopsavvy
# Run specific test file
pytest tests/test_client.py
# Run with verbose output
pytest -v
# Format code
black src tests
# Sort imports
isort src tests
# Type checking
mypy src
# Linting
flake8 src tests
import pytest
from unittest.mock import Mock, patch
from shopsavvy import create_client, AuthenticationError
def test_client_creation():
"""Test client creation with valid API key"""
api = create_client("ss_test_valid_key")
assert api is not None
def test_invalid_api_key():
"""Test invalid API key handling"""
with pytest.raises(ValueError):
create_client("invalid_key_format")
@patch('shopsavvy.client.httpx.Client.request')
def test_product_lookup(mock_request):
"""Test product lookup with mocked response"""
# Mock successful response
mock_response = Mock()
mock_response.status_code = 200
mock_response.is_success = True
mock_response.json.return_value = {
"success": True,
"data": {
"product_id": "12345",
"name": "Test Product",
"brand": "Test Brand"
}
}
mock_request.return_value = mock_response
# Test the client
api = create_client("ss_test_valid_key")
product = api.get_product_details("012345678901")
assert product.success is True
assert product.data.name == "Test Product"
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application
COPY . .
# Set environment variables
ENV SHOPSAVVY_API_KEY="your_api_key_here"
ENV SHOPSAVVY_TIMEOUT="30.0"
# Run application
CMD ["python", "app.py"]
import json
import os
from shopsavvy import create_client
# Initialize client outside handler for connection reuse
api = create_client(os.environ['SHOPSAVVY_API_KEY'])
def lambda_handler(event, context):
"""AWS Lambda handler for product lookup"""
try:
identifier = event.get('identifier')
if not identifier:
return {
'statusCode': 400,
'body': json.dumps({'error': 'identifier required'})
}
# Get product data
product = api.get_product_details(identifier)
offers = api.get_current_offers(identifier)
# Find best price
best_offer = min(offers.data, key=lambda x: x.price) if offers.data else None
response = {
'product': {
'name': product.data.name,
'brand': product.data.brand,
'category': product.data.category
},
'best_price': {
'price': best_offer.price,
'retailer': best_offer.retailer,
'url': best_offer.url
} if best_offer else None,
'total_offers': len(offers.data)
}
return {
'statusCode': 200,
'body': json.dumps(response)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# Required
export SHOPSAVVY_API_KEY="ss_live_your_api_key_here"
# Optional
export SHOPSAVVY_TIMEOUT="30.0"
export SHOPSAVVY_BASE_URL="https://api.shopsavvy.com/v1"
export SHOPSAVVY_MAX_RETRIES="3"
class EcommerceIntegration:
"""Integration with e-commerce platforms"""
def __init__(self, api_key: str):
self.api = create_client(api_key)
def enrich_product_catalog(self, product_skus: list):
"""Enrich existing product catalog with market data"""
enriched_products = []
for sku in product_skus:
try:
# Get competitive pricing
offers = self.api.get_current_offers(sku)
competitor_prices = [
offer.price for offer in offers.data
if offer.retailer != "your-store"
]
enrichment = {
'sku': sku,
'competitor_count': len(competitor_prices),
'min_competitor_price': min(competitor_prices) if competitor_prices else None,
'avg_competitor_price': sum(competitor_prices) / len(competitor_prices) if competitor_prices else None,
'price_position': self.calculate_price_position(sku, competitor_prices)
}
enriched_products.append(enrichment)
except Exception as e:
print(f"β Error enriching {sku}: {e}")
return enriched_products
def calculate_price_position(self, sku: str, competitor_prices: list) -> str:
"""Calculate where your price stands vs competitors"""
if not competitor_prices:
return "no_competition"
your_price = self.get_your_price(sku) # Your implementation
if not your_price:
return "unknown"
cheaper_count = sum(1 for price in competitor_prices if price < your_price)
total_competitors = len(competitor_prices)
if cheaper_count == 0:
return "most_expensive"
elif cheaper_count == total_competitors:
return "cheapest"
elif cheaper_count < total_competitors / 3:
return "premium"
elif cheaper_count > total_competitors * 2/3:
return "budget"
else:
return "competitive"
class BusinessIntelligenceDashboard:
"""BI dashboard for retail insights"""
def __init__(self, api_key: str):
self.api = create_client(api_key)
def generate_market_report(self, category: str, time_period: int = 30):
"""Generate comprehensive market report"""
from datetime import datetime, timedelta
# Get category products (you'd have your own product database)
category_products = self.get_category_products(category)
report = {
'category': category,
'analysis_date': datetime.now().isoformat(),
'time_period_days': time_period,
'products_analyzed': len(category_products),
'insights': {}
}
# Analyze each product
price_trends = []
retailer_coverage = defaultdict(int)
availability_stats = defaultdict(int)
for product_id in category_products:
try:
# Get current market state
offers = self.api.get_current_offers(product_id)
for offer in offers.data:
retailer_coverage[offer.retailer] += 1
availability_stats[offer.availability] += 1
# Get price trends
start_date = (datetime.now() - timedelta(days=time_period)).strftime("%Y-%m-%d")
end_date = datetime.now().strftime("%Y-%m-%d")
history = self.api.get_price_history(product_id, start_date, end_date)
for offer_history in history.data:
if offer_history.price_history:
prices = [p.price for p in offer_history.price_history]
trend = self.calculate_trend_percentage(prices)
price_trends.append(trend)
except Exception as e:
print(f"β Error analyzing {product_id}: {e}")
# Compile insights
report['insights'] = {
'avg_price_trend': sum(price_trends) / len(price_trends) if price_trends else 0,
'top_retailers': dict(sorted(retailer_coverage.items(), key=lambda x: x[1], reverse=True)[:10]),
'availability_breakdown': dict(availability_stats),
'market_volatility': statistics.stdev(price_trends) if len(price_trends) > 1 else 0
}
return report
from flask import Flask, jsonify, request
from shopsavvy import create_client
app = Flask(__name__)
api = create_client(os.environ['SHOPSAVVY_API_KEY'])
@app.route('/api/product/scan', methods=['POST'])
def scan_product():
"""Handle barcode scans from mobile app"""
data = request.get_json()
barcode = data.get('barcode')
if not barcode:
return jsonify({'error': 'Barcode required'}), 400
try:
# Get product details
product = api.get_product_details(barcode)
# Get current offers
offers = api.get_current_offers(barcode)
# Find best deals
available_offers = [o for o in offers.data if o.availability == "in_stock"]
best_offer = min(available_offers, key=lambda x: x.price) if available_offers else None
response = {
'product': {
'name': product.data.name,
'brand': product.data.brand,
'image_url': product.data.image_url,
'category': product.data.category
},
'pricing': {
'best_price': best_offer.price if best_offer else None,
'best_retailer': best_offer.retailer if best_offer else None,
'buy_url': best_offer.url if best_offer else None,
'total_offers': len(available_offers),
'all_offers': [
{
'retailer': offer.retailer,
'price': offer.price,
'availability': offer.availability,
'condition': offer.condition,
'url': offer.url
}
for offer in available_offers[:5] # Top 5 offers
]
}
}
return jsonify(response)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/product/alerts', methods=['POST'])
def create_price_alert():
"""Create price alert for mobile users"""
data = request.get_json()
product_id = data.get('product_id')
target_price = data.get('target_price')
user_id = data.get('user_id') # Your user system
try:
# Schedule monitoring
result = api.schedule_product_monitoring(product_id, "daily")
# Store alert in your database
# store_price_alert(user_id, product_id, target_price)
return jsonify({
'success': True,
'message': 'Price alert created successfully',
'monitoring_active': result.data.get('scheduled', False)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
We welcome contributions! See CONTRIBUTING.md for guidelines.
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature
- Make your changes with tests
- Run the test suite:
pytest
- Submit a pull request
Resource | Link | Description |
---|---|---|
π API Documentation | shopsavvy.com/data/documentation | Complete API reference |
π Dashboard | shopsavvy.com/data/dashboard | Manage your API keys and usage |
π¬ Support | [email protected] | Get help from our team |
π Issues | GitHub Issues | Report bugs and request features |
π¦ PyPI | pypi.org/project/shopsavvy-sdk | Python package repository |
π Changelog | GitHub Releases | Version history and updates |
This project is licensed under the MIT License - see the LICENSE file for details.
ShopSavvy has been helping shoppers save money since 2008. With over 40 million downloads and millions of active users, we're the most trusted name in price comparison and shopping intelligence.
Our Data API provides the same powerful product data and pricing intelligence that powers our consumer app, now available to developers and businesses worldwide.
- β 13+ Years of e-commerce data expertise
- β Millions of Products across thousands of retailers
- β Real-Time Data updated continuously
- β Enterprise Scale trusted by major brands
- β Developer Friendly with comprehensive tools and support
π Ready to get started? Get your API key and start building amazing e-commerce applications today!
π¬ Need help? Contact us at [email protected] or visit shopsavvy.com/data for more information.