Bläddra i källkod

[dataextract] 数据提取txt文件后处理脚本

yanxin 7 månader sedan
förälder
incheckning
1cbaf638a2
1 ändrade filer med 308 tillägg och 0 borttagningar
  1. 308 0
      FastExtract/scripts/data_filter.py

+ 308 - 0
FastExtract/scripts/data_filter.py

@@ -0,0 +1,308 @@
+# Description: 
+# 1. 读取指定文件夹下的所有txt文件
+# 2. 逐行读取文件内容,根据一定规则过滤行
+# 3. 一定概率将过滤后的文件写入新文件
+# 4. 将其他未处理的文件移动到out文件夹
+# 5. 输出日志
+ 
+import os
+import argparse
+from datetime import datetime 
+import re 
+from pathlib import Path
+import shutil
+
+def list_files(dir_path):
+    file_list = []
+    for root, dirs, files in os.walk(dir_path):
+        for file in files:
+            file_list.append(os.path.join(root, file))
+    return file_list
+
+
+def get_files_in_directory(dir_path): 
+    files_and_folders = os.listdir(dir_path)
+ 
+    files = [file for file in files_and_folders if os.path.isfile(os.path.join(dir_path, file))]
+
+    return files
+
+
+def read_file_by_line(file_path):
+    try:
+        with open(file_path, 'r', errors='ignore') as file:
+            lines = file.readlines()
+            return lines
+    except FileNotFoundError:
+        print(f"文件 '{file_path}' 不存在")
+
+summary_line_count = 0
+catched_line_count = 0
+handled_line_count = 0
+handled_file_count = 0
+
+lines_nessary_catched = []  
+lines_optional_catched = []
+
+is_missed = False
+is_empty = False
+
+def find_non_empty_line(lines, index, direction):
+    if direction == 'next':
+        step = 1
+    elif direction == 'previous':
+        step = -1
+    else:
+        return None
+
+    new_index = index + step
+    while new_index >= 0 and new_index < len(lines):
+        if lines[new_index].strip() != '':
+            return lines[new_index]
+        new_index += step
+
+    return None
+
+def filter_lines(file_path, output_success_files):
+    global summary_line_count 
+    global lines_nessary_catched 
+    global lines_optional_catched 
+    global is_missed
+    global is_empty
+
+    if True:
+        tags = ["图", "来源", "请参阅"]
+        tail_tags = ["走势", "趋势"]
+        except_signs = ['。','.....']
+    else:
+        tags = ["图", "来源"]
+        tail_tags = []
+
+        except_signs = []
+    
+    lines_nessary_catched = []
+    lines_optional_catched = []
+
+    lines = read_file_by_line(file_path)
+    for line in lines:
+        summary_line_count += 1 
+        # 如果行中包含关键字
+        for tag in tags:
+           found = False
+           if tag in line:
+            # except_signs 不在其中,避免遗漏长文段内容与目录
+            if not any(except_tag in line for except_tag in except_signs):
+                found = True
+                append_to_necessary_array(lines.index(line))
+                break 
+            if found:
+                continue 
+        for tail_tag in tail_tags:
+            if line.endswith(tail_tag):
+            # except_signs 不在其中,避免遗漏长文段内容与目录
+                if not any(except_tag in line for except_tag in except_signs):
+                    found = True 
+                    append_to_necessary_array(lines.index(line)) 
+                    break 
+            if found:
+                continue
+        # 如果行是纯数字、带逗号的数字、带小数点的数字,或者是百分比数字,且上一个内容行或下一个内容行也满足条件,大概率是无意义内容,如果上下行是空格,则再往前或往后查询一行
+        opt_regix = r'^[,\.\%-]*\d+[,\.\%-]*$'
+        if re.match(opt_regix, line.strip()):
+            index = lines.index(line)
+            if index == 0:
+                next_line = find_non_empty_line(lines, index, 'next')
+                if next_line and re.match(opt_regix, next_line.strip()):
+                    append_to_optional_array(index) 
+                    continue 
+            elif index == len(lines) - 1:
+                previous_line = find_non_empty_line(lines, index, 'previous')
+                if previous_line and re.match(opt_regix, previous_line.strip()):
+                    append_to_optional_array(index) 
+                    continue
+            else:
+                next_line = find_non_empty_line(lines, index, 'next')
+                previous_line = find_non_empty_line(lines, index, 'previous')
+                if (next_line and re.match(opt_regix, next_line.strip())) or (previous_line and re.match(opt_regix, previous_line.strip())):
+                    append_to_optional_array(index) 
+                    continue
+
+    # 对 lines_optional_catched 和 lines_nessary_catched 进行去重
+    lines_optional_catched = list(set(lines_optional_catched))
+    lines_nessary_catched = list(set(lines_nessary_catched))
+
+    if len(lines) > 0: 
+        is_empty = False
+        if(len(lines_optional_catched)/( len(lines)) >= 0.02):
+            is_missed = True 
+        else:
+            is_missed = False
+            delete_catched_lines(file_path, output_success_files)
+    else:
+        is_empty = True
+        is_missed = True
+    return set(lines_nessary_catched) | set(lines_optional_catched)
+
+def remove_output_next_level(file_path):
+    # 将文件路径转换为Path对象
+    path = Path(file_path) 
+    # 找到"output"在路径中的位置
+    output_index = path.parts.index('output') 
+    # 删除"output"后的一级结构
+    new_parts = path.parts[:output_index+1] + path.parts[output_index+2:] 
+    # 将新的路径部分连接起来
+    new_path = Path(os.path.join(*new_parts)) 
+    return str(new_path)
+
+def delete_catched_lines(file_path, output_success_files):
+    global lines_nessary_catched
+    global handled_line_count
+    global handled_file_count 
+    global is_missed
+    
+    handled_file_count += 1
+    handled_line_count += len(lines_nessary_catched)
+
+    with open(file_path, 'r', errors='ignore') as file:
+        read_lines = file.readlines()
+        if len(read_lines) == 0:
+            is_missed = True
+            return  
+        
+    output_success_file = convert_with_relative_path(remove_output_next_level(file_path), output_success_files, False)
+    lines = set(lines_nessary_catched) | set(lines_optional_catched)
+    with open(output_success_file, 'w', errors='ignore') as file:
+        for line in read_lines:
+            if read_lines.index(line) not in lines:
+                    file.write(line) 
+
+    # 如果文件处理后为空,不保存这个文件
+    if os.path.getsize(output_success_file) == 0:
+        os.remove(output_success_file)
+        is_missed = True
+        print(f"文件 {file_path} 处理后为空,已删除\n")
+
+def convert_with_relative_path(path, output_path, is_dir=True):
+    global input_dir_path
+
+    # Get the relative path of the file
+    relative_path = os.path.relpath(path, os.path.dirname(input_dir_path))
+    output_path = os.path.join(output_path, relative_path)
+
+    # Create the directory if it does not exist
+    if not os.path.exists(output_path):
+        if is_dir:
+            os.makedirs(output_path)
+        else :
+            os.makedirs(os.path.dirname(output_path), exist_ok=True)
+
+    return output_path
+    # shutil.copy(path, output_path)
+
+def append_to_necessary_array(line):
+    global catched_line_count
+    global lines_nessary_catched 
+    catched_line_count += 1
+    lines_nessary_catched.append(line) 
+
+def append_to_optional_array(line):
+    global catched_line_count
+    global lines_optional_catched
+    catched_line_count += 1
+    lines_optional_catched.append(line)
+
+input_dir_path = ""
+
+def process(dir_path, output_dir_path): 
+    global is_missed
+    global handled_file_count
+    global input_dir_path
+
+    input_dir_path = dir_path
+    output_success_files = os.path.join(output_dir_path, "success_files")
+    output_failed_files = os.path.join(output_dir_path, "failed_files")
+
+    os.makedirs(output_dir_path, exist_ok=True)
+    os.makedirs(output_failed_files, exist_ok=True)
+
+    files = list_files(dir_path)  
+    output_name = "filter_lines.txt"
+     
+    summary_file_count = 0
+    catched_file_count = 0
+    handled_file_count = 0    
+    too_big_file_count = 0
+    
+    global catched_line_count
+
+    collected_file_count = 0
+    empty_file_count = 0
+
+    output_path = os.path.join(output_dir_path, output_name)
+    
+    with open(os.path.join(dir_path, output_path) , "a") as output_txt:
+        output_txt.write(f"{dir_path}\n")
+        for file in files:
+            if not file.endswith(".txt") and not os.path.basename(file).startswith("filter_lines"):
+                continue    
+            
+            lines_catched = filter_lines(file, output_success_files) 
+            summary_file_count += 1
+            if is_empty:
+                empty_file_count += 1
+                print(f"文件 {file} 为空,当前 {empty_file_count} 个空文件\n")
+                continue
+            # 如果文件大于2M,直接处理为失败,过大的文件会引起超长时间的处理
+            if os.path.getsize(file) > 2 * 1024 * 1024: 
+                output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False)
+                # 将file复制到output_failed_files 
+                shutil.copy(file, output_failed_file)
+                summary_file_count += 1 
+                too_big_file_count += 1
+                print(f"文件 {file} 大于2M,不处理\n")
+                continue   
+            if lines_catched:
+                catched_file_count += 1
+
+                if is_missed:
+                    output_failed_file = convert_with_relative_path(remove_output_next_level(file), output_failed_files, False)
+                    # 将file复制到output_failed_files 
+                    shutil.copy(file, output_failed_file)
+                    
+                    collected_file_count += 1
+            # 每循环1000次,打印一次日志
+            if summary_file_count % 1000 == 0:
+                print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n")
+                print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n")
+
+        print(f"已扫描txt文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件。\n")
+        print(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n")
+        # 写入日志
+        output_txt.write(f"\n===============================================\n \n")
+        output_txt.write(f"已扫描文件 {summary_file_count} 个,其中有问题的文件 {catched_file_count} 个,已处理 {handled_file_count} 个,已收集 {collected_file_count} 个问题文件, 空文件 {empty_file_count} 个。\n \n")
+        output_txt.write(f"已扫描行数 {summary_line_count} 行,其中有问题的行数 {catched_line_count} 行,已处理 {handled_line_count} 行\n \n")
+        # 写入文件和行的处理率(已处理x / 有问题y)
+        file_process_rate = 0
+        line_process_rate = 0
+        if catched_file_count > 0:
+            file_process_rate = handled_file_count / catched_file_count
+        if catched_line_count > 0:
+            line_process_rate = handled_line_count / catched_line_count
+        output_txt.write(f"文件处理率 {file_process_rate} 行处理率 {line_process_rate}\n \n")
+
+
+if __name__ == "__main__":
+    
+    parser = argparse.ArgumentParser(description='批量转档程序')
+    parser.add_argument("--input", type=str, nargs='+', help='输入文件夹根路径')
+    parser.add_argument("--output", type=str, help='输出文件夹根路径')  
+
+    args = parser.parse_args()
+
+    if not args.input:
+        parser.print_help()
+     
+    input_dir_paths = args.input
+    output_dir_path = args.output
+    for input_dir_path in input_dir_paths:
+        process(input_dir_path, output_dir_path)