From 837dfeb46f777bd08947a6b24b83fbf643d1d688 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 23 May 2023 16:24:15 +0800 Subject: [PATCH] feat(exproto): add raw_publish function --- .../priv/protos/exproto.proto | 11 ++++++ .../src/emqx_exproto_channel.erl | 32 ++++++++++++----- .../src/emqx_exproto_gsvr.erl | 14 ++++++++ .../test/emqx_exproto_SUITE.erl | 36 +++++++++++++++++++ .../test/emqx_exproto_echo_svr.erl | 26 +++++++++++++- .../test/emqx_exproto_unary_echo_svr.erl | 1 + 6 files changed, 110 insertions(+), 10 deletions(-) diff --git a/apps/emqx_gateway_exproto/priv/protos/exproto.proto b/apps/emqx_gateway_exproto/priv/protos/exproto.proto index 8446307e9..b1aa97f26 100644 --- a/apps/emqx_gateway_exproto/priv/protos/exproto.proto +++ b/apps/emqx_gateway_exproto/priv/protos/exproto.proto @@ -41,6 +41,8 @@ service ConnectionAdapter { rpc Subscribe(SubscribeRequest) returns (CodeResponse) {}; rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {}; + + rpc RawPublish(RawPublishRequest) returns (CodeResponse) {}; } // Deprecated service. @@ -165,6 +167,15 @@ message PublishRequest { bytes payload = 4; } +message RawPublishRequest { + + string topic = 1; + + uint32 qos = 2; + + bytes payload = 3; +} + message SubscribeRequest { string conn = 1; diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index 2e5306f6b..648943d56 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -297,7 +297,9 @@ handle_timeout( NChannel = remove_timer_ref(alive_timer, Channel), %% close connection if keepalive timeout Replies = [{event, disconnected}, {close, keepalive_timeout}], - NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{closed_reason = keepalive_timeout}), + NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{ + closed_reason = keepalive_timeout + }), {ok, Replies, NChannel1} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> @@ -474,10 +476,21 @@ handle_call(kick, _From, Channel) -> {reply, ok, [{event, disconnected}, {close, kicked}], Channel}; handle_call(discard, _From, Channel) -> {shutdown, discarded, ok, Channel}; -handle_call(Req, _From, Channel) -> +handle_call( + Req, + _From, + Channel = #channel{ + conn_state = ConnState, + clientinfo = ClientInfo, + closed_reason = ClosedReason + } +) -> ?SLOG(warning, #{ msg => "unexpected_call", - call => Req + call => Req, + conn_state => ConnState, + clientid => maps:get(clientid, ClientInfo, undefined), + closed_reason => ClosedReason }), {reply, {error, unexpected_call}, Channel}. @@ -505,12 +518,13 @@ handle_info( {shutdown, Reason, Channel1}; _ -> %% delayed close process for flushing all callback funcs to gRPC server - Channel1 = case ClosedReason of - undefined -> - Channel#channel{closed_reason = Reason}; - _ -> - Channel - end, + Channel1 = + case ClosedReason of + undefined -> + Channel#channel{closed_reason = Reason}; + _ -> + Channel + end, Channel2 = ensure_timer(force_timer, Channel1), {ok, ensure_disconnected(Reason, Channel2)} end; diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl index e048fd73e..043c910da 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl @@ -33,6 +33,7 @@ authenticate/2, start_timer/2, publish/2, + raw_publish/2, subscribe/2, unsubscribe/2 ]). @@ -129,6 +130,19 @@ publish(Req, Md) -> }), {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}. +-spec raw_publish(emqx_exproto_pb:raw_publish_request(), grpc:metadata()) -> + {ok, emqx_exproto_pb:code_response(), grpc:metadata()} + | {error, grpc_stream:error_response()}. +raw_publish(Req = #{topic := Topic, qos := Qos, payload := Payload}, Md) -> + ?SLOG(debug, #{ + msg => "recv_grpc_function_call", + function => ?FUNCTION_NAME, + request => Req + }), + Msg = emqx_message:make(exproto, Qos, Topic, Payload), + _ = emqx_broker:safe_publish(Msg), + {ok, response(ok), Md}. + -spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata()) -> {ok, emqx_exproto_pb:code_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl index cc9226658..91481cb91 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl @@ -31,6 +31,7 @@ frame_connect/2, frame_connack/1, frame_publish/3, + frame_raw_publish/3, frame_puback/1, frame_subscribe/2, frame_suback/1, @@ -86,6 +87,7 @@ groups() -> MainCases = [ t_keepalive_timeout, t_mountpoint_echo, + t_raw_publish, t_auth_deny, t_acl_deny, t_hook_connected_disconnected, @@ -229,6 +231,40 @@ t_mountpoint_echo(Cfg) -> end, close(Sock). +t_raw_publish(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + mountpoint => <<"ct/">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + PubBin2 = frame_raw_publish(<<"t/up">>, 0, <<"echo">>), + PubAckBin = frame_puback(0), + + %% mountpoint is not used in raw publish + emqx:subscribe(<<"t/up">>), + + send(Sock, PubBin2), + {ok, PubAckBin} = recv(Sock, 5000), + + receive + {deliver, _, _} -> ok + after 1000 -> + error(echo_not_running) + end, + close(Sock). + t_auth_deny(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl index ac01c8964..0c1b0def7 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl @@ -27,6 +27,7 @@ frame_connect/2, frame_connack/1, frame_publish/3, + frame_raw_publish/3, frame_puback/1, frame_subscribe/2, frame_suback/1, @@ -55,6 +56,7 @@ -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). -define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). -define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). +-define(raw_publish(Req), ?CLIENT:raw_publish(Req, #{channel => ct_test_channel})). -define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). -define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). @@ -67,6 +69,7 @@ -define(TYPE_UNSUBSCRIBE, 7). -define(TYPE_UNSUBACK, 8). -define(TYPE_DISCONNECT, 9). +-define(TYPE_RAW_PUBLISH, 10). -define(loop_recv_and_reply_empty_success(Stream), ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end) @@ -267,6 +270,17 @@ handle_in(Conn, ?TYPE_PUBLISH, #{ _ -> handle_out(Conn, ?TYPE_PUBACK, 1) end; +handle_in(Conn, ?TYPE_RAW_PUBLISH, #{ + <<"topic">> := Topic, + <<"qos">> := Qos, + <<"payload">> := Payload +}) -> + case ?raw_publish(#{topic => Topic, qos => Qos, payload => Payload}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_PUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_PUBACK, 1) + end; handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) -> case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of {ok, #{code := 'SUCCESS'}, _} -> @@ -293,7 +307,9 @@ handle_out(Conn, ?TYPE_SUBACK, Code) -> handle_out(Conn, ?TYPE_UNSUBACK, Code) -> ?send(#{conn => Conn, bytes => frame_unsuback(Code)}); handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> - ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}). + ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}); +handle_out(Conn, ?TYPE_RAW_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> + ?send(#{conn => Conn, bytes => frame_raw_publish(Topic, Qos, Payload)}). handle_out(Conn, ?TYPE_DISCONNECT) -> ?send(#{conn => Conn, bytes => frame_disconnect()}). @@ -318,6 +334,14 @@ frame_publish(Topic, Qos, Payload) -> payload => Payload }). +frame_raw_publish(Topic, Qos, Payload) -> + emqx_utils_json:encode(#{ + type => ?TYPE_RAW_PUBLISH, + topic => Topic, + qos => Qos, + payload => Payload + }). + frame_puback(Code) -> emqx_utils_json:encode(#{type => ?TYPE_PUBACK, code => Code}). diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl index 8d498c3ed..19bbdb6c6 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl @@ -50,6 +50,7 @@ -define(TYPE_UNSUBSCRIBE, 7). -define(TYPE_UNSUBACK, 8). -define(TYPE_DISCONNECT, 9). +-define(TYPE_RAW_PUBLISH, 10). %%-------------------------------------------------------------------- %% callbacks