# Description: # 1. 读取指定文件夹下的所有txt文件 # 2. 逐行读取文件内容,根据一定规则过滤行 # 3. 一定概率将过滤后的文件写入新文件 # 4. 将其他未处理的文件移动到out文件夹 # 5. 输出日志 import os import argparse from datetime import datetime import re from pathlib import Path import shutil def list_files(dir_path): file_list = [] for root, dirs, files in os.walk(dir_path): for file in files: file_list.append(os.path.join(root, file)) return file_list def get_files_in_directory(dir_path): files_and_folders = os.listdir(dir_path) files = [file for file in files_and_folders if os.path.isfile(os.path.join(dir_path, file))] return files def read_file_by_line(file_path): try: with open(file_path, 'r', errors='ignore') as file: lines = file.readlines() return lines except FileNotFoundError: print(f"文件 '{file_path}' 不存在") summary_line_count = 0 catched_line_count = 0 handled_line_count = 0 handled_file_count = 0 lines_nessary_catched = [] lines_optional_catched = [] is_missed = False is_empty = False def find_non_empty_line(lines, index, direction): if direction == 'next': step = 1 elif direction == 'previous': step = -1 else: return None new_index = index + step while new_index >= 0 and new_index < len(lines): if lines[new_index].strip() != '': return lines[new_index] new_index += step return None def filter_lines(file_path, output_success_files): global summary_line_count global lines_nessary_catched global lines_optional_catched global is_missed global is_empty if True: tags = ["图", "来源", "请参阅"] tail_tags = ["走势", "趋势"] except_signs = ['。','.....'] else: tags = ["图", "来源"] tail_tags = [] except_signs = [] lines_nessary_catched = [] lines_optional_catched = [] lines = read_file_by_line(file_path) for line in lines: summary_line_count += 1 # 如果行中包含关键字 for tag in tags: found = False if tag in line: # except_signs 不在其中,避免遗漏长文段内容与目录 if not any(except_tag in line for except_tag in except_signs): found = True append_to_necessary_array(lines.index(line)) break if found: continue for tail_tag in tail_tags: if line.endswith(tail_tag): # except_signs 不在其中,避免遗漏长文段内容与目录 if not any(except_tag in line for except_tag in except_signs): found = True append_to_necessary_array(lines.index(line)) break if found: continue # 如果行是纯数字、带逗号的数字、带小数点的数字,或者是百分比数字,且上一个内容行或下一个内容行也满足条件,大概率是无意义内容,如果上下行是空格,则再往前或往后查询一行 opt_regix = r'^[,\.\%-]*\d+[,\.\%-]*$' if re.match(opt_regix, line.strip()): index = lines.index(line) if index == 0: next_line = find_non_empty_line(lines, index, 'next') if next_line and re.match(opt_regix, next_line.strip()): append_to_optional_array(index) continue elif index == len(lines) - 1: previous_line = find_non_empty_line(lines, index, 'previous') if previous_line and re.match(opt_regix, previous_line.strip()): append_to_optional_array(index) continue else: next_line = find_non_empty_line(lines, index, 'next') previous_line = find_non_empty_line(lines, index, 'previous') if (next_line and re.match(opt_regix, next_line.strip())) or (previous_line and re.match(opt_regix, previous_line.strip())): append_to_optional_array(index) continue # 对 lines_optional_catched 和 lines_nessary_catched 进行去重 lines_optional_catched = list(set(lines_optional_catched)) lines_nessary_catched = list(set(lines_nessary_catched)) if len(lines) > 0: is_empty = False if(len(lines_optional_catched)/( len(lines)) >= 0.02): is_missed = True else: is_missed = False delete_catched_lines(file_path, output_success_files) else: is_empty = True is_missed = True return set(lines_nessary_catched) | set(lines_optional_catched) def remove_output_next_level(file_path): # 将文件路径转换为Path对象 path = Path(file_path) # 找到"output"在路径中的位置 output_index = path.parts.index('output') # 删除"output"后的一级结构 new_parts = path.parts[:output_index+1] + path.parts[output_index+2:] # 将新的路径部分连接起来 new_path = Path(os.path.join(*new_parts)) return str(new_path) def delete_catched_lines(file_path, output_success_files): global lines_nessary_catched global handled_line_count global handled_file_count global is_missed handled_file_count += 1 handled_line_count += len(lines_nessary_catched) with open(file_path, 'r', errors='ignore') as file: read_lines = file.readlines() if len(read_lines) == 0: is_missed = True return output_success_file = convert_with_relative_path(remove_output_next_level(file_path), output_success_files, False) lines = set(lines_nessary_catched) | set(lines_optional_catched) with open(output_success_file, 'w', errors='ignore') as file: for line in read_lines: if read_lines.index(line) not in lines: file.write(line) # 如果文件处理后为空,不保存这个文件 if os.path.getsize(output_success_file) == 0: os.remove(output_success_file) is_missed = True print(f"文件 {file_path} 处理后为空,已删除\n") def convert_with_relative_path(path, output_path, is_dir=True): global input_dir_path # Get the relative path of the file relative_path = os.path.relpath(path, os.path.dirname(input_dir_path)) output_path = os.path.join(output_path, relative_path) # Create the directory if it does not exist if not os.path.exists(output_path): if is_dir: os.makedirs(output_path) else : os.makedirs(os.path.dirname(output_path), exist_ok=True) return output_path # shutil.copy(path, output_path) def append_to_necessary_array(line): global catched_line_count global lines_nessary_catched catched_line_count += 1 lines_nessary_catched.append(line) def append_to_optional_array(line): global catched_line_count global lines_optional_catched catched_line_count += 1 lines_optional_catched.append(line) input_dir_path = "" def process(dir_path, output_dir_path): global is_missed global handled_file_count global input_dir_path input_dir_path = dir_path output_success_files = os.path.join(output_dir_path, "success_files") output_failed_files = os.path.join(output_dir_path, "failed_files") os.makedirs(output_dir_path, exist_ok=True) os.makedirs(output_failed_files, exist_ok=True) files = list_files(dir_path) output_name = "filter_lines.txt" summary_file_count = 0 catched_file_count = 0 handled_file_count = 0 too_big_file_count = 0 global catched_line_count collected_file_count = 0 empty_file_count = 0 output_path = os.path.join(output_dir_path, output_name) with open(os.path.join(dir_path, output_path) , "a") as output_txt: output_txt.write(f"{dir_path}\n") for file in files: if not file.endswith(".txt") and not os.path.basename(file).startswith("filter_lines"): continue lines_catched = filter_lines(file, output_success_files) summary_file_count += 1 if is_empty: empty_file_count += 1 print(f"文件 {file} 为空,当前 {empty_file_count} 个空文件\n") continue # 如果文件大于2M,直接处理为失败,过大的文件会引起超长时间的处理 if os.path.getsize(file) > 2 * 1024 * 1024: output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False) # 将file复制到output_failed_files shutil.copy(file, output_failed_file) summary_file_count += 1 too_big_file_count += 1 print(f"文件 {file} 大于2M,不处理\n") continue if lines_catched: catched_file_count += 1 if is_missed: output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False) # 将file复制到output_failed_files shutil.copy(file, output_failed_file) collected_file_count += 1 # 每循环1000次,打印一次日志 if summary_file_count % 1000 == 0: print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n") print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n") print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n") print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n") # 写入日志 output_txt.write(f"\n===============================================\n \n") output_txt.write(f"已扫描文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件, 空文件 {empty_file_count} 个。\n \n") output_txt.write(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n \n") # 写入文件和行的处理率(已处理x / 有问题y) file_process_rate = 0 line_process_rate = 0 if catched_file_count > 0: file_process_rate = handled_file_count / catched_file_count if catched_line_count > 0: line_process_rate = handled_line_count / catched_line_count output_txt.write(f"文件处理率 {file_process_rate} 行处理率 {line_process_rate}\n \n") if __name__ == "__main__": parser = argparse.ArgumentParser(description='批量转档程序') parser.add_argument("--input", type=str, nargs='+', help='输入文件夹根路径') parser.add_argument("--output", type=str, help='输出文件夹根路径') args = parser.parse_args() if not args.input: parser.print_help() input_dir_paths = args.input output_dir_path = args.output for input_dir_path in input_dir_paths: process(input_dir_path, output_dir_path)