1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
| import xml.etree.ElementTree as ET import csv import os import traceback
def format_helix_qac_xml_to_csv(xml_file_path, csv_file_path): """ 将Helix QAC的XML输出文件格式化为CSV文件,每条诊断(包括子诊断)占据一行。
Args: xml_file_path (str): 输入的XML文件路径。 csv_file_path (str): 输出的CSV文件路径。 Returns: bool: True 表示处理成功, False 表示处理失败。 """ try: tree = ET.parse(xml_file_path) root = tree.getroot() except FileNotFoundError: print(f"错误: XML文件 '{xml_file_path}' 未找到。请确保文件存在于指定的路径。") return False except ET.ParseError as e: print(f"错误: 解析XML文件 '{xml_file_path}' 时发生错误: {e}") return False except Exception as e: print(f"读取或解析XML文件 '{xml_file_path}' 时发生未知错误: {e}") traceback.print_exc() return False
csv_columns = [ "Diag_ID", "Diag_Type", "Primary_File_Path", "Primary_Line", "Primary_Column", "BaseName", "FileName", "FilePath", "Line", "Column", "MsgText", "RuleId", "RuleNum", "MsgNum", "Severity", "HelpPath", "RuleCategories", "RuleGroupName", "Producer" ]
extracted_data = []
for file_node in root.findall(".//File"): primary_file_path = file_node.findtext("Name", default="N/A")
for diag_node in file_node.findall(".//Diag"): diag_id = diag_node.get("id") main_diag_row = { "Diag_ID": f"Main-{diag_id}", "Diag_Type": "Main Diag", "Primary_File_Path": primary_file_path, "Primary_Line": diag_node.findtext("Line", default="N/A"), "Primary_Column": diag_node.findtext("Column", default="N/A"), "BaseName": diag_node.findtext("BaseName", default="N/A"), "FileName": diag_node.findtext("FileName", default="N/A"), "FilePath": diag_node.findtext("FilePath", default="N/A"), "Line": diag_node.findtext("Line", default="N/A"), "Column": diag_node.findtext("Column", default="N/A"), "MsgText": diag_node.findtext("MsgText", default="N/A").strip(), "RuleId": diag_node.findtext("RuleId", default="N/A"), "RuleNum": diag_node.findtext("RuleNum", default="N/A"), "MsgNum": diag_node.findtext("MsgNum", default="N/A"), "Severity": diag_node.findtext("Severity", default="N/A"), "HelpPath": diag_node.findtext("HelpPath", default="N/A"), "RuleCategories": diag_node.findtext("RuleCategories", default="N/A"), "RuleGroupName": diag_node.findtext("RuleGroupName", default="N/A"), "Producer": diag_node.findtext("Producer", default="N/A") } extracted_data.append(main_diag_row)
for sub_diag_node in diag_node.findall(".//SubDiag"): sub_diag_id = sub_diag_node.get("id") sub_diag_row = { "Diag_ID": f"Sub-{diag_id}-{sub_diag_id}", "Diag_Type": "Sub Diag", "Primary_File_Path": primary_file_path, "Primary_Line": diag_node.findtext("Line", default="N/A"), "Primary_Column": diag_node.findtext("Column", default="N/A"), "BaseName": sub_diag_node.findtext("BaseName", default="N/A"), "FileName": sub_diag_node.findtext("FileName", default="N/A"), "FilePath": sub_diag_node.findtext("FilePath", default="N/A"), "Line": sub_diag_node.findtext("Line", default="N/A"), "Column": sub_diag_node.findtext("Column", default="N/A"), "MsgText": sub_diag_node.findtext("MsgText", default="N/A").strip(), "RuleId": sub_diag_node.findtext("RuleId", default="N/A"), "RuleNum": sub_diag_node.findtext("RuleNum", default="N/A"), "MsgNum": sub_diag_node.findtext("MsgNum", default="N/A"), "Severity": sub_diag_node.findtext("Severity", default="N/A"), "HelpPath": sub_diag_node.findtext("HelpPath", default="N/A"), "RuleCategories": sub_diag_node.findtext("RuleCategories", default="N/A"), "RuleGroupName": sub_diag_node.findtext("RuleGroupName", default="N/A"), "Producer": sub_diag_node.findtext("Producer", default="N/A") } extracted_data.append(sub_diag_row)
if extracted_data: try: os.makedirs(os.path.dirname(csv_file_path), exist_ok=True) with open(csv_file_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=csv_columns) writer.writeheader() writer.writerows(extracted_data) print(f"成功将格式化数据写入到 '{csv_file_path}'") return True except IOError as e: print(f"写入CSV文件 '{csv_file_path}' 时发生IO错误: {e}") return False else: print(f"文件 '{xml_file_path}' 中没有可写入CSV的诊断数据。") return True
if __name__ == "__main__": try: script_directory = os.path.dirname(os.path.abspath(__file__)) except NameError: script_directory = os.getcwd()
report_directory_name = "report" report_directory = os.path.join(script_directory, report_directory_name)
if not os.path.exists(report_directory): try: os.makedirs(report_directory) print(f"已创建文件夹: '{report_directory}'") except OSError as e: print(f"错误: 创建文件夹 '{report_directory}' 失败: {e}") exit()
print(f"将在 '{script_directory}' (及其子文件夹, 不包括 '{report_directory_name}') 中查找XML文件...") print(f"转换后的CSV文件将保存在 '{report_directory}' 中。")
xml_files_to_process = [] for root, dirs, files in os.walk(script_directory): dirs[:] = [d for d in dirs if d not in [report_directory_name, '.git', '.vscode', '.idea', '__pycache__', 'venv', '.venv']]
for filename in files: if filename.endswith(".xml"): xml_files_to_process.append(os.path.join(root, filename))
if not xml_files_to_process: print(f"在 '{script_directory}' (及其子文件夹, 不包括 '{report_directory_name}') 下未找到任何 .xml 文件。") else: print(f"找到 {len(xml_files_to_process)} 个XML文件进行处理。") success_count = 0 failure_count = 0 for xml_file_path in xml_files_to_process: print(f"\n--- 开始处理文件: {xml_file_path} ---") base_name = os.path.splitext(os.path.basename(xml_file_path))[0] output_csv_file = os.path.join(report_directory, f"{base_name}.csv") if format_helix_qac_xml_to_csv(xml_file_path, output_csv_file): success_count += 1 else: failure_count += 1 print("\n--- 所有XML文件处理完毕 ---") print(f"总计处理文件数: {len(xml_files_to_process)}") print(f"✅ 成功转换: {success_count} 个文件") if failure_count > 0: print(f"❌ 转换失败: {failure_count} 个文件")
|