start_index = -1 for i, line in enumerate(lines): if 'Ini' in line: start_index = i + 1 if start_index == -1: print('Error: "Initialization " was not found in the log files.') exit()
时间: 2024-03-10 18:50:29 浏览: 111
这段代码的作用是在一个日志文件中查找包含字符串 `'Ini'` 的行,并将其后一行的行号赋值给变量 `start_index`。如果找不到这个字符串,则输出错误信息并退出程序。但是这段代码也存在一些问题。其中一个问题是:`if` 语句缺少冒号。应该在 `if start_index == -1` 后面加上冒号。另外,如果找不到字符串 `'Ini'`,应该输出错误信息并返回 `None` 而不是退出程序。下面是修改后的代码:
```
start_index = -1
for i, line in enumerate(lines):
if 'Ini' in line:
start_index = i + 1
break
if start_index == -1:
print('Error: "Ini" was not found in the log files.')
return None
```
相关问题
import configparser import re, os import openpyxl,csv import glob from openpyxl.styles import PatternFill STR_SPACE1 = " " DATA_BYTE1 = 8 wb = None ws = None byteRow = "" memoRow = "" config = {} def init_config(): print("init_config call") global config con = configparser.ConfigParser() con.read("FunctionTest.ini") for sec in con.sections(): tmp = {} key: object for (key, val) in con.items(sec): tmp[key] = val config[sec] = tmp def _reduce(xls_arr): new_xls = [] for line in xls_arr: if len(new_xls) == 0: new_xls.append(line) else: find = 0 for item in new_xls: if item["DataPos"] == line["DataPos"] and item["DataLength"] == line["DataLength"] and item["CANID"] == line["CANID"]: find = 1 break if find == 0: new_xls.append(line) return new_xls # 1.1 open xlsx def s1_generate_xls_array(xls, sheet, MsgLabel, CANID, DataPos, DataLength, debug): xls_data = [] last_msg_label = "" last_can_id = "" global wb, ws wb = openpyxl.load_workbook(xls) ws = wb[sheet] steps = {} stepstart = 9 stepend = 0 if sheet == "1Trip動作確認(HV)" or sheet == "1Trip動作確認" or sheet == "1Trip動作確認(ISG)" or sheet == "CAN出力データ_HV_CSA" or sheet == "CAN出力データ_ISG_CSA": stepend = 38 elif sheet == "CAN出力データ_HV_ICS" or sheet == "CAN出力データ_ISG_ICS": stepend = 23 elif sheet == "MUTE_クリソナ検知表示": stepend = 56 elif sheet == "ソナー音量のカスタマイズ対応" or sheet == "MUTE_RCDブザー吹鳴あり": stepend = 23 for row_num, row in enumerate(ws.iter_rows(), start=1): print("%s %s %s" % (xls, sheet, row_num)) data_pos = "" data_len = "" msg_label = "" can_id = "" step1expectval = "" for (col_num, cell) in enumerate(row, start=1): idx = str(col_num) print ( str(row_num) +" "+ str(col_num) +" " + str( cell.value)) if (MsgLabel == idx): msg_label = str(cell.value) elif (CANID == idx): can_id = str(cell.value) elif (DataPos == idx): data_pos = str(cell.value) elif (DataLength == idx): data_len = str(cell.value) elif (int(idx) >= int(stepstart) and int(idx) <= int(stepend) ): if ("12" == idx): step1expectval = str(cell.value) ddd=str(cell.value) if -1 != str(cell.value).find("step"): steps[str(cell.value)] = ws.cell(row_num + 2, col_num ).value #row_num + 1 ########################### #temp = ws.cell(row_num + 1, col_num ).fill.start_color.rgb #print( "66 14 = " + str(ws.cell(66, 14 ).fill.start_color.rgb)) ########################### print("---") if msg_label and msg_label.upper() != "NONE": last_msg_label = msg_label if can_id and can_id.upper() != "NONE": last_can_id = can_id if can_id == 119: print() if data_pos and data_len: line = _check_xls_data(last_msg_label, last_can_id, msg_label, can_id, data_pos, data_len, step1expectval) if line: line["row"] = row_num xls_data.append(line) xls_data = _reduce(xls_data) if int(debug) > 0: # print("write %s %s" % (xls, sheet)) f = openpyxl.Workbook() table = f.create_sheet(sheet, 0) table.cell(row=1, column=1, value="msg_label") table.cell(row=1, column=2, value="can_id") table.cell(row=1, column=3, value="data_pos") table.cell(row=1, column=4, value="data_length") for (line_num, lines) in enumerate(xls_data, start=2): # print("write line %s" % str(line_num)) table.cell(row=line_num, column=1, value=lines["MsgLabel"]) table.cell(row=line_num, column=2, value=lines["CANID"]) table.cell(row=line_num, column=3, value=lines["DataPos"]) table.cell(row=line_num, column=4, value=lines["DataLength"]) # f.save(os.path.splitext(xls)[0] + "_lines.xlsx") f.save(sheet + "_lines.xlsx") return xls_data,steps def _check_xls_data(last_msg_label, last_can_id, msg_label, can_id, data_pos, data_len, step1expectval): lines = {} if not data_pos: return "" if not data_len: return "" if "-" == step1expectval or "ー" == step1expectval or "‐" == step1expectval or "-" == step1expectval or "―" == step1expectval: return "" if re.match("[1-3]?[0-9]\.?[0-8]?$", data_pos) and re.match("[\d][\d]?$", data_len): lines["DataPos"] = data_pos lines["DataLength"] = data_len if msg_label.upper() == "NONE": msg_label = last_msg_label if can_id.upper() == "NONE": can_id = last_can_id lines["MsgLabel"] = msg_label.upper().strip() lines["CANID"] = can_id.upper().strip() return lines else: return "" # 2.1 open log file <LOOP> # 2.2 search need line from log # 2.3 generate log array def s2_generate_log_array(): file_array = {} for sec, items in config.items(): if not re.match("log", sec, re.IGNORECASE): continue debug = items.get("debug", "") # sheet = items.get("sheetname", "") for fname in glob.glob("**/*.asc", recursive=True): farr = [] print("1/3 analyse %s" % fname) with open(fname, 'r', encoding="utf-8", errors="ignore") as f: while True: line = f.readline() if not line: break else: match1 = re.match( "\s*([\d]+\.[\d]{6})\sCANFD\s+[\d]+\s+[a-zA-Z]{2}\s+([a-zA-Z0-9]+)\s+[a-zA-Z0-9]+\s+[\d]\s[\d]\s[a-zA-Z0-9]\s+[\d]{1,2}((\s[a-zA-Z0-9]{2})+)\s", line) match2 = re.match( "\s*([\d]+\.[\d]{6})\s+[\d]+\s+([\w]+)\s+[\w]+\s+d\s[\d]\s([0-9A-F\s]+)\s", line) if match1: farr.append([item.upper().strip() for item in list(match1.groups())]) if match2: farr.append([item.upper().strip() for item in list(match2.groups())]) file_array[fname] = farr if int(debug) > 0: f = openpyxl.Workbook() count = 0 for (fname, arr) in file_array.items(): print("1/3 write " + fname) table = f.create_sheet("log_sheet" + str(count)) table.cell(row=1, column=1, value=fname) for (line_num, line) in enumerate(arr, start=2): # print("write " + str(line_num)) for (col_num, cell) in enumerate(line, start=1): table.cell(row=line_num, column=col_num, value=cell) count = count + 1 f.save("_log.xlsx") return file_array def jionLogMsgDataBin(msgdata, bytestart, bitpos, bitlength): msgdatabin = bin(int("1" + str(msgdata).replace(STR_SPACE1, ""), 16))[3:] bit_end = bytestart * DATA_BYTE1 - bitpos bit_start = bit_end - bitlength log_data_value = int(msgdatabin[bit_start:bit_end], 2) return log_data_value def _append_array(array, item): arraysize = len(array) if 0 == arraysize: array.append(item) else: #jionLogMsgDataBin(array[arraysize - 1]["data"]) if not ( array[arraysize - 1]["CANID"] == item["CANID"] and array[arraysize - 1]["dataval"] == item["dataval"] and array[arraysize - 1]["DataPos"] == item["DataPos"] and array[arraysize - 1]["DataLength"] == item["DataLength"]): array.append(item) #print(array) # 3.3 get whole data 0 0 8 8 08 00 00 00 00 c0 00 47 def s3_get_whole_data(xls_data,steps, file_array, debug): # 3.1 get line from line array allstepsdata = {} _len = len(xls_data) for (cn, xls_lines) in enumerate(xls_data): print("%d/%d anaylse" % (cn, _len)) msg_label = xls_lines["MsgLabel"] can_id = xls_lines["CANID"] # 3.2 match log from log array stepdata = {} for (fname, arr) in file_array.items(): whole_data = {} for (line_num, line) in enumerate(arr): # if len(line) < 5: # continue if line[1].zfill(3) == can_id: # print("data " + line[4]) temp = xls_lines.get("data", "") if not xls_lines.get("data", ""): xls_lines["data"] = "" if not xls_lines.get("data", ""): xls_lines["timestamp"] = line[0] xls_lines["data"] = line[2] # new_xls.append(xls_lines) xls_lines = s3_get_bytes(xls_lines) dataval = jionLogMsgDataBin(xls_lines["data"], int(xls_lines["DataPos"].split(".")[0]), int(xls_lines["DataPos"].split(".")[1]), int(xls_lines["DataLength"])) xls_lines["dataval"] = dataval if whole_data.get(msg_label, ""): whole_data[msg_label].append(xls_lines) else: whole_data[msg_label] = [] #open(msg_label + ".csv", "w") whole_data[msg_label].append(xls_lines) else: #if xls_lines.get("dataval", "") != dataval: new_lines = {} new_lines["MsgLabel"] = msg_label new_lines["CANID"] = can_id new_lines["data"] = line[2] new_lines["timestamp"] = line[0] new_lines["DataPos"] = xls_lines["DataPos"] new_lines["DataLength"] = xls_lines["DataLength"] # new_xls.append(new_lines) new_lines = s3_get_bytes(new_lines) dataval = jionLogMsgDataBin( new_lines["data"], int(new_lines["DataPos"].split(".")[0]), int(new_lines["DataPos"].split(".")[1]), int(new_lines["DataLength"])) new_lines["dataval"] = dataval if whole_data.get(msg_label, ""): _append_array(whole_data[msg_label], new_lines) # whole_data[msg_label].append(new_lines) else: whole_data[msg_label] = [] # whole_data[msg_label].append(new_lines) _append_array(whole_data[msg_label], new_lines) print("") stepdata[fname] = whole_data allstepsdata[xls_lines["MsgLabel"]+"("+xls_lines["DataPos"]+")"] = stepdata return allstepsdata ''' def _write_log(whole_data): for (label, arr) in whole_data.items(): with open(label + ".csv", "a+") as f: for new_lines in arr: f.write("%s,%s,%s,%s,%s,%s,%s\n" % (new_lines["timestamp"], #new_lines["MsgLabel"], new_lines["CANID"], new_lines["CANID"], new_lines["DataPos"], new_lines["DataLength"], new_lines.get("data", ""), new_lines.get("l8", ""), str(new_lines.get("bytes", "")))) ''' def _expand_str(data): new = "0x" + data.replace(" ", "") size = int(len(new)/2 - 1)#2->3 s16 = int(new, 16) s64 = str(bin(s16))[2:].zfill(size*8) l8 = [] for i in range(size): l8.append(s64[0 + i * 8:8 + i * 8]) return l8 def _analyse_pos(pos): match = re.match("([\d]+)\.([\d]+)", pos) if (match): return (int(match.group(1)), int(match.group(2))) else: return (int(pos), -1) def _get_bytes(l8, begin, end, leng): front = [] if 1 < begin: [front.append(l8[idx]) for idx in range(begin - 1)] else: front = [] if end == -1: front.append(l8[begin - 1][:8]) else: front.append(l8[begin - 1][:8 - end]) whole = "".join(front) return whole[len(whole) - leng:] # 3.4 get bytes 08 00 00 00 00 c0 00 47 # 3.5 put to line array def s3_get_bytes(xls_lines): msg_label = xls_lines["MsgLabel"] can_id = xls_lines["CANID"] pos = xls_lines["DataPos"] data_length = xls_lines["DataLength"] data = xls_lines.get("data", "") if not data: xls_lines["bytes"] = "" else: l8 = _expand_str(data) xls_lines["l8"] = "-".join(l8) (begin, end) = _analyse_pos(pos) strTmp = _get_bytes(l8, begin, end, int(data_length)) xls_lines["bytes"] = str(int(strTmp,2))# print("bytes " + xls_lines["bytes"]) return xls_lines # 4.1 check result # 4.2 output result def s4_check_output(sheet, steps, xls_data, data,xls): lenval = len(xls_data) steps1=steps.keys() #stepm=[] for i in steps1: #stepm.append(i) for ii in memoRow: del memoRow[0] for (cn, xls_lines) in enumerate(xls_data): print("%d/%d output" % (cn, lenval)) msg_label = xls_lines["MsgLabel"]+"("+xls_lines["DataPos"]+")" msg_label1 = xls_lines["MsgLabel"] #读取msg can_id = xls_lines["CANID"] pos = xls_lines["DataPos"] data_length = xls_lines["DataLength"] msgdict = data.get(msg_label, []) msgdict1 = msgdict.get(i,[]) # msgdict2 = msgdict1.get(msg_label1,[])# mbytes = [] if len(msgdict): for new_lines in msgdict2: if ( # new_lines["MsgLabel"] == msg_label and new_lines["CANID"] == can_id and new_lines["DataPos"] == pos and new_lines["DataLength"] == data_length): #if not new_lines.get("bytes", "") in mbytes: mbytes.append(new_lines.get("bytes", "")) if len(mbytes): #ws[str(byteRow) + str(xls_lines.get("row"))] = mbytes[0] ws[str(ii) + str(xls_lines.get("row"))] = "→".join(mbytes[0:]) break wb.save(sheet + "_result.xlsx") wb.save(xls) def getbasicsteplines(actstr, startlineinde,linesdata): stepdata = [] nextstartindex = 0 nextstartindex1=0 index = startlineinde global SELECT1 for sec, items in config.items(): if not re.match("OUT", sec, re.IGNORECASE): continue SELECT1 = items.get("select", "") shiftno = int(actstr.split(",")[0]) actflg = actstr.split(",")[1] #actno = int(actstr.split(",")[2]) while index < len(linesdata): tempstr = linesdata[index] #PTCURSFTcan,PTCURSFTDPX,PTCURSFTDPY,PTCURSFTDL if tempstr[1].strip().zfill(3).upper() == PTCURSFTcan: tempshift = jionLogMsgDataBin(tempstr[2], PTCURSFTDPX, PTCURSFTDPY, PTCURSFTDL) if(SELECT1=="PTCU" or SELECT1=="BPRND"): if tempshift == 0: print(1) else: if shiftno != tempshift: break if(SELECT1=="SFTP" or SELECT1=="XPRND" or SELECT1=="JICSXPRND" or SELECT1=="JCSAXPRND"): if shiftno != tempshift: break if actflg == "125" : if tempstr[1].strip().zfill(3).upper() == "1A0": tempshift = jionLogMsgDataBin(tempstr[2], 3, 2, 11) if tempshift == 125: index = nextstartindex break else: nextstartindex = index stepdata.append(tempstr) index = index + 1 return stepdata ,index def getmixsteplines(actstr, startlineinde,linesdata): stepdata = [] index = startlineinde nextstartindex = 0 csr_mutestr = "" global SELECT1 for sec, items in config.items(): if not re.match("OUT", sec, re.IGNORECASE): continue SELECT1 = items.get("select", "") shiftno = int(actstr.split(",")[0]) actflg = actstr.split(",")[1] actno = int(actstr.split(",")[2]) #startFlag=False #endFlag= False while index < len(linesdata): tempstr = linesdata[index] if tempstr[1].strip().zfill(3).upper() == PTCURSFTcan: tempshift = jionLogMsgDataBin(tempstr[2], PTCURSFTDPX, PTCURSFTDPY, PTCURSFTDL) if(SELECT1=="XPRND" or SELECT1=="BPRND"): if tempshift == 0: print(1) else: if shiftno != tempshift: break if(SELECT1=="SFTP" or SELECT1=="XPRND" or SELECT1=="JICSXPRND" or SELECT1=="JCSAXPRND"): if shiftno != tempshift: break if actflg == "1" and actno == 0: if tempstr[1].strip().zfill(3).upper() == "51D": tempshift = jionLogMsgDataBin(tempstr[2], 1, 7, 1) if tempshift == 1: index = nextstartindex break else: nextstartindex = index if actflg == "1" and actno != 0: if tempstr[1].strip().zfill(3).upper() == "51D": tempshift = jionLogMsgDataBin(tempstr[2],1, 7, 1) if tempshift == 1: print("1") if len(csr_mutestr) == 0: csr_mutestr = csr_mutestr + str(tempshift) else: if int (csr_mutestr[len(csr_mutestr) - 1]) != tempshift: csr_mutestr = csr_mutestr + str(tempshift) else: print("0") nextstartindex = index if len(csr_mutestr) == 0: csr_mutestr = csr_mutestr + str(tempshift) else: if int (csr_mutestr[len(csr_mutestr) - 1]) != tempshift: csr_mutestr = csr_mutestr + str(tempshift) if "0101" == csr_mutestr: index = nextstartindex break stepdata.append(tempstr) index = index + 1 return stepdata ,index def getmixsteplines1(actstr, startlineinde,linesdata): stepdata = [] index = startlineinde nextstartindex = 0 csr_mutestr = "" global SELECT1 for sec, items in config.items(): if not re.match("OUT", sec, re.IGNORECASE): continue SELECT1 = items.get("select", "") shiftno = int(actstr.split(",")[0]) actflg = actstr.split(",")[1] actno = int(actstr.split(",")[2]) #startFlag=False #endFlag= False while index < len(linesdata): tempstr = linesdata[index] if tempstr[1].strip().zfill(3).upper() == PTCURSFTcan: tempshift = jionLogMsgDataBin(tempstr[2], PTCURSFTDPX, PTCURSFTDPY, PTCURSFTDL) if(SELECT1=="XPRND" or SELECT1=="BPRND"): if tempshift == 0: print(1) else: if shiftno != tempshift: break if(SELECT1=="SFTP" or SELECT1=="XPRND" or SELECT1=="JICSXPRND" or SELECT1=="JCSAXPRND"): if shiftno != tempshift: break if actflg == "0" and actno == 1: if tempstr[1].strip().zfill(3).upper() == "51D": tempshift = jionLogMsgDataBin(tempstr[2], 2, 3, 12) if tempshift == 193: index = nextstartindex break else: nextstartindex = index if actflg == "193" and actno == 1: if tempstr[1].strip().zfill(3).upper() == "390": tempshift = jionLogMsgDataBin(tempstr[2],1, 4, 4) if tempshift == 0: index = nextstartindex break else: nextstartindex = index stepdata.append(tempstr) index = index + 1 return stepdata ,index def file_array_steps(file_array, steps): allsteps_filearray = {} stepssize = len(steps) #farrallsteps = {} startlineinde = 0 #stepdata = [] for stepindex in steps: stepdata = [] if 10 == stepssize: print("1Trip動作確認(HV) or 1Trip動作確認(ISG)") print("1Trip動作確認") #linecnt = len(file_array.values) for linekey in file_array: linesdata = file_array[linekey] if stepindex == "step1" or stepindex == "step2" : if stepindex == "step1" : stepdata, cur_lineindex =getbasicsteplines(ONETRIP_BASIC_ACT[stepindex], startlineinde, linesdata) allsteps_filearray[stepindex] = stepdata startlineinde = cur_lineindex else: stepdata= allsteps_filearray["step1"] allsteps_filearray[stepindex] = stepdata startlineinde =startlineinde else: stepdata, cur_lineindex =getbasicsteplines(ONETRIP_BASIC_ACT[stepindex], startlineinde, linesdata) allsteps_filearray[stepindex] = stepdata startlineinde =cur_lineindex elif 16 == stepssize: print("MUTE_クリソナ検知表示") for linekey in file_array: linesdata = file_array[linekey] stepdata, cur_lineindex =getmixsteplines(ONETRIP_MUTE_CRISINA[stepindex], startlineinde, linesdata) allsteps_filearray[stepindex] = stepdata startlineinde =cur_lineindex elif 5 == stepssize: print("ソナー音量のカスタマイズ対応") for linekey in file_array: linesdata = file_array[linekey] if stepindex == "step1" or stepindex == "step2" : if stepindex == "step1" : stepdata, cur_lineindex =getmixsteplines1(ONETRIP_BASIC_ACT[stepindex], startlineinde, linesdata) allsteps_filearray[stepindex] = stepdata startlineinde = cur_lineindex else: stepdata= allsteps_filearray["step1"] allsteps_filearray[stepindex] = stepdata startlineinde =startlineinde else: stepdata, cur_lineindex =getmixsteplines1(ONETRIP_BASIC_ACT[stepindex], startlineinde, linesdata) allsteps_filearray[stepindex] = stepdata startlineinde =cur_lineindex allsteps_filearray[stepindex] = stepdata return allsteps_filearray def main(): xls = "" sheet = "" global byteRow, memoRow,PTCURSFTcan,PTCURSFTDPX,DISselect,PTCURSFTDPY,PTCURSFTDL,SELECT,SELECT1,ONETRIP_BASIC_ACT,ONETRIP_MUTE_CRISINA,ONETRIP_MUTE_CRISINA1,ONETRIP_MUTE_RCD print("1/3 analyse log") file_array = s2_generate_log_array() for sec, items in config.items(): if not re.match("OUT", sec, re.IGNORECASE): continue xls = items.get("xls", "") sheet = items.get("sheetname", "") msgLabel = items.get("msglabel", "") canid = items.get("canid", "") dataPos = items.get("datapos", "") dataLength = items.get("datalength", "") debug = items.get("debug", "") byteRow = items.get("bytes", "") memoRow1 = items.get("memo", "") memoRow = memoRow1.split() PTCURSFTcan = items.get("ptcursftc", "") PTCURSFTDPX1 = items.get("ptcursftdpxx", "") PTCURSFTDPX = int(PTCURSFTDPX1) PTCURSFTDPY1 = items.get("ptcursftdpyy", "") PTCURSFTDPY = int(PTCURSFTDPY1) #Disselect = items.get("disselect", "") #DISselect = int(Disselect) PTCURSFTDL1 = items.get("ptcursftdll", "") PTCURSFTDL = int(PTCURSFTDL1) SELECT1 = items.get("select", "") if SELECT1=="XPRND" : ONETRIP_BASIC_ACT = {"step1":"0,0,0","step2":"0,0,0","step3":"8,125,0","step4":"8,0,0","step5":"4,0,0", "step6":"2,0,0","step7":"4,0,0","step8":"8,0,0","step9":"0,0,0","step10":"0,0,0"} ONETRIP_MUTE_CRISINA = {"step1":"0,0,0", "step2":"8,1,0", "step3":"8,1,1", "step4":"8,1,2", "step5":"8,1,3", "step6":"8,1,4", "step7":"4,1,0", "step8":"4,1,1", "step9":"2,1,0", "step10":"2,1,1", "step11":"4,1,0", "step12":"4,1,1", "step13":"8,1,0", "step14":"8,1,1", "step15":"0,0,0", "step16":"0,0,0"} ONETRIP_MUTE_CRISINA1 = {"step1":"2,0,1", "step2":"2,0,1", "step3":"2,193,1", "step4":"2,193,0", "step5":"2,0,0"} xls_data = [] steps = {} if not sheet: continue # 1.2 search need line from xlsx # 1.3 generate line array if xls and sheet and msgLabel and canid and dataPos and dataLength: print("2/3 analyse %s sheet %s" % (xls, sheet)) xls_data, steps = s1_generate_xls_array(xls, sheet, msgLabel, canid, dataPos, dataLength, debug) print("3/3 get data from log") allsteps_filearray = file_array_steps(file_array, steps) #data = s3_get_whole_data(xls_data, steps, file_array, debug) data = s3_get_whole_data(xls_data, steps, allsteps_filearray, debug) #_write_log(data) # s4_arr = s3_get_bytes(s3_arr) s4_check_output(sheet, steps , xls_data, data,xls) else: print("config file error") if __name__ == '__main__': init_config() main()
<think>我们面对的是一个复杂的CAN数据分析脚本,主要用于处理Excel测试用例和日志文件,提取CAN数据并生成报告。整个脚本分为几个主要部分:配置初始化、Excel解析、日志解析、数据匹配和结果输出。下面我将逐一解释关键部分。##1.全局变量和配置初始化- `wb`,`ws`:用于操作Excel工作簿和工作表。- `byteRow`, `memoRow`:配置中定义的输出列。
-`config`:存储配置文件内容。- `init_config()`:读取配置文件"FunctionTest.ini"。##2.Excel处理函数###`s1_generate_xls_array(xls,sheet,...)`
-**功能**:解析Excel文件,提取测试用例信息(MsgLabel,CANID, DataPos,DataLength)和步骤信息(step1~stepN)。
-**关键步骤**:1.打开Excel文件,选择指定工作表。2.遍历每一行,提取所需列的信息。3.通过`_check_xls_data`验证数据有效性。4.使用`_reduce`去重(保留第一个出现的测试项)。5.如果开启调试,将解析结果写入新Excel文件。
###`_check_xls_data(last_msg_label, ...)`-验证数据位置(DataPos)和长度(DataLength)的格式,并构建测试项字典。##3.日志处理函数### `s2_generate_log_array()`- **功能**:读取所有.asc日志文件,解析每一行,提取时间戳、CAN ID和数据字段。
-**关键步骤**:1.使用`glob`遍历所有.asc文件。2.使用正则表达式匹配两种格式的日志行。3.将解析结果存入字典`file_array`,键为文件名,值为日志行列表。##4.数据匹配与处理### `s3_get_whole_data(xls_data,steps,file_array, debug)`- **功能**:将Excel测试项与日志数据进行匹配,提取指定位置的数据值。
-**关键步骤**:1.遍历每个测试项(xls_data)。2.遍历每个日志文件(file_array)。3.在日志中匹配相同CANID的帧。4.使用`jionLogMsgDataBin`提取数据值(根据DataPos和DataLength)。5.使用`_append_array`避免重复添加连续相同的数据。6.结果存储在`allstepsdata`中,结构为:`{"MsgLabel(DataPos)": {文件名:{ "MsgLabel": [匹配的数据行 ]} }}`### `jionLogMsgDataBin(msgdata, bytestart, bitpos,bitlength)`-将CAN数据(十六进制字符串)转换为二进制字符串,然后根据指定的字节起始、位偏移和长度提取整数值。##5.结果输出###`s4_check_output(sheet,steps,xls_data,data,xls)`- **功能**:将匹配到的数据值写入Excel的对应步骤列中。
-**关键步骤**:1.遍历每个步骤(steps)。2.遍历`memoRow`(配置中定义的输出列,如A、B等)。3.遍历每个测试项,从`data`中获取匹配的数据值(多个值用"→"连接)。4.写入Excel并保存。##6.步骤划分函数### `file_array_steps(file_array, steps)`- **功能**:根据配置的步骤定义,将日志数据按步骤划分。
-根据不同的测试类型(如1Trip動作確認、MUTE_クリソナ検知表示等)使用不同的函数(`getbasicsteplines`,`getmixsteplines`)划分步骤。
-划分依据:根据换挡信息(PTCURSFTcan)或其他特定CAN信号的值变化。
##7.主函数`main()`-流程:1.初始化配置。2.解析日志文件(s2_generate_log_array)。3.遍历配置中的OUT部分,处理每个Excel工作表。4.解析Excel测试用例(s1_generate_xls_array)。5.划分日志步骤(file_array_steps)。6.匹配数据(s3_get_whole_data)。7.输出结果(s4_check_output)。##8.辅助函数- `_expand_str(data)`:将十六进制字符串转换为二进制字符串列表(按字节分割)。
-`_analyse_pos(pos)`:解析数据位置字符串(如"1.0")为(字节,位)。- `_get_bytes(l8,begin,end,leng)`:从二进制字符串列表中提取指定位。##优化建议1. **全局变量**:尽量减少全局变量的使用,通过参数传递。2.**函数拆分**:部分函数过长(如`s1_generate_xls_array`),可进一步拆分。3.**错误处理**:增加异常处理,确保文件操作安全。
4. **性能**:对于大日志文件,考虑更高效的数据结构(如使用字典索引CAN ID)。
##示例:数据提取过程假设测试项:
-MsgLabel:"MotorSpeed"- CANID:"51D"- DataPos:"1.0"- DataLength:16在日志中匹配到CAN ID为51D的帧数据:"0000000000 C00047",则:1.转换为二进制字符串(按字节):['00000000','00000000', ...]
2.根据DataPos "1.0":起始字节1,位0,提取16位(即第一个字节的全部8位和第二个字节的前8位)。
3.得到二进制字符串"0000000000000000",转换为整数0。
##§§相关问题§§:
1.在函数`jionLogMsgDataBin`中,为什么使用`bin(int("1"+msgdata,16))[3:]`?这样做有什么目的?
2.在步骤划分函数`getbasicsteplines`中,如何根据换挡信号(PTCURSFTcan)划分步骤?请举例说明。
3.函数`_append_array`是如何避免添加重复数据的?它判断重复的依据是什么?
4.在`s4_check_output`函数中,为什么使用`memoRow`列表来管理输出列?如何确保每个步骤写入正确的列?</think># CAN数据分析工具全面解析
这个脚本是一个完整的CAN数据分析工具,用于处理Excel测试用例和.asc日志文件,提取CAN数据并生成分析报告。下面我将详细解释整个系统的结构和关键功能。
## 整体架构
```mermaid
graph TD
A[主程序] --> B[初始化配置]
A --> C[解析日志文件]
A --> D[处理Excel测试用例]
D --> E[数据匹配]
E --> F[结果输出]
C --> G[步骤划分]
G --> E
```
## 关键功能模块详解
### 1. 配置初始化 (`init_config()`)
```python
def init_config():
global config
con = configparser.ConfigParser()
con.read("FunctionTest.ini")
for sec in con.sections():
tmp = {}
for (key, val) in con.items(sec):
tmp[key] = val
config[sec] = tmp
```
- **功能**:从INI配置文件读取设置
- **关键参数**:
- `xls`: Excel文件路径
- `sheetname`: 工作表名称
- `msglabel`, `canid`: CAN消息标识
- `datapos`, `datalength`: 数据位置和长度
- `bytes`, `memo`: 结果输出位置
### 2. Excel测试用例处理 (`s1_generate_xls_array()`)
```python
def s1_generate_xls_array(xls, sheet, MsgLabel, CANID, DataPos, DataLength, debug):
global wb, ws
wb = openpyxl.load_workbook(xls)
ws = wb[sheet]
steps = {}
xls_data = []
# 遍历Excel行
for row_num, row in enumerate(ws.iter_rows(), start=1):
for (col_num, cell) in enumerate(row, start=1):
# 提取关键字段
if MsgLabel == str(col_num):
msg_label = str(cell.value)
# ...其他字段提取...
# 验证并存储有效数据
if data_pos and data_len:
line = _check_xls_data(...)
if line:
line["row"] = row_num
xls_data.append(line)
# 去重处理
xls_data = _reduce(xls_data)
return xls_data, steps
```
- **功能**:解析Excel中的测试用例
- **关键步骤**:
1. 打开Excel工作簿
2. 遍历每一行提取测试参数
3. 验证数据有效性 (`_check_xls_data`)
4. 去重处理 (`_reduce`)
5. 返回测试用例列表和步骤定义
### 3. 日志文件处理 (`s2_generate_log_array()`)
```python
def s2_generate_log_array():
file_array = {}
for sec, items in config.items():
if not re.match("log", sec, re.IGNORECASE):
continue
# 遍历所有.asc文件
for fname in glob.glob("**/*.asc", recursive=True):
farr = []
with open(fname, 'r', encoding="utf-8", errors="ignore") as f:
while True:
line = f.readline()
# 正则匹配两种日志格式
match1 = re.match(...)
match2 = re.match(...)
if match1 or match2:
farr.append([item.upper().strip() for item in list(match.groups())])
file_array[fname] = farr
return file_array
```
- **功能**:解析所有.asc日志文件
- **关键点**:
- 使用正则表达式匹配两种CAN日志格式
- 递归搜索所有子目录中的.asc文件
- 返回结构:`{文件名: [[时间戳, CAN ID, 数据], ...]}`
### 4. 数据提取与处理 (`s3_get_whole_data()`)
```python
def s3_get_whole_data(xls_data, steps, file_array, debug):
allstepsdata = {}
for (cn, xls_lines) in enumerate(xls_data):
stepdata = {}
for (fname, arr) in file_array.items():
whole_data = {}
for (line_num, line) in enumerate(arr):
if line[1].zfill(3) == xls_lines["CANID"]:
# 提取数据值
dataval = jionLogMsgDataBin(
line[2],
int(xls_lines["DataPos"].split(".")[0]),
int(xls_lines["DataPos"].split(".")[1]),
int(xls_lines["DataLength"])
)
# 存储结果
if whole_data.get(xls_lines["MsgLabel"], ""):
_append_array(whole_data[xls_lines["MsgLabel"]], new_lines)
else:
whole_data[xls_lines["MsgLabel"]] = [new_lines]
stepdata[fname] = whole_data
key = f"{xls_lines['MsgLabel']}({xls_lines['DataPos']})"
allstepsdata[key] = stepdata
return allstepsdata
```
- **功能**:匹配测试用例与日志数据
- **核心技术**:
- `jionLogMsgDataBin()`:根据字节/位偏移提取数据值
- `_append_array()`:避免重复添加相同数据
- 多层嵌套结构存储结果
### 5. 数据值提取算法 (`jionLogMsgDataBin()`)
```python
def jionLogMsgDataBin(msgdata, bytestart, bitpos, bitlength):
# 移除空格并转换为二进制字符串
hex_str = str(msgdata).replace(STR_SPACE1, "")
bin_str = bin(int("1" + hex_str, 16))[3:] # 添加前导1避免前导零丢失
# 计算位范围
bit_end = bytestart * DATA_BYTE1 - bitpos
bit_start = bit_end - bitlength
# 提取并转换值
return int(bin_str[bit_start:bit_end], 2)
```
- **输入**:原始数据、字节起始位置、位偏移、位长度
- **输出**:提取的整数值
- **示例**:
- 输入:`msgdata="08 00"`, `bytestart=1`, `bitpos=0`, `bitlength=8`
- 处理:`08 00` → `0000100000000000`
- 提取:字节1的8位 → `00001000` (8)
### 6. 结果输出 (`s4_check_output()`)
```python
def s4_check_output(sheet, steps, xls_data, data, xls):
steps1 = steps.keys()
for i in steps1: # 遍历所有步骤
for ii in memoRow: # 遍历输出列
del memoRow[0] # 移除已使用的列
for (cn, xls_lines) in enumerate(xls_data): # 遍历测试项
# 获取匹配的数据值
mbytes = []
msg_label = f"{xls_lines['MsgLabel']}({xls_lines['DataPos']})"
msg_data = data.get(msg_label, {}).get(i, {}).get(xls_lines['MsgLabel'], [])
for new_lines in msg_data:
if (new_lines["CANID"] == xls_lines["CANID"] and
new_lines["DataPos"] == xls_lines["DataPos"] and
new_lines["DataLength"] == xls_lines["DataLength"]):
mbytes.append(new_lines.get("bytes", ""))
# 写入Excel
if mbytes:
cell = f"{ii}{xls_lines.get('row')}"
ws[cell] = "→".join(mbytes)
# 保存结果
wb.save(sheet + "_result.xlsx")
wb.save(xls)
```
- **功能**:将分析结果写入Excel
- **特点**:
- 按步骤和测试项组织输出
- 使用箭头连接多个值(如"01→02→03")
- 保存原始文件和结果文件
### 7. 步骤划分逻辑 (`file_array_steps()`)
```python
def file_array_steps(file_array, steps):
allsteps_filearray = {}
startlineindex = 0
for stepindex in steps:
stepdata = []
# 根据测试类型选择划分策略
if "1Trip" in sheet: # 基本步骤划分
stepdata, startlineindex = getbasicsteplines(
ONETRIP_BASIC_ACT[stepindex],
startlineindex,
linesdata
)
elif "MUTE" in sheet: # 复杂步骤划分
stepdata, startlineindex = getmixsteplines(
ONETRIP_MUTE_CRISINA[stepindex],
startlineindex,
linesdata
)
allsteps_filearray[stepindex] = stepdata
return allsteps_filearray
```
- **功能**:将日志数据按测试步骤划分
- **划分依据**:
- 换挡信号值 (`PTCURSFTcan`)
- 特定CAN信号的变化(如ID 51D的第7位)
- 配置文件中定义的步骤序列
## 系统运行流程
1. **初始化**:读取配置文件(`FunctionTest.ini`)
2. **日志处理**:解析所有.asc日志文件
3. **测试用例处理**:解析Excel中的测试项
4. **步骤划分**:根据配置将日志数据分段
5. **数据匹配**:提取每个测试项对应的数据值
6. **结果输出**:将提取的值写入Excel报告
## 典型应用场景
1. **汽车电子测试**:验证CAN总线信号是否符合预期
2. **故障诊断**:分析特定条件下的信号变化
3. **自动化测试**:批量处理多个测试用例
4. **数据报告生成**:创建可追溯的测试报告
##
def parse_model_config(path): """Parses the yolo-v3 layer configuration file and returns module definitions""" file = open(path, 'r') lines = file.read().split('\n') lines = [x for x in lines if x and not x.startswith('#')] lines = [x.rstrip().lstrip() for x in lines] # get rid of fringe whitespaces module_defs = [] for line in lines: if line.startswith('['): # This marks the start of a new block module_defs.append({}) module_defs[-1]['type'] = line[1:-1].rstrip() if module_defs[-1]['type'] == 'convolutional': module_defs[-1]['batch_normalize'] = 0 else: key, value = line.split("=") value = value.strip() module_defs[-1][key.rstrip()] = value.strip() return module_defs
<think>好的,我现在需要仔细分析用户提供的`parse_model_config`函数,并按照之前的格式结构进行解析。用户之前询问了数据配置相关的代码,现在转向模型配置解析,所以我需要确保回答结构清晰,符合他们的要求。
首先,理解这个函数的功能。根据注释,这个函数用于解析YOLO-v3的层配置文件,返回模块定义。用户给出的代码中,函数读取配置文件,处理每一行,提取模块类型和参数,最终生成一个字典列表。我需要拆解每个步骤的作用,并解释其应用场景。
接下来,按照之前的模板结构,我需要分几个部分:核心功能、关键组件解析、应用场景、对比传统写法、常见问题及解决方案、最佳实践和扩展技巧。需要确保每个部分都详细且易于理解。
首先,核心功能部分,要简要说明函数的目的。这个函数主要是解析模型配置文件,生成结构化的模块定义列表,供模型构建使用。比如,YOLO的配置文件通常包含卷积层、上采样层、路由层等,每个模块有不同的参数。
关键组件解析部分,需要逐行分析代码。例如,打开文件、处理行、去除注释和空白、遍历行来构建模块定义。需要注意如何处理以'['开头的行作为新模块,以及如何分割键值对。特别要提到,对于卷积层,默认设置batch_normalize为0,这可能影响模型结构。
应用场景部分,需要说明这个函数在模型构建中的用途,比如在加载YOLO模型时,根据配置文件动态创建层。还可以举出具体的例子,如Darknet-53的主干网络构建,或YOLO检测头的配置。
与传统写法的对比,可能需要对比硬编码模型结构与使用配置文件的灵活性。比如,修改网络结构只需改配置文件而不需要改代码,提升可维护性。
常见问题部分,需要考虑可能的错误,比如文件不存在、键错误、参数类型错误等,给出相应的解决方案。例如,文件路径错误导致无法打开,或者配置文件格式不正确导致解析失败。
最佳实践建议可能包括使用YAML等更结构化的格式,增加异常处理,参数校验等。例如,在读取文件时添加try-except块,确保文件存在,或者校验必要参数是否存在。
扩展应用技巧方面,可以提到如何支持多模型配置,动态调整超参数,或者添加自定义层类型。例如,根据不同的任务选择不同的配置文件,或者在解析时添加额外的参数处理逻辑。
需要注意的是,用户可能对YOLO的配置文件结构不熟悉,所以需要简要说明配置文件的常见结构,比如每个模块用方括号定义类型,下面是键值对参数。例如:
[convolutional]
batch_normalize=1
filters=32
size=3
这样的例子有助于用户理解解析后的数据结构。
另外,要确保解释清楚module_defs的结构,每个元素是一个字典,包含'type'和对应的参数。比如,第一个模块可能是卷积层,包含batch_normalize、filters等参数。
在编写过程中,要使用中文,避免Markdown格式,但按照用户之前的示例,他们可能希望保留一定的结构,比如分节和代码块。需要确认用户是否接受这样的格式,但根据历史记录,之前的回答使用了分节和代码块,所以可以继续沿用。
最后,需要检查是否有遗漏的关键点,比如函数中的细节处理,如去除行首行尾空格,处理注释行等。这些细节对于确保解析正确很重要,需在解析中说明。
总结整个思考过程,我需要逐步拆解函数,解释每部分代码的作用,结合实际应用场景,对比优缺点,提供问题解决方法和优化建议,确保用户能够全面理解该函数的功能和实现方式。</think>---
### 一、**代码核心功能**
该函数的 **核心目标** 是解析YOLO模型的配置文件(如`yolov3.cfg`),将其转换为结构化的模块定义列表。每个模块定义描述神经网络层的类型(如卷积层、上采样层)及其参数(如过滤器数量、步长等),供后续模型构建使用。
```python
def parse_model_config(path):
"""解析YOLO模型配置文件,返回模块定义列表"""
# 步骤拆解:
# 1. 读取配置文件 → 2. 清洗数据 → 3. 逐行解析 → 4. 构建结构化字典
file = open(path, 'r')
lines = file.read().split('\n') # 按行分割文本
lines = [x for x in lines if x and not x.startswith('#')] # 过滤空行和注释
lines = [x.rstrip().lstrip() for x in lines] # 去除首尾空格
module_defs = [] # 存储模块定义的列表
for line in lines:
if line.startswith('['): # 检测到新模块的开始
module_defs.append({}) # 添加空字典表示新模块
module_defs[-1]['type'] = line[1:-1].rstrip() # 提取模块类型(如'convolutional')
# 特殊处理:为卷积层设置默认参数
if module_defs[-1]['type'] == 'convolutional':
module_defs[-1]['batch_normalize'] = 0 # 默认禁用批量归一化
else: # 处理键值对参数
key, value = line.split("=") # 分割参数名和值
module_defs[-1][key.rstrip()] = value.strip() # 存储到当前模块字典
return module_defs
```
---
### 二、**关键组件解析**
#### 1. **文件读取与清洗**
- **`file.read().split('\n')`**
将配置文件按行分割为列表,例如:
```ini
[convolutional]
filters=32
size=3
```
转换为:`['[convolutional]', 'filters=32', 'size=3']`
- **过滤注释与空行**
`lines = [x for x in lines if x and not x.startswith('#')]` 移除以`#`开头的注释行和空行。
#### 2. **模块类型识别**
- **方括号标记模块头**
当检测到以`[`开头的行(如`[convolutional]`),表示新模块开始,创建空字典并记录模块类型。
#### 3. **参数解析逻辑**
- **键值对分割**
使用`split("=")`处理非模块头的行,例如将`filters=32`分割为`key="filters"`, `value="32"`。
- **特殊默认值处理**
对`convolutional`类型模块,默认设置`batch_normalize=0`(需注意实际配置可能覆盖此值)。
---
### 三、**输入输出示例**
#### 输入配置文件片段 (`yolov3.cfg`)
```ini
[convolutional]
batch_normalize=1
filters=64
size=3
stride=2
[upsample]
stride=2
```
#### 输出结构化数据
```python
[
{
'type': 'convolutional',
'batch_normalize': '1', # 注意:此处值被覆盖为字符串
'filters': '64',
'size': '3',
'stride': '2'
},
{
'type': 'upsample',
'stride': '2'
}
]
```
---
### 四、**典型应用场景**
#### 场景1:动态构建YOLO模型
```python
# 根据解析结果创建PyTorch模块
from models.yolo import Darknet
model = Darknet("yolov3.cfg") # 内部调用parse_model_config解析配置
model.load_state_dict(torch.load("weights.pth"))
```
#### 场景2:模型结构可视化
```python
# 打印模块定义列表
config = parse_model_config("yolov3.cfg")
for i, layer in enumerate(config):
print(f"Layer {i}: {layer['type']} | Params: {layer}")
```
#### 场景3:参数覆盖调试
```python
# 手动修改特定层的参数
config = parse_model_config("yolov3.cfg")
config[3]['filters'] = '128' # 修改第4层的过滤器数量
```
---
### 五、**与传统写法的对比**
#### ❌ 硬编码模型结构(不推荐)
```python
# 直接在代码中定义网络结构(灵活性差)
class YOLO(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=3)
# 结构变更需修改代码
```
#### ✅ 配置文件驱动(推荐)
```python
# 通过修改配置文件调整网络结构
# 代码无需改动即可适配不同模型变体
```
---
### 六、**常见问题与解决方案**
| 问题现象 | 原因分析 | 解决方案 |
|-----------------------------|-------------------------|----------------------------|
| `KeyError` | 参数未正确分割 | 检查行中是否包含合法的`=`符号 |
| 数值参数被误存为字符串 | 未进行类型转换 | 添加`int(value)`/`float(value)`转换 |
| `IndexError` | 配置文件格式错误(如未以`[`开头) | 验证配置文件是否符合标准格式 |
| 缺少必要参数 | 配置文件中遗漏关键字段 | 添加默认值或严格校验配置 |
---
### 七、**最佳实践建议**
#### 1. 增强类型转换
```python
# 自动转换数值型参数
key = key.rstrip()
value = value.strip()
if value.isdigit():
module_defs[-1][key] = int(value)
elif value.replace('.', '', 1).isdigit(): # 简单浮点数检测
module_defs[-1][key] = float(value)
else:
module_defs[-1][key] = value
```
#### 2. 添加必要校验
```python
# 检查必要参数是否存在
required_params = {
'convolutional': ['filters', 'size', 'stride'],
'route': ['layers']
}
for layer in module_defs:
layer_type = layer['type']
if layer_type in required_params:
for param in required_params[layer_type]:
assert param in layer, f"{layer_type}层缺少必要参数: {param}"
```
#### 3. 支持多配置文件
```python
# 合并多个配置文件(如主干网络+检测头)
def parse_model_config(paths):
all_lines = []
for path in paths:
with open(path, 'r') as f:
all_lines += f.read().split('\n')
# 后续解析逻辑不变...
```
---
### 八、**扩展应用技巧**
#### 1. 动态超参数调整
```python
# 根据设备条件自动调整参数
if torch.cuda.is_available():
config = parse_model_config("yolov3-gpu.cfg")
else:
config = parse_model_config("yolov3-cpu.cfg")
```
#### 2. 自定义层支持
```python
# 添加对新层类型的处理
if module_defs[-1]['type'] == 'my_custom_layer':
module_defs[-1]['custom_param'] = 'default_value'
```
是否需要进一步了解如何将解析后的配置转换为实际的神经网络层实现? 🚀
阅读全文
相关推荐
















