diff --git a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl index 149e6acd6..bb687ae58 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl @@ -249,7 +249,7 @@ t_case_stomp(_) -> t_case_exproto(_) -> Mod = emqx_exproto_SUITE, SvrMod = emqx_exproto_echo_svr, - Svrs = SvrMod:start(), + Svrs = SvrMod:start(http), Login = fun(Username, Password, Expect) -> with_resource( ?FUNCTOR(Mod:open(tcp)), diff --git a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl index c62e840df..1934ce4ad 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl @@ -332,7 +332,7 @@ t_case_sn_subscribe(_) -> t_case_exproto_publish(_) -> Mod = emqx_exproto_SUITE, SvrMod = emqx_exproto_echo_svr, - Svrs = SvrMod:start(), + Svrs = SvrMod:start(http), Payload = <<"publish with authz">>, Publish = fun(Topic, Checker) -> with_resource( @@ -369,7 +369,7 @@ t_case_exproto_publish(_) -> t_case_exproto_subscribe(_) -> Mod = emqx_exproto_SUITE, SvrMod = emqx_exproto_echo_svr, - Svrs = SvrMod:start(), + Svrs = SvrMod:start(http), WaitTime = 5000, Sub = fun(Topic, ErrorCode) -> with_resource( diff --git a/apps/emqx_gateway_exproto/.gitignore b/apps/emqx_gateway_exproto/.gitignore index 922b0f989..e0c871ab7 100644 --- a/apps/emqx_gateway_exproto/.gitignore +++ b/apps/emqx_gateway_exproto/.gitignore @@ -22,3 +22,5 @@ src/emqx_exproto_v_1_connection_adapter_bhvr.erl src/emqx_exproto_v_1_connection_adapter_client.erl src/emqx_exproto_v_1_connection_handler_bhvr.erl src/emqx_exproto_v_1_connection_handler_client.erl +src/emqx_exproto_v_1_connection_unary_handler_bhvr.erl +src/emqx_exproto_v_1_connection_unary_handler_client.erl diff --git a/apps/emqx_gateway_exproto/priv/protos/exproto.proto b/apps/emqx_gateway_exproto/priv/protos/exproto.proto index ccdebff43..b1aa97f26 100644 --- a/apps/emqx_gateway_exproto/priv/protos/exproto.proto +++ b/apps/emqx_gateway_exproto/priv/protos/exproto.proto @@ -41,8 +41,12 @@ service ConnectionAdapter { rpc Subscribe(SubscribeRequest) returns (CodeResponse) {}; rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {}; + + rpc RawPublish(RawPublishRequest) returns (CodeResponse) {}; } +// Deprecated service. +// Please using `ConnectionUnaryHandler` to replace it service ConnectionHandler { // -- socket layer @@ -60,6 +64,32 @@ service ConnectionHandler { rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {}; } +// This service is an optimization of `ConnectionHandler`. +// In the initial version, we expected to use streams to improve the efficiency +// of requests. But unfortunately, events between different streams are out of +// order. it causes the `OnSocketCreated` event to may arrive later than `OnReceivedBytes`. +// +// So we added the `ConnectionUnaryHandler` service since v4.3.21/v4.4.10 and forced +// the use of Unary in it to avoid ordering problems. +// +// Recommend using `ConnectionUnaryHandler` to replace `ConnectionHandler` +service ConnectionUnaryHandler { + + // -- socket layer + + rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {}; + + rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {}; + + // -- pub/sub layer + + rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {}; + + rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {}; +} + message EmptySuccess { } enum ResultCode { @@ -137,6 +167,15 @@ message PublishRequest { bytes payload = 4; } +message RawPublishRequest { + + string topic = 1; + + uint32 qos = 2; + + bytes payload = 3; +} + message SubscribeRequest { string conn = 1; diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index 3b2c8d73b..648943d56 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -45,7 +45,7 @@ %% Context ctx :: emqx_gateway_ctx:context(), %% gRPC channel options - gcli :: map(), + gcli :: emqx_exproto_gcli:grpc_client_state(), %% Conn info conninfo :: emqx_types:conninfo(), %% Client info from `register` function @@ -54,10 +54,6 @@ conn_state :: conn_state(), %% Subscription subscriptions = #{}, - %% Request queue - rqueue = queue:new(), - %% Inflight function name - inflight = undefined, %% Keepalive keepalive :: maybe(emqx_keepalive:keepalive()), %% Timers @@ -150,9 +146,11 @@ init( }, Options ) -> + GRpcChann = maps:get(grpc_client_channel, Options), + ServiceName = maps:get(grpc_client_service_name, Options), + GRpcClient = emqx_exproto_gcli:init(ServiceName, #{channel => GRpcChann}), + Ctx = maps:get(ctx, Options), - GRpcChann = maps:get(handler, Options), - PoolName = maps:get(pool_name, Options), IdleTimeout = emqx_gateway_utils:idle_timeout(Options), NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}), @@ -170,7 +168,7 @@ init( }, Channel = #channel{ ctx = Ctx, - gcli = #{channel => GRpcChann, pool_name => PoolName}, + gcli = GRpcClient, conninfo = NConnInfo, clientinfo = ClientInfo, conn_state = connecting, @@ -188,9 +186,7 @@ init( } ) }, - start_idle_checking_timer( - try_dispatch(on_socket_created, wrap(Req), Channel) - ). + dispatch(on_socket_created, Req, start_idle_checking_timer(Channel)). %% @private peercert(NoSsl, ConnInfo) when @@ -239,7 +235,7 @@ start_idle_checking_timer(Channel) -> | {shutdown, Reason :: term(), channel()}. handle_in(Data, Channel) -> Req = #{bytes => Data}, - {ok, try_dispatch(on_received_bytes, wrap(Req), Channel)}. + {ok, dispatch(on_received_bytes, Req, Channel)}. -spec handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} @@ -276,7 +272,7 @@ handle_deliver( Delivers ), Req = #{messages => Msgs}, - {ok, try_dispatch(on_received_messages, wrap(Req), Channel)}. + {ok, dispatch(on_received_messages, Req, Channel)}. -spec handle_timeout(reference(), Msg :: term(), channel()) -> {ok, channel()} @@ -301,10 +297,13 @@ handle_timeout( NChannel = remove_timer_ref(alive_timer, Channel), %% close connection if keepalive timeout Replies = [{event, disconnected}, {close, keepalive_timeout}], - {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} + NChannel1 = dispatch(on_timer_timeout, Req, NChannel#channel{ + closed_reason = keepalive_timeout + }), + {ok, Replies, NChannel1} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> - {shutdown, {error, {force_close, Reason}}, Channel}; + {shutdown, Reason, Channel}; handle_timeout(_TRef, force_close_idle, Channel) -> {shutdown, idle_timeout, Channel}; handle_timeout(_TRef, Msg, Channel) -> @@ -331,10 +330,20 @@ handle_call( Channel = #channel{conn_state = connected} ) -> ?SLOG(warning, #{ - msg => "ingore_duplicated_authorized_command", + msg => "ingore_duplicated_authenticate_command", request_clientinfo => ClientInfo }), {reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; +handle_call( + {auth, ClientInfo, _Password}, + _From, + Channel = #channel{conn_state = disconnected} +) -> + ?SLOG(warning, #{ + msg => "authenticate_command_after_socket_disconnected", + request_clientinfo => ClientInfo + }), + {reply, {error, ?RESP_PERMISSION_DENY, <<"Client socket disconnected">>}, Channel}; handle_call( {auth, ClientInfo0, Password}, _From, @@ -467,10 +476,21 @@ handle_call(kick, _From, Channel) -> {reply, ok, [{event, disconnected}, {close, kicked}], Channel}; handle_call(discard, _From, Channel) -> {shutdown, discarded, ok, Channel}; -handle_call(Req, _From, Channel) -> +handle_call( + Req, + _From, + Channel = #channel{ + conn_state = ConnState, + clientinfo = ClientInfo, + closed_reason = ClosedReason + } +) -> ?SLOG(warning, #{ msg => "unexpected_call", - call => Req + call => Req, + conn_state => ConnState, + clientid => maps:get(clientid, ClientInfo, undefined), + closed_reason => ClosedReason }), {reply, {error, unexpected_call}, Channel}. @@ -490,32 +510,50 @@ handle_cast(Req, Channel) -> | {shutdown, Reason :: term(), channel()}. handle_info( {sock_closed, Reason}, - Channel = #channel{rqueue = Queue, inflight = Inflight} + Channel = #channel{gcli = GClient, closed_reason = ClosedReason} ) -> - case - queue:len(Queue) =:= 0 andalso - Inflight =:= undefined - of + case emqx_exproto_gcli:is_empty(GClient) of true -> - Channel1 = ensure_disconnected({sock_closed, Reason}, Channel), + Channel1 = ensure_disconnected(Reason, Channel), {shutdown, Reason, Channel1}; _ -> %% delayed close process for flushing all callback funcs to gRPC server - Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}}, + Channel1 = + case ClosedReason of + undefined -> + Channel#channel{closed_reason = Reason}; + _ -> + Channel + end, Channel2 = ensure_timer(force_timer, Channel1), - {ok, ensure_disconnected({sock_closed, Reason}, Channel2)} + {ok, ensure_disconnected(Reason, Channel2)} end; -handle_info({hreply, on_socket_created, ok}, Channel) -> - dispatch_or_close_process(Channel#channel{inflight = undefined}); -handle_info({hreply, FunName, ok}, Channel) when - FunName == on_socket_closed; - FunName == on_received_bytes; - FunName == on_received_messages; - FunName == on_timer_timeout +handle_info( + {hreply, FunName, Result}, + Channel0 = #channel{gcli = GClient0, timers = Timers} +) when + FunName =:= on_socket_created; + FunName =:= on_socket_closed; + FunName =:= on_received_bytes; + FunName =:= on_received_messages; + FunName =:= on_timer_timeout -> - dispatch_or_close_process(Channel#channel{inflight = undefined}); -handle_info({hreply, FunName, {error, Reason}}, Channel) -> - {shutdown, {error, {FunName, Reason}}, Channel}; + GClient = emqx_exproto_gcli:ack(FunName, GClient0), + Channel = Channel0#channel{gcli = GClient}, + + ShutdownNow = + emqx_exproto_gcli:is_empty(GClient) andalso + maps:get(force_timer, Timers, undefined) =/= undefined, + case Result of + ok when not ShutdownNow -> + GClient1 = emqx_exproto_gcli:maybe_shoot(GClient), + {ok, Channel#channel{gcli = GClient1}}; + ok when ShutdownNow -> + Channel1 = cancel_timer(force_timer, Channel), + {shutdown, Channel1#channel.closed_reason, Channel1}; + {error, Reason} -> + {shutdown, {error, {FunName, Reason}}, Channel} + end; handle_info({subscribe, _}, Channel) -> {ok, Channel}; handle_info(Info, Channel) -> @@ -528,7 +566,8 @@ handle_info(Info, Channel) -> -spec terminate(any(), channel()) -> channel(). terminate(Reason, Channel) -> Req = #{reason => stringfy(Reason)}, - try_dispatch(on_socket_closed, wrap(Req), Channel). + %% XXX: close streams? + dispatch(on_socket_closed, Req, Channel). %%-------------------------------------------------------------------- %% Sub/UnSub @@ -705,34 +744,10 @@ interval(alive_timer, #channel{keepalive = Keepalive}) -> %% Dispatch %%-------------------------------------------------------------------- -wrap(Req) -> - Req#{conn => base64:encode(term_to_binary(self()))}. - -dispatch_or_close_process( - Channel = #channel{ - rqueue = Queue, - inflight = undefined, - gcli = GClient - } -) -> - case queue:out(Queue) of - {empty, _} -> - case Channel#channel.conn_state of - disconnected -> - {shutdown, Channel#channel.closed_reason, Channel}; - _ -> - {ok, Channel} - end; - {{value, {FunName, Req}}, NQueue} -> - emqx_exproto_gcli:async_call(FunName, Req, GClient), - {ok, Channel#channel{inflight = FunName, rqueue = NQueue}} - end. - -try_dispatch(FunName, Req, Channel = #channel{inflight = undefined, gcli = GClient}) -> - emqx_exproto_gcli:async_call(FunName, Req, GClient), - Channel#channel{inflight = FunName}; -try_dispatch(FunName, Req, Channel = #channel{rqueue = Queue}) -> - Channel#channel{rqueue = queue:in({FunName, Req}, Queue)}. +dispatch(FunName, Req, Channel = #channel{gcli = GClient}) -> + Req1 = Req#{conn => base64:encode(term_to_binary(self()))}, + NGClient = emqx_exproto_gcli:maybe_shoot(FunName, Req1, GClient), + Channel#channel{gcli = NGClient}. %%-------------------------------------------------------------------- %% Format diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl index 34883cdce..639e3bfe9 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl @@ -14,161 +14,197 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% the gRPC client worker for ConnectionHandler service +%% the gRPC client agent for ConnectionHandler service -module(emqx_exproto_gcli). --behaviour(gen_server). - -include_lib("emqx/include/logger.hrl"). -%% APIs --export([async_call/3]). +-logger_header("[ExProtoGCli]"). --export([start_link/2]). - -%% gen_server callbacks -export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 + init/2, + maybe_shoot/1, + maybe_shoot/3, + ack/2, + is_empty/1 ]). --record(state, { - pool, - id, - streams -}). +-define(CONN_HANDLER_MOD, emqx_exproto_v_1_connection_handler_client). +-define(CONN_UNARY_HANDLER_MOD, emqx_exproto_v_1_connection_unary_handler_client). --define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client). +-type service_name() :: 'ConnectionUnaryHandler' | 'ConnectionHandler'. + +-type grpc_client_state() :: + #{ + owner := pid(), + service_name := service_name(), + client_opts := options(), + queue := queue:queue(), + inflight := atom() | undefined, + streams => map(), + middleman => pid() | undefined + }. + +-type options() :: + #{channel := term()}. %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -start_link(Pool, Id) -> - gen_server:start_link( - {local, emqx_utils:proc_name(?MODULE, Id)}, - ?MODULE, - [Pool, Id], - [] - ). +-spec init(service_name(), options()) -> grpc_client_state(). +init(ServiceName, Options) -> + #{ + owner => self(), + service_name => ServiceName, + client_opts => Options, + queue => queue:new(), + inflight => undefined + }. --spec async_call(atom(), map(), map()) -> ok. -async_call( - FunName, - Req = #{conn := Conn}, - Options = #{pool_name := PoolName} -) -> - case pick(PoolName, Conn) of +-spec maybe_shoot(atom(), map(), grpc_client_state()) -> grpc_client_state(). +maybe_shoot(FunName, Req, GState = #{inflight := undefined}) -> + shoot(FunName, Req, GState); +maybe_shoot(FunName, Req, GState) -> + enqueue(FunName, Req, GState). + +-spec maybe_shoot(grpc_client_state()) -> grpc_client_state(). +maybe_shoot(GState = #{inflight := undefined, queue := Q}) -> + case queue:is_empty(Q) of + true -> + GState; false -> - reply(self(), FunName, {error, no_available_grpc_client}); - Pid when is_pid(Pid) -> - cast(Pid, {rpc, FunName, Req, Options, self()}) - end, - ok. - -%%-------------------------------------------------------------------- -%% cast, pick -%%-------------------------------------------------------------------- - --compile({inline, [cast/2, pick/2]}). - -cast(Deliver, Msg) -> - gen_server:cast(Deliver, Msg). - --spec pick(term(), term()) -> pid() | false. -pick(PoolName, Conn) -> - gproc_pool:pick_worker(PoolName, Conn). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Pool, Id]) -> - true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id, streams = #{}}}. - -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) -> - case ensure_stream_opened(Fun, Options, Streams) of - {error, Reason} -> - ?SLOG(error, #{ - msg => "request_grpc_server_failed", - function => {?CONN_ADAPTER_MOD, Fun, Options}, - reason => Reason - }), - reply(From, Fun, {error, Reason}), - {noreply, State#state{streams = Streams#{Fun => undefined}}}; - {ok, Stream} -> - case catch grpc_client:send(Stream, Req) of - ok -> - ?SLOG(debug, #{ - msg => "send_grpc_request_succeed", - function => {?CONN_ADAPTER_MOD, Fun}, - request => Req - }), - reply(From, Fun, ok), - {noreply, State#state{streams = Streams#{Fun => Stream}}}; - {'EXIT', {not_found, _Stk}} -> - %% Not found the stream, reopen it - ?SLOG(info, #{ - msg => "cannt_find_old_stream_ref", - function => {?CONN_ADAPTER_MOD, Fun} - }), - handle_cast( - {rpc, Fun, Req, Options, From}, - State#state{streams = maps:remove(Fun, Streams)} - ); - {'EXIT', {timeout, _Stk}} -> - ?SLOG(error, #{ - msg => "send_grpc_request_timeout", - function => {?CONN_ADAPTER_MOD, Fun}, - request => Req - }), - reply(From, Fun, {error, timeout}), - {noreply, State#state{streams = Streams#{Fun => Stream}}}; - {'EXIT', {Reason1, Stk}} -> - ?SLOG(error, #{ - msg => "send_grpc_request_failed", - function => {?CONN_ADAPTER_MOD, Fun}, - request => Req, - error => Reason1, - stacktrace => Stk - }), - reply(From, Fun, {error, Reason1}), - {noreply, State#state{streams = Streams#{Fun => undefined}}} - end + {{value, {FunName, Req}}, Q1} = queue:out(Q), + shoot(FunName, Req, GState#{queue => Q1}) end. -handle_info(_Info, State) -> - {noreply, State}. +-spec ack(atom(), grpc_client_state()) -> grpc_client_state(). +ack(FunName, GState = #{inflight := FunName}) -> + GState#{inflight => undefined, middleman => undefined}; +ack(_, _) -> + error(badarg). -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +-spec is_empty(grpc_client_state()) -> boolean(). +is_empty(#{queue := Q, inflight := Inflight}) -> + Inflight == undefined andalso queue:is_empty(Q). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- -reply(Pid, Fun, Result) -> - Pid ! {hreply, Fun, Result}, - ok. +enqueue(FunName, Req, GState = #{queue := Q}) -> + GState#{queue => queue:in({FunName, Req}, Q)}. -ensure_stream_opened(Fun, Options, Streams) -> - case maps:get(Fun, Streams, undefined) of +shoot(FunName, Req, GState) -> + ServiceName = maps:get(service_name, GState), + shoot(ServiceName, FunName, Req, GState). + +shoot( + 'ConnectionUnaryHandler', + FunName, + Req, + GState = #{owner := Owner, client_opts := Options} +) -> + Pid = + spawn( + fun() -> + try + Result = request(FunName, Req, Options), + hreply(Owner, Result, FunName) + catch + T:R:Stk -> + hreply(Owner, {error, {{T, R}, Stk}}, FunName) + end + end + ), + GState#{inflight => FunName, middleman => Pid}; +shoot( + 'ConnectionHandler', + FunName, + Req, + GState +) -> + GState1 = streaming(FunName, Req, GState), + GState1#{inflight => FunName}. + +%%-------------------------------------------------------------------- +%% streaming + +streaming( + FunName, + Req, + GState = #{owner := Owner, client_opts := Options} +) -> + Streams = maps:get(streams, GState, #{}), + case ensure_stream_opened(FunName, Options, Streams) of + {error, Reason} -> + ?SLOG(error, #{ + msg => "request_grpc_server_failed", + function => {FunName, Options}, + reason => Reason + }), + hreply(Owner, {error, Reason}, FunName), + {ok, GState}; + {ok, Stream} -> + case catch grpc_client:send(Stream, Req) of + ok -> + ?SLOG(debug, #{ + msg => "send_grpc_request_succeed", + function => FunName, + request => Req + }), + hreply(Owner, ok, FunName), + GState#{streams => Streams#{FunName => Stream}}; + {'EXIT', {not_found, _Stk}} -> + %% Not found the stream, reopen it + ?SLOG(info, #{ + msg => "cannt_find_old_stream_ref", + function => FunName + }), + streaming(FunName, Req, GState#{streams => maps:remove(FunName, Streams)}); + {'EXIT', {timeout, _Stk}} -> + ?SLOG(error, #{ + msg => "send_grpc_request_timeout", + function => FunName, + request => Req + }), + hreply(Owner, {error, timeout}, FunName), + GState; + {'EXIT', {Reason1, Stk}} -> + ?SLOG(error, #{ + msg => "send_grpc_request_failed", + function => FunName, + request => Req, + error => Reason1, + stacktrace => Stk + }), + hreply(Owner, {error, Reason1}, FunName), + GState + end + end. + +ensure_stream_opened(FunName, Options, Streams) -> + case maps:get(FunName, Streams, undefined) of undefined -> - case apply(?CONN_ADAPTER_MOD, Fun, [Options]) of + case apply(?CONN_HANDLER_MOD, FunName, [Options]) of {ok, Stream} -> {ok, Stream}; {error, Reason} -> {error, Reason} end; Stream -> {ok, Stream} end. + +%%-------------------------------------------------------------------- +%% unary + +request(FunName, Req, Options) -> + case apply(?CONN_UNARY_HANDLER_MOD, FunName, [Req, Options]) of + {ok, _EmptySucc, _Md} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +hreply(Owner, Result, FunName) -> + Owner ! {hreply, FunName, Result}, + ok. diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl index 5bbe7bf37..043c910da 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_gsvr.erl @@ -33,6 +33,7 @@ authenticate/2, start_timer/2, publish/2, + raw_publish/2, subscribe/2, unsubscribe/2 ]). @@ -129,6 +130,19 @@ publish(Req, Md) -> }), {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Md}. +-spec raw_publish(emqx_exproto_pb:raw_publish_request(), grpc:metadata()) -> + {ok, emqx_exproto_pb:code_response(), grpc:metadata()} + | {error, grpc_stream:error_response()}. +raw_publish(Req = #{topic := Topic, qos := Qos, payload := Payload}, Md) -> + ?SLOG(debug, #{ + msg => "recv_grpc_function_call", + function => ?FUNCTION_NAME, + request => Req + }), + Msg = emqx_message:make(exproto, Qos, Topic, Payload), + _ = emqx_broker:safe_publish(Msg), + {ok, response(ok), Md}. + -spec subscribe(emqx_exproto_pb:subscribe_request(), grpc:metadata()) -> {ok, emqx_exproto_pb:code_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. @@ -176,6 +190,8 @@ call(ConnStr, Req) -> {error, ?RESP_PARAMS_TYPE_ERROR, <<"The conn type error">>}; exit:noproc -> {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>}; + exit:{noproc, _} -> + {error, ?RESP_CONN_PROCESS_NOT_ALIVE, <<"Connection process is not alive">>}; exit:timeout -> {error, ?RESP_UNKNOWN, <<"Connection is not answered">>}; Class:Reason:Stk -> diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl index eb44c030b..7e1f6f49c 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_schema.erl @@ -74,6 +74,15 @@ fields(exproto_grpc_server) -> fields(exproto_grpc_handler) -> [ {address, sc(binary(), #{required => true, desc => ?DESC(exproto_grpc_handler_address)})}, + {service_name, + sc( + hoconsc:union(['ConnectionHandler', 'ConnectionUnaryHandler']), + #{ + required => true, + default => 'ConnectionUnaryHandler', + desc => ?DESC(exproto_grpc_handler_service_name) + } + )}, {ssl_options, sc( ref(emqx_schema, "ssl_client_opts"), diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index 09cf58338..96b40f30e 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.erl b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.erl index ff105b931..ec2540ec2 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.erl +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.erl @@ -62,22 +62,16 @@ on_gateway_load( GwName, maps:get(handler, Config, undefined) ), + ServiceName = ensure_service_name(Config), %% XXX: How to monitor it ? _ = start_grpc_server(GwName, maps:get(server, Config, undefined)), - %% XXX: How to monitor it ? - PoolName = pool_name(GwName), - PoolSize = emqx_vm:schedulers() * 2, - {ok, PoolSup} = emqx_pool_sup:start_link( - PoolName, - hash, - PoolSize, - {emqx_exproto_gcli, start_link, []} - ), - NConfig = maps:without( [server, handler], - Config#{pool_name => PoolName} + Config#{ + grpc_client_channel => GwName, + grpc_client_service_name => ServiceName + } ), Listeners = emqx_gateway_utils:normalize_config( NConfig#{handler => GwName} @@ -93,7 +87,7 @@ on_gateway_load( ) of {ok, ListenerPids} -> - {ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}; + {ok, ListenerPids, _GwState = #{ctx => Ctx}}; {error, {Reason, Listener}} -> throw( {badconf, #{ @@ -126,11 +120,9 @@ on_gateway_unload( name := GwName, config := Config }, - _GwState = #{pool := PoolSup} + _GwState ) -> Listeners = emqx_gateway_utils:normalize_config(Config), - %% Stop funcs??? - exit(PoolSup, kill), stop_grpc_server(GwName), stop_grpc_client_channel(GwName), stop_listeners(GwName, Listeners). @@ -139,8 +131,6 @@ on_gateway_unload( %% Internal funcs %%-------------------------------------------------------------------- -start_grpc_server(_GwName, undefined) -> - undefined; start_grpc_server(GwName, Options = #{bind := ListenOn}) -> Services = #{ protos => [emqx_exproto_pb], @@ -179,15 +169,24 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) -> reason => illegal_grpc_server_confs }} ) - end. + end; +start_grpc_server(_GwName, Options) -> + throw( + {badconf, #{ + key => server, + value => Options, + reason => illegal_grpc_server_confs + }} + ). stop_grpc_server(GwName) -> _ = grpc:stop_server(GwName), console_print("Stop ~s gRPC server successfully.~n", [GwName]). -start_grpc_client_channel(_GwName, undefined) -> - undefined; -start_grpc_client_channel(GwName, Options = #{address := Address}) -> +start_grpc_client_channel( + GwName, + Options = #{address := Address} +) -> #{host := Host, port := Port} = case emqx_http_lib:uri_parse(Address) of {ok, URIMap0} -> @@ -201,7 +200,7 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) -> }} ) end, - case emqx_utils_maps:deep_get([ssl, enable], Options, false) of + case emqx_utils_maps:deep_get([ssl_options, enable], Options, false) of false -> SvrAddr = compose_http_uri(http, Host, Port), grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{}); @@ -217,7 +216,15 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) -> SvrAddr = compose_http_uri(https, Host, Port), grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts) - end. + end; +start_grpc_client_channel(_GwName, Options) -> + throw( + {badconf, #{ + key => handler, + value => Options, + reason => ililegal_grpc_client_confs + }} + ). compose_http_uri(Scheme, Host, Port) -> lists:flatten( @@ -230,8 +237,8 @@ stop_grpc_client_channel(GwName) -> _ = grpc_client_sup:stop_channel_pool(GwName), ok. -pool_name(GwName) -> - list_to_atom(lists:concat([GwName, "_gcli_pool"])). +ensure_service_name(Config) -> + emqx_utils_maps:deep_get([handler, service_name], Config, 'ConnectionUnaryHandler'). -ifndef(TEST). console_print(Fmt, Args) -> ?ULOG(Fmt, Args). diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl index 264f6af95..91481cb91 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl @@ -19,8 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -import( emqx_exproto_echo_svr, @@ -28,6 +31,7 @@ frame_connect/2, frame_connack/1, frame_publish/3, + frame_raw_publish/3, frame_puback/1, frame_subscribe/2, frame_suback/1, @@ -37,19 +41,24 @@ ] ). --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - -define(TCPOPTS, [binary, {active, false}]). -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). +-define(PORT, 7993). + +-define(DEFAULT_CLIENT, #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> +}). + %%-------------------------------------------------------------------- -define(CONF_DEFAULT, << "\n" "gateway.exproto {\n" " server.bind = 9100,\n" " handler.address = \"http://127.0.0.1:9001\"\n" + " handler.service_name = \"ConnectionHandler\"\n" " listeners.tcp.default {\n" " bind = 7993,\n" " acceptors = 8\n" @@ -62,43 +71,110 @@ %%-------------------------------------------------------------------- all() -> - [{group, Name} || Name <- metrics()]. + [ + {group, tcp_listener}, + {group, ssl_listener}, + {group, udp_listener}, + {group, dtls_listener}, + {group, https_grpc_server}, + {group, streaming_connection_handler} + ]. suite() -> [{timetrap, {seconds, 30}}]. groups() -> - Cases = emqx_common_test_helpers:all(?MODULE), - [{Name, Cases} || Name <- metrics()]. + MainCases = [ + t_keepalive_timeout, + t_mountpoint_echo, + t_raw_publish, + t_auth_deny, + t_acl_deny, + t_hook_connected_disconnected, + t_hook_session_subscribed_unsubscribed, + t_hook_message_delivered + ], + [ + {tcp_listener, [sequence], MainCases}, + {ssl_listener, [sequence], MainCases}, + {udp_listener, [sequence], MainCases}, + {dtls_listener, [sequence], MainCases}, + {streaming_connection_handler, [sequence], MainCases}, + {https_grpc_server, [sequence], MainCases} + ]. -%% @private -metrics() -> - [tcp, ssl, udp, dtls]. +init_per_group(GrpName, Cfg) when + GrpName == tcp_listener; + GrpName == ssl_listener; + GrpName == udp_listener; + GrpName == dtls_listener +-> + LisType = + case GrpName of + tcp_listener -> tcp; + ssl_listener -> ssl; + udp_listener -> udp; + dtls_listener -> dtls + end, + init_per_group(LisType, 'ConnectionUnaryHandler', http, Cfg); +init_per_group(https_grpc_server, Cfg) -> + init_per_group(tcp, 'ConnectionUnaryHandler', https, Cfg); +init_per_group(streaming_connection_handler, Cfg) -> + init_per_group(tcp, 'ConnectionHandler', http, Cfg); +init_per_group(_, Cfg) -> + init_per_group(tcp, 'ConnectionUnaryHandler', http, Cfg). -init_per_group(GrpName, Cfg) -> +init_per_group(LisType, ServiceName, Scheme, Cfg) -> + Svrs = emqx_exproto_echo_svr:start(Scheme), application:load(emqx_gateway_exproto), - put(grpname, GrpName), - Svrs = emqx_exproto_echo_svr:start(), - emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway], fun set_special_cfg/1), - [{servers, Svrs}, {listener_type, GrpName} | Cfg]. + emqx_common_test_helpers:start_apps( + [emqx_authn, emqx_gateway], + fun(App) -> + set_special_cfg(App, LisType, ServiceName, Scheme) + end + ), + [ + {servers, Svrs}, + {listener_type, LisType}, + {service_name, ServiceName}, + {grpc_client_scheme, Scheme} + | Cfg + ]. end_per_group(_, Cfg) -> emqx_config:erase(gateway), emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn]), emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). -set_special_cfg(emqx_gateway) -> - LisType = get(grpname), +init_per_testcase(TestCase, Cfg) when + TestCase == t_enter_passive_mode +-> + case proplists:get_value(listener_type, Cfg) of + udp -> {skip, ignore}; + _ -> Cfg + end; +init_per_testcase(_TestCase, Cfg) -> + Cfg. + +end_per_testcase(_TestCase, _Cfg) -> + ok. + +set_special_cfg(emqx_gateway, LisType, ServiceName, Scheme) -> + Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])), emqx_config:put( [gateway, exproto], #{ server => #{bind => 9100}, idle_timeout => 5000, - handler => #{address => "http://127.0.0.1:9001"}, + handler => #{ + address => Addrs, + service_name => ServiceName, + ssl_options => #{enable => Scheme == https} + }, listeners => listener_confs(LisType) } ); -set_special_cfg(_App) -> +set_special_cfg(_, _, _, _) -> ok. listener_confs(Type) -> @@ -112,9 +188,6 @@ default_config() -> %% Tests cases %%-------------------------------------------------------------------- -t_start_stop(_) -> - ok. - t_mountpoint_echo(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), @@ -158,6 +231,40 @@ t_mountpoint_echo(Cfg) -> end, close(Sock). +t_raw_publish(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + mountpoint => <<"ct/">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + PubBin2 = frame_raw_publish(<<"t/up">>, 0, <<"echo">>), + PubAckBin = frame_puback(0), + + %% mountpoint is not used in raw publish + emqx:subscribe(<<"t/up">>), + + send(Sock, PubBin2), + {ok, PubAckBin} = recv(Sock, 5000), + + receive + {deliver, _, _} -> ok + after 1000 -> + error(echo_not_running) + end, + close(Sock). + t_auth_deny(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), @@ -264,7 +371,7 @@ t_keepalive_timeout(Cfg) -> ?assertMatch( {ok, #{ clientid := ClientId1, - reason := {shutdown, {sock_closed, keepalive_timeout}} + reason := {shutdown, keepalive_timeout} }}, ?block_until(#{?snk_kind := conn_process_terminated}, 8000) ); @@ -272,7 +379,7 @@ t_keepalive_timeout(Cfg) -> ?assertMatch( {ok, #{ clientid := ClientId1, - reason := {shutdown, {sock_closed, keepalive_timeout}} + reason := {shutdown, keepalive_timeout} }}, ?block_until(#{?snk_kind := conn_process_terminated}, 12000) ), diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl index e04990f5f..0c1b0def7 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_echo_svr.erl @@ -19,7 +19,7 @@ -behaviour(emqx_exproto_v_1_connection_handler_bhvr). -export([ - start/0, + start/1, stop/1 ]). @@ -27,12 +27,16 @@ frame_connect/2, frame_connack/1, frame_publish/3, + frame_raw_publish/3, frame_puback/1, frame_subscribe/2, frame_suback/1, frame_unsubscribe/1, frame_unsuback/1, - frame_disconnect/0 + frame_disconnect/0, + handle_in/3, + handle_out/2, + handle_out/3 ]). -export([ @@ -45,19 +49,6 @@ -define(LOG(Fmt, Args), ct:pal(Fmt, Args)). --define(HTTP, #{ - grpc_opts => #{ - service_protos => [emqx_exproto_pb], - services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE} - }, - listen_opts => #{ - port => 9001, - socket_options => [] - }, - pool_opts => #{size => 8}, - transport_opts => #{ssl => false} -}). - -define(CLIENT, emqx_exproto_v_1_connection_adapter_client). -define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})). @@ -65,6 +56,7 @@ -define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). -define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). -define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). +-define(raw_publish(Req), ?CLIENT:raw_publish(Req, #{channel => ct_test_channel})). -define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). -define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). @@ -77,6 +69,7 @@ -define(TYPE_UNSUBSCRIBE, 7). -define(TYPE_UNSUBACK, 8). -define(TYPE_DISCONNECT, 9). +-define(TYPE_RAW_PUBLISH, 10). -define(loop_recv_and_reply_empty_success(Stream), ?loop_recv_and_reply_empty_success(Stream, fun(_) -> ok end) @@ -104,9 +97,9 @@ end). %% APIs %%-------------------------------------------------------------------- -start() -> +start(Scheme) -> application:ensure_all_started(grpc), - [ensure_channel(), start_server()]. + [ensure_channel(), start_server(Scheme)]. ensure_channel() -> case grpc_client_sup:create_channel_pool(ct_test_channel, "http://127.0.0.1:9100", #{}) of @@ -114,12 +107,40 @@ ensure_channel() -> {ok, Pid} -> {ok, Pid} end. -start_server() -> +start_server(Scheme) when Scheme == http; Scheme == https -> Services = #{ protos => [emqx_exproto_pb], - services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE} + services => #{ + 'emqx.exproto.v1.ConnectionHandler' => ?MODULE, + 'emqx.exproto.v1.ConnectionUnaryHandler' => emqx_exproto_unary_echo_svr + } }, - Options = [], + CertsDir = filename:join( + [ + emqx_common_test_helpers:proj_root(), + "apps", + "emqx", + "etc", + "certs" + ] + ), + + Options = + case Scheme of + https -> + CAfile = filename:join([CertsDir, "cacert.pem"]), + Keyfile = filename:join([CertsDir, "key.pem"]), + Certfile = filename:join([CertsDir, "cert.pem"]), + [ + {ssl_options, [ + {cacertfile, CAfile}, + {certfile, Certfile}, + {keyfile, Keyfile} + ]} + ]; + _ -> + [] + end, grpc:start_server(?MODULE, 9001, Services, Options). stop([_ChannPid, _SvrPid]) -> @@ -249,6 +270,17 @@ handle_in(Conn, ?TYPE_PUBLISH, #{ _ -> handle_out(Conn, ?TYPE_PUBACK, 1) end; +handle_in(Conn, ?TYPE_RAW_PUBLISH, #{ + <<"topic">> := Topic, + <<"qos">> := Qos, + <<"payload">> := Payload +}) -> + case ?raw_publish(#{topic => Topic, qos => Qos, payload => Payload}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_PUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_PUBACK, 1) + end; handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) -> case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of {ok, #{code := 'SUCCESS'}, _} -> @@ -275,7 +307,9 @@ handle_out(Conn, ?TYPE_SUBACK, Code) -> handle_out(Conn, ?TYPE_UNSUBACK, Code) -> ?send(#{conn => Conn, bytes => frame_unsuback(Code)}); handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> - ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}). + ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}); +handle_out(Conn, ?TYPE_RAW_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> + ?send(#{conn => Conn, bytes => frame_raw_publish(Topic, Qos, Payload)}). handle_out(Conn, ?TYPE_DISCONNECT) -> ?send(#{conn => Conn, bytes => frame_disconnect()}). @@ -300,6 +334,14 @@ frame_publish(Topic, Qos, Payload) -> payload => Payload }). +frame_raw_publish(Topic, Qos, Payload) -> + emqx_utils_json:encode(#{ + type => ?TYPE_RAW_PUBLISH, + topic => Topic, + qos => Qos, + payload => Payload + }). + frame_puback(Code) -> emqx_utils_json:encode(#{type => ?TYPE_PUBACK, code => Code}). diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl new file mode 100644 index 000000000..19bbdb6c6 --- /dev/null +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_unary_echo_svr.erl @@ -0,0 +1,98 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exproto_unary_echo_svr). + +-behavior(emqx_exproto_v_1_connection_unary_handler_bhvr). + +-import( + emqx_exproto_echo_svr, + [ + handle_in/3, + handle_out/2, + handle_out/3 + ] +). + +-export([ + on_socket_created/2, + on_received_bytes/2, + on_socket_closed/2, + on_timer_timeout/2, + on_received_messages/2 +]). + +-define(LOG(Fmt, Args), ct:pal(Fmt, Args)). + +-define(CLIENT, emqx_exproto_v_1_connection_adapter_client). + +-define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})). + +-define(TYPE_CONNECT, 1). +-define(TYPE_CONNACK, 2). +-define(TYPE_PUBLISH, 3). +-define(TYPE_PUBACK, 4). +-define(TYPE_SUBSCRIBE, 5). +-define(TYPE_SUBACK, 6). +-define(TYPE_UNSUBSCRIBE, 7). +-define(TYPE_UNSUBACK, 8). +-define(TYPE_DISCONNECT, 9). +-define(TYPE_RAW_PUBLISH, 10). + +%%-------------------------------------------------------------------- +%% callbacks +%%-------------------------------------------------------------------- + +-spec on_socket_created(emqx_exproto_pb:socket_created_request(), grpc:metadata()) -> + {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} + | {error, grpc_stream:error_response()}. +on_socket_created(_Req, _Md) -> + {ok, #{}, _Md}. + +-spec on_socket_closed(emqx_exproto_pb:socket_closed_request(), grpc:metadata()) -> + {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} + | {error, grpc_stream:error_response()}. +on_socket_closed(_Req, _Md) -> + {ok, #{}, _Md}. + +-spec on_received_bytes(emqx_exproto_pb:received_bytes_request(), grpc:metadata()) -> + {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} + | {error, grpc_stream:error_response()}. +on_received_bytes(#{conn := Conn, bytes := Bytes}, _Md) -> + #{<<"type">> := Type} = Params = emqx_utils_json:decode(Bytes, [return_maps]), + _ = handle_in(Conn, Type, Params), + {ok, #{}, _Md}. + +-spec on_timer_timeout(emqx_exproto_pb:timer_timeout_request(), grpc:metadata()) -> + {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} + | {error, grpc_stream:error_response()}. +on_timer_timeout(#{conn := Conn, type := 'KEEPALIVE'}, _Md) -> + ?LOG("Close this connection ~p due to keepalive timeout", [Conn]), + handle_out(Conn, ?TYPE_DISCONNECT), + ?close(#{conn => Conn}), + {ok, #{}, _Md}. + +-spec on_received_messages(emqx_exproto_pb:received_messages_request(), grpc:metadata()) -> + {ok, emqx_exproto_pb:empty_success(), grpc:metadata()} + | {error, grpc_stream:error_response()}. +on_received_messages(#{conn := Conn, messages := Messages}, _Md) -> + lists:foreach( + fun(Message) -> + handle_out(Conn, ?TYPE_PUBLISH, Message) + end, + Messages + ), + {ok, #{}, _Md}. diff --git a/changes/ce/feat-10598.en.md b/changes/ce/feat-10598.en.md new file mode 100644 index 000000000..d08233392 --- /dev/null +++ b/changes/ce/feat-10598.en.md @@ -0,0 +1 @@ +Provide a callback method of Unary type in ExProto to avoid possible message disorder issues. diff --git a/mix.exs b/mix.exs index dab11693c..fbd88e61d 100644 --- a/mix.exs +++ b/mix.exs @@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true}, {:ekka, github: "emqx/ekka", tag: "0.15.2", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, - {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, + {:grpc, github: "emqx/grpc-erl", tag: "0.6.8", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, diff --git a/rebar.config b/rebar.config index 6a44b8074..8bbba5b96 100644 --- a/rebar.config +++ b/rebar.config @@ -64,7 +64,7 @@ , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} - , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} + , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.8"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} diff --git a/rel/i18n/emqx_exproto_schema.hocon b/rel/i18n/emqx_exproto_schema.hocon index eed450208..db9dd2480 100644 --- a/rel/i18n/emqx_exproto_schema.hocon +++ b/rel/i18n/emqx_exproto_schema.hocon @@ -6,6 +6,15 @@ exproto.desc: exproto_grpc_handler_address.desc: """gRPC server address.""" +exproto_grpc_handler_service_name.desc: +"""The service name to handle the connection events. +In the initial version, we expected to use streams to improve the efficiency +of requests in `ConnectionHandler`. But unfortunately, events between different +streams are out of order. It causes the `OnSocketCreated` event to may arrive +later than `OnReceivedBytes`. +So we added the `ConnectionUnaryHandler` service since v5.0.25 and forced +the use of Unary in it to avoid ordering problems.""" + exproto_grpc_handler_ssl.desc: """SSL configuration for the gRPC client."""