parse-number/main.py

419 lines
16 KiB
Python
Raw Normal View History

2026-03-10 15:59:20 +03:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
📞 DuckDuckGo/Yandex Phone Number Scraper v2.2
Извлекает ссылки следует редиректам (включая yabs) парсит телефоны сохраняет в Excel
+ Исправления: PoolTimeout, экспоненциальный бэк-офф, адаптивные лимиты, рандомизация
"""
import re
import sys
import asyncio
import random
import httpx
from pathlib import Path
from urllib.parse import urlparse
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from datetime import datetime
import argparse
import logging
# === Специфичные исключения httpx ===
from httpx import PoolTimeout, ConnectTimeout, ReadTimeout, HTTPStatusError, RequestError
from config import CONFIG
from link_collector import collect_links
# Suppress httpx info logs
logging.getLogger("httpx").setLevel(logging.WARNING)
# Компиляция regex заранее
TEL_PATTERNS = [re.compile(p, re.IGNORECASE) for p in CONFIG["phone"]["patterns"]]
# Глобальный список User-Agent для ротации
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
]
def normalize_domain(url: str) -> str:
"""Нормализация домена для проверки дубликатов."""
try:
domain = urlparse(url.strip()).hostname or ''
return domain.replace('www.', '', 1).lower()
except Exception:
return ''
def is_excluded(domain: str) -> bool:
"""Проверка домена на исключение (ТОЧНОЕ совпадение, без поддоменов)."""
return domain in CONFIG["excluded_domains"]
def normalize_phone(phone: str) -> str | None:
"""Нормализация телефона к формату: +7 (XXX) XXX-XX-XX"""
digits = re.sub(r"[^\d+]", "", phone.strip())
if digits.startswith('+7'):
digits = digits[2:]
elif digits.startswith('8') and len(digits) == 11:
digits = digits[1:]
elif digits.startswith('7') and len(digits) == 11:
digits = digits[1:]
if len(digits) != 10:
return None
return f"+7 ({digits[:3]}) {digits[3:6]}-{digits[6:8]}-{digits[8:10]}"
def extract_phone_from_html(html: str) -> str | None:
"""Поиск телефона в HTML-контенте по множеству паттернов."""
for pattern in TEL_PATTERNS:
match = pattern.search(html)
if match:
raw = match.group(1).strip()
normalized = normalize_phone(raw)
if normalized:
return normalized
return None
def check_content_filters(html: str) -> bool:
"""Проверка HTML на наличие required_keywords и отсутствие stop_keywords (case-insensitive)."""
lower_html = html.lower()
if CONFIG["required_keywords"]:
required_lower = [kw.lower() for kw in CONFIG["required_keywords"]]
if not any(kw in lower_html for kw in required_lower):
return False
if CONFIG["stop_keywords"]:
stop_lower = [kw.lower() for kw in CONFIG["stop_keywords"]]
if any(kw in lower_html for kw in stop_lower):
return False
return True
def analyze_redirect_chain(url: str, final_url: str) -> tuple[str, bool]:
"""
Анализирует цепочку редиректов.
Возвращает: (финальный домен, is_promo)
"""
try:
original_host = urlparse(url.strip()).hostname or ''
is_promo = (original_host == 'yabs.yandex.ru')
final_domain = urlparse(final_url.strip()).hostname or ''
final_domain = final_domain.replace('www.', '', 1).lower()
return final_domain, is_promo
except Exception:
return normalize_domain(url), False
def _get_client_config(url: str) -> dict:
"""Возвращает конфигурацию клиента в зависимости от домена (щадящий режим для Яндекса)."""
is_yandex = any(x in url.lower() for x in ['yandex.ru', 'yabs.yandex.ru', 'ya.ru'])
if is_yandex:
return {
"limits": httpx.Limits(max_connections=30, max_keepalive_connections=20),
"timeout": httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=30.0),
"retry_base_delay": 2.0,
"max_retries": 2,
}
else:
return {
"limits": httpx.Limits(max_connections=20, max_keepalive_connections=10),
"timeout": httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=10.0),
"retry_base_delay": 1.0,
"max_retries": CONFIG["http"]["retry_attempts"],
}
async def fetch_with_retry(client: httpx.AsyncClient, url: str, retries: int = 0,
base_delay: float = 1.0, max_retries: int = 3) -> tuple[str, str | None, str | None, bool]:
"""
ГИБРИДНАЯ функция с улучшенной обработкой тайм-аутов и экспоненциальным бэк-оффом.
"""
try:
# Ротация User-Agent для каждого запроса
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Cache-Control": "max-age=0",
}
async with client.stream("GET", url, headers=headers, follow_redirects=True) as response:
if response.status_code >= 400:
raise HTTPStatusError(
f"Status {response.status_code}",
request=response.request,
response=response
)
final_url = str(response.url)
buffer = []
async for chunk in response.aiter_text(chunk_size=8192):
buffer.append(chunk)
full_html = ''.join(buffer)
final_domain, is_promo = analyze_redirect_chain(url, final_url)
if not check_content_filters(full_html):
return url, None, final_domain, is_promo
phone = extract_phone_from_html(full_html)
return url, phone, final_domain, is_promo
except PoolTimeout as e:
if retries < min(2, max_retries):
delay = base_delay * (2 ** retries) + random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
return await fetch_with_retry(client, url, retries + 1, base_delay, max_retries)
return url, None, normalize_domain(url), False
except ConnectTimeout as e:
if retries < max_retries:
delay = base_delay * (2 ** retries) + random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
return await fetch_with_retry(client, url, retries + 1, base_delay, max_retries)
return url, None, normalize_domain(url), False
except ReadTimeout as e:
return url, None, normalize_domain(url), False
except HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = e.response.headers.get('Retry-After', '5')
await asyncio.sleep(int(retry_after) + random.randint(1, 3))
return await fetch_with_retry(client, url, retries + 1, base_delay, max_retries)
return url, None, normalize_domain(url), False
except RequestError as e:
if retries < max_retries:
delay = base_delay * (2 ** retries) + random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
return await fetch_with_retry(client, url, retries + 1, base_delay, max_retries)
return url, None, normalize_domain(url), False
except Exception as e:
if retries < max_retries:
delay = base_delay * (2 ** retries) + random.uniform(0.5, 1.5)
await asyncio.sleep(delay)
return await fetch_with_retry(client, url, retries + 1, base_delay, max_retries)
return url, None, normalize_domain(url), False
async def process_batch(urls: list[str], batch_size: int = 50, progress_callback=None, unique_phones: set = None):
"""
Пакетная обработка с ограничением параллелизма и адаптивными настройками.
:param progress_callback: async функция (done: int, total: int) для обновления прогресса
:param unique_phones: set для инкрементального добавления уникальных номеров
"""
results = []
total_urls = len(urls)
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
sample_url = batch[0] if batch else ""
client_config = _get_client_config(sample_url)
async with httpx.AsyncClient(
headers={"User-Agent": random.choice(USER_AGENTS)},
timeout=client_config["timeout"],
follow_redirects=True,
limits=client_config["limits"]
) as client:
tasks = [
fetch_with_retry(
client, url,
base_delay=client_config["retry_base_delay"],
max_retries=client_config["max_retries"]
)
for url in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for j, result in enumerate(batch_results):
current_idx = i + j + 1
if isinstance(result, Exception):
if progress_callback and callable(progress_callback):
await progress_callback(current_idx, total_urls)
continue
original_url, phone, final_domain, is_promo = result
print(f"{current_idx}: {final_domain} - {phone if phone else 'нет'}")
if phone:
results.append((original_url, phone, final_domain, is_promo))
if unique_phones is not None:
unique_phones.add(phone) # Инкрементальное добавление уникального номера
if progress_callback and callable(progress_callback):
await progress_callback(current_idx, total_urls)
if i + batch_size < len(urls):
delay = random.uniform(1.0, 2.0)
await asyncio.sleep(delay)
if progress_callback and callable(progress_callback):
await progress_callback(total_urls, total_urls)
return results
def save_to_excel(results: list[tuple], filepath: str):
"""Сохранение результатов в Excel с пометкой promo и оценкой."""
wb = Workbook()
ws = wb.active
ws.title = "Phone Numbers"
headers = ["Original URL", "Phone", "Final Domain", "Promo", "Processed At", "Rating"]
ws.append(headers)
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF")
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal="center")
for original_url, phone, final_domain, is_promo, rating in results:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
promo_mark = "YES" if is_promo else "no"
ws.append([original_url, phone, final_domain, promo_mark, timestamp, rating])
if is_promo:
row_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
for cell in ws[ws.max_row]:
cell.fill = row_fill
for column in ws.columns:
max_len = 0
for cell in column:
if cell.value:
max_len = max(max_len, len(str(cell.value)))
ws.column_dimensions[column[0].column_letter].width = min(max_len + 2, 60)
ws.freeze_panes = 'A2'
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
wb.save(filepath)
def load_urls(additional_urls: list[str]) -> list[str]:
"""Загрузка и фильтрация URL из конфигурации."""
urls = []
urls.extend(CONFIG["urls"])
if CONFIG["input_file"] and Path(CONFIG["input_file"]).exists():
try:
with open(CONFIG["input_file"], 'r', encoding='utf-8') as f:
for line in f:
line = line.strip().strip('"\'').rstrip(',')
if line and line.startswith('http'):
urls.append(line.split()[0])
except Exception:
pass
urls.extend(additional_urls)
seen_domains = set()
cleaned = []
for url in urls:
url = url.strip()
if not url or not url.startswith('http'):
continue
domain = normalize_domain(url)
if not domain or is_excluded(domain):
continue
if domain == 'yabs.yandex.ru':
cleaned.append(url)
continue
if domain in seen_domains:
continue
seen_domains.add(domain)
cleaned.append(url)
return cleaned
async def main():
"""Точка входа."""
parser = argparse.ArgumentParser(description="Phone Scraper")
parser.add_argument('--promo-only', action='store_true', help="Сохранять только promo-записи (yabs.yandex.ru)")
parser.add_argument('urls', nargs='*', help="Дополнительные URL для обработки")
args = parser.parse_args()
promo_only = args.promo_only
try:
collected_links = collect_links()
except Exception as e:
print(f"Ошибка в collect_links: {e}. Продолжаем без собранных ссылок.")
collected_links = []
urls = load_urls(collected_links + args.urls)
if not urls:
print("\n💡 Использование:")
print(" python script.py [--promo-only] 'https://site1.ru' 'https://site2.ru'")
return
raw_results = await process_batch(urls)
seen_final_domains = set()
unique_raw_results = []
for result in raw_results:
original_url, phone, final_domain, is_promo = result
if final_domain not in seen_final_domains:
seen_final_domains.add(final_domain)
unique_raw_results.append(result)
results = unique_raw_results
if promo_only:
results = [r for r in results if r[3]]
if results:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = CONFIG["output_file"].format(timestamp=timestamp)
save_to_excel(results, output_path)
promo_count = sum(1 for r in results if r[3])
print(f"\n📊 ИТОГИ:")
print(f" 🔍 Обработано: {len(urls)}")
print(f" 📞 Найдено телефонов: {len(results)}")
print(f" 🎯 Из promo (yabs): {promo_count}")
print(f" 📁 Файл: {output_path}")
print("\n✅ Готово!")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n⚠ Прервано пользователем")
except Exception as e:
raise