Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]In the streaming mode of grpc, docking with jwt failed #3405

Closed
zzwl0219 opened this issue Aug 23, 2024 · 7 comments
Closed

[Question]In the streaming mode of grpc, docking with jwt failed #3405

zzwl0219 opened this issue Aug 23, 2024 · 7 comments
Labels
question Further information is requested

Comments

@zzwl0219
Copy link

在grpc的流模式下,发送jwt,服务端无法从context中获取对应的metadata

客户端代码
1、启动grpc连接
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithMiddleware(
metadata.Client(),
),
grpc.WithEndpoint(net.JoinHostPort(global.BootConfig.Node.ServerHost, global.BootConfig.Node.ServerPort)),
//流式grpc增加中间处理流程,普通grpc无效
grpc.WithStreamInterceptor(JwtStreamClientInterceptor(global.BootConfig.Node.NodeId, global.BootConfig.Node.JwtSecret)),
)
if err != nil {
global.LogUtil.Error(global.LogErrCode, err)
return err
}
c.client = serverPB.NewNodeServerClient(conn)
stream, err := c.client.MessageChannel(ctx)

2、过滤器开发
func JwtStreamClientInterceptor(nodeId, tokenSecret string) grpc.StreamClientInterceptor {
return func(ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 将 JWT 添加到元数据中
ips, _ := utils.GetLocalIPs()
macs, _ := utils.GetMacAddresses()
agentInfo := dto.AgentInfo{
LocalIp: strings.Join(ips, ""),
Mac: strings.Join(macs, ","),
Id: nodeId,
}
token, err := tokenEncode(agentInfo, tokenSecret)
if err != nil {
global.LogUtil.Error(global.LogErrCode, err)
return nil, err
}
// 使用生成的token替换硬编码的字符串
md := metadata.Pairs("x-md-global-authorization", fmt.Sprintf("Bearer %s", token))

	// 打印md以确认是否包含正确的授权头
	fmt.Printf("Metadata: %v\n", md)
	ctxNew := metadata.NewOutgoingContext(ctx, md)
	return streamer(ctxNew, desc, cc, method, opts...)
}

}

3、服务端启动服务
func NewGRPCServer(c *conf.Bootstrap, nodeService *service.NodeService, logger log.Logger) *grpc.Server {
var opts = []grpc.ServerOption{
grpc.Middleware(
//middlewares.JwtMiddleware(c.Node.JwtSecret),
metadata.Server(),
recovery.Recovery(),
),
grpc.StreamInterceptor(middlewares.JwtStreamInterceptor(c.Node.JwtSecret)),
}
if c.Server.Grpc.Network != "" {
opts = append(opts, grpc.Network(c.Server.Grpc.Network))
}
if c.Server.Grpc.Addr != "" {
opts = append(opts, grpc.Address(c.Server.Grpc.Addr))
}
if c.Server.Grpc.Timeout != nil {
opts = append(opts, grpc.Timeout(c.Server.Grpc.Timeout.AsDuration()))
}
//opts = append(opts, grpc.Middleware())
srv := grpc.NewServer(opts...)
v1.RegisterNodeServerServer(srv, nodeService)
return srv
}
4、服务端过滤器
func JwtStreamInterceptor(jwtSecret string) grpc.StreamServerInterceptor {
return func(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctx := stream.Context()
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Errorf(codes.InvalidArgument, "missing metadata")
}
j, _ := json.MarshalIndent(md, "", " ")
fmt.Println(string(j))

	return handler(srv, &wrappedServerStream{stream, ctxNew})
}

}
运行起来服务端打印的metadata 是
{
":authority": [
"127.0.0.1:9091"
],
"content-type": [
"application/grpc"
],
"user-agent": [
"grpc-go/1.65.0"
]
}
没有我在客户端添加的数据,如果常规的grpc,是可以成功的,请问这是什么原因

@zzwl0219 zzwl0219 added the question Further information is requested label Aug 23, 2024
@kratos-ci-bot kratos-ci-bot changed the title [Question]grpc的流模式下,对接jwt失败 [Question]In the streaming mode of grpc, docking with jwt failed Aug 23, 2024
@shenqidebaozi
Copy link
Member

Have you ever tried testing directly with gRPC instead of Kratos gRPC.

@zzwl0219
Copy link
Author

Have you ever tried testing directly with gRPC instead of Kratos gRPC.

I didn't test grpc, just use kratos grpc.

@zzwl0219
Copy link
Author

我用原生grpc去实现流模式下的jwt传递参数,服务端可以通过metadata接受到jwt信息
{
":authority": [
"localhost:9021"
],
"content-type": [
"application/grpc"
],
"user-agent": [
"grpc-go/1.65.0"
],
"x-md-global-authorization": [
"Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjoiTW1adGVMVHIwZmlqLyt6Mzdpd3BGbnNFUWV6cTdJV3MrZXYwOS9hMiIsImV4cCI6MTcyNDgxMTY2OX0.Su9FpFMu6kPFi1D9ItCWQil-96PvPjYWxxhn2eraDgs"
]
}
所以我现在不确定kratos为什么接受不到

@zzwl0219
Copy link
Author

这是我这边的代码,服务端接受不到metadata,代码中包含server和client
server.zip

@kratos-ci-bot
Copy link
Collaborator

Bot detected the issue body's language is not English, translate it automatically. 👯👭🏻🧑‍🤝‍🧑👫🧑🏿‍🤝‍🧑🏻👩🏾‍🤝‍👨🏿👬🏿


This is my code. The server cannot accept metadata. The code contains server and client.
server.zip

@clement2026
Copy link

@zzwl0219
This should fix the issue:

func isHealthCheck(method string) bool {
	return strings.HasPrefix(method, "/grpc.health.v1.Health/")
}

func JwtStreamInterceptor(jwtSecret string) grpc.StreamServerInterceptor {
	return func(
		srv interface{},
		stream grpc.ServerStream,
		info *grpc.StreamServerInfo,
		handler grpc.StreamHandler,
	) error {
                 // HealthCheck requests don't include the "x-md-global-authorization" header
		if isHealthCheck(info.FullMethod) {
			return handler(srv, stream)
		}
		ctx := stream.Context()
		ctxNew, err := JwtParse(ctx, jwtSecret)
		if err != nil {
			return err
		}
		return handler(srv, &wrappedServerStream{stream, ctxNew})
	}
}

Explanation

Kratos sets up a healthcheck server when it starts:

if !srv.customHealth {
grpc_health_v1.RegisterHealthServer(srv.Server, srv.health)
}

When a grpc.ClientConn is created, it initiates a watch stream like any other stream. This stream goes through your JwtStreamInterceptor without the "x-md-global-authorization" header, causing an error that closes the grpc.ClientConn. This happens before the cc.SayHello(context.Background()) request is handled by the server.

image

Copy link

dosubot bot commented Dec 31, 2024

Hi, @zzwl0219. I'm Dosu, and I'm helping the Kratos team manage their backlog. I'm marking this issue as stale.

Issue Summary:

  • The issue involved gRPC's streaming mode where the server couldn't retrieve JWT metadata from the context using Kratos gRPC.
  • You confirmed that the problem did not occur with native gRPC.
  • @clement2026 provided a solution with a JwtStreamInterceptor function to handle JWT metadata correctly.
  • The solution included bypassing health check requests that do not include the JWT header.

Next Steps:

  • Please confirm if this issue is still relevant to the latest version of the Kratos repository by commenting on this issue.
  • If there is no response, the issue will be automatically closed in 7 days.

Thank you for your understanding and contribution!

@dosubot dosubot bot added the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label Dec 31, 2024
@dosubot dosubot bot closed this as not planned Won't fix, can't repro, duplicate, stale Jan 7, 2025
@dosubot dosubot bot removed the stale Issue has not had recent activity or appears to be solved. Stale issues will be automatically closed label Jan 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants