123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- # Description:
- # 1. 读取指定文件夹下的所有txt文件
- # 2. 逐行读取文件内容,根据一定规则过滤行
- # 3. 一定概率将过滤后的文件写入新文件
- # 4. 将其他未处理的文件移动到out文件夹
- # 5. 输出日志
-
- import os
- import argparse
- from datetime import datetime
- import re
- from pathlib import Path
- import shutil
- def list_files(dir_path):
- file_list = []
- for root, dirs, files in os.walk(dir_path):
- for file in files:
- file_list.append(os.path.join(root, file))
- return file_list
- def get_files_in_directory(dir_path):
- files_and_folders = os.listdir(dir_path)
-
- files = [file for file in files_and_folders if os.path.isfile(os.path.join(dir_path, file))]
- return files
- def read_file_by_line(file_path):
- try:
- with open(file_path, 'r', errors='ignore') as file:
- lines = file.readlines()
- return lines
- except FileNotFoundError:
- print(f"文件 '{file_path}' 不存在")
- summary_line_count = 0
- catched_line_count = 0
- handled_line_count = 0
- handled_file_count = 0
- lines_nessary_catched = []
- lines_optional_catched = []
- is_missed = False
- is_empty = False
- def find_non_empty_line(lines, index, direction):
- if direction == 'next':
- step = 1
- elif direction == 'previous':
- step = -1
- else:
- return None
- new_index = index + step
- while new_index >= 0 and new_index < len(lines):
- if lines[new_index].strip() != '':
- return lines[new_index]
- new_index += step
- return None
- def filter_lines(file_path, output_success_files):
- global summary_line_count
- global lines_nessary_catched
- global lines_optional_catched
- global is_missed
- global is_empty
- if True:
- tags = ["图", "来源", "请参阅"]
- tail_tags = ["走势", "趋势"]
- except_signs = ['。','.....']
- else:
- tags = ["图", "来源"]
- tail_tags = []
- except_signs = []
-
- lines_nessary_catched = []
- lines_optional_catched = []
- lines = read_file_by_line(file_path)
- for line in lines:
- summary_line_count += 1
- # 如果行中包含关键字
- for tag in tags:
- found = False
- if tag in line:
- # except_signs 不在其中,避免遗漏长文段内容与目录
- if not any(except_tag in line for except_tag in except_signs):
- found = True
- append_to_necessary_array(lines.index(line))
- break
- if found:
- continue
- for tail_tag in tail_tags:
- if line.endswith(tail_tag):
- # except_signs 不在其中,避免遗漏长文段内容与目录
- if not any(except_tag in line for except_tag in except_signs):
- found = True
- append_to_necessary_array(lines.index(line))
- break
- if found:
- continue
- # 如果行是纯数字、带逗号的数字、带小数点的数字,或者是百分比数字,且上一个内容行或下一个内容行也满足条件,大概率是无意义内容,如果上下行是空格,则再往前或往后查询一行
- opt_regix = r'^[,\.\%-]*\d+[,\.\%-]*$'
- if re.match(opt_regix, line.strip()):
- index = lines.index(line)
- if index == 0:
- next_line = find_non_empty_line(lines, index, 'next')
- if next_line and re.match(opt_regix, next_line.strip()):
- append_to_optional_array(index)
- continue
- elif index == len(lines) - 1:
- previous_line = find_non_empty_line(lines, index, 'previous')
- if previous_line and re.match(opt_regix, previous_line.strip()):
- append_to_optional_array(index)
- continue
- else:
- next_line = find_non_empty_line(lines, index, 'next')
- previous_line = find_non_empty_line(lines, index, 'previous')
- if (next_line and re.match(opt_regix, next_line.strip())) or (previous_line and re.match(opt_regix, previous_line.strip())):
- append_to_optional_array(index)
- continue
- # 对 lines_optional_catched 和 lines_nessary_catched 进行去重
- lines_optional_catched = list(set(lines_optional_catched))
- lines_nessary_catched = list(set(lines_nessary_catched))
- if len(lines) > 0:
- is_empty = False
- if(len(lines_optional_catched)/( len(lines)) >= 0.02):
- is_missed = True
- else:
- is_missed = False
- delete_catched_lines(file_path, output_success_files)
- else:
- is_empty = True
- is_missed = True
- return set(lines_nessary_catched) | set(lines_optional_catched)
- def remove_output_next_level(file_path):
- # 将文件路径转换为Path对象
- path = Path(file_path)
- # 找到"output"在路径中的位置
- output_index = path.parts.index('output')
- # 删除"output"后的一级结构
- new_parts = path.parts[:output_index+1] + path.parts[output_index+2:]
- # 将新的路径部分连接起来
- new_path = Path(os.path.join(*new_parts))
- return str(new_path)
- def delete_catched_lines(file_path, output_success_files):
- global lines_nessary_catched
- global handled_line_count
- global handled_file_count
- global is_missed
-
- handled_file_count += 1
- handled_line_count += len(lines_nessary_catched)
- with open(file_path, 'r', errors='ignore') as file:
- read_lines = file.readlines()
- if len(read_lines) == 0:
- is_missed = True
- return
-
- output_success_file = convert_with_relative_path(remove_output_next_level(file_path), output_success_files, False)
- lines = set(lines_nessary_catched) | set(lines_optional_catched)
- with open(output_success_file, 'w', errors='ignore') as file:
- for line in read_lines:
- if read_lines.index(line) not in lines:
- file.write(line)
- # 如果文件处理后为空,不保存这个文件
- if os.path.getsize(output_success_file) == 0:
- os.remove(output_success_file)
- is_missed = True
- print(f"文件 {file_path} 处理后为空,已删除\n")
- def convert_with_relative_path(path, output_path, is_dir=True):
- global input_dir_path
- # Get the relative path of the file
- relative_path = os.path.relpath(path, os.path.dirname(input_dir_path))
- output_path = os.path.join(output_path, relative_path)
- # Create the directory if it does not exist
- if not os.path.exists(output_path):
- if is_dir:
- os.makedirs(output_path)
- else :
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
- return output_path
- # shutil.copy(path, output_path)
- def append_to_necessary_array(line):
- global catched_line_count
- global lines_nessary_catched
- catched_line_count += 1
- lines_nessary_catched.append(line)
- def append_to_optional_array(line):
- global catched_line_count
- global lines_optional_catched
- catched_line_count += 1
- lines_optional_catched.append(line)
- input_dir_path = ""
- def process(dir_path, output_dir_path):
- global is_missed
- global handled_file_count
- global input_dir_path
- input_dir_path = dir_path
- output_success_files = os.path.join(output_dir_path, "success_files")
- output_failed_files = os.path.join(output_dir_path, "failed_files")
- os.makedirs(output_dir_path, exist_ok=True)
- os.makedirs(output_failed_files, exist_ok=True)
- files = list_files(dir_path)
- output_name = "filter_lines.txt"
-
- summary_file_count = 0
- catched_file_count = 0
- handled_file_count = 0
- too_big_file_count = 0
-
- global catched_line_count
- collected_file_count = 0
- empty_file_count = 0
- output_path = os.path.join(output_dir_path, output_name)
-
- with open(os.path.join(dir_path, output_path) , "a") as output_txt:
- output_txt.write(f"{dir_path}\n")
- for file in files:
- if not file.endswith(".txt") and not os.path.basename(file).startswith("filter_lines"):
- continue
-
- lines_catched = filter_lines(file, output_success_files)
- summary_file_count += 1
- if is_empty:
- empty_file_count += 1
- print(f"文件 {file} 为空,当前 {empty_file_count} 个空文件\n")
- continue
- # 如果文件大于2M,直接处理为失败,过大的文件会引起超长时间的处理
- if os.path.getsize(file) > 2 * 1024 * 1024:
- output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False)
- # 将file复制到output_failed_files
- shutil.copy(file, output_failed_file)
- summary_file_count += 1
- too_big_file_count += 1
- print(f"文件 {file} 大于2M,不处理\n")
- continue
- if lines_catched:
- catched_file_count += 1
- if is_missed:
- output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False)
- # 将file复制到output_failed_files
- shutil.copy(file, output_failed_file)
-
- collected_file_count += 1
- # 每循环1000次,打印一次日志
- if summary_file_count % 1000 == 0:
- print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n")
- print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n")
- print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n")
- print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n")
- # 写入日志
- output_txt.write(f"\n===============================================\n \n")
- output_txt.write(f"已扫描文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件, 空文件 {empty_file_count} 个。\n \n")
- output_txt.write(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n \n")
- # 写入文件和行的处理率(已处理x / 有问题y)
- file_process_rate = 0
- line_process_rate = 0
- if catched_file_count > 0:
- file_process_rate = handled_file_count / catched_file_count
- if catched_line_count > 0:
- line_process_rate = handled_line_count / catched_line_count
- output_txt.write(f"文件处理率 {file_process_rate} 行处理率 {line_process_rate}\n \n")
- if __name__ == "__main__":
-
- parser = argparse.ArgumentParser(description='批量转档程序')
- parser.add_argument("--input", type=str, nargs='+', help='输入文件夹根路径')
- parser.add_argument("--output", type=str, help='输出文件夹根路径')
- args = parser.parse_args()
- if not args.input:
- parser.print_help()
-
- input_dir_paths = args.input
- output_dir_path = args.output
- for input_dir_path in input_dir_paths:
- process(input_dir_path, output_dir_path)
|