mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4mobile wallpaper 5mobile wallpaper 6
24 字
1 分钟
图床爬虫

没什么好说的,又去爬举个栗子的Api了,写了个新脚本

import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from queue import Queue
import time
class ImageDownloader:
def __init__(self, max_workers=10):
self.base_url = "https://t.alcy.cc/img/"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
self.max_workers = max_workers
self.downloaded_count = 0
self.total_count = 0
self.lock = threading.Lock()
def get_categories(self):
"""获取所有分类"""
print(f"正在访问主页: {self.base_url}")
try:
response = requests.get(self.base_url, headers=self.headers)
response.raise_for_status()
except Exception as e:
print(f"访问主页失败: {e}")
return []
soup = BeautifulSoup(response.text, 'html.parser')
categories = []
for a in soup.find_all('a', href=True):
if '(' in a.text and ')' in a.text:
category_name = a.text.split('(')[0].strip()
category_url = urljoin(self.base_url, a['href'])
categories.append({
'name': category_name,
'url': category_url
})
return categories
def get_image_urls(self, category_url):
"""获取分类下的所有图片URL"""
try:
response = requests.get(category_url, headers=self.headers)
response.raise_for_status()
except Exception as e:
print(f"获取分类页面失败: {e}")
return []
soup = BeautifulSoup(response.text, 'html.parser')
img_tags = soup.find_all('img')
img_urls = []
for img in img_tags:
src = img.get('data-src') or img.get('src')
if src and not src.startswith('data:'):
img_urls.append(urljoin(self.base_url, src))
# 去重并返回
return list(set(img_urls))
def download_single_image(self, img_url, save_path):
"""下载单张图片"""
try:
response = requests.get(img_url, headers=self.headers, timeout=15)
response.raise_for_status()
# 检查文件是否已存在
if os.path.exists(save_path):
with self.lock:
self.downloaded_count += 1
return True, "已存在"
# 保存图片
with open(save_path, 'wb') as f:
f.write(response.content)
with self.lock:
self.downloaded_count += 1
return True, "成功"
except Exception as e:
return False, str(e)
def download_category_images(self, category, img_urls):
"""多线程下载一个分类的所有图片"""
category_name = category['name']
# 创建分类目录
if not os.path.exists(category_name):
os.makedirs(category_name)
print(f"\n创建目录: {category_name}")
print(f"分类 [{category_name}] 共有 {len(img_urls)} 张图片")
# 使用ThreadPoolExecutor进行多线程下载
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for i, img_url in enumerate(img_urls):
# 获取文件名
parsed_url = urlparse(img_url)
filename = os.path.basename(parsed_url.path)
if not filename:
filename = f"image_{i}.jpg"
save_path = os.path.join(category_name, filename)
# 提交下载任务
future = executor.submit(self.download_single_image, img_url, save_path)
futures.append((future, filename, i+1))
# 显示进度
completed = 0
for future, filename, idx in futures:
try:
success, message = future.result(timeout=20)
completed += 1
if success and message == "成功":
status = "✓"
elif success and message == "已存在":
status = "↻"
else:
status = "✗"
# 更新进度显示
progress = f"[{idx}/{len(img_urls)}] {status} {filename}"
print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 当前: {progress[:50]}", end="")
except Exception as e:
completed += 1
print(f"\r分类 [{category_name}] 进度: {completed}/{len(img_urls)} | 失败: {filename[:30]}...", end="")
print(f"\n分类 [{category_name}] 下载完成")
def run(self):
"""主运行函数"""
print("=" * 60)
print("开始获取图片分类...")
print("=" * 60)
# 获取所有分类
categories = self.get_categories()
if not categories:
print("未找到任何分类。")
return
print(f"找到 {len(categories)} 个分类: {[c['name'] for c in categories]}")
# 获取所有图片URL
all_img_urls = []
category_data = []
for cat in categories:
print(f"正在获取分类 [{cat['name']}] 的图片列表...", end="")
img_urls = self.get_image_urls(cat['url'])
all_img_urls.extend(img_urls)
category_data.append({
'category': cat,
'img_urls': img_urls
})
print(f" 找到 {len(img_urls)} 张图片")
self.total_count = len(all_img_urls)
print(f"\n总共发现 {self.total_count} 张图片")
print("=" * 60)
# 开始下载
start_time = time.time()
for data in category_data:
if data['img_urls']:
self.download_category_images(data['category'], data['img_urls'])
# 统计信息
end_time = time.time()
total_time = end_time - start_time
print("\n" + "=" * 60)
print("下载完成!")
print("=" * 60)
print(f"总计图片数量: {self.total_count}")
print(f"成功下载/已存在: {self.downloaded_count}")
print(f"总耗时: {total_time:.2f}秒")
print(f"平均速度: {self.downloaded_count/total_time:.2f} 张/秒" if total_time > 0 else "速度计算中...")
print("=" * 60)
def main():
# 可以调整线程数,根据网络情况和电脑性能
max_workers = 20 # 默认20个线程,可以调整
downloader = ImageDownloader(max_workers=max_workers)
downloader.run()
if __name__ == "__main__":
main()
分享

如果这篇文章对你有帮助,欢迎分享给更多人!

图床爬虫
https://www.xn--eet944d.top/posts/图床爬虫/
作者
兔兔
发布于
2026-02-06
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时