[dataextract] 提交初版数据提取工程

yanxin 11 months ago

+cmake_minimum_required(VERSION 3.10)
+        FastExtract
+        VERSION 0.0.1
+set(CMAKE_BUILD_TYPE Release CACHE STRING "build type")
+    set(CMAKE_CXX_FLAGS "-Wno-format -g0 -O3")
+    if(NEED_ABI0)
+        add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0)
+    else()
+        add_definitions(-D_GLIBCXX_USE_CXX11_ABI=1)
+    endif()
+endif(NOT MSVC)
+option(ENABLE_GPU "" ON)
+option(BUILD_CLIENT "" OFF)
+option(BUILD_SERVER "" OFF)
+set(Srpc_DIR "" CACHE PATH "Srpc_DIR")
+if (WIN32)
+    set(CMAKE_C_FLAGS   "${CMAKE_C_FLAGS}   /MP /wd4200")
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++17")
+else ()
+    set(CMAKE_C_FLAGS   "${CMAKE_C_FLAGS}   -Wall -fPIC -pipe -std=gnu99")
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++17 -Wno-invalid-offsetof")
+endif ()
+    find_package(srpc REQUIRED CONFIG HINTS ${Srpc_DIR})
+    find_package(OpenSSL REQUIRED)
+    find_package(Protobuf)
+    if ("x${Protobuf_DIR}" STREQUAL "xProtobuf_DIR-NOTFOUND")
+        if (APPLE)
+            message (FATAL_ERROR ${Protobuf_ERROR_MSG_MACOS})
+        else ()
+            message (FATAL_ERROR ${Protobuf_ERROR_MSG_LINUX})
+        endif ()
+    endif ()
+    get_filename_component(Protobuf_LIB_DIR ${Protobuf_LIBRARY} DIRECTORY)
+    if (NOT EXISTS "${Srpc_DIR}/third_party/lz4/lib/lz4.h")
+        set(LZ4_LIB lz4)
+    endif ()
+    if (NOT EXISTS "${Srpc_DIR}/third_party/snappy/cmake")
+        set(SNAPPY_LIB snappy)
+    endif ()
+    find_package(ZLIB REQUIRED)
+    find_package(Workflow REQUIRED CONFIG HINTS ${Workflow_DIR})
+    # Prefer to static link first
+    find_library(Workflow_LIB workflow HINTS ${Workflow_DIR}/_lib)
+    find_library(Srpc_LIB srpc HINTS ${Srpc_DIR}/_lib)

@@ -0,0 +1,34 @@
+  "conversion":
+  {
+    "pdf_root_path": "/media/kdan/0d2e102d-a28c-4d29-a512-f1409a936b44/Data/sbp1/",
+    "conversion_resource_path": "/home/kdan/repo/conversion_repos/pdfium_tools/",
+    "conversion_instance": "conversion_fastdeploy",
+    "conversion_type": "2txt",
+    "output_path": "/media/kdan/0d2e102d-a28c-4d29-a512-f1409a936b44/Data/神笔派/output/",
+    "model_path": "model_path/home/kdan/下载/ppyoloe_plus_crn_m_80e_coco_DET_TA_F_20230511",
+    "outer_batch_size": 10000000,
+    "inner_batch_size": 100000
+  },
+  "client":
+  {
+    "remote_host": "",
+    "remote_port": 8080,
+    "redirect_max": 2,
+    "retry_max": 2
+  },
+  "global":
+  {
+    "poller_threads": 30,
+    "handler_threads": 60,
+    "endpoint_params":
+    {
+      "max_connections": 20000,
+      "connect_timeout": 1000000,
+      "response_timeout": 1000000
+    }
+  }

@@ -0,0 +1,26 @@
+  "server":
+  {
+    "port": 1412
+  },
+  "global":
+  {
+    "poller_threads": 30,
+    "handler_threads": 60,
+    "endpoint_params":
+    {
+      "max_connections": 20000,
+      "connect_timeout": 1000000,
+      "response_timeout": 1000000
+    }
+  },
+  "model":
+  {
+    "det_model_path": "/home/kdan/repo/conversion_fastdeploy/ppyoloe_plus_crn_m_80e_coco_DET_TA_F_20230511/model.pdmodel",
+    "det_params_path": "/home/kdan/repo/conversion_fastdeploy/ppyoloe_plus_crn_m_80e_coco_DET_TA_F_20230511/model.pdiparams",
+    "det_cfg_path": "/home/kdan/repo/conversion_fastdeploy/ppyoloe_plus_crn_m_80e_coco_DET_TA_F_20230511/infer_cfg.yml",
+    "model_count": 60
+  }

@@ -0,0 +1,15 @@
+message EchoRequest {
+	required string path = 1;
+message EchoResponse {
+	required string json = 1;
+	optional int32 state = 2;
+	optional int32 error = 3;
+service gpu_server {
+	rpc Echo(EchoRequest) returns (EchoResponse);

@@ -0,0 +1,18 @@
+cd ..
+if [ -d "build" ]
+    cd build
+    mkdir build
+    cd build
+cmake .. -DWorkflow_DIR=/home/kdan/repo/srpc/workflow \
+            -DTRT_DRECTORY=/usr/local/TensorRT-   \
+            -DFASTDEPLOY_DRECTORY=/home/kdan/repo/FastDeploy/build_trt/compiled_fastdeploy_sdk    \
+            -DBUILD_CLIENT=ON \
+            -DRAPIDJSON_DRECTORY=/home/kdan/repo/rapidjson    \
+            -DSrpc_DIR=/home/kdan/repo/srpc/
+make -j16

@@ -0,0 +1,11 @@
+cmake_minimum_required(VERSION 3.10)
+    add_subdirectory(client)
+    add_subdirectory(server)

@@ -0,0 +1,26 @@
+cmake_minimum_required(VERSION 3.10)
+project(fastextract_client LANGUAGES CXX)
+set(CMAKE_BUILD_TYPE Release)
+# Set all the libraries here
+set(LIB ${Srpc_LIB} ${Workflow_LIB} pthread OpenSSL::SSL OpenSSL::Crypto
+        protobuf z ${SNAPPY_LIB} ${LZ4_LIB} ${THIRDPARTY_PATH}/conversion/lib/
+# Add header directories and library directories here
+link_directories(${OPENSSL_LINK_DIR} ${WORKFLOW_LIB_DIR})
+# Build executable outputs
+add_executable(${PROJECT_NAME} ${CLIENT_SRC} $<TARGET_OBJECTS:util>)
+target_link_libraries(${PROJECT_NAME} ${LIB})

@@ -0,0 +1,260 @@
+#include <stdio.h>
+#include <signal.h>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <thread>
+#include <chrono>
+#include <ctime>
+#include "workflow/HttpMessage.h"
+#include "workflow/WFTaskFactory.h"
+#include "workflow/WFFacilities.h"
+#include "workflow/Workflow.h"
+#include "client_config/config.h"
+static srpc::ClientRPCConfig config;
+struct MMInput
+    int id;
+	std::string file_path;
+struct MMOutput
+    int error;
+	double time;
+using MMFactory = WFThreadTaskFactory<MMInput, MMOutput>;
+using MMTask = WFThreadTask<MMInput, MMOutput>;
+void put_callback(WFHttpTask *task)
+    int state = task->get_state();
+    int error = task->get_error();
+    if (state == WFT_STATE_SUCCESS) // print server response body
+    {
+        const void *body;
+        size_t body_len;
+        task->get_resp()->get_parsed_body(&body, &body_len);
+        std::string res((char *)(body), body_len);
+        if (res.empty())
+        {
+            return;
+        }
+        auto json_res = wfrest::Json::parse(res);
+        config.add_http_put_count();
+    }
+void convert_callback(MMTask *task)
+	auto *input = task->get_input();
+	auto *output = task->get_output();
+    std::string url = std::string("http://") + config.client_host() +
+                      std::string(":") + std::to_string(config.client_port()) + std::string("/UpdateTaskStatus");
+    std::stringstream query_params;
+    query_params << "?id=" << input->id << "&status=";
+    if (output->error == 0) {
+        query_params << 2;
+    } else {
+        query_params << 3;
+    }
+    url.append(query_params.str());
+	assert(task->get_state() == WFT_STATE_SUCCESS);
+    WFHttpTask *http_task = WFTaskFactory::create_http_task(url,
+                                                            config.redirect_max(),
+                                                            config.retry_max(),
+                                                            put_callback);
+    http_task->start();
+void convert_pdf(const MMInput *in, MMOutput *out)
+    out->error = -1;
+    if (!in->file_path.empty())
+    {
+        char* new_conv_args[7] {nullptr};
+        for (int i = 0; i < 6; ++i) {
+            switch (i)
+            {
+            case 0:
+            {
+                new_conv_args[i] = new char[strlen(config.get_conversion_instance()) + 1]{0};
+                strcpy(new_conv_args[i], config.get_conversion_instance());
+                break;
+            }
+            case 1:
+            {
+                new_conv_args[i] = new char[strlen(config.get_conversion_type()) + 1]{0};
+                strcpy(new_conv_args[i], config.get_conversion_type());
+                break;
+            }
+            case 2:
+            {
+                new_conv_args[i] = new char[strlen(config.get_conversion_resource_path()) + 1]{0};
+                strcpy(new_conv_args[i], config.get_conversion_resource_path());
+                break;
+            }
+            case 3:
+            {
+                new_conv_args[i] = new char[in->file_path.length() + 1]{0};
+                strcpy(new_conv_args[i], in->file_path.c_str());
+                break;
+            }
+            case 4:
+            {
+                std::string output_path = config.get_output_path();
+                output_path.append(config.get_time_stamp());
+                output_path.append("/");
+                std::string tmp_str = in->file_path;
+                tmp_str.erase(0, strlen(config.get_root_path()));
+                auto position = tmp_str.find_last_of('/');
+                tmp_str.erase(position, tmp_str.length() - position);
+                output_path.append(tmp_str);
+                output_path.append("/");
+                new_conv_args[i] = new char[output_path.length() + 1]{0};
+                strcpy(new_conv_args[i], output_path.c_str());
+                break;
+            }
+            case 5:
+            {
+                new_conv_args[i] = new char[strlen(config.get_model_path()) + 1]{0};
+                strcpy(new_conv_args[i], config.get_model_path());
+                break;
+            }
+            default:
+                break;
+            }
+        }
+        out->error = 0;
+        pid_t pid = fork();
+        if (pid == -1) { // fork失败
+            std::cerr << "Failed to fork()" << std::endl;
+            return;
+        } else if (pid == 0) { // 这里是子进程
+            execvp(new_conv_args[0], new_conv_args);
+            out->error = -1;
+            // 此处不应该被执行,除非execvp失败
+            _exit(EXIT_FAILURE); // 如果execvp失败,退出子进程
+        } else { // 这里是父进程
+            int status;
+            // 等待子进程结束,并获取子进程的退出状态
+            if (waitpid(pid, &status, 0) == -1) {
+                perror("waitpid");
+                out->error = -1;
+                return;
+            }
+            // 检查子进程的退出状态
+            if (WIFEXITED(status)) {
+                out->error = WEXITSTATUS(status);
+            } else if (WIFSIGNALED(status)) {
+                std::cerr << "Child process terminated by signal." << std::endl;
+                out->error = -1; // 或者设置一个特定的错误码
+            }
+        }
+        for (int i = 0; i < 7; ++i)
+            delete[] new_conv_args[i];
+    }
+void sig_handler(int signo)
+void init(const char* path)
+    if (config.load(path) == false)
+    {
+        perror("Load config failed");
+        exit(1);
+    }
+    signal(SIGINT, sig_handler);
+    signal(SIGTERM, sig_handler);
+void callback(WFHttpTask *task)
+    int state = task->get_state();
+    int error = task->get_error();
+    if (state == WFT_STATE_SUCCESS) // print server response body
+    {
+        const void *body;
+        size_t body_len;
+        task->get_resp()->get_parsed_body(&body, &body_len);
+        std::string res((char *)(body), body_len);
+        if (res.empty())
+        {
+            return;
+        }
+        auto json_res = wfrest::Json::parse(res);
+        if (json_res.has("path"))
+        {
+            std::string file_path = config.get_pdf_root_path() + json_res["path"].get<std::string>();
+            MMTask *task = MMFactory::create_thread_task("convert_task", convert_pdf, convert_callback);
+            auto *input = task->get_input();
+            input->file_path = std::move(file_path);
+            input->id = json_res["id"].get<int>();
+            Workflow::start_series_work(task, nullptr);
+            config.add_http_get_count();
+        }
+    }
+int main(int argc, char* argv[])
+    if (argc < 2)
+    {
+        std::cout << "please input client config path." << std::endl;
+        return -1;
+    }
+    init(argv[1]);
+    std::string url = std::string("http://") + config.client_host() +
+                      std::string(":") + std::to_string(config.client_port()) + std::string("/GetNewTask");
+    auto outer_size = config.get_outer_batch_size();
+    auto inner_size = config.get_inner_batch_size();
+    for (int i = 0; i < outer_size; i++)
+    {
+        auto begin = std::chrono::high_resolution_clock::now();
+        ParallelWork *pwork = Workflow::create_parallel_work(nullptr);
+        for (int j = 0; j < inner_size; j++)
+        {
+            WFHttpTask *task = WFTaskFactory::create_http_task(url,
+                                                            config.redirect_max(),
+                                                            config.retry_max(),
+                                                            callback);
+            SeriesWork *series = Workflow::create_series_work(task, nullptr);
+            pwork->add_series(series);
+        }
+        Workflow::start_series_work(pwork, nullptr);
+        auto end = std::chrono::high_resolution_clock::now();
+        auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin);
+        std::cout << "转换任务数: " << config.get_http_get_count() << ", 上传任务数: " << config.get_http_put_count() << std::endl;
+    }
+    return 0;

@@ -0,0 +1,44 @@
+cmake_minimum_required(VERSION 3.10)
+project(fastextract_server LANGUAGES CXX)
+set(CMAKE_BUILD_TYPE Release)
+# Generate idl code: xx.srpc.h xx.pb.h xx.thrift.h
+set(IDL_FILE ${PROJECT_ROOT_PATH}/config/server.proto)
+set(SRPC_GEN_PROGRAM ${SRPC_BIN_DIR}/srpc_generator) 
+find_program(PROTOC "protoc")
+    message(FATAL_ERROR "Protobuf compiler is missing!")
+endif ()
+protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${IDL_FILE})
+add_custom_target(SRPC_GEN ALL
+    COMMENT "srpc generator..."
+# Set all the libraries here
+set(LIB ${Srpc_LIB} ${Workflow_LIB} pthread OpenSSL::SSL OpenSSL::Crypto
+    protobuf z ${SNAPPY_LIB} ${LZ4_LIB})
+# Add all the common code here
+# Add header directories and library directories here
+include_directories(${OPENSSL_INCLUDE_DIR} ${Protobuf_INCLUDE_DIR}
+                    ${WORKFLOW_INCLUDE_DIR} ${SRPC_INCLUDE_DIR}
+link_directories(${OPENSSL_LINK_DIR} ${Protobuf_LIB_DIR}
+                 ${WORKFLOW_LIB_DIR} ${SRPC_LIB_DIR})
+# Build executable outputs
+add_executable(${PROJECT_NAME} ${COMMON_CODE} $<TARGET_OBJECTS:util>)
+target_link_libraries(${PROJECT_NAME} ${LIB} ${FASTDEPLOY_LIBS})

@@ -0,0 +1,211 @@
+#include <stdio.h>
+#include <signal.h>
+#include <mutex>
+#include <thread>
+#include "workflow/WFFacilities.h"
+#include "srpc/rpc_types.h"
+#include "fastdeploy/vision.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+#include "server_config/config.h"
+#include "server.srpc.h"
+using namespace srpc;
+static int gpu_id = 0;
+static std::vector< fastdeploy::vision::detection::PPYOLOER* > det_models;
+static std::vector< char > model_used;
+static std::mutex model_mtx;
+static WFFacilities::WaitGroup wait_group(100);
+static srpc::ServerRPCConfig config;
+std::string DetectionResultToJson(const fastdeploy::vision::DetectionResult& res)
+    rapidjson::StringBuffer ss;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(ss);
+    auto label_map = [](int label) -> std::string {
+        switch (label)
+        {
+            case 3:
+                return "Figure";
+            case 6:
+                return "Table_0";
+            case 7:
+                return "Table_std";
+            default:
+                return "";
+        }
+    };
+    writer.StartObject();
+    writer.Key("objects");
+    writer.StartArray();
+    for (int i = 0; i < res.scores.size(); i++) {
+        auto label_str = label_map(res.label_ids[i]);
+        if (label_str.empty() || res.scores[i] < 0.5)
+            continue;
+        writer.StartObject();
+        writer.Key("text");
+        writer.String(label_str.c_str());
+        writer.Key("score");
+        writer.Double(res.scores[i]);
+        writer.Key("position");
+        writer.StartArray();
+        writer.Int(res.boxes[i][0]);
+        writer.Int(res.boxes[i][1]);
+        writer.Int(res.boxes[i][2]);
+        writer.Int(res.boxes[i][1]);
+        writer.Int(res.boxes[i][2]);
+        writer.Int(res.boxes[i][3]);
+        writer.Int(res.boxes[i][0]);
+        writer.Int(res.boxes[i][3]);
+        writer.EndArray();
+        writer.EndObject();
+    }
+    writer.EndArray();
+    writer.EndObject();
+    return ss.GetString();
+void sig_handler(int signo)
+    wait_group.done();
+void init(const char* path)
+    if (config.load(path) == false)
+    {
+        perror("Load config failed");
+        exit(1);
+    }
+    fastdeploy::SetLogger(false, false);
+    auto option = fastdeploy::RuntimeOption();
+    option.UseGpu(gpu_id);
+    int model_count = config.get_model_count();
+    for(size_t i = 0; i < model_count; ++i) {
+        if (i > model_count / 2 && gpu_id == 0)
+        {
+            gpu_id = 1;
+            option.UseGpu(gpu_id);
+            auto* model =  new fastdeploy::vision::detection::PPYOLOER(config.get_det_model_path(), config.get_det_params_path(), config.get_det_cfg_path(), option);
+            model->Initialized();
+            det_models.emplace_back(model);
+            continue;
+        }
+        if (det_models.empty()) {
+            auto* model =  new fastdeploy::vision::detection::PPYOLOER(config.get_det_model_path(), config.get_det_params_path(), config.get_det_cfg_path(), option);
+            model->Initialized();
+            det_models.emplace_back(model);
+        } else {
+            auto tmodel = std::move(det_models.back()->Clone());
+            det_models.emplace_back(reinterpret_cast<fastdeploy::vision::detection::PPYOLOER*>(tmodel.release()));
+        }
+    }
+    std::cout << "成功初始化 " << model_count << " 个模型!" << std::endl;
+    model_used.resize(model_count, 0);
+    signal(SIGINT, sig_handler);
+    signal(SIGTERM, sig_handler);
+class ServiceImpl : public gpu_server::Service
+    void Echo(EchoRequest *req, EchoResponse *resp, RPCContext *ctx) override
+    {
+        try {
+            auto get_valid_model = []() -> int {
+                while (true) {
+                    for (int i = 0; i < det_models.size(); ++i) {
+                        if (!model_used[i]) {
+                            std::unique_lock<std::mutex> lk(model_mtx);
+                            if (!model_used[i])
+                            {
+                                model_used[i] = 1;
+                                std::cout << "将使用第 " << i + 1 << " 个模型进行推理!" << std::endl;
+                                return i;
+                            }                            
+                        }
+                    }
+                }
+            };
+            const cv::Mat im = cv::imread(req->path());
+            if (!im.empty())
+            {
+                auto begin = std::chrono::high_resolution_clock::now();
+                auto model_index = get_valid_model();
+                fastdeploy::vision::DetectionResult res;
+                if (!det_models[model_index]->Predict(im, &res)) {
+                    std::cerr << "Failed to predict." << std::endl;
+                    return;
+                }
+                // 4. delete the following codes and fill your logic
+                {
+                    std::lock_guard<std::mutex> gd(model_mtx);
+                    model_used[model_index] = 0;
+                }
+                auto json_str = DetectionResultToJson(res);
+                res.Free();
+                resp->set_json(json_str.c_str());
+                auto end = std::chrono::high_resolution_clock::now();
+                auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin);
+                // std::cout << "推理耗时: << " << elapsed.count() * 1e-9 << " seconds.\n";
+            } else {
+                resp->set_json("{}");
+            }
+        } catch (const std::exception& e){
+            std::cout << "failed: " << e.what() << std::endl;
+        }
+    }
+int main(int argc, char* argv[])
+    if (argc < 2)
+    {
+        std::cout << "please input client config path." << std::endl;
+        return -1;
+    }
+    // 1. load config
+    init(argv[1]);
+    // 2. start server
+    SRPCServer server;
+    ServiceImpl impl;
+    server.add_service(&impl);
+    config.load_filter(server);
+    if (server.start(config.server_port()) == 0)
+    {
+        // 3. success and wait
+        fprintf(stderr, "gpu_server SRPC server started, port %u\n", config.server_port());
+        wait_group.wait();
+        server.stop();
+    }
+    else
+        perror("server start");
+    return 0;

@@ -0,0 +1,18 @@
+cmake_minimum_required(VERSION 3.10)
+	json/
+	server_config/
+	client_config/
+set(LIB ${Srpc_LIB} ${Workflow_LIB} pthread OpenSSL::SSL OpenSSL::Crypto
+    protobuf z ${SNAPPY_LIB} ${LZ4_LIB})
+include_directories(${OPENSSL_INCLUDE_DIR} ${Protobuf_INCLUDE_DIR}
+                    ${WORKFLOW_INCLUDE_DIR} ${SRPC_INCLUDE_DIR}
+add_library(${PROJECT_NAME} OBJECT ${SRC})
+target_link_libraries(${PROJECT_NAME} ${LIB})

@@ -0,0 +1,342 @@
+#include <fstream>
+#include "config.h"
+#include "workflow/WFGlobal.h"
+#include "workflow/UpstreamManager.h"
+#include "workflow/UpstreamPolicies.h"
+using namespace srpc;
+// default upstream_route_t
+static unsigned int default_consistent_hash(const char *path, const char *query,
+                                              const char *fragment)
+    return 0;
+static unsigned int default_select_route(const char *path, const char *query,
+                                         const char *fragment)
+    return 0;
+static void set_endpoint_params(const wfrest::Json& data,
+                                struct EndpointParams *params)
+    for (const auto& it : data)
+    {
+        if (it.key() == "max_connections")
+            params->max_connections = data["max_connections"];
+        else if (it.key() == "connect_timeout")
+            params->connect_timeout = data["connect_timeout"];
+        else if (it.key() == "response_timeout")
+            params->response_timeout = data["response_timeout"];
+        else if (it.key() == "ssl_connect_timeout")
+            params->ssl_connect_timeout = data["ssl_connect_timeout"];
+        else if (it.key() == "use_tls_sni")
+            params->use_tls_sni = data["use_tls_sni"];
+        else
+        {
+            printf("[INFO][set_endpoint_params] Unknown key : %s\n",
+                   it.key().c_str());
+        }
+    }
+static void load_global(const wfrest::Json& data)
+    struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
+    std::string resolv_conf_path;
+    std::string hosts_path;
+    for (const auto& it : data)
+    {
+        if (it.key() == "endpoint_params")
+        {
+            set_endpoint_params(data["endpoint_params"],
+                                &settings.endpoint_params);
+        }
+        else if (it.key() == "dns_server_params")
+        {
+            set_endpoint_params(data["dns_server_params"],
+                                &settings.dns_server_params);
+        }
+        else if (it.key() == "dns_ttl_default")
+            settings.dns_ttl_default = data["dns_ttl_default"];
+        else if (it.key() == "dns_ttl_min")
+            settings.dns_ttl_min = data["dns_ttl_min"];
+        else if (it.key() == "dns_threads")
+            settings.dns_threads = data["dns_threads"];
+        else if (it.key() == "poller_threads")
+            settings.poller_threads = data["poller_threads"];
+        else if (it.key() == "handler_threads")
+            settings.handler_threads = data["handler_threads"];
+        else if (it.key() == "compute_threads")
+            settings.compute_threads = data["compute_threads"];
+        else if (it.key() == "resolv_conf_path")
+        {
+            resolv_conf_path = data["resolv_conf_path"].get<std::string>();
+            settings.resolv_conf_path = resolv_conf_path.c_str();
+        }
+        else if (it.key() == "hosts_path")
+        {
+            hosts_path = data["hosts_path"].get<std::string>();
+            settings.hosts_path = hosts_path.c_str();
+        }
+        else
+            printf("[INFO][load_global] Unknown key : %s\n", it.key().c_str());
+    }
+    WORKFLOW_library_init(&settings);
+static bool load_upstream_server(const wfrest::Json& data,
+                                 std::vector<std::string>& hosts,
+                                 std::vector<AddressParams>& params)
+    AddressParams param;
+    hosts.clear();
+    params.clear();
+    for (const auto& server : data)
+    {
+        if (server.has("host") == false)
+        {
+            printf("[ERROR][load_upstream] Invalid upstream server\n");
+            continue;
+        }
+        param = ADDRESS_PARAMS_DEFAULT;
+        if (server.has("params"))
+        {
+            for (const auto& p : server["params"])
+            {
+                if (p.key() == "endpoint_params")
+                    set_endpoint_params(p.value(), &param.endpoint_params);
+                else if (p.key() == "weight")
+                    param.weight = p.value().get<unsigned short>();
+                else if (p.key() == "max_fails")
+                    param.max_fails = p.value().get<unsigned int>();
+                else if (p.key() == "dns_ttl_default")
+                    param.dns_ttl_default = p.value().get<unsigned int>();
+                else if (p.key() == "dns_ttl_min")
+                    param.dns_ttl_min = p.value().get<unsigned int>();
+                else if (p.key() == "server_type")
+                    param.server_type = p.value().get<int>();
+                else if (p.key() == "group_id")
+                    param.group_id = p.value().get<int>();
+                else
+                    printf("[ERROR][load_upstream] Invalid params: %s\n",
+                           p.key().c_str());
+            }
+        }
+        hosts.push_back(server["host"]);
+        params.push_back(param);
+    }
+    if (hosts.size() == 0)
+        return false;
+    else
+        return true;
+static void load_upstream(const wfrest::Json& data)
+    std::string name;
+    std::string type;
+    bool try_another;
+    std::vector<std::string> hosts;
+    std::vector<AddressParams> params;
+    for (const auto& it : data)
+    {
+        if (it.has("name") == false ||
+            it.has("type") == false ||
+            it.has("server") == false || 
+            load_upstream_server(it["server"], hosts, params) == false)
+        {
+            printf("[ERROR][load_upstream] Invalid upstream\n");
+            continue;
+        }
+        name = it["name"].get<std::string>();
+        type = it["type"].get<std::string>();
+        if (it.has("try_another"))
+            try_another = it["try_another"];
+        else
+            try_another = false;
+        if (type == "weighted_random")
+        {
+            UpstreamManager::upstream_create_weighted_random(name, try_another);
+        }
+        else if (type == "consistent_hash")
+        {
+            UpstreamManager::upstream_create_consistent_hash(name,
+                                                    default_consistent_hash);
+        }
+        else if (type == "round_robin")
+        {
+            UpstreamManager::upstream_create_round_robin(name, try_another);
+        }
+        else if (type == "manual")
+        {
+            UpstreamManager::upstream_create_manual(name,
+                                                    default_select_route,
+                                                    try_another,
+                                                    default_consistent_hash);
+        }
+        else if (type == "vnswrr")
+        {
+            UpstreamManager::upstream_create_vnswrr(name);
+        }
+        else
+        {
+            printf("[INFO][load_upstream] Unknown type : %s\n", type.c_str());
+            continue;
+        }
+        for (size_t i = 0; i < hosts.size(); i++)
+            UpstreamManager::upstream_add_server(name, hosts[i], &params[i]);
+    }
+void ClientRPCConfig::load_server()
+    if (this->data["server"].has("port"))
+        this->s_port = this->data["server"]["port"];
+    if (this->data["server"].has("root"))
+        this->root_path = this->data["server"]["root"].get<std::string>();
+    if (this->data["server"].has("cert_file"))
+        this->s_cert_file = this->data["server"]["cert_file"].get<std::string>();
+    if (this->data["server"].has("file_key"))
+        this->s_file_key = this->data["server"]["file_key"].get<std::string>();
+    if (this->data["server"].has("error_page"))
+    {
+        for (const auto& it : this->data["server"]["error_page"])
+        {
+            std::string page;
+            if (it.has("error") == true && it.has("error") == true)
+            {
+                page = it["page"].get<std::string>();
+                for (const auto& e : it["error"])
+                    this->error_page.insert(std::make_pair(e.get<int>(), page));
+            }
+            else
+            {
+                printf("[ERROR][load_file_service] Invalid error_page\n");
+                continue;
+            }
+        }
+    }
+void ClientRPCConfig::load_client()
+    if (this->data["client"].has("remote_host"))
+        this->c_host = this->data["client"]["remote_host"].get<std::string>();
+    if (this->data["client"].has("remote_port"))
+        this->c_port = this->data["client"]["remote_port"];
+    if (this->data["client"].has("redirect_max"))
+        this->c_redirect_max = this->data["client"]["redirect_max"];
+    if (this->data["client"].has("retry_max"))
+        this->c_retry_max = this->data["client"]["retry_max"];
+    if (this->data["client"].has("user_name"))
+        this->c_user_name = this->data["client"]["user_name"].get<std::string>();
+    if (this->data["client"].has("password"))
+        this->c_password = this->data["client"]["password"].get<std::string>();
+void ClientRPCConfig::load_conversion()
+    if (this->data["conversion"].has("pdf_root_path"))
+        this->pdf_root_path = this->data["conversion"]["pdf_root_path"].get<std::string>();
+    if (this->data["conversion"].has("conversion_resource_path"))
+        this->conversion_resource_path = this->data["conversion"]["conversion_resource_path"].get<std::string>();
+    if (this->data["conversion"].has("conversion_instance"))
+        this->conversion_instance = this->data["conversion"]["conversion_instance"].get<std::string>();
+    if (this->data["conversion"].has("conversion_type"))
+        this->conversion_type = this->data["conversion"]["conversion_type"].get<std::string>();
+    if (this->data["conversion"].has("output_path"))
+        this->output_path = this->data["conversion"]["output_path"].get<std::string>();
+    if (this->data["conversion"].has("model_path"))
+        this->model_path = this->data["conversion"]["model_path"].get<std::string>();
+    if (this->data["conversion"].has("outer_batch_size"))
+        this->outer_batch_size = this->data["conversion"]["outer_batch_size"];
+    if (this->data["conversion"].has("inner_batch_size"))
+        this->inner_batch_size = this->data["conversion"]["inner_batch_size"];
+bool ClientRPCConfig::load(const char *file)
+    FILE *fp = fopen(file, "r");
+    if (!fp)
+        return false;
+    this->data = wfrest::Json::parse(fp);
+    fclose(fp);
+    if (this->data.is_valid() == false)
+        return false;
+    for (const auto& it : this->data)
+    {
+        if (it.key() == "server")
+            this->load_server();
+        else if (it.key() == "client")
+            this->load_client();
+        else if (it.key() == "conversion")
+            this->load_conversion();
+        else if (it.key() == "global")
+            load_global(it.value());
+        else if (it.key() == "upstream")
+            load_upstream(it.value());
+        else
+            printf("[INFO][ClientRPCConfig::load] Unknown key: %s\n", it.key().c_str());
+    }
+    // 获取当前时间点
+    auto now = std::chrono::system_clock::now();
+    // 转换为时间戳
+    auto now_c = std::chrono::system_clock::to_time_t(now);
+    // 创建tm结构体实例
+    std::tm local_tm = *std::localtime(&now_c);
+    // 缓冲区用于存储格式化后的字符串
+    char buffer[20];
+    // 使用asctime_r函数按特定格式填充缓冲区,但这个函数不支持我们所需的格式,需要进一步处理
+    std::strftime(buffer, sizeof(buffer), "%Y%m%d_%H_%M_%S", &local_tm);
+    time_stamp = buffer;
+    time_stamp.erase(0, 2);
+    time_stamp.push_back('c');
+    return true;

@@ -0,0 +1,83 @@
+#ifndef _RPC_CONFIG_H_
+#define _RPC_CONFIG_H_
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include "json/Json.h"
+namespace srpc
+class ClientRPCConfig
+    using ErrorPageMap = std::unordered_map<int, std::string>;
+    bool load(const char *file);
+    unsigned short server_port() const { return this->s_port; }
+    const char *server_cert_file() const { return this->s_cert_file.c_str(); }
+    const char *server_file_key() const { return this->s_file_key.c_str(); }
+    unsigned short client_port() const { return this->c_port; }
+    const char *client_host() const { return this->c_host.c_str(); }
+    int redirect_max() const { return this->c_redirect_max; }
+    int retry_max() const { return this->c_retry_max; }
+    const char *client_user_name() const { return this->c_user_name.c_str(); }
+    const char *client_password() const { return this->c_password.c_str(); }
+    const char *get_root_path() const { return this->root_path.c_str(); }
+    const ErrorPageMap& get_error_page() const { return this->error_page; }
+    const char* get_conversion_instance() { return conversion_instance.c_str(); }
+    const char* get_conversion_type() { return conversion_type.c_str(); }
+    const char* get_conversion_resource_path() { return conversion_resource_path.c_str(); }
+    const char* get_output_path() { return output_path.c_str(); }
+    const char* get_model_path() { return model_path.c_str(); }
+    const char* get_pdf_root_path() { return pdf_root_path.c_str(); }
+    const char* get_time_stamp() { return time_stamp.c_str(); }
+    void add_http_put_count() { http_put_count++; }
+    void add_http_get_count() { http_get_count++; }
+    int get_http_put_count() { return http_put_count; }
+    int get_http_get_count() { return http_get_count; }
+    int get_outer_batch_size() { return outer_batch_size; }
+    int get_inner_batch_size() { return inner_batch_size; }
+    ClientRPCConfig() : s_port(0), c_port(0), c_redirect_max(0), c_retry_max(0) { }
+    ~ClientRPCConfig();
+    void load_server();
+    void load_client();
+    void load_conversion();
+    wfrest::Json data;
+    unsigned short s_port;
+    std::string s_cert_file;
+    std::string s_file_key;
+    std::string c_host;
+    unsigned short c_port;
+    int c_redirect_max;
+    int c_retry_max;
+    std::string c_user_name;
+    std::string c_password;
+    std::string root_path;
+    ErrorPageMap error_page;
+    int http_get_count = 0;
+    int http_put_count = 0;
+    int outer_batch_size;
+    int inner_batch_size;
+    std::string pdf_root_path;
+    std::string conversion_resource_path;
+    std::string conversion_instance;
+    std::string conversion_type;
+    std::string output_path;
+    std::string model_path;
+    std::string time_stamp;

@@ -0,0 +1,35 @@
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include "workflow/WFTaskFactory.h"
+template <class TASK>
+void print_peer_address(TASK *server_task)
+    char addrstr[128];
+    struct sockaddr_storage addr;
+    socklen_t l = sizeof addr;
+    unsigned short port = 0;
+    long long seq = server_task->get_task_seq();
+    server_task->get_peer_addr((struct sockaddr *)&addr, &l);
+    if (addr.ss_family == AF_INET)
+    {
+        struct sockaddr_in *sin = (struct sockaddr_in *)&addr;
+        inet_ntop(AF_INET, &sin->sin_addr, addrstr, 128);
+        port = ntohs(sin->sin_port);
+    }
+    else if (addr.ss_family == AF_INET6)
+    {
+        struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&addr;
+        inet_ntop(AF_INET6, &sin6->sin6_addr, addrstr, 128);
+        port = ntohs(sin6->sin6_port);
+    }
+    else
+        strcpy(addrstr, "Unknown");
+    fprintf(stderr, "peer address: %s:%d, seq: %lld.\n", addrstr, port, seq);

@@ -0,0 +1,557 @@
+#include <fstream>
+#include "config.h"
+#include "workflow/WFGlobal.h"
+#include "workflow/UpstreamManager.h"
+#include "workflow/UpstreamPolicies.h"
+#include "srpc/rpc_metrics_filter.h"
+#include "srpc/rpc_trace_filter.h"
+using namespace srpc;
+// default upstream_route_t
+static unsigned int default_consistent_hash(const char *path, const char *query,
+                                              const char *fragment)
+    return 0;
+static unsigned int default_select_route(const char *path, const char *query,
+                                         const char *fragment)
+    return 0;
+static void set_endpoint_params(const wfrest::Json& data,
+                                struct EndpointParams *params)
+    for (const auto& it : data)
+    {
+        if (it.key() == "max_connections")
+            params->max_connections = data["max_connections"];
+        else if (it.key() == "connect_timeout")
+            params->connect_timeout = data["connect_timeout"];
+        else if (it.key() == "response_timeout")
+            params->response_timeout = data["response_timeout"];
+        else if (it.key() == "ssl_connect_timeout")
+            params->ssl_connect_timeout = data["ssl_connect_timeout"];
+        else if (it.key() == "use_tls_sni")
+            params->use_tls_sni = data["use_tls_sni"];
+        else
+        {
+            printf("[INFO][set_endpoint_params] Unknown key : %s\n",
+                   it.key().c_str());
+        }
+    }
+static void load_global(const wfrest::Json& data)
+    struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
+    std::string resolv_conf_path;
+    std::string hosts_path;
+    for (const auto& it : data)
+    {
+        if (it.key() == "endpoint_params")
+        {
+            set_endpoint_params(data["endpoint_params"],
+                                &settings.endpoint_params);
+        }
+        else if (it.key() == "dns_server_params")
+        {
+            set_endpoint_params(data["dns_server_params"],
+                                &settings.dns_server_params);
+        }
+        else if (it.key() == "dns_ttl_default")
+            settings.dns_ttl_default = data["dns_ttl_default"];
+        else if (it.key() == "dns_ttl_min")
+            settings.dns_ttl_min = data["dns_ttl_min"];
+        else if (it.key() == "dns_threads")
+            settings.dns_threads = data["dns_threads"];
+        else if (it.key() == "poller_threads")
+            settings.poller_threads = data["poller_threads"];
+        else if (it.key() == "handler_threads")
+            settings.handler_threads = data["handler_threads"];
+        else if (it.key() == "compute_threads")
+            settings.compute_threads = data["compute_threads"];
+        else if (it.key() == "resolv_conf_path")
+        {
+            resolv_conf_path = data["resolv_conf_path"].get<std::string>();
+            settings.resolv_conf_path = resolv_conf_path.c_str();
+        }
+        else if (it.key() == "hosts_path")
+        {
+            hosts_path = data["hosts_path"].get<std::string>();
+            settings.hosts_path = hosts_path.c_str();
+        }
+        else
+            printf("[INFO][load_global] Unknown key : %s\n", it.key().c_str());
+    }
+    WORKFLOW_library_init(&settings);
+static bool load_upstream_server(const wfrest::Json& data,
+                                 std::vector<std::string>& hosts,
+                                 std::vector<AddressParams>& params)
+    AddressParams param;
+    hosts.clear();
+    params.clear();
+    for (const auto& server : data)
+    {
+        if (server.has("host") == false)
+        {
+            printf("[ERROR][load_upstream] Invalid upstream server\n");
+            continue;
+        }
+        param = ADDRESS_PARAMS_DEFAULT;
+        if (server.has("params"))
+        {
+            for (const auto& p : server["params"])
+            {
+                if (p.key() == "endpoint_params")
+                    set_endpoint_params(p.value(), &param.endpoint_params);
+                else if (p.key() == "weight")
+                    param.weight = p.value().get<unsigned short>();
+                else if (p.key() == "max_fails")
+                    param.max_fails = p.value().get<unsigned int>();
+                else if (p.key() == "dns_ttl_default")
+                    param.dns_ttl_default = p.value().get<unsigned int>();
+                else if (p.key() == "dns_ttl_min")
+                    param.dns_ttl_min = p.value().get<unsigned int>();
+                else if (p.key() == "server_type")
+                    param.server_type = p.value().get<int>();
+                else if (p.key() == "group_id")
+                    param.group_id = p.value().get<int>();
+                else
+                    printf("[ERROR][load_upstream] Invalid params: %s\n",
+                           p.key().c_str());
+            }
+        }
+        hosts.push_back(server["host"]);
+        params.push_back(param);
+    }
+    if (hosts.size() == 0)
+        return false;
+    else
+        return true;
+static void load_upstream(const wfrest::Json& data)
+    std::string name;
+    std::string type;
+    bool try_another;
+    std::vector<std::string> hosts;
+    std::vector<AddressParams> params;
+    for (const auto& it : data)
+    {
+        if (it.has("name") == false ||
+            it.has("type") == false ||
+            it.has("server") == false || 
+            load_upstream_server(it["server"], hosts, params) == false)
+        {
+            printf("[ERROR][load_upstream] Invalid upstream\n");
+            continue;
+        }
+        name = it["name"].get<std::string>();
+        type = it["type"].get<std::string>();
+        if (it.has("try_another"))
+            try_another = it["try_another"];
+        else
+            try_another = false;
+        if (type == "weighted_random")
+        {
+            UpstreamManager::upstream_create_weighted_random(name, try_another);
+        }
+        else if (type == "consistent_hash")
+        {
+            UpstreamManager::upstream_create_consistent_hash(name,
+                                                    default_consistent_hash);
+        }
+        else if (type == "round_robin")
+        {
+            UpstreamManager::upstream_create_round_robin(name, try_another);
+        }
+        else if (type == "manual")
+        {
+            UpstreamManager::upstream_create_manual(name,
+                                                    default_select_route,
+                                                    try_another,
+                                                    default_consistent_hash);
+        }
+        else if (type == "vnswrr")
+        {
+            UpstreamManager::upstream_create_vnswrr(name);
+        }
+        else
+        {
+            printf("[INFO][load_upstream] Unknown type : %s\n", type.c_str());
+            continue;
+        }
+        for (size_t i = 0; i < hosts.size(); i++)
+            UpstreamManager::upstream_add_server(name, hosts[i], &params[i]);
+    }
+void ServerRPCConfig::load_server()
+    if (this->data["server"].has("port"))
+        this->s_port = this->data["server"]["port"];
+    if (this->data["server"].has("root"))
+        this->root_path = this->data["server"]["root"].get<std::string>();
+    if (this->data["server"].has("cert_file"))
+        this->s_cert_file = this->data["server"]["cert_file"].get<std::string>();
+    if (this->data["server"].has("file_key"))
+        this->s_file_key = this->data["server"]["file_key"].get<std::string>();
+    if (this->data["server"].has("error_page"))
+    {
+        for (const auto& it : this->data["server"]["error_page"])
+        {
+            std::string page;
+            if (it.has("error") == true && it.has("error") == true)
+            {
+                page = it["page"].get<std::string>();
+                for (const auto& e : it["error"])
+                    this->error_page.insert(std::make_pair(e.get<int>(), page));
+            }
+            else
+            {
+                printf("[ERROR][load_file_service] Invalid error_page\n");
+                continue;
+            }
+        }
+    }
+void ServerRPCConfig::load_client()
+    if (this->data["client"].has("transport_type"))
+    {
+        std::string type = this->data["client"]["transport_type"].get<std::string>();
+        if (type == "TT_SCTP")
+            this->c_transport_type = TT_SCTP;
+        else if (type == "TT_UDP")
+            this->c_transport_type = TT_UDP;
+    }
+    if (this->data["client"].has("remote_host"))
+        this->c_host = this->data["client"]["remote_host"].get<std::string>();
+    if (this->data["client"].has("remote_port"))
+        this->c_port = this->data["client"]["remote_port"];
+    if (this->data["client"].has("is_ssl"))
+        this->c_is_ssl = this->data["client"]["is_ssl"];
+    if (this->data["client"].has("callee_timeout"))
+        this->c_callee_timeout = this->data["client"]["callee_timeout"];
+    if (this->data["client"].has("redirect_max"))
+        this->c_redirect_max = this->data["client"]["redirect_max"];
+    if (this->data["client"].has("retry_max"))
+        this->c_retry_max = this->data["client"]["retry_max"];
+    if (this->data["client"].has("user_name"))
+        this->c_user_name = this->data["client"]["user_name"].get<std::string>();
+    if (this->data["client"].has("password"))
+        this->c_password = this->data["client"]["password"].get<std::string>();
+bool ServerRPCConfig::load(const char *file)
+    FILE *fp = fopen(file, "r");
+    if (!fp)
+        return false;
+    this->data = wfrest::Json::parse(fp);
+    fclose(fp);
+    if (this->data.is_valid() == false)
+        return false;
+    for (const auto& it : this->data)
+    {
+        if (it.key() == "server")
+            this->load_server();
+        else if (it.key() == "client")
+            this->load_client();
+        else if (it.key() == "global")
+            load_global(it.value());
+        else if (it.key() == "upstream")
+            load_upstream(it.value());
+        else if (it.key() == "metrics")
+            this->load_metrics();
+        else if (it.key() == "trace")
+            this->load_trace();
+        else if (it.key() == "model")
+            this->load_model();
+        else
+            printf("[INFO][ServerRPCConfig::load] Unknown key: %s\n", it.key().c_str());
+    }
+    return true;
+void ServerRPCConfig::load_metrics()
+    for (const auto& it : this->data["metrics"])
+    {
+        if (it.has("filter") == false)
+            continue;
+        std::string filter_name = it["filter"];
+        if ("prometheus") == 0)
+        {
+            if (it.has("port") == false)
+                continue;
+            RPCMetricsPull *filter = new RPCMetricsPull();
+            unsigned short port = it["port"];
+            filter->init(port);
+            this->filters.push_back(filter);
+        }
+        else if ("opentelemetry") == 0)
+        {
+            if (it.has("address") == false)
+                continue;
+            std::string name = it["filter_name"];
+            std::string url = it["address"];
+            unsigned int redirect_max = OTLP_HTTP_REDIRECT_MAX;
+            unsigned int retry_max = OTLP_HTTP_RETRY_MAX;
+            size_t report_threshold = RPC_REPORT_THREHOLD_DEFAULT;
+            size_t report_interval = RPC_REPORT_INTERVAL_DEFAULT;
+            if (it.has("redirect_max"))
+                redirect_max = it["redirect_max"];
+            if (it.has("retry_max"))
+                retry_max = it["retry_max"];
+            if (it.has("report_threshold"))
+                report_threshold = it["report_threshold"];
+            if (it.has("report_interval_ms"))
+                report_interval = it["report_interval_ms"];
+            RPCMetricsOTel *filter = new RPCMetricsOTel(name,
+                                                        url,
+                                                        redirect_max,
+                                                        retry_max,
+                                                        report_threshold,
+                                                        report_interval);
+            if (it.has("attributes"))
+            {
+                for (const auto& kv : it["attributes"])
+                {
+                    if (kv.has("key") == false || kv.has("value") == false)
+                        continue;
+                    filter->add_attributes(kv["key"], kv["value"]);
+                }
+            }
+            this->filters.push_back(filter);
+        }
+        else
+        {
+            printf("[ERROR][ServerRPCConfig::load_metrics] Unknown metrics: %s\n",
+                   filter_name.c_str());
+        }
+    }
+void ServerRPCConfig::load_trace()
+    for (const auto& it : this->data["trace"])
+    {
+        if (it.has("filter") == false)
+            continue;
+        std::string filter_name = it["filter"];
+        size_t spans_per_second = SPANS_PER_SECOND_DEFAULT;
+        if ("default") == 0)
+        {
+            if (it.has("spans_per_second"))
+                spans_per_second = it["spans_per_second"];
+            auto *filter = new RPCTraceDefault(spans_per_second);
+            this->filters.push_back(filter);
+        }
+        else if ("opentelemetry") == 0)
+        {
+            if (it.has("address") == false)
+                continue;
+            std::string url = it["address"];
+            unsigned int redirect_max = OTLP_HTTP_REDIRECT_MAX;
+            unsigned int retry_max = OTLP_HTTP_RETRY_MAX;
+            size_t report_threshold = RPC_REPORT_THREHOLD_DEFAULT;
+            size_t report_interval = RPC_REPORT_INTERVAL_DEFAULT;
+            if (it.has("redirect_max"))
+                redirect_max = it["redirect_max"];
+            if (it.has("retry_max"))
+                retry_max = it["retry_max"];
+            if (it.has("report_threshold"))
+                report_threshold = it["report_threshold"];
+            if (it.has("report_interval_ms"))
+                report_interval = it["report_interval_ms"];
+            auto *filter = new RPCTraceOpenTelemetry(url,
+                                                     OTLP_TRACES_PATH,
+                                                     redirect_max,
+                                                     retry_max,
+                                                     spans_per_second,
+                                                     report_threshold,
+                                                     report_interval);
+            if (it.has("attributes"))
+            {
+                for (const auto& kv : it["attributes"])
+                {
+                    if (kv.has("key") == false || kv.has("value") == false)
+                        continue;
+                    filter->add_attributes(kv["key"], kv["value"]);
+                }
+            }
+            this->filters.push_back(filter);
+        }
+        else
+        {
+            printf("[ERROR][ServerRPCConfig::load_metrics] Unknown metrics: %s\n",
+                   filter_name.c_str());
+        }
+    }
+void ServerRPCConfig::load_model()
+    if (this->data["model"].has("model_count"))
+        this->model_count = this->data["model"]["model_count"];
+    if (this->data["model"].has("det_model_path"))
+        this->det_model_path = this->data["model"]["det_model_path"].get<std::string>();
+    if (this->data["model"].has("det_params_path"))
+        this->det_params_path = this->data["model"]["det_params_path"].get<std::string>();
+    if (this->data["model"].has("det_cfg_path"))
+        this->det_cfg_path = this->data["model"]["det_cfg_path"].get<std::string>();
+void ServerRPCConfig::load_filter(SRPCServer& server)
+    for (auto *filter : this->filters)
+        server.add_filter(filter);
+void ServerRPCConfig::load_filter(SRPCClient& client)
+    for (auto *filter : this->filters)
+        client.add_filter(filter);
+void ServerRPCConfig::load_filter(SRPCHttpServer& server)
+    for (auto *filter : this->filters)
+        server.add_filter(filter);
+void ServerRPCConfig::load_filter(SRPCHttpClient& client)
+    for (auto *filter : this->filters)
+        client.add_filter(filter);
+void ServerRPCConfig::load_filter(BRPCServer& server)
+    for (auto *filter : this->filters)
+        server.add_filter(filter);
+void ServerRPCConfig::load_filter(BRPCClient& client)
+    for (auto *filter : this->filters)
+        client.add_filter(filter);
+void ServerRPCConfig::load_filter(ThriftServer& server)
+    for (auto *filter : this->filters)
+        server.add_filter(filter);
+void ServerRPCConfig::load_filter(ThriftClient& client)
+    for (auto *filter : this->filters)
+        client.add_filter(filter);
+void ServerRPCConfig::load_filter(ThriftHttpServer& server)
+    for (auto *filter : this->filters)
+        server.add_filter(filter);
+void ServerRPCConfig::load_filter(ThriftHttpClient& client)
+    for (auto *filter : this->filters)
+        client.add_filter(filter);
+void ServerRPCConfig::load_filter(TRPCServer& server)
+    for (auto *filter : this->filters)
+        server.add_filter(filter);
+void ServerRPCConfig::load_filter(TRPCClient& client)
+    for (auto *filter : this->filters)
+        client.add_filter(filter);
+void ServerRPCConfig::load_filter(TRPCHttpServer& server)
+    for (auto *filter : this->filters)
+        server.add_filter(filter);
+void ServerRPCConfig::load_filter(TRPCHttpClient& client)
+    for (auto *filter : this->filters)
+        client.add_filter(filter);
+    for (size_t i = 0; i < this->filters.size(); i++)
+        delete this->filters[i];

@@ -0,0 +1,104 @@
+#ifndef _RPC_CONFIG_H_
+#define _RPC_CONFIG_H_
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include "json/Json.h"
+#include "srpc/rpc_types.h"
+#include "srpc/rpc_define.h"
+#include "srpc/rpc_filter.h"
+namespace srpc
+class ServerRPCConfig
+    using ErrorPageMap = std::unordered_map<int, std::string>;
+    bool load(const char *file);
+    void load_filter(SRPCServer& server);
+    void load_filter(SRPCClient& client);
+    void load_filter(SRPCHttpServer& server);
+    void load_filter(SRPCHttpClient& client);
+    void load_filter(BRPCServer& server);
+    void load_filter(BRPCClient& client);
+    void load_filter(ThriftServer& server);
+    void load_filter(ThriftClient& client);
+    void load_filter(ThriftHttpServer& server);
+    void load_filter(ThriftHttpClient& client);
+    void load_filter(TRPCServer& server);
+    void load_filter(TRPCClient& client);
+    void load_filter(TRPCHttpServer& server);
+    void load_filter(TRPCHttpClient& client);
+    unsigned short server_port() const { return this->s_port; }
+    const char *server_cert_file() const { return this->s_cert_file.c_str(); }
+    const char *server_file_key() const { return this->s_file_key.c_str(); }
+    enum TransportType client_transport_type() const { return this->c_transport_type; }
+    const char *client_host() const { return this->c_host.c_str(); }
+    unsigned short client_port() const { return this->c_port; }
+    bool client_is_ssl() const { return this->c_is_ssl; }
+    const char *client_url() const { return this->c_url.c_str(); }
+    int client_callee_timeout() const { return this->c_callee_timeout; }
+    const char *client_caller() const { return this->c_caller.c_str(); }
+    int redirect_max() const { return this->c_redirect_max; }
+    int retry_max() const { return this->c_retry_max; }
+    const char *client_user_name() const { return this->c_user_name.c_str(); }
+    const char *client_password() const { return this->c_password.c_str(); }
+    const char *get_root_path() const { return this->root_path.c_str(); }
+    const ErrorPageMap& get_error_page() const { return this->error_page; }
+    int get_model_count() { return model_count; }
+    const char* get_det_model_path() { return det_model_path.c_str(); }
+    const char* get_det_params_path() { return det_params_path.c_str(); }
+    const char* get_det_cfg_path() { return det_cfg_path.c_str(); }
+    ServerRPCConfig() :
+        s_port(0), c_port(0), c_is_ssl(false), c_redirect_max(0), c_retry_max(0), model_count(30)
+    { }
+    ~ServerRPCConfig();
+    void load_server();
+    void load_client();
+    void load_metrics();
+    void load_trace();
+    void load_model();
+    wfrest::Json data;
+    std::vector<RPCFilter *> filters;
+    unsigned short s_port;
+    std::string s_cert_file;
+    std::string s_file_key;
+    enum TransportType c_transport_type;
+    std::string c_host;
+    unsigned short c_port;
+    bool c_is_ssl;
+    std::string c_url;
+    int c_callee_timeout;
+    std::string c_caller;
+    int c_redirect_max;
+    int c_retry_max;
+    std::string c_user_name;
+    std::string c_password;
+    std::string root_path;
+    ErrorPageMap error_page;
+    int model_count;
+    std::string det_model_path;
+    std::string det_params_path;
+    std::string det_cfg_path;

