import requests from bs4 import BeautifulSoup import os import re from urllib.parse import urljoin # 定义关键词列表 KEYWORDS = [""] def sanitize_text(text): """增强型文本清洗""" text = re.sub(r'<[^>]+>', '', text) text = re.sub(r'https?://\S+', '', text) replacements = { '&nbsp;': ' ', '&amp;': '&', '&quot;': '"', '&lt;': '<', '&gt;': '>' } for k, v in replacements.items(): text = text.replace(k, v) text = re.sub(r'[■◆▼©®™●【】]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def save_content(url, save_folder): """安全获取并保存网页内容""" try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} response = requests.get(url, headers=headers, timeout=10) response.encoding = 'utf-8' if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') main_content = soup.find(['article', 'div'], class_=re.compile(r'content|main')) clean_text = sanitize_text(main_content.get_text() if main_content else soup.get_text()) # 生成安全文件名 filename = re.sub(r'[\\/*?:"<>|]', '', url.split('/')[-1])[:50] + '.txt' filepath = os.path.join(save_folder, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(clean_text) print(f'成功保存至: {filepath}') else: print(f'访问失败: {url} 状态码: {response.status_code}') except Exception as e: print(f'处理{url}时出错: {str(e)}') def main(): """主程序""" # 设置保存路径 desktop = os.path.join(os.path.expanduser('~'), 'Desktop') folder_name = "ScrapedData" # 自定义文件夹名称 save_path = os.path.join(desktop, folder_name) # 创建保存目录(如果不存在) os.makedirs(save_path, exist_ok=True) # 模拟种子页面(需合法授权后替换实际目标网站) seed_url = "http://www.81.cn/" # 示例地址 try: res = requests.get(seed_url, timeout=10) soup = BeautifulSoup(res.text, 'html.parser') # 提取包含关键词的链接 links = [] for a in soup.find_all('a', href=True): text = a.get_text().strip() if any(keyword in text for keyword in KEYWORDS): absolute_url = urljoin(seed_url, a['href']) links.append(absolute_url) # 去重处理 unique_links = list(set(links)) # 保存内容(建议控制频率) for link in unique_links[:5]: # 示例仅处理前5条 save_content(link, save_path) except Exception as e: print(f'程序终止: {str(e)}') if __name__ == "__main__": main() 在上述代码基础上将输出txt中没有内容的删除掉,请给出完整代码

时间: 2025-06-21 16:23:15 AIGC 浏览: 75
<think>嗯,用户给了一个Python爬虫的代码,然后希望在现有基础上修改,使得输出的txt文件中没有内容的被删除掉。我需要先理解现有的代码逻辑,然后找到哪里需要修改。 首先,现有的save_content函数负责保存网页内容到txt文件。它会尝试提取主要内容,清洗文本,然后保存。如果main_content存在,就获取其文本,否则用soup的全文。但可能存在的情况是,即使没有内容,也会生成一个空文件。用户希望过滤掉这些空文件。 那问题出在保存的时候没有检查内容是否为空。所以应该在保存之前检查clean_text是否为空或者只有空白字符。如果是的话,就不保存,或者保存后删除空文件。 那应该在保存文件之后,检查文件内容是否为空。或者,在写入之前,先判断clean_text是否有有效内容。如果clean_text.strip()为空,就不写入文件,或者写入后删除该文件。 在代码中,当clean_text被处理后,可能得到空字符串。例如,如果网页没有内容,或者清洗后全部被去除了。这时候,即使写入文件,文件也是空的。 所以修改点应该在保存文件的时候,先检查clean_text是否有内容。如果有,才保存;否则跳过,或者删除文件。 看当前的save_content函数,在写入文件后,打印成功消息。如果clean_text是空的,那么写入的文件会是空的。所以需要修改逻辑:在写入之前检查clean_text是否为空,如果是的话,不保存,并提示。或者保存之后检查文件是否为空,然后删除。 哪种方式更好呢?在写入之前检查更高效,因为避免了创建空文件。所以应该在写入前检查clean_text是否有效。 所以修改save_content函数中的逻辑:在获取clean_text之后,判断它是否为空或者只有空格。如果是,就不保存,并提示内容为空。否则才保存。 例如: if clean_text.strip() == '': print(f'内容为空,跳过保存: {url}') return else: 写入文件 这样修改后,空内容的文件就不会被保存了。但用户原来的代码中,保存的文件名是根据URL生成的,可能会有重复的情况,但这不是当前问题要考虑的。 另外,用户可能希望不仅处理空内容,还要处理只有空白符的情况。所以用strip()后的长度是否为0来判断。 那么修改代码中的save_content函数,在获取clean_text之后,判断是否为空,如果是,则跳过保存。同时,原代码中的print语句可能需要调整,比如成功保存的消息应该只在有内容时显示。 所以,在原来的代码中: if response.status_code == 200: ...处理得到clean_text if clean_text.strip(): ...保存文件并打印成功 else: print(f'内容为空,不保存: {url}') else: ...处理其他状态码 这样修改后,就能避免保存空文件了。 现在需要检查用户原来的代码结构。原save_content函数中的逻辑是: 当response是200时,提取内容,清洗,然后保存。不管内容是否为空,都生成文件。所以需要在这个条件下添加判断clean_text是否非空。 修改后的代码逻辑应该是: 当提取到clean_text后,检查其是否非空。如果是,才保存,否则不保存,并提示。 所以,在save_content函数中的相应部分修改: 原代码: clean_text = sanitize_text(...) 生成文件名 写入文件 打印成功 修改后: clean_text = sanitize_text(...) if clean_text.strip() == '': print(f'内容为空,跳过保存: {url}') return 生成文件名,写入文件,打印成功 或者,在写入之前判断,如果有内容才写入,否则不写。 这样就不会生成空文件了。 所以,修改save_content函数中的部分: 原: with open(filepath, 'w', encoding='utf-8') as f: f.write(clean_text) print(f'成功保存至: {filepath}') 改为: if clean_text.strip(): with open(filepath, 'w', encoding='utf-8') as f: f.write(clean_text) print(f'成功保存至: {filepath}') else: print(f'内容为空,不保存: {url}') 这样,当clean_text为空时,不写入文件,避免生成空文件。 那用户的问题就解决了。此外,还需要确保其他部分没有错误。例如,当main_content不存在时,soup.get_text()可能得到的文本是否为空? sanitize_text函数会进一步处理,可能得到空字符串。所以判断clean_text.strip()是否为空即可。 综上,修改save_content函数中的写入逻辑,添加对clean_text是否为空的判断,只有非空时才保存文件。这样就能删除输出中无内容的txt文件了。 现在,需要将代码中的相应部分修改,并给出完整代码。用户希望得到完整的修改后的代码,所以需要将原来的代码中save_content部分进行调整。 因此,最终的修改是在保存文件之前检查clean_text是否为空,如果是则跳过保存。这样,输出的txt文件中就不会有空文件了。</think>以下是在原有代码基础上增加空文件过滤功能的完整实现(修改部分已标注): ```python import requests from bs4 import BeautifulSoup import os import re from urllib.parse import urljoin # 定义关键词列表 KEYWORDS = [""] def sanitize_text(text): """增强型文本清洗""" text = re.sub(r'<[^>]+>', '', text) text = re.sub(r'https?://\S+', '', text) replacements = { '&nbsp;': ' ', '&amp;': '&', '&quot;': '"', '&lt;': '<', '&gt;': '>' } for k, v in replacements.items(): text = text.replace(k, v) text = re.sub(r'[■◆▼©®™●【】]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def save_content(url, save_folder): """安全获取并保存网页内容(增加空内容过滤)""" try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} response = requests.get(url, headers=headers, timeout=10) response.encoding = 'utf-8' if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') main_content = soup.find(['article', 'div'], class_=re.compile(r'content|main')) clean_text = sanitize_text(main_content.get_text() if main_content else soup.get_text()) # 新增空内容过滤判断 if not clean_text.strip(): print(f'跳过空内容: {url}') return # 生成安全文件名 filename = re.sub(r'[\\/*?:"<>|]', '', url.split('/')[-1])[:50] + '.txt' filepath = os.path.join(save_folder, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(clean_text) print(f'成功保存至: {filepath}') else: print(f'访问失败: {url} 状态码: {response.status_code}') except Exception as e: print(f'处理{url}时出错: {str(e)}') def main(): """主程序""" # 设置保存路径 desktop = os.path.join(os.path.expanduser('~'), 'Desktop') folder_name = "ScrapedData" save_path = os.path.join(desktop, folder_name) # 创建保存目录(如果不存在) os.makedirs(save_path, exist_ok=True) # 模拟种子页面(需合法授权后替换实际目标网站) seed_url = "http://www.81.cn/" try: res = requests.get(seed_url, timeout=10) soup = BeautifulSoup(res.text, 'html.parser') # 提取包含关键词的链接 links = [] for a in soup.find_all('a', href=True): text = a.get_text().strip() if any(keyword in text for keyword in KEYWORDS): absolute_url = urljoin(seed_url, a['href']) links.append(absolute_url) # 去重处理 unique_links = list(set(links)) # 保存内容(建议控制频率) for link in unique_links[:5]: # 示例仅处理前5条 save_content(link, save_path) except Exception as e: print(f'程序终止: {str(e)}') if __name__ == "__main__": main() ``` 主要改进点说明(使用中文数字编号): 一、核心过滤逻辑 1. 在`save_content`函数中增加空内容判断: ```python if not clean_text.strip(): print(f'跳过空内容: {url}') return ``` 2. 在写入文件前进行内容有效性验证,确保只有包含有效文本的内容才会被保存 二、增强健壮性 1. 使用`strip()`方法过滤纯空白字符内容 2. 空内容过滤发生在文本清洗之后,确保判断准确性 3. 添加明确的跳过提示,方便追踪爬取过程 三、注意事项 1. 建议在`KEYWORDS`列表中添加实际需要的关键词 2. 正式使用前请确保遵守目标网站的robots.txt协议 3. 可根据需要调整`unique_links[:5]`的限制数量 4. 建议添加请求间隔时间(如`time.sleep(1)`)避免被封禁 这个改进版本能有效过滤空内容文件,同时保持原有功能完整,建议在合法合规的前提下使用。
阅读全文

相关推荐

优化代码:#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import sys import time import requests from tqdm import tqdm from bs4 import BeautifulSoup from urllib.parse import urljoin def download_vr_images( target_url, save_dir='/Users/sunhui/Downloads/tu', max_retry=3, timeout=30, concurrency=5 ): """ 全景图片下载核心函数 参数: - target_url: 目标网页URL - save_dir: 存储路径(默认:用户下载目录) - max_retry: 最大重试次数 - timeout: 请求超时时间(秒) - concurrency: 并发下载数 """ # ==================== 路径验证 ==================== if not os.path.isabs(save_dir): raise ValueError(f"路径必须为绝对路径:{save_dir}") illegal_chars = re.findall(r'[<>:"|?*]', save_dir) if illegal_chars: raise ValueError(f"路径包含非法字符:{''.join(set(illegal_chars))}") try: os.makedirs(save_dir, exist_ok=True) test_file = os.path.join(save_dir, '__perm_test.tmp') with open(test_file, 'w') as f: f.write('permission_test') os.remove(test_file) except Exception as e: raise RuntimeError(f"路径初始化失败: {str(e)}") # ==================== 网络请求 ==================== headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Referer': target_url } try: response = requests.get(target_url, headers=headers, timeout=timeout) response.raise_for_status() except requests.exceptions.RequestException as e: raise ConnectionError(f"网页请求失败: {str(e)}") # ==================== 内容解析 ==================== soup = BeautifulSoup(response.text, 'html.parser') # 两种图片链接提取策略 img_links = [] # 策略1:通过CSS类名匹配 vr_images = soup.select('div.vr-container img.full-res') # 策略2:通过正则表达式匹配 pattern = re.compile(r'(https?://\S+?\.(jpg|png|webp))', re.IGNORECASE) fallback_links = pattern.findall(response.text) if vr_images: for img in vr_images: img_url = img.get('data-src') or img.get('src') if img_url: img_links.append(urljoin(target_url, img_url)) elif fallback_links: img_links = [urljoin(target_url, link[0]) for link in fallback_links] else: raise ValueError("未检测到有效图片链接") # ==================== 下载逻辑 ==================== success_count = 0 for idx, img_url in enumerate(tqdm(img_links, desc="下载进度", unit="file"), 1): file_name = f"vr_image_{time.strftime('%Y%m%d%H%M%S')}_{idx}.{img_url.split('.')[-1]}" save_path = os.path.join(save_dir, file_name) for retry in range(max_retry + 1): try: with requests.get(img_url, stream=True, headers=headers, timeout=timeout) as r: r.raise_for_status() total_size = int(r.headers.get('content-length', 0)) with open(save_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) success_count += 1 break except Exception as e: if retry == max_retry: print(f"\n⚠️ 文件下载失败:{img_url} | 错误:{str(e)}") else: time.sleep(2 ** retry) # ==================== 结果报告 ==================== print(f"\n✅ 下载完成 | 成功:{success_count}/{len(img_links)}") print(f"📁 存储路径:{save_dir}") if __name__ == "__main__": # 使用示例 try: download_vr_images( target_url="https://vr.justeasy.cn/view/1709b5704a0u64f4-1711545753.html", save_dir="/Users/sunhui/Downloads/tu" ) except Exception as e: print(f"❌ 运行错误:{str(e)}") sys.exit(1)

import os import logging from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor import time import requests from bs4 import BeautifulSoup # 配置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(), logging.FileHandler('crawler.log')] ) logger = logging.getLogger(__name__) # 图片保存路径 IMAGE_DIR = 'downloaded_images' if not os.path.exists(IMAGE_DIR): os.makedirs(IMAGE_DIR) # 已爬取 URL 集合(用于去重) visited_urls = set() # Markdown 文件保存路径 MD_FILE = 'crawled_content.md' def fetch_page(url, retries=3): """抓取网页内容""" try: response = requests.get(url, timeout=10) response.raise_for_status() # 检查 HTTP 响应状态码 return response.text except requests.RequestException as e: logger.error(f" 请求 {url} 失败: {e}") if retries > 0: logger.info(f" 重试 {url}, 剩余尝试次数: {retries - 1}") time.sleep(2) # 等待 2 秒后重试 return fetch_page(url, retries - 1) else: logger.error(f" 放弃请求 {url}") return None def parse_page(html): """解析网页内容""" soup = BeautifulSoup(html, 'html.parser') # 提取图片链接 img_tags = soup.find_all('img') img_urls = [ urljoin(soup.base['href'], img['src']) if 'src' in img and img['src'] else '' for img in img_tags ] # 提取文本内容 text_content = soup.get_text(strip=True) return { 'text': text_content, 'images': img_urls } def download_image(img_url): """下载图片""" try: response = requests.get(img_url, stream=True, timeout=10) response.raise_for_status() # 提取文件名 img_name = img_url.split('/')[-1] img_path = os.path.join(IMAGE_DIR, img_name) with open(img_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info(f" 图片已下载: {img_url}") return img_name # 返回图片文件名 except requests.RequestException as e: logger.error(f" 下载图片失败: {img_url}, 错误信息: {e}") return None def crawl_page(url): """爬取单个页面""" if url in visited_urls: logger.info(f" 跳过已爬取的 URL: {url}") return logger.info(f" 正在爬取: {url}") html = fetch_page(url) if not html: return data = parse_page(html) visited_urls.add(url) # 下载图片并生成Markdown图片链接 markdown_images = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(download_image, img_url) for img_url in data['images']] for future in futures: img_name = future.result() if img_name: markdown_images.append(f"![{img_name}]({os.path.join(IMAGE_DIR, img_name)})") # 将内容保存到Markdown文件 save_to_markdown(data['text'], markdown_images) return data['text'] def save_to_markdown(text, images): """将文本和图片保存到Markdown文件""" with open(MD_FILE, 'a', encoding='utf-8') as f: f.write(f"# 页面内容\n\n") f.write(f"{text}\n\n") if images: f.write(f"## 图片内容\n\n") for img in images: f.write(f"{img}\n\n") logger.info(" 内容已保存到Markdown文件") def crawl_pages(base_url, page_param='page', start_page=1, max_pages=10): """分页爬取""" all_data = [] current_page = start_page while current_page <= max_pages: url = f"{base_url}?{page_param}={current_page}" logger.info(f" 开始爬取第 {current_page} 页: {url}") # 爬取当前页面 page_content = crawl_page(url) if not page_content: logger.warning(f" 第 {current_page} 页没有有效内容") break # 如果当前页没有内容,停止爬取 # 将当前页的内容添加到总数据中 all_data.append(page_content) # 增加页码,继续下一页 current_page += 1 logger.info(" 分页爬取完成") return all_data def main(): base_url = "https://www.21ic.com/article/906177.html" # 替换为目标网站 page_param = "page" start_page = 1 max_pages = 10 try: all_data = crawl_pages(base_url, page_param, start_page, max_pages) logger.info(" 所有页面爬取完成") except Exception as e: logger.error(f" 爬取过程中发生错误: {e}") if __name__ == '__main__': main()优化我的爬虫代码,并且给我完整的修改之后的代码

import os import re import logging import hashlib import json from urllib.parse import urljoin, urlparse from concurrent.futures import ThreadPoolExecutor, as_completed import time import requests from bs4 import BeautifulSoup from tqdm import tqdm # 配置项(可扩展) CONFIG = { 'REQUEST': { 'headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept-Encoding': 'gzip, deflate', 'Referer': 'https://www.google.com/' }, 'timeout': 20, 'max_retries': 5, 'delay': 1.5, 'proxy': None # 格式: {'http': 'http://10.10.1.10:3128'} }, 'SAVING': { 'image_dir': 'downloads/images', 'text_dir': 'downloads/text', 'max_filename_length': 100, 'image_types': ['.jpg', '.png', '.webp'] }, 'PERFORMANCE': { 'max_workers': 10, 'batch_size': 50 }, 'ANTI_CRAWL': { 'rotate_user_agents': False, 'use_proxy_pool': False } } # 初始化目录 os.makedirs(CONFIG['SAVING']['image_dir'], exist_ok=True) os.makedirs(CONFIG['SAVING']['text_dir'], exist_ok=True) # 日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler('crawler.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class CrawlerSession: """爬虫会话管理""" def __init__(self): self.session = requests.Session() self.proxy = CONFIG['REQUEST']['proxy'] self.retry_count = 0 self.last_request_time = 0 def get(self, url): """带速率限制的请求""" self._rate_limit() try: response = self.session.get( url, headers=CONFIG['REQUEST']['headers'], proxies=self.proxy, timeout=CONFIG['REQUEST']['timeout'] ) response.raise_for_status() return response except requests.RequestException as e: logger.warning(f"请求失败: {url} - {str(e)}") return None def _rate_limit(self): """请求速率控制""" elapsed = time.time() - self.last_request_time if elapsed < CONFIG['REQUEST']['delay']: time.sleep(CONFIG['REQUEST']['delay'] - elapsed) self.last_request_time = time.time() class ContentParser: """内容解析器""" @staticmethod def safe_parse(html, parser=None): """安全解析HTML(自动选择可用解析器)""" try: # 尝试按优先级选择解析器 for p in ['lxml', 'html.parser', 'html5lib']: try: return BeautifulSoup(html, p) except Exception: continue return BeautifulSoup(html, 'html.parser') # 最终回退 except Exception as e: logger.error(f"解析失败: {str(e)}") return None @staticmethod def extract_links(soup, base_url): """提取页面链接""" links = [] for a in soup.find_all('a', href=True): link = urljoin(base_url, a['href']) links.append(link) return links @staticmethod def extract_images(soup, base_url): """提取图片链接""" img_urls = [] for img in soup.find_all('img'): sources = [img.get('src'), img.get('data-src'), img.get('data-original')] for src in sources: if src and src.strip(): img_url = urljoin(base_url, src.strip()) img_urls.append(img_url) break return img_urls class FileManager: """文件管理""" @staticmethod def sanitize_filename(filename): """生成安全文件名""" filename = re.sub(r'[\\/*?:"<>|]', "", filename)[:CONFIG['SAVING']['max_filename_length']] if not filename: return hashlib.md5(str(time.time()).encode()).hexdigest() return filename @staticmethod def save_image(content, extension): """保存图片文件""" filename = f"{hashlib.md5(content).hexdigest()}{extension}" path = os.path.join(CONFIG['SAVING']['image_dir'], filename) with open(path, 'wb') as f: f.write(content) return path @staticmethod def save_text(content, url): """保存文本文件""" filename = FileManager.sanitize_filename(urlparse(url).path) + '.txt' path = os.path.join(CONFIG['SAVING']['text_dir'], filename) with open(path, 'w', encoding='utf-8') as f: f.write(content) return path class CrawlerEngine: """爬虫引擎""" def __init__(self): self.visited_urls = set() self.session = CrawlerSession() self.parser = ContentParser() self.file_manager = FileManager() def crawl(self, start_url, max_depth=2, max_pages=100): """启动爬虫""" queue = [(start_url, 1)] results = [] with tqdm(total=max_pages, desc="爬取进度") as pbar: while queue and len(results) < max_pages: url, depth = queue.pop(0) if depth > max_depth: continue if url in self.visited_urls: continue self.visited_urls.add(url) # 处理当前页面 content = self._process_page(url) if content: results.append(content) pbar.update(1) # 提取新链接 soup = self.parser.safe_parse(content['html']) if soup: new_links = self.parser.extract_links(soup, url) queue.extend([(link, depth + 1) for link in new_links]) return results class CrawlerEngine: def _process_page(self, url): """增强版页面处理方法""" try: # 网络请求增强处理 try: response = self.session.get(url) response.raise_for_status() # 检查HTTP状态码 except requests.exceptions.RequestException as e: logger.error(f"请求失败: {url} - {str(e)}") return None except Exception as e: logger.error(f"未知请求错误: {url} - {str(e)}") return None # 响应内容验证 if not response or not response.text: logger.warning(f"空响应内容: {url}") return None # 自动检测编码 try: detected_encoding = response.apparent_encoding decoded_content = response.content.decode(detected_encoding) except UnicodeDecodeError: logger.warning(f"编码检测失败,使用备用解码: {url}") decoded_content = response.text # 回退到requests的自动解码 content = { 'url': url, 'html': decoded_content, 'images': [], 'text': '' } # 解析增强处理 try: soup = self.parser.safe_parse(content['html']) if not soup or not soup.find(): logger.warning(f"无效的HTML结构: {url}") return None except Exception as e: logger.error(f"解析异常: {url} - {str(e)}") return None # 文本提取增强 try: text = soup.get_text(strip=True) # 清理非法字符 text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) content['text'] = text except AttributeError: logger.warning(f"文本提取失败: {url}") content['text'] = '' except UnicodeEncodeError: logger.warning(f"编码标准化失败: {url}") content['text'] = text.encode('utf-8', 'replace').decode('utf-8') # 图片下载增强 try: img_urls = self.parser.extract_images(soup, url) with ThreadPoolExecutor(max_workers=CONFIG['PERFORMANCE']['max_workers']) as executor: futures = {executor.submit(self._download_image, img_url): img_url for img_url in img_urls} for future in as_completed(futures): img_url = futures[future] try: if image := future.result(): content['images'].append(image) except Exception as e: logger.warning(f"图片下载失败: {img_url} - {str(e)}") except Exception as e: logger.error(f"图片处理异常: {url} - {str(e)}") return content except Exception as e: logger.critical(f"页面处理严重错误: {url} - {str(e)}", exc_info=True) return None def _download_image(self, img_url): """下载单张图片""" response = self.session.get(img_url) if response and response.headers.get('Content-Type', '').startswith('image/'): ext = os.path.splitext(img_url)[1] if ext.lower() in CONFIG['SAVING']['image_types']: return self.file_manager.save_image(response.content, ext) return None def _save_content(self, content): """保存内容""" # 保存文本 self.file_manager.save_text(content['text'], content['url']) # 生成报告 report = { 'url': content['url'], 'images': len(content['images']), 'text_length': len(content['text']) } logger.info(json.dumps(report, ensure_ascii=False)) if __name__ == '__main__': engine = CrawlerEngine() start_url = "https://www.21ic.com/article/906177.html" try: results = engine.crawl(start_url, max_depth=2, max_pages=50) logger.info(f"爬取完成,共获取{len(results)}个页面") except KeyboardInterrupt: logger.info("用户中断爬取") except Exception as e: logger.error(f"爬取失败: {str(e)}")修改我代码,检查出错误并且修改正确,最后给出完整的代码

import os import re import requests import threading from queue import Queue from datetime import datetime from urllib.parse import unquote class VulnerabilityScanner: def __init__(self, base_url, output_file="vulnerability_report.html", threads=5): self.base_url = base_url.rstrip('/') self.output_file = output_file self.threads = threads self.vulnerabilities = [] self.lock = threading.Lock() self.task_queue = Queue() # 定义漏洞检测规则 (基于引用[1]的扫描器类型) self.scan_rules = { "XSS": [ {"type": "regex", "pattern": r"<script>alert$.+$</script>", "part": "body"}, {"type": "word", "words": ["onerror=", "javascript:"], "part": "body"} ], "SQL Injection": [ {"type": "regex", "pattern": r"SQL syntax error|mysql_fetch", "part": "body"}, {"type": "status", "status": [500]}, {"type": "word", "words": ["You have an error in your SQL syntax"], "part": "body"} ], "LFI": [ {"type": "regex", "pattern": r"(root:/root|etc/passwd|boot.ini|win.ini)", "part": "body"}, {"type": "word", "words": ["[boot loader]", "[operating systems]"], "part": "body"} ], "CRLF Injection": [ {"type": "word", "words": ["\r\nSet-Cookie:", "\r\nLocation:"], "part": "header"} ] } def sanitize_path(self, path): """安全处理路径输入 (防御路径遍历攻击)""" # 解码URL编码 (防御%2e%2e/绕过) decoded_path = unquote(path) # 标准化路径并过滤非法字符 (引用[1]) normalized = os.path.normpath(decoded_path) if re.search(r"(\.\./|\.\.\\|%2e%2e|@|\|)", normalized, re.IGNORECASE): raise ValueError(f"检测到非法路径字符: {normalized}") # 确保路径以/开头 return '/' + normalized.lstrip('/') def send_request(self, method, path, payload=None): """发送HTTP请求并返回响应""" try: safe_path = self.sanitize_path(path) url = self.base_url + safe_path headers = {"User-Agent": "VulnerabilityScanner/1.0"} params = payload if method == "GET" and payload else None data = payload if method in ["POST", "PUT"] and payload else None return requests.request( method, url, headers=headers, params=params, data=data, timeout=10, verify=False ) except Exception as e: print(f"请求失败: {str(e)}") return None def match_response(self, response, rules): """检查响应是否匹配漏洞规则""" for rule in rules: try: # 状态码匹配 if rule["type"] == "status": if response.status_code in rule["status"]: return True # 关键字匹配 elif rule["type"] == "word": content = response.headers if rule["part"] == "header" else response.text for word in rule["words"]: if word in content: return True # 正则匹配 elif rule["type"] == "regex": content = response.headers if rule["part"] == "header" else response.text if re.search(rule["pattern"], content, re.IGNORECASE): return True except: continue return False def test_endpoint(self, method, path, payload=None): """测试单个端点是否存在漏洞""" response = self.send_request(method, path, payload) if not response: return None results = [] for vuln_type, rules in self.scan_rules.items(): if self.match_response(response, rules): results.append({ "type": vuln_type, "url": response.url, "payload": payload, "status": response.status_code }) return results def worker(self): """多线程工作函数""" while not self.task_queue.empty(): task = self.task_queue.get() method, path, payload = task if results := self.test_endpoint(method, path, payload): with self.lock: self.vulnerabilities.extend(results) self.task_queue.task_done() def generate_report(self): """生成HTML格式漏洞报告 (引用[1])""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") html = f"""<!DOCTYPE html> <html> <head> <title>漏洞扫描报告</title> <style> body {{ font-family: Arial, sans-serif; margin: 20px; }} h1 {{ color: #2c3e50; }} table {{ border-collapse: collapse; width: 100%; }} th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} tr:nth-child(even) {{ background-color: #f2f2f2; }} .critical {{ color: #e74c3c; font-weight: bold; }} </style> </head> <body> 漏洞扫描报告 - {timestamp} 目标URL: {self.base_url} 检测结果 漏洞类型 URL 状态码 Payload """ for vuln in self.vulnerabilities: html += f""" {vuln['type']} {vuln['url']} {vuln['status']} {vuln['payload'] or 'N/A'} """ html += """ </body> </html>""" with open(self.output_file, 'w', encoding='utf-8') as f: f.write(html) print(f"报告已生成: {self.output_file}") def scan(self, endpoints): """执行漏洞扫描""" # 准备扫描任务 (引用[2]的效率优化) for method, path, payload in endpoints: self.task_queue.put((method, path, payload)) # 启动多线程扫描 (引用[1]) for _ in range(self.threads): threading.Thread(target=self.worker, daemon=True).start() self.task_queue.join() # 生成报告 if self.vulnerabilities: self.generate_report() return True return False if __name__ == "__main__": # 配置扫描参数 TARGET_URL = "https://example.com" # 替换为目标URL ENDPOINTS_TO_SCAN = [ ("GET", "/", None), ("GET", "/search?q=<script>alert(1)</script>", None), # XSS测试 ("GET", "/profile?id=1' OR '1'='1", None), # SQL注入测试 ("GET", "/../../etc/passwd", None), # LFI测试 ("GET", "/redirect?url=https://evil.com%0d%0aSet-Cookie:sessionid=123", None) # CRLF测试 ] # 执行扫描 scanner = VulnerabilityScanner(TARGET_URL) if scanner.scan(ENDPOINTS_TO_SCAN): print("发现漏洞! 请查看生成的报告") else: print("未发现漏洞") 请再次完善以上代码,以便直接使用

zip
系统支持前后端分离架构,涵盖微信、支付宝、百度、头条等主流平台的小程序、APP及公众号,内置多种常见支付方式,具备完善的订单处理机制,界面设计美观,是一款功能完备的商城开源平台。毕业设计是高校教育中的一项关键实践性任务,用于评估学生在专业领域内的知识掌握程度、实践能力和创新思维。该任务通常要求学生结合所学理论,针对某一具体问题提出可行的解决方案或开展一项具有实际价值的研究项目。 在选题阶段,学生需根据个人兴趣、专业方向及现实需求进行选择,并在导师指导下明确研究目标与核心问题,制定研究计划与实施方案。整个过程通常包含资料查阅、需求分析、系统设计、开发实现及测试优化等多个环节,确保研究的完整性与科学性。 在研究过程中,学生需具备较强的自主分析与问题解决能力,可能通过实验、调研、案例研究等方式收集数据并验证假设,从而提升专业技能与实际操作能力。撰写毕业设计报告是核心环节之一,需详细记录研究过程、方法、结果及结论,以全面展示研究成果。同时,这一过程也有助于提升学生的学术表达能力与逻辑思维水平。 最终,毕业设计成果将由导师及相关专家进行评审,评价标准涵盖创新性、应用价值、研究方法的合理性及论文撰写质量等方面。毕业设计的成绩将作为学生学业评估的重要依据,直接影响其毕业资格与学位授予。 总体而言,毕业设计是高校教学体系中的重要组成部分,不仅有助于学生深化专业知识,还能锻炼其独立研究与实践能力,为未来职业发展奠定良好基础。 资源来源于网络分享,仅用于学习交流使用,请勿用于商业,如有侵权请联系我删除!

最新推荐

recommend-type

MATLAB_四旋翼matlab模型.zip

MATLAB_四旋翼matlab模型.zip
recommend-type

二维码(31).zip

二维码(31).zip
recommend-type

生成二维码(8).zip

生成二维码(8).zip
recommend-type

二维码生成扫描demo.zip

二维码生成扫描demo.zip
recommend-type

数字图像处理研讨会——基于拉斐尔·冈萨雷斯和理查德·伍兹的《数字图像处理》一书_“Digital Image Proce

数字图像处理研讨会——基于拉斐尔·冈萨雷斯和理查德·伍兹的《数字图像处理》一书_“Digital Image Processing” Workshop - Based on Digital Image Processing Book by Rafael C. Gonzalez and Richard E. Woods.zip
recommend-type

Hyperledger Fabric v2与Accord Project Cicero智能合约开发指南

标题和描述中提到的“hlf-cicero-contract:Accord Project Cicero与Hyperledger Fabric v2签约”以及“半西约合同”暗示了与智能合约和区块链技术相关的知识点。下面详细说明这些知识点: ### 智能合约与区块链技术 智能合约是一套运行在区块链上的程序,当合约条款被触发时,合约会自动执行相应的操作。这种自动执行的特点使得智能合约特别适合于执行多方之间的可信交易,它能减少或消除中介服务的需要,从而降低交易成本并提高效率。 区块链技术是一种分布式账本技术,通过加密算法和共识机制保证了交易数据的不可篡改性和透明性。区块链上的每一笔交易都会被网络中的多个节点验证并记录,确保了交易记录的安全性。 ### Hyperledger Fabric v2 Hyperledger Fabric 是由Linux基金会托管的一个开源项目,它是企业级区块链框架,旨在为商业应用提供安全、模块化、可扩展的区块链平台。Hyperledger Fabric v2.2是该框架的一个版本。 Hyperledger Fabric v2支持链码(Chaincode)概念,链码是部署在Hyperledger Fabric网络上的应用程序,它可以被用来实现各种智能合约逻辑。链码在运行时与网络中的背书节点和排序服务交互,负责验证、执行交易以及维护账本状态。 ### Accord Project Cicero Accord Project Cicero 是一个开源的智能合同模板和执行引擎,它允许开发者使用自然语言来定义合同条款,并将这些合同转换为可以在区块链上执行的智能合约。CiceroMark是基于Markdown格式的一种扩展,它允许在文档中嵌入智能合约逻辑。 通过Accord Project Cicero,可以创建出易于理解、可执行的智能合约。这些合同可以与Hyperledger Fabric集成,利用其提供的安全、透明的区块链网络环境,从而使得合同条款的执行更加可靠。 ### 智能合约的安装与部署 描述中提到了“安装”和“启动”的步骤,这意味着为了使用HLF v2.2和Accord Project Cicero,需要先进行一系列的配置和安装工作。这通常包括设置环境变量(例如HLF_INSTALL_DIR)、安装区块链网络(Test-Net)以及安装其他必需的软件工具(如jq)。 jq是一个轻量级且灵活的命令行JSON处理器,常用于处理JSON数据。在区块链项目中,jq可以帮助开发者处理链码或智能合约的数据,特别是在与网络节点交互时。 ### JavaScript 标签 标签“JavaScript”表明本项目或相关文档中会涉及到JavaScript编程语言。Hyperledger Fabric v2支持多种智能合约语言,其中JavaScript是一个广泛使用的选项。JavaScript在编写链码时提供了灵活的语法和强大的库支持,是进行区块链开发的一个流行选择。 ### 文件结构 文件名称列表“hlf-cicero-contract-master”暗示这是一个包含所有相关文件和资源的项目源代码目录。这个名称通常表明开发者可以从该目录开始探索、安装和配置项目的所有组件。 ### 综合知识点 1. 智能合约与区块链技术可以自动化执行多方面可信交易,降低交易成本并提高效率。 2. Hyperledger Fabric v2为企业提供一个安全、模块化、可扩展的区块链平台。 3. Accord Project Cicero结合Markdown格式的CiceroMark,将自然语言定义的合同条款转换为可执行的智能合约。 4. 智能合约的安装和部署涉及到一系列配置步骤,包括设置环境变量、安装必要的软件工具等。 5. JavaScript是编写智能合约的流行语言之一,为区块链开发提供灵活性和强大的库支持。 6. 项目文件结构表明从一个主目录可以探索和配置所有项目相关文件和资源。 以上是根据文件标题、描述、标签以及文件名称列表所提取的相关知识点。这些知识点对于理解Hyperledger Fabric v2和Accord Project Cicero在智能合约中的应用尤为重要。
recommend-type

深度神经网络优化技巧全解析

# 深度神经网络优化技巧全解析 ## 1 超越 Sigmoid 函数 在神经网络中,Sigmoid 函数曾被广泛使用,但它存在一些问题,如梯度消失。研究人员一直在寻找更好的激活函数,且不同的激活函数在不同场景下表现各异。 ### 1.1 ReLU 函数登场 如今,修正线性单元(ReLU)成为了 Sigmoid 的常用替代函数。与 Sigmoid 相比,ReLU 非常简单,以下是其 Python 实现: ```python def relu(z): if z <= 0: return 0 else: return z ``` ReLU 由两段直
recommend-type

什么是噪声功率密度

<think> 嗯,用户这次问的是噪声功率密度的定义,结合之前的系统指令和站内引用,需要给出清晰准确的中文解释。用户可能是电子工程或通信专业的学生,正在学习噪声相关的理论基础,也可能是工程师需要实际应用概念。从引用来[^1]看,用户对相位噪声的定义有基础认知,但需要系统梳理核心概念。 噪声功率密度最本质的定义应该是单位带宽内的噪声功率。引用[^2]提到电阻热噪声的例子很典型,50Ω电阻在300K环境下功率谱密度是定值(0.9nV/√Hz)²,这说明: 1. 它描述的是噪声功率随频率分布的密度 2. 与带宽强相关(带宽加倍则总功率加倍) 3. 单位通常用W/Hz或V²/Hz 维纳-辛钦定理(
recommend-type

Libshare: Salesforce的高效可重用模块集合

Salesforce是一个云基础的CRM平台,它允许用户构建定制应用程序来满足特定的业务需求。Apex是Salesforce平台上的一个强类型编程语言,用于开发复杂的业务逻辑,通过触发器、类和组件等实现。这些组件使得开发者可以更高效地构建应用程序和扩展Salesforce的功能。 在提到的"libshare:经过测试的Salesforce可重用模块"文件中,首先介绍了一个名为Libshare的工具包。这个工具包包含了一系列已经过测试的可重用模块,旨在简化和加速Salesforce应用程序的开发。 Libshare的各个组成部分的知识点如下: 1. 设置模块:在Salesforce应用程序中,应用程序设置的管理是必不可少的一部分。设置模块提供了一种简便的方式存储应用程序的设置,并提供了一个易用的API来与之交互。这样,开发者可以轻松地为不同的环境配置相同的设置,并且可以快速地访问和修改这些配置。 2. Fluent断言模块:断言是单元测试中的关键组成部分,它们用于验证代码在特定条件下是否表现预期。Fluent断言模块受到Java世界中Assertj的启发,提供了一种更流畅的方式来编写断言。通过这种断言方式,可以编写更易于阅读和维护的测试代码,提高开发效率和测试质量。 3. 秒表模块:在性能调优和效率测试中,记录方法的执行时间是常见的需求。秒表模块为开发者提供了一种方便的方式来记录总时间,并跟踪每种方法所花费的时间。这使得开发者能够识别瓶颈并优化代码性能。 4. JsonMapper模块:随着Web API的广泛应用,JSON数据格式在应用程序开发中扮演了重要角色。JsonMapper模块为开发者提供了一个更高级别的抽象,用于读取和创建JSON内容。这能够大幅简化与JSON数据交互的代码,并提高开发效率。 5. utils模块:在软件开发过程中,经常会遇到需要重复实现一些功能的情况,这些功能可能是通用的,例如日期处理、字符串操作等。utils模块提供了一系列已经编写好的实用工具函数,可以用于节省时间,避免重复劳动,提高开发效率。 6. 记录器模块:记录器通常用于记录应用程序的运行日志,以便于问题诊断和性能监控。系统提供的System.debug功能虽然强大,但在大型应用中,统一的记录器包装器可以使得日志管理更加高效。记录器模块支持记录器名称,并且可以对日志进行适当的封装。 7. App Logger模块:App Logger模块扩展了记录器模块的功能,它允许开发者将日志语句保存到一个精心设计的App Log对象中。此外,App Logger模块支持存储长达56k字符的日志内容,这对于复杂应用的监控和调试非常有用。 8. 应用程序任务模块:在处理异步作业时,例如批量数据处理或定时任务,需要有一个框架来管理和跟踪这些任务。应用程序任务模块提供了一个框架,用于处理可排队的作业,并能够跟踪这些任务的执行情况。 通过Libshare提供的这些模块,Salesforce的开发者能够减少开发工作量,加快开发速度,并提高代码质量。这些模块能够帮助开发者避免重复的“造轮子”工作,专注于核心业务逻辑的实现。同时,由于Libshare作为托管程序包发布,开发者无需担心代码的维护和管理,只需将其添加到自己的Salesforce组织中即可使用。 Libshare的发布也强调了可重用性的重要性,这是软件工程领域中长期提倡的一个原则。通过使用可重用的组件,开发者能够遵循DRY(Don't Repeat Yourself)原则,从而减少代码的冗余,提高生产效率,同时降低因重复编写相同代码而导致错误的风险。 总之,Libshare是一个有价值的资源,对于那些希望在Salesforce平台上快速构建高效、可靠应用程序的开发者来说,这些预置的、经过测试的模块无疑是一个强大的助手。
recommend-type

机器学习技术要点与应用解析

# 机器学习技术要点与应用解析 ## 1. 机器学习基础概念 ### 1.1 数据类型与表示 在编程中,数据类型起着关键作用。Python 具有动态类型特性,允许变量在运行时改变类型。常见的数据类型转换函数包括 `bool()`、`int()`、`str()` 等。例如,`bool()` 函数可将值转换为布尔类型,`int()` 用于将值转换为整数类型。数据类型还包括列表(`lists`)、字典(`dictionaries`)、元组(`tuples`)等集合类型,其中列表使用方括号 `[]` 表示,字典使用花括号 `{}` 表示,元组使用圆括号 `()` 表示。 ### 1.2 变量与命名