Skip to content

Building dynamic routing forwarding based on pingora

Published: at 18:16

生命周期简介

# 使用httpbin 验证转发 以及header修改
podman run -p 8080:80 docker.io/kong/httpbin:latest

RUST_LOG=INFO cargo run

# 该请求会转发到8080 并体系header修改
curl localhost:18081/get -v
#[async_trait]
impl ProxyHttp for SimpleGateway{
    type CTX = ();
    fn new_ctx(&self) -> Self::CTX {}

    ///
    /// 转发
    async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> pingora_error::Result<Box<HttpPeer>> {
        log_summary(session,"upstream_peer");

        let peer = Box::new(HttpPeer::new(("127.0.0.1", 8080), false, "1.1.1.1".to_string()));
        Ok(peer)
    }

    ///
    /// 首先处理请求
    /// 解析、验证、速率限制
    /// 注意 返回true意味着请求退出 false 继续
    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_error::Result<bool> where Self::CTX: Send + Sync {
        log_summary(_session,"request_filter");
        Ok(false)
    }

    ///
    /// 决定是否继续请求
    async fn proxy_upstream_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_error::Result<bool> where Self::CTX: Send + Sync {
        log_summary(_session,"proxy_upstream_filter");
        Ok(true)
    }

    ///
    /// 请求发到上游前修改请求
    async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
        log_summary(_session,"upstream_request_filter");
        _upstream_request.insert_header("service-name",&self.name)
    }

    ///
    /// 在返回下游前修改响应
    async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
        log_summary(_session,"response_filter");
        _upstream_response.insert_header("server-resp",&self.name)
    }

    ///
    /// 针对响应正文
    fn response_body_filter(&self, _session: &mut Session, _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX) -> pingora_error::Result<Option<Duration>> where Self::CTX: Send + Sync {
        log_summary(_session,"response_body_filter");
        Ok(Some(Duration::from_secs(0)))
    }


    ///
    /// session日志
    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX) where Self::CTX: Send + Sync {
        let res_code = _session.response_written()
            .map_or(0,|resp|resp.status.as_u16());
        info!("{} response code: {res_code}",self.request_summary(_session,_ctx));
    }

    ///
    /// 刚刚连接
    async fn connected_to_upstream(&self, _session: &mut Session, _reused: bool, _peer: &HttpPeer, _fd: RawFd, _digest: Option<&Digest>, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
        log_summary(_session,"connected_to_upstream");
        Ok(())
    }
}

fn log_summary(session: &mut Session,func_name:&str) {
    let summary = session.request_summary();
    info!("==>{func_name:?} {summary:?}")
}

静态路由代理


#[async_trait]
impl ProxyHttp for SimpleProxy{
    type CTX = CurrentRoute;

    fn new_ctx(&self) -> Self::CTX {
        CurrentRoute{
            // 一级路由用于判断转发到何处
            inner_route:"".to_string(),
            // 去掉一级路由后的部分
            goto_route:"".to_string(),
        }
    }

    async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_error::Result<bool> where Self::CTX: Send + Sync {
        log_summary(_session,"request_filter");
        // 获取一级路由
        let path = _session.req_header().uri.path_and_query().unwrap().to_string();
        let path_arr = path.strip_prefix("/").unwrap().split_once("/").unwrap();
        // 如果没有就返回不再继续
        if !self.where_go_to.contains_key(path_arr.0) {
            return Ok(true)
        }
        // 如果又就存入上下文
        _ctx.inner_route = path_arr.0.to_string();
        _ctx.goto_route = path_arr.1.to_string();
        Ok(false)
    }

    async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> pingora_error::Result<Box<HttpPeer>> {
        log_summary(session,"upstream_peer");
        // 从map中获取转发目标
        let peer = self.where_go_to.get(&ctx.inner_route).unwrap();
        let peer_addr = &peer.0;
        let peer_port = peer.1;
        // 构建peer
        let bp = Box::new(HttpPeer::new((String::from(peer_addr),peer_port),false,"1.1.1.1".to_string()));
        Ok(bp)
    }

    async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {

        // 把转发路径改为去掉一级路由的部分
        let path = &_ctx.goto_route;
        let v = PathAndQuery::from_str(&path).unwrap();
        _upstream_request.set_uri(Uri::from(v));
        Ok(())

    }

    async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
        Ok(())
    }

    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX) where Self::CTX: Send + Sync {
        log_summary(_session,"logging");
        // 如果为空就是从request_filter 404跳过来的
        if _ctx.goto_route.is_empty() {
            // 准备ResponseHeader
            let mut req_h = ResponseHeader::build(404,Some(6)).unwrap();
            // 返回数据
            let b = Bytes::from("can not find proxy path");
            req_h.append_header("Content-Length",b.len()).unwrap();
            // 一定要write_response_header 写准备ResponseHeader 才能把后面的body写入响应
            _session.write_response_header(Box::new(req_h)).await.unwrap();

            _session.write_response_body(b).await.unwrap();
            _session.set_keepalive(None);

            info!("==>is write{:?}",_session.response_written());

            _session.finish_body().await.unwrap();
        }

    }
}

动态路由转发

# 实现效果
# 1.check route exits ?
curl --request POST --url 'http://127.0.0.1:18081/a/post?a=3' --header 'Accept: *' --header 'Content-Type: *'
# 2. add route
 curl --request GET \
   --url http://127.0.0.1:8989/ \
   --header 'Content-Type: application/json' \
   --header 'User-Agent: insomnia/2023.5.8' \
   --data '{
 	"route":"a",
 	"addr":"127.0.0.1",
 	"port":8080
 }'
# 3. check again
curl --request POST --url 'http://127.0.0.1:18081/a/post?a=3' --header 'Accept: *' --header 'Content-Type: *'

pub struct MutRouteProxy {
    // 因为所有权问题以及并发等问题 使用了moka的sync cache
    where_go_to:Cache<String,PeerAddr>,
}

// http proxy 部分没有改变

// 添加了一个admin app 用于添加路由


#[async_trait]
impl ServeHttp for AdminApp {
    async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>> {
        let read_timeout = 2000;
        let body = match timeout(
            Duration::from_millis(read_timeout),
            http_session.read_request_body(),
        )
            .await
        {
            Ok(res) => res.unwrap().unwrap_or_else(|| Bytes::from("no body!")),
            Err(_) => {
                panic!("Timed out after {:?}ms", read_timeout);
            }
        };

        let v:NewRoute= serde_json::from_slice(body.to_vec().as_slice()).unwrap();
        let route = v.route;
        let addr = v.addr;
        let port = v.port;
        info!("new route ==={route} > {addr}:{port}");
        self.map.insert(route,PeerAddr(addr,port));

        let ret = Bytes::from("ok");
        Response::builder()
            .status(StatusCode::OK)
            .header(http::header::CONTENT_TYPE, "text/html")
            .header(http::header::CONTENT_LENGTH, ret.len())
            .body(ret.to_vec())
            .unwrap()
    }
}

代码仓库

GITHUBpo-hproxy

ATOMpo-hproxy