Este proyecto implementa un sistema de scraping distribuido para procesar grandes volúmenes de datos de empresas (100,000+) utilizando múltiples workers que pueden ejecutarse en diferentes máquinas. La solución utiliza Supabase como base de datos centralizada y Redis Cloud para coordinar el trabajo entre los diferentes nodos.
Arquitectura Distribuida del Sistema de Scraping Supabase Base de Datos Centralizada Almacena datos de empresas y resultados Redis Cloud Sistema de Colas Coordina tareas entre workers distribuidos Máquina 1 Worker 1 Worker 2 Máquina 2 Worker 3 Worker 4 Máquina 3 Worker 5 Worker 6 Dashboard Monitoreo Seguimiento en tiempo real Leyenda: Flujo de datos Consultas a BDEl sistema está compuesto por los siguientes componentes:
- Base de Datos Centralizada (Supabase): Almacena todos los datos de empresas y resultados de scraping.
- Sistema de Colas (Redis): Maneja la distribución y seguimiento de tareas.
- Worker Nodes: Nodos de procesamiento que ejecutan el scraping en paralelo.
- Dashboard de Monitoreo: Interfaz para seguir el progreso y rendimiento.
- Python 3.8+
- Cuenta en Supabase (Plan gratuito o de pago)
- Cuenta en Redis Cloud (Plan gratuito o de pago)
- Paquetes Python (ver
requirements.txt)
- Regístrate en Supabase
- Crea un nuevo proyecto
- Anota la URL, la clave API y los detalles de conexión PostgreSQL
- Regístrate en Redis Cloud
- Crea una base de datos (el plan gratuito es suficiente para empezar)
- Anota los detalles de conexión (host, puerto, contraseña)
Crea un archivo .env en la raíz del proyecto:
# Supabase
SUPABASE_URL=https://your-project-id.supabase.co
SUPABASE_KEY=your-anon-key
SUPABASE_SERVICE_KEY=your-service-key
SUPABASE_DB_HOST=your-project-id.supabase.co
SUPABASE_DB_PORT=5432
SUPABASE_DB_USER=postgres
SUPABASE_DB_PASSWORD=your-db-password
SUPABASE_DB_NAME=postgres
# Redis Cloud
REDIS_HOST=your-redis-host.redislabs.com
REDIS_PORT=15678
REDIS_PASSWORD=your-redis-password
REDIS_USERNAME=default
# Configuración general
MAX_WORKERS_PER_NODE=4
SCRAPING_RATE_LIMIT=60
Ejecuta el siguiente SQL en la consola SQL de Supabase:
CREATE TABLE sociedades (
id SERIAL PRIMARY KEY,
cod_infotel INTEGER NOT NULL,
nif VARCHAR(11),
razon_social VARCHAR(255),
domicilio VARCHAR(255),
cod_postal VARCHAR(5),
nom_poblacion VARCHAR(100),
nom_provincia VARCHAR(100),
url VARCHAR(255),
url_valida VARCHAR(255),
url_exists BOOLEAN DEFAULT FALSE NOT NULL,
url_limpia VARCHAR(255),
url_status INTEGER,
url_status_mensaje VARCHAR(255),
telefono_1 VARCHAR(16),
telefono_2 VARCHAR(16),
telefono_3 VARCHAR(16),
facebook VARCHAR(255),
twitter VARCHAR(255),
linkedin VARCHAR(255),
instagram VARCHAR(255),
youtube VARCHAR(255),
e_commerce BOOLEAN DEFAULT FALSE NOT NULL,
processed BOOLEAN DEFAULT FALSE NOT NULL,
worker_id VARCHAR(50),
fecha_actualizacion TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deleted BOOLEAN DEFAULT FALSE
);
-- Crear índice único
CREATE UNIQUE INDEX idx_sociedades_cod_infotel
ON sociedades(cod_infotel);pip install -r requirements.txtHay dos opciones para cargar y encolar tareas:
Para cargar un archivo Excel o CSV y encolar las empresas para procesamiento:
python load_and_enqueue.py datos_empresas.xlsx --batch-size 1000Opciones:
--batch-size: Tamaño de lote para encolar (por defecto: 1000)--reset: Reiniciar todas las colas antes de cargar
Para encolar empresas directamente desde la base de datos:
python distributed_scraping.py enqueue --limit 1000Opciones:
--limit: Número máximo de empresas a encolar
Hay dos opciones para ejecutar workers:
Cada colaborador puede ejecutar uno o más workers en su máquina:
python worker.py --max-tasks 5000Opciones:
--max-tasks: Número máximo de tareas a procesar por worker (por defecto: sin límite)--idle-timeout: Tiempo máximo de espera cuando no hay tareas (segundos, por defecto: 60)
Para ejecutar múltiples workers en una misma máquina:
# En terminales/consolas separadas
python worker.py
python worker.py
python worker.py
python worker.pyPara ejecutar un worker con el script consolidado:
python distributed_scraping.py workerOpciones:
--worker-id: ID personalizado para el worker (por defecto: se genera automáticamente)--max-tasks: Número máximo de tareas a procesar (por defecto: sin límite)--idle-timeout: Tiempo máximo de espera en segundos (por defecto: 60)
Ejemplo con ID personalizado:
python distributed_scraping.py worker --worker-id "maquina1_angel"Modo consola (rico):
python monitor.py --refresh-rate 3Dashboard web con Streamlit:
streamlit run dashboard.py| Comando | Descripción |
|---|---|
python load_and_enqueue.py <archivo> |
Carga datos desde un archivo y encola tareas |
python distributed_scraping.py enqueue --limit N |
Encola N empresas desde la base de datos |
python distributed_scraping.py worker |
Ejecuta un worker con el script consolidado |
python monitor.py |
Monitorea el progreso en consola |
streamlit run dashboard.py |
Lanza el dashboard web |
-
Distribución Recomendada:
- 4-8 workers por máquina (dependiendo de los recursos)
- Cada worker puede procesar aproximadamente 2-5 empresas por minuto
- Con 4 personas ejecutando 4 workers cada una: ~16 workers = ~32-80 empresas/minuto
- Tiempo estimado: 21-52 horas para 100,000 empresas
-
Optimización de Rate Limiting:
- Ajustar el parámetro
SCRAPING_RATE_LIMITen.envsegún la capacidad de tu red - Monitorizarlo para evitar bloqueos de IP
- Ajustar el parámetro
-
Tolerancia a Fallos:
- El sistema está diseñado para manejar caídas de workers
- Las tareas se recuperarán automáticamente si un worker falla
├── redis_config.py # Configuración de Redis
├──supabase_config.py # Configuración de Supabase
├──config.py # Configuración de timeouts
├── database_supabase.py # Gestor de base de datos Supabase
├── db_validator.py # Validación de datos
├── dashboard.py # Dashboard web con Streamlit
├── distributed_scraping.py # Script consolidado (enqueue y worker)
├── load_and_enqueue.py # Script para cargar datos y encolar
├── monitor.py # Monitor en consola
├── scraping_flow.py # Lógica de scraping
├── task.py # Definición de tareas
├── task_manager.py # Gestor de colas de tareas
├── worker.py # Worker para procesamiento distribuido
├── requirements.txt # Dependencias del proyecto
└── .env # Variables de entorno (no incluir en git)
# Clonar el repositorio si aún no lo has hecho
git clone <url-del-repositorio>
cd <directorio-del-repositorio>
# Crear y cambiar a nueva rama
git checkout -b feature/distributed-architecture
# Añadir archivos nuevos
git add .
# Commit de cambios
git commit -m "Implementación de arquitectura distribuida con Supabase y Redis"
# Subir la rama al repositorio remoto
git push -u origin feature/distributed-architecture- Verifica que las credenciales en
.envsean correctas - Asegúrate de que tu IP esté permitida en la configuración de Redis Cloud
- Verifica que las credenciales en
.envsean correctas - Asegúrate de que la política de PostgreSQL permita conexiones externas
- Ejecuta
python monitor.pypara verificar el estado de las colas - Asegúrate de que haya tareas en la cola
scraper:pending
- Verifica la configuración de rate limiting
- Monitoriza el uso de recursos (CPU, memoria, red) en los nodos de worker
- Asegúrate de que la clase
TaskManagerinicializa correctamente el atributoworker_id - Verifica que estás pasando el
worker_idcorrectamente al crear elTaskManager
- Implementar autenticación en el dashboard
- Añadir funcionalidad de re-intentar tareas fallidas
- Desarrollar sistema de notificaciones (email, Slack, etc.)
- Implementar mecanismos avanzados de detección de proxy para evitar bloqueos