data_filter.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # Description:
  2. # 1. 读取指定文件夹下的所有txt文件
  3. # 2. 逐行读取文件内容,根据一定规则过滤行
  4. # 3. 一定概率将过滤后的文件写入新文件
  5. # 4. 将其他未处理的文件移动到out文件夹
  6. # 5. 输出日志
  7. import os
  8. import argparse
  9. from datetime import datetime
  10. import re
  11. from pathlib import Path
  12. import shutil
  13. def list_files(dir_path):
  14. file_list = []
  15. for root, dirs, files in os.walk(dir_path):
  16. for file in files:
  17. file_list.append(os.path.join(root, file))
  18. return file_list
  19. def get_files_in_directory(dir_path):
  20. files_and_folders = os.listdir(dir_path)
  21. files = [file for file in files_and_folders if os.path.isfile(os.path.join(dir_path, file))]
  22. return files
  23. def read_file_by_line(file_path):
  24. try:
  25. with open(file_path, 'r', errors='ignore') as file:
  26. lines = file.readlines()
  27. return lines
  28. except FileNotFoundError:
  29. print(f"文件 '{file_path}' 不存在")
  30. summary_line_count = 0
  31. catched_line_count = 0
  32. handled_line_count = 0
  33. handled_file_count = 0
  34. lines_nessary_catched = []
  35. lines_optional_catched = []
  36. is_missed = False
  37. is_empty = False
  38. def find_non_empty_line(lines, index, direction):
  39. if direction == 'next':
  40. step = 1
  41. elif direction == 'previous':
  42. step = -1
  43. else:
  44. return None
  45. new_index = index + step
  46. while new_index >= 0 and new_index < len(lines):
  47. if lines[new_index].strip() != '':
  48. return lines[new_index]
  49. new_index += step
  50. return None
  51. def filter_lines(file_path, output_success_files):
  52. global summary_line_count
  53. global lines_nessary_catched
  54. global lines_optional_catched
  55. global is_missed
  56. global is_empty
  57. if True:
  58. tags = ["图", "来源", "请参阅"]
  59. tail_tags = ["走势", "趋势"]
  60. except_signs = ['。','.....']
  61. else:
  62. tags = ["图", "来源"]
  63. tail_tags = []
  64. except_signs = []
  65. lines_nessary_catched = []
  66. lines_optional_catched = []
  67. lines = read_file_by_line(file_path)
  68. for line in lines:
  69. summary_line_count += 1
  70. # 如果行中包含关键字
  71. for tag in tags:
  72. found = False
  73. if tag in line:
  74. # except_signs 不在其中,避免遗漏长文段内容与目录
  75. if not any(except_tag in line for except_tag in except_signs):
  76. found = True
  77. append_to_necessary_array(lines.index(line))
  78. break
  79. if found:
  80. continue
  81. for tail_tag in tail_tags:
  82. if line.endswith(tail_tag):
  83. # except_signs 不在其中,避免遗漏长文段内容与目录
  84. if not any(except_tag in line for except_tag in except_signs):
  85. found = True
  86. append_to_necessary_array(lines.index(line))
  87. break
  88. if found:
  89. continue
  90. # 如果行是纯数字、带逗号的数字、带小数点的数字,或者是百分比数字,且上一个内容行或下一个内容行也满足条件,大概率是无意义内容,如果上下行是空格,则再往前或往后查询一行
  91. opt_regix = r'^[,\.\%-]*\d+[,\.\%-]*$'
  92. if re.match(opt_regix, line.strip()):
  93. index = lines.index(line)
  94. if index == 0:
  95. next_line = find_non_empty_line(lines, index, 'next')
  96. if next_line and re.match(opt_regix, next_line.strip()):
  97. append_to_optional_array(index)
  98. continue
  99. elif index == len(lines) - 1:
  100. previous_line = find_non_empty_line(lines, index, 'previous')
  101. if previous_line and re.match(opt_regix, previous_line.strip()):
  102. append_to_optional_array(index)
  103. continue
  104. else:
  105. next_line = find_non_empty_line(lines, index, 'next')
  106. previous_line = find_non_empty_line(lines, index, 'previous')
  107. if (next_line and re.match(opt_regix, next_line.strip())) or (previous_line and re.match(opt_regix, previous_line.strip())):
  108. append_to_optional_array(index)
  109. continue
  110. # 对 lines_optional_catched 和 lines_nessary_catched 进行去重
  111. lines_optional_catched = list(set(lines_optional_catched))
  112. lines_nessary_catched = list(set(lines_nessary_catched))
  113. if len(lines) > 0:
  114. is_empty = False
  115. if(len(lines_optional_catched)/( len(lines)) >= 0.02):
  116. is_missed = True
  117. else:
  118. is_missed = False
  119. delete_catched_lines(file_path, output_success_files)
  120. else:
  121. is_empty = True
  122. is_missed = True
  123. return set(lines_nessary_catched) | set(lines_optional_catched)
  124. def remove_output_next_level(file_path):
  125. # 将文件路径转换为Path对象
  126. path = Path(file_path)
  127. # 找到"output"在路径中的位置
  128. output_index = path.parts.index('output')
  129. # 删除"output"后的一级结构
  130. new_parts = path.parts[:output_index+1] + path.parts[output_index+2:]
  131. # 将新的路径部分连接起来
  132. new_path = Path(os.path.join(*new_parts))
  133. return str(new_path)
  134. def delete_catched_lines(file_path, output_success_files):
  135. global lines_nessary_catched
  136. global handled_line_count
  137. global handled_file_count
  138. global is_missed
  139. handled_file_count += 1
  140. handled_line_count += len(lines_nessary_catched)
  141. with open(file_path, 'r', errors='ignore') as file:
  142. read_lines = file.readlines()
  143. if len(read_lines) == 0:
  144. is_missed = True
  145. return
  146. output_success_file = convert_with_relative_path(remove_output_next_level(file_path), output_success_files, False)
  147. lines = set(lines_nessary_catched) | set(lines_optional_catched)
  148. with open(output_success_file, 'w', errors='ignore') as file:
  149. for line in read_lines:
  150. if read_lines.index(line) not in lines:
  151. file.write(line)
  152. # 如果文件处理后为空,不保存这个文件
  153. if os.path.getsize(output_success_file) == 0:
  154. os.remove(output_success_file)
  155. is_missed = True
  156. print(f"文件 {file_path} 处理后为空,已删除\n")
  157. def convert_with_relative_path(path, output_path, is_dir=True):
  158. global input_dir_path
  159. # Get the relative path of the file
  160. relative_path = os.path.relpath(path, os.path.dirname(input_dir_path))
  161. output_path = os.path.join(output_path, relative_path)
  162. # Create the directory if it does not exist
  163. if not os.path.exists(output_path):
  164. if is_dir:
  165. os.makedirs(output_path)
  166. else :
  167. os.makedirs(os.path.dirname(output_path), exist_ok=True)
  168. return output_path
  169. # shutil.copy(path, output_path)
  170. def append_to_necessary_array(line):
  171. global catched_line_count
  172. global lines_nessary_catched
  173. catched_line_count += 1
  174. lines_nessary_catched.append(line)
  175. def append_to_optional_array(line):
  176. global catched_line_count
  177. global lines_optional_catched
  178. catched_line_count += 1
  179. lines_optional_catched.append(line)
  180. input_dir_path = ""
  181. def process(dir_path, output_dir_path):
  182. global is_missed
  183. global handled_file_count
  184. global input_dir_path
  185. input_dir_path = dir_path
  186. output_success_files = os.path.join(output_dir_path, "success_files")
  187. output_failed_files = os.path.join(output_dir_path, "failed_files")
  188. os.makedirs(output_dir_path, exist_ok=True)
  189. os.makedirs(output_failed_files, exist_ok=True)
  190. files = list_files(dir_path)
  191. output_name = "filter_lines.txt"
  192. summary_file_count = 0
  193. catched_file_count = 0
  194. handled_file_count = 0
  195. too_big_file_count = 0
  196. global catched_line_count
  197. collected_file_count = 0
  198. empty_file_count = 0
  199. output_path = os.path.join(output_dir_path, output_name)
  200. with open(os.path.join(dir_path, output_path) , "a") as output_txt:
  201. output_txt.write(f"{dir_path}\n")
  202. for file in files:
  203. if not file.endswith(".txt") and not os.path.basename(file).startswith("filter_lines"):
  204. continue
  205. lines_catched = filter_lines(file, output_success_files)
  206. summary_file_count += 1
  207. if is_empty:
  208. empty_file_count += 1
  209. print(f"文件 {file} 为空,当前 {empty_file_count} 个空文件\n")
  210. continue
  211. # 如果文件大于2M,直接处理为失败,过大的文件会引起超长时间的处理
  212. if os.path.getsize(file) > 2 * 1024 * 1024:
  213. output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False)
  214. # 将file复制到output_failed_files
  215. shutil.copy(file, output_failed_file)
  216. summary_file_count += 1
  217. too_big_file_count += 1
  218. print(f"文件 {file} 大于2M,不处理\n")
  219. continue
  220. if lines_catched:
  221. catched_file_count += 1
  222. if is_missed:
  223. output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False)
  224. # 将file复制到output_failed_files
  225. shutil.copy(file, output_failed_file)
  226. collected_file_count += 1
  227. # 每循环1000次,打印一次日志
  228. if summary_file_count % 1000 == 0:
  229. print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n")
  230. print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n")
  231. print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n")
  232. print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n")
  233. # 写入日志
  234. output_txt.write(f"\n===============================================\n \n")
  235. output_txt.write(f"已扫描文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件, 空文件 {empty_file_count} 个。\n \n")
  236. output_txt.write(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n \n")
  237. # 写入文件和行的处理率(已处理x / 有问题y)
  238. file_process_rate = 0
  239. line_process_rate = 0
  240. if catched_file_count > 0:
  241. file_process_rate = handled_file_count / catched_file_count
  242. if catched_line_count > 0:
  243. line_process_rate = handled_line_count / catched_line_count
  244. output_txt.write(f"文件处理率 {file_process_rate} 行处理率 {line_process_rate}\n \n")
  245. if __name__ == "__main__":
  246. parser = argparse.ArgumentParser(description='批量转档程序')
  247. parser.add_argument("--input", type=str, nargs='+', help='输入文件夹根路径')
  248. parser.add_argument("--output", type=str, help='输出文件夹根路径')
  249. args = parser.parse_args()
  250. if not args.input:
  251. parser.print_help()
  252. input_dir_paths = args.input
  253. output_dir_path = args.output
  254. for input_dir_path in input_dir_paths:
  255. process(input_dir_path, output_dir_path)