活动介绍
file-type

公共数据分析入门:基于Jupyter Notebook的实践指南

ZIP文件

下载需积分: 5 | 10.95MB | 更新于2025-08-11 | 24 浏览量 | 0 下载量 举报 收藏
download 立即下载
### 公共数据分析基础课程实践代码知识点 #### 标题解析 课程标题为“open-data-analysis-basic:Infron-这是开始使用公共数据分析数据的课程的实践代码。”,从标题中可以提取出以下关键知识点: - **公共数据分析(Open Data Analysis)**:这是指利用公开可用的数据资源进行分析的过程。该领域的学习者会接触到如何搜集、处理、分析公共数据,以及如何将这些数据转化为有用的信息和知识。 - **基础(Basic)**:表明本课程内容面向初学者,着重于公共数据分析领域最基础、最核心的知识和技能。 - **Infron**:这个名字可能是本课程的特定名称或者是创建者的笔名,它在这里不具备特别的含义,仅仅是用来标识课程。 - **实践代码(Practical Code)**:强调在学习公共数据分析时,会通过实际的编程代码来操作和练习,说明课程会涉及一些具体编程语言或工具的使用。 #### 描述解析 从描述中可以提炼出以下几点重要知识点: - **数据和源代码下载**:这提示学习者课程会提供数据集和相应的源代码,学习者可以下载并运行这些代码,以便更好地理解和掌握数据分析的过程。 - **四个数据分析实践项目**: - **01新私人公寓销售价格分析**:这可能涉及到房价预测、市场分析、回归分析等数据科学领域中的问题。学习者可能需要处理房屋属性、销售时间等数据,从而得出价格走势或预测。 - **02通过购物中心信息学习技术统计的基础**:该练习可能与描述商业区特征、分析客流量、运营效率等商业统计学相关。技术统计学在这里指的是运用统计方法来提取信息、进行预测或分类。 - **03特许经营进入分析**:这可能包含市场分析、潜在的经营风险评估等,特许经营进入分析可能需要对特定地区或行业进行详细的市场调研和数据分析。 - **04星巴克Idiya商店位置比较**:这个项目可能重点在于比较不同品牌或店铺的地理位置分布、顾客分布特征,甚至是通过位置分析来预测销售量等。 每个练习项目都提供了相应的练习文件和结果文件。练习文件允许学习者自行尝试输入结果,而结果文件则包含了课程推荐的或正确的分析结果,这有利于学习者进行自我检查和对比。 #### 标签解析 - **Jupyter Notebook**:这是课程的一个重要标签,表明课程的实践部分将主要使用Jupyter Notebook这个工具。Jupyter Notebook是一个开源Web应用程序,允许用户创建和分享包含实时代码、方程、可视化和解释性文本的文档。它非常适合数据分析、机器学习等应用场景,因为可以将代码、结果和说明文本整合在同一个文档中,便于教学和学习。 #### 压缩包子文件的文件名称解析 - **open-data-analysis-basic-master**:这是课程的源代码仓库名称,它告诉学习者,课程的所有相关文件和练习材料都储存在这个压缩包中。从文件名的"master"部分可以推测,这是课程的主仓库或者主版本,通常含有课程最新的和完整的资源。 综上所述,本课程涵盖了公共数据分析的基础概念、四个实际的数据分析项目、Jupyter Notebook工具的使用以及如何获取和利用课程资源。对于初学者来说,这是一门全面的入门课程,它不仅提供了理论知识,更重要的是提供了丰富的实践操作机会,使得学习者可以亲手实践并深入理解数据分析的实际应用。通过这门课程,学习者可以为未来的数据分析、数据科学乃至机器学习等领域的深入学习打下坚实的基础。

相关推荐

filetype

#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import requests import time import chardet # 用于自动检测文件编码 from dataclasses import dataclass import concurrent.futures from typing import List @dataclass class AnalysisResult: filepath: str issues: dict model_used: str elapsed: float class OllamaAnalyzer: def __init__(self, server_url, analysis_level="standard"): self.base_url = server_url.rstrip('/') self.session = requests.Session() self.session.timeout = 30 self.analysis_level = analysis_level self.rules = { "basic": ["security"], "standard": ["security", "code_smell", "io"], "strict": ["security", "code_smell", "io", "performance", "complexity"] } def detect_encoding(self, filepath: str) -> str: """自动检测文件编码""" with open(filepath, 'rb') as f: rawdata = f.read(1024) # 读取前1KB用于检测 result = chardet.detect(rawdata) return result['encoding'] or 'utf-8' def analyze(self, filepath: str) -> AnalysisResult: """执行代码分析并打印原始响应""" start_time = time.time() try: encoding = self.detect_encoding(filepath) with open(filepath, 'r', encoding=encoding) as f: code = f.read() response = self._send_analysis_request(code) # 打印原始响应 print(f"\n🔍 原始分析响应 ({filepath}):") print(json.dumps(response, indent=2, ensure_ascii=False)) print("-" * 50) return AnalysisResult( filepath=filepath, issues=self._parse_response(response), model_used="codellama:7b", elapsed=time.time() - start_time ) except Exception as e: return AnalysisResult( filepath=filepath, issues={"ERROR": [f"分析失败: {str(e)}"]}, model_used="N/A", elapsed=time.time() - start_time ) def _send_analysis_request(self, code: str) -> dict: """发送分析请求到Ollama""" payload = { "model": "codellama:7b", "prompt": f"Analyze this code for issues:\n```csharp\n{code[:5000]}\n```", "stream": False, "options": {"temperature": 0.2} } try: resp = self.session.post( f"{self.base_url}/api/generate", json=payload, timeout=45 ) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException as e: return {"response": f"API请求失败: {str(e)}"} def _parse_response(self, response: dict) -> dict: """增强版响应解析器""" text = response.get('response', '').lower() issues = {} # 安全漏洞检测 if self.analysis_level in ["standard", "strict", "basic"]: security_terms = [ 'sql injection', 'xss', 'csrf', 'path traversal', 'insecure deserialization', 'buffer overflow', 'command injection' ] if any(term in text for term in security_terms): issues["SECURITY"] = ["发现潜在安全漏洞"] # 代码异味检测 if self.analysis_level in ["standard", "strict"]: code_smell_terms = [ 'unused variable', 'magic number', 'long method', 'duplicate code', 'god class', 'feature envy' ] if any(term in text for term in code_smell_terms): issues["CODE_SMELL"] = ["发现代码异味"] # 性能问题检测 if self.analysis_level == "strict": perf_terms = [ 'n+1 query', 'memory leak', 'inefficient loop', 'unbounded collection', 'excessive allocation' ] if any(term in text for term in perf_terms): issues["PERFORMANCE"] = ["发现潜在性能问题"] return issues if issues else {"INFO": ["未发现严重问题"]} def perform_analysis(server_url: str, filepaths: List[str]) -> None: """执行全自动分析流程""" class EnhancedAnalyzer(OllamaAnalyzer): def __init__(self, server_url): super().__init__(server_url, analysis_level="strict") self.success_count = 0 def analyze(self, filepath): result = super().analyze(filepath) if not any("ERROR" in k for k in result.issues): self.success_count += 1 return result print(f"\n🔧 正在初始化分析器 (服务器: {server_url})...") analyzer = EnhancedAnalyzer(server_url) with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map(analyzer.analyze, filepaths)) print("\n📊 分析完成!") print(f"✅ 成功分析文件数: {analyzer.success_count}/{len(filepaths)}") for result in results: print(f"\n📄 文件: {result.filepath}") for category, items in result.issues.items(): print(f" ⚠️ {category}: {', '.join(items)}") print(f" ⏱️ 耗时: {result.elapsed:.2f}秒") def create_test_files(): """创建包含多种编码的测试文件""" test_files = [] # UTF-8编码测试文件 utf8_file = "utf8_test.cs" with open(utf8_file, 'w', encoding='utf-8') as f: f.write(''' // UTF-8 测试文件 using System; class Program { static void Main() { Console.WriteLine("你好,世界!"); // 中文测试 var x = 100; // 魔法数字 } }''') test_files.append(utf8_file) # GBK编码测试文件 gbk_file = "gbk_test.cs" with open(gbk_file, 'w', encoding='gbk') as f: f.write(''' // GBK 测试文件 using System.Data.SqlClient; class Program { static void Main(string input) { var cmd = "SELECT * FROM Users WHERE Name='" + input + "'"; new SqlCommand(cmd).ExecuteReader(); // SQL注入测试 } }''') test_files.append(gbk_file) return test_files def main(): # 创建测试文件 test_files = create_test_files() # 使用扫描到的服务器 selected_server = "http://44.251.176.235:4711" perform_analysis(selected_server, test_files) # 清理测试文件 for filename in test_files: try: os.remove(filename) except Exception as e: print(f"⚠️ 清理文件 {filename} 失败: {str(e)}") if __name__ == "__main__": # 确保chardet包已安装 try: import chardet except ImportError: print("正在安装所需依赖...") os.system("pip install chardet requests") main(),返回python3 code_analyzer.py 🔧 正在初始化分析器 (服务器: http://44.251.176.235:4711)... 🔍 原始分析响应 (gbk_test.cs): 🔍 原始分析响应 (utf8_test.cs): 📊 分析完成! ✅ 成功分析文件数: 0/2 📄 文件: utf8_test.cs ⚠️ ERROR: 分析失败: name 'json' is not defined ⏱️ 耗时: 0.57秒 📄 文件: gbk_test.cs ⚠️ ERROR: 分析失败: name 'json' is not defined ⏱️ 耗时: 0.57秒,给出优化后的完整脚本

filetype

from jqdata import * from jqfactor import get_factor_values import datetime import math from scipy.optimize import minimize import pandas as pd # 初始化函数,设定基准等等 def initialize(context): # 设定沪深300作为基准 set_benchmark("399303.XSHE") # 打开防未来函数 set_option("avoid_future_data", True) # 开启动态复权模式(真实价格) set_option("use_real_price", True) # 输出内容到日志 log.info() log.info("初始函数开始运行") # 过滤掉order系列API产生的比error级别低的log log.set_level("order", "error") # 固定滑点设置ETF 0.001(即交易对手方一档价) set_slippage(FixedSlippage(0.002), type="fund") # 股票交易总成本0.3%(含固定滑点0.02) set_order_cost( OrderCost( open_tax=0, close_tax=0.001, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5, ), type="stock", ) g.hold_list = [] # 记录策略的持仓股票 g.positions = {} # 记录策略的持仓股票 # 持仓股票数 g.stock_sum = 6 # 判断买卖点的行业数量 g.num = 1 # 空仓的月份 g.pass_months = [] # 策略执行计划 run_weekly(adjust, 1, "9:31") run_daily(check, "14:50") # 获取昨日涨停票并卖出 def check(context): # 获取已持有列表 g.hold_list = list(g.positions.keys()) banner_stocks = [] # 获取昨日涨停列表 if g.hold_list != []: df = get_price( g.hold_list, end_date=context.previous_date, frequency="daily", fields=["close", "high_limit"], count=1, panel=False, fill_paused=False, ) df = df[df["close"] == df["high_limit"]] banner_stocks = list(df.code) for stock in banner_stocks: order_target_value_(context, stock, 0) # 获取昨日跌停列表 if g.hold_list != []: df = get_price( g.hold_list, end_date=context.previous_date, frequency="daily", fields=["close", "low_limit"], count=1, panel=False, fill_paused=False, ) df = df[df["close"] == df["low_limit"]] banner_stocks = list(df.code) for stock in banner_stocks: order_target_value_(context, stock, 0) # 获取策略当前持仓市值 def get_total_value(context): return sum(context.portfolio.positions[key].price * value for key, value in g.positions.items()) # 调仓 def adjust(context): target = select(context) # 获取前stock_sum个标的 target = target[:min(len(target), g.stock_sum)] # 获取已持有列表 g.hold_list = list(g.positions.keys()) portfolio = context.portfolio # 调仓卖出 for stock in g.hold_list: if stock not in target: order_target_value_(context, stock, 0) # 调仓买入 count = len(set(target) - set(g.hold_list)) if count == 0: return # 目标市值 target_value = portfolio.total_value # 当前市值 position_value = get_total_value(context) # 可用现金:当前现金 available_cash = portfolio.available_cash # 买入股票的总市值 value = max(0, min(target_value - position_value, available_cash)) # 等价值买入每一个未买入的标的 for security in target: if security not in g.hold_list: order_target_value_(context, security, value / count) # 择时 def select(context): I = get_market_breadth(context) industries = {"银行I", "煤炭I", "采掘I", "钢铁I"} if not industries.intersection(I) and not is_empty_month(context): return filter(context) return [] # 获取市场 def get_market_breadth(context): # 指定日期防止未来数据 yesterday = context.previous_date # 获取初始列表 中证全指(000985.XSHG) stocks = get_index_stocks("000985.XSHG") count = 1 h = get_price( stocks, end_date=yesterday, frequency="1d", fields=["close"], count=count + 20, panel=False, ) h["date"] = pd.DatetimeIndex(h.time).date # 将长表格转换为宽表格,方便按日期分析股票价格。 df_close = h.pivot(index="code", columns="date", values="close").dropna(axis=0) # 计算20日均线 df_ma20 = df_close.rolling(window=20, axis=1).mean().iloc[:, -count:] # 计算偏离程度 df_bias = df_close.iloc[:, -count:] > df_ma20 df_bias["industry_name"] = getStockIndustry(stocks) # 计算行业偏离比例 df_ratio = ((df_bias.groupby("industry_name").sum() * 100.0) / df_bias.groupby("industry_name").count()).round() # 获取偏离程度最高的行业 top_values = df_ratio.loc[:, yesterday].nlargest(g.num) I = top_values.index.tolist() return I # 基础过滤(过滤科创北交、ST、停牌、次新股) def filter_basic_stock(context, stock_list): # 30开头的是深交所的创业板, # 68开头的是上交所的科创板, # 8开头的股票可能指的是北交所的, # 新三板北交所的股票代码通常以43、83、87等开头 # 4开头的股票可能属于退市板块 current_data = get_current_data() return [ stock for stock in stock_list if not current_data[stock].paused and not current_data[stock].is_st and "ST" not in current_data[stock].name and "*" not in current_data[stock].name and "退" not in current_data[stock].name and not (stock[0] == "4" or stock[0] == "8" or stock[:2] == "68") and not context.previous_date - get_security_info(stock).start_date < datetime.timedelta(375) ] # 过滤当前时间涨跌停的股票 def filter_limitup_limitdown_stock(stock_list): current_data = get_current_data() return [ stock for stock in stock_list if current_data[stock].last_price < current_data[stock].high_limit and current_data[stock].last_price > current_data[stock].low_limit ] # 判断今天是在空仓月 def is_empty_month(context): month = context.current_dt.month return month in g.pass_months def getStockIndustry(stocks): # 第一步:获取原始行业数据(假设stocks是股票代码列表) industry = get_industry(stocks) # 第二步:提取申万一级行业名称 return pd.Series({stock: info["sw_l1"]["industry_name"] for stock, info in industry.items() if "sw_l1" in info}) # 过滤股票 def filter(context): stocks = get_index_stocks("399303.XSHE") # 这里的有问题,需要由399303.XSHE代替 stocks = filter_basic_stock(context, stocks) stocks = ( get_fundamentals( query( valuation.code, ) .filter( valuation.code.in_(stocks), # 从现有股票池中筛选 indicator.adjusted_profit > 0, # 要求调整后净利润>0 ) .order_by(valuation.market_cap.asc()) # 按市值升序排列(从小市值开始) ) .head(20) # 取前20只股票 .code # 提取股票代码 ) stocks = filter_limitup_limitdown_stock(stocks) return stocks # 自定义下单(涨跌停不交易) def order_target_value_(context, security, value): current_data = get_current_data() # 检查标的是否停牌、涨停、跌停 if current_data[security].paused: log.info(f"{security}: 今日停牌") return False # 检查是否涨停 if current_data[security].last_price == current_data[security].high_limit: log.info(f"{security}: 当前涨停") return False # 检查是否跌停 if current_data[security].last_price == current_data[security].low_limit: log.info(f"{security}: 当前跌停") return False # 获取当前标的的价格 price = current_data[security].last_price # 获取当前策略的持仓数量 current_position = g.positions.get(security, 0) # 计算目标持仓数量 target_position = (int(value / price) // 100) * 100 if price != 0 else 0 # 计算需要调整的数量 adjustment = target_position - current_position # 检查是否当天买入卖出 closeable_amount = context.portfolio.positions[security].closeable_amount if security in context.portfolio.positions else 0 if adjustment < 0 and closeable_amount == 0: log.info(f"{security}: 当天买入不可卖出") return False # 下单并更新持仓 if adjustment != 0: o = order(security, adjustment) if o: # 更新持仓数量 amount = o.amount if o.is_buy else -o.amount g.positions[security] = amount + current_position # 如果目标持仓为零,移除该证券 if target_position == 0: g.positions.pop(security, None) # 更新持有列表 g.hold_list = list(g.positions.keys()) return True return False (把这个聚宽的代码迁移到backtrade

JinTommy
  • 粉丝: 49
上传资源 快速赚钱