生命周期简介
# 使用httpbin 验证转发 以及header修改
podman run -p 8080:80 docker.io/kong/httpbin:latest
RUST_LOG=INFO cargo run
# 该请求会转发到8080 并体系header修改
curl localhost:18081/get -v
#[async_trait]
impl ProxyHttp for SimpleGateway{
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
///
/// 转发
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> pingora_error::Result<Box<HttpPeer>> {
log_summary(session,"upstream_peer");
let peer = Box::new(HttpPeer::new(("127.0.0.1", 8080), false, "1.1.1.1".to_string()));
Ok(peer)
}
///
/// 首先处理请求
/// 解析、验证、速率限制
/// 注意 返回true意味着请求退出 false 继续
async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_error::Result<bool> where Self::CTX: Send + Sync {
log_summary(_session,"request_filter");
Ok(false)
}
///
/// 决定是否继续请求
async fn proxy_upstream_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_error::Result<bool> where Self::CTX: Send + Sync {
log_summary(_session,"proxy_upstream_filter");
Ok(true)
}
///
/// 请求发到上游前修改请求
async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
log_summary(_session,"upstream_request_filter");
_upstream_request.insert_header("service-name",&self.name)
}
///
/// 在返回下游前修改响应
async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
log_summary(_session,"response_filter");
_upstream_response.insert_header("server-resp",&self.name)
}
///
/// 针对响应正文
fn response_body_filter(&self, _session: &mut Session, _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX) -> pingora_error::Result<Option<Duration>> where Self::CTX: Send + Sync {
log_summary(_session,"response_body_filter");
Ok(Some(Duration::from_secs(0)))
}
///
/// session日志
async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX) where Self::CTX: Send + Sync {
let res_code = _session.response_written()
.map_or(0,|resp|resp.status.as_u16());
info!("{} response code: {res_code}",self.request_summary(_session,_ctx));
}
///
/// 刚刚连接
async fn connected_to_upstream(&self, _session: &mut Session, _reused: bool, _peer: &HttpPeer, _fd: RawFd, _digest: Option<&Digest>, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
log_summary(_session,"connected_to_upstream");
Ok(())
}
}
fn log_summary(session: &mut Session,func_name:&str) {
let summary = session.request_summary();
info!("==>{func_name:?} {summary:?}")
}
静态路由代理
#[async_trait]
impl ProxyHttp for SimpleProxy{
type CTX = CurrentRoute;
fn new_ctx(&self) -> Self::CTX {
CurrentRoute{
// 一级路由用于判断转发到何处
inner_route:"".to_string(),
// 去掉一级路由后的部分
goto_route:"".to_string(),
}
}
async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_error::Result<bool> where Self::CTX: Send + Sync {
log_summary(_session,"request_filter");
// 获取一级路由
let path = _session.req_header().uri.path_and_query().unwrap().to_string();
let path_arr = path.strip_prefix("/").unwrap().split_once("/").unwrap();
// 如果没有就返回不再继续
if !self.where_go_to.contains_key(path_arr.0) {
return Ok(true)
}
// 如果又就存入上下文
_ctx.inner_route = path_arr.0.to_string();
_ctx.goto_route = path_arr.1.to_string();
Ok(false)
}
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> pingora_error::Result<Box<HttpPeer>> {
log_summary(session,"upstream_peer");
// 从map中获取转发目标
let peer = self.where_go_to.get(&ctx.inner_route).unwrap();
let peer_addr = &peer.0;
let peer_port = peer.1;
// 构建peer
let bp = Box::new(HttpPeer::new((String::from(peer_addr),peer_port),false,"1.1.1.1".to_string()));
Ok(bp)
}
async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
// 把转发路径改为去掉一级路由的部分
let path = &_ctx.goto_route;
let v = PathAndQuery::from_str(&path).unwrap();
_upstream_request.set_uri(Uri::from(v));
Ok(())
}
async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> pingora_error::Result<()> where Self::CTX: Send + Sync {
Ok(())
}
async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX) where Self::CTX: Send + Sync {
log_summary(_session,"logging");
// 如果为空就是从request_filter 404跳过来的
if _ctx.goto_route.is_empty() {
// 准备ResponseHeader
let mut req_h = ResponseHeader::build(404,Some(6)).unwrap();
// 返回数据
let b = Bytes::from("can not find proxy path");
req_h.append_header("Content-Length",b.len()).unwrap();
// 一定要write_response_header 写准备ResponseHeader 才能把后面的body写入响应
_session.write_response_header(Box::new(req_h)).await.unwrap();
_session.write_response_body(b).await.unwrap();
_session.set_keepalive(None);
info!("==>is write{:?}",_session.response_written());
_session.finish_body().await.unwrap();
}
}
}
动态路由转发
# 实现效果
# 1.check route exits ?
curl --request POST --url 'http://127.0.0.1:18081/a/post?a=3' --header 'Accept: *' --header 'Content-Type: *'
# 2. add route
curl --request GET \
--url http://127.0.0.1:8989/ \
--header 'Content-Type: application/json' \
--header 'User-Agent: insomnia/2023.5.8' \
--data '{
"route":"a",
"addr":"127.0.0.1",
"port":8080
}'
# 3. check again
curl --request POST --url 'http://127.0.0.1:18081/a/post?a=3' --header 'Accept: *' --header 'Content-Type: *'
pub struct MutRouteProxy {
// 因为所有权问题以及并发等问题 使用了moka的sync cache
where_go_to:Cache<String,PeerAddr>,
}
// http proxy 部分没有改变
// 添加了一个admin app 用于添加路由
#[async_trait]
impl ServeHttp for AdminApp {
async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>> {
let read_timeout = 2000;
let body = match timeout(
Duration::from_millis(read_timeout),
http_session.read_request_body(),
)
.await
{
Ok(res) => res.unwrap().unwrap_or_else(|| Bytes::from("no body!")),
Err(_) => {
panic!("Timed out after {:?}ms", read_timeout);
}
};
let v:NewRoute= serde_json::from_slice(body.to_vec().as_slice()).unwrap();
let route = v.route;
let addr = v.addr;
let port = v.port;
info!("new route ==={route} > {addr}:{port}");
self.map.insert(route,PeerAddr(addr,port));
let ret = Bytes::from("ok");
Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "text/html")
.header(http::header::CONTENT_LENGTH, ret.len())
.body(ret.to_vec())
.unwrap()
}
}
代码仓库
GITHUBpo-hproxy
ATOMpo-hproxy