refactor(exproto): support unary handler

This commit is contained in:
JianBo He 2023-05-04 18:36:10 +08:00
parent 0f080cda66
commit b6bc3cd921
11 changed files with 513 additions and 267 deletions

View File

@ -22,3 +22,5 @@ src/emqx_exproto_v_1_connection_adapter_bhvr.erl
src/emqx_exproto_v_1_connection_adapter_client.erl
src/emqx_exproto_v_1_connection_handler_bhvr.erl
src/emqx_exproto_v_1_connection_handler_client.erl
src/emqx_exproto_v_1_connection_unary_handler_bhvr.erl
src/emqx_exproto_v_1_connection_unary_handler_client.erl

View File

@ -43,6 +43,8 @@ service ConnectionAdapter {
rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {};
}
// Deprecated service.
// Please using `ConnectionUnaryHandler` to replace it
service ConnectionHandler {
// -- socket layer
@ -60,6 +62,32 @@ service ConnectionHandler {
rpc OnReceivedMessages(stream ReceivedMessagesRequest) returns (EmptySuccess) {};
}
// This service is an optimization of `ConnectionHandler`.
// In the initial version, we expected to use streams to improve the efficiency
// of requests. But unfortunately, events between different streams are out of
// order. it causes the `OnSocketCreated` event to may arrive later than `OnReceivedBytes`.
//
// So we added the `ConnectionUnaryHandler` service since v4.3.21/v4.4.10 and forced
// the use of Unary in it to avoid ordering problems.
//
// Recommend using `ConnectionUnaryHandler` to replace `ConnectionHandler`
service ConnectionUnaryHandler {
// -- socket layer
rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {};
rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {};
rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {};
// -- pub/sub layer
rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {};
rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {};
}
message EmptySuccess { }
enum ResultCode {

View File

@ -45,7 +45,7 @@
%% Context
ctx :: emqx_gateway_ctx:context(),
%% gRPC channel options
gcli :: map(),
gcli :: emqx_exproto_gcli:grpc_client_state(),
%% Conn info
conninfo :: emqx_types:conninfo(),
%% Client info from `register` function
@ -54,10 +54,6 @@
conn_state :: conn_state(),
%% Subscription
subscriptions = #{},
%% Request queue
rqueue = queue:new(),
%% Inflight function name
inflight = undefined,
%% Keepalive
keepalive :: maybe(emqx_keepalive:keepalive()),
%% Timers
@ -79,7 +75,6 @@
-define(TIMER_TABLE, #{
alive_timer => keepalive,
force_timer => force_close,
idle_timer => force_close_idle
}).
@ -150,9 +145,11 @@ init(
},
Options
) ->
GRpcChann = maps:get(grpc_client_channel, Options),
ServiceName = maps:get(grpc_client_service_name, Options),
GRpcClient = emqx_exproto_gcli:init(ServiceName, #{channel => GRpcChann}),
Ctx = maps:get(ctx, Options),
GRpcChann = maps:get(handler, Options),
PoolName = maps:get(pool_name, Options),
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
@ -170,7 +167,7 @@ init(
},
Channel = #channel{
ctx = Ctx,
gcli = #{channel => GRpcChann, pool_name => PoolName},
gcli = GRpcClient,
conninfo = NConnInfo,
clientinfo = ClientInfo,
conn_state = connecting,
@ -188,9 +185,7 @@ init(
}
)
},
start_idle_checking_timer(
try_dispatch(on_socket_created, wrap(Req), Channel)
).
dispatch(on_socket_created, Req, start_idle_checking_timer(Channel)).
%% @private
peercert(NoSsl, ConnInfo) when
@ -239,7 +234,7 @@ start_idle_checking_timer(Channel) ->
| {shutdown, Reason :: term(), channel()}.
handle_in(Data, Channel) ->
Req = #{bytes => Data},
{ok, try_dispatch(on_received_bytes, wrap(Req), Channel)}.
{ok, dispatch(on_received_bytes, Req, Channel)}.
-spec handle_deliver(list(emqx_types:deliver()), channel()) ->
{ok, channel()}
@ -276,7 +271,7 @@ handle_deliver(
Delivers
),
Req = #{messages => Msgs},
{ok, try_dispatch(on_received_messages, wrap(Req), Channel)}.
{ok, dispatch(on_received_messages, Req, Channel)}.
-spec handle_timeout(reference(), Msg :: term(), channel()) ->
{ok, channel()}
@ -301,7 +296,8 @@ handle_timeout(
NChannel = remove_timer_ref(alive_timer, Channel),
%% close connection if keepalive timeout
Replies = [{event, disconnected}, {close, keepalive_timeout}],
{ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
NChannel1 = dispatch(on_timer_timeout, Req, NChannel),
{ok, Replies, NChannel1}
end;
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, {error, {force_close, Reason}}, Channel};
@ -490,32 +486,29 @@ handle_cast(Req, Channel) ->
| {shutdown, Reason :: term(), channel()}.
handle_info(
{sock_closed, Reason},
Channel = #channel{rqueue = Queue, inflight = Inflight}
Channel
) ->
case
queue:len(Queue) =:= 0 andalso
Inflight =:= undefined
of
true ->
Channel1 = ensure_disconnected({sock_closed, Reason}, Channel),
{shutdown, Reason, Channel1};
_ ->
%% delayed close process for flushing all callback funcs to gRPC server
Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}},
Channel2 = ensure_timer(force_timer, Channel1),
{ok, ensure_disconnected({sock_closed, Reason}, Channel2)}
end;
handle_info({hreply, on_socket_created, ok}, Channel) ->
dispatch_or_close_process(Channel#channel{inflight = undefined});
handle_info({hreply, FunName, ok}, Channel) when
FunName == on_socket_closed;
FunName == on_received_bytes;
FunName == on_received_messages;
FunName == on_timer_timeout
Channel1 = ensure_disconnected({sock_closed, Reason}, Channel),
{shutdown, Reason, Channel1};
handle_info(
{hreply, FunName, Result},
Channel = #channel{gcli = GClient}
) when
FunName =:= on_socket_created;
FunName =:= on_socket_closed;
FunName =:= on_received_bytes;
FunName =:= on_received_messages;
FunName =:= on_timer_timeout
->
dispatch_or_close_process(Channel#channel{inflight = undefined});
handle_info({hreply, FunName, {error, Reason}}, Channel) ->
{shutdown, {error, {FunName, Reason}}, Channel};
case Result of
ok ->
GClient1 = emqx_exproto_gcli:maybe_shoot(
emqx_exproto_gcli:ack(FunName, GClient)
),
{ok, Channel#channel{gcli = GClient1}};
{error, Reason} ->
{shutdown, {error, {FunName, Reason}}, Channel}
end;
handle_info({subscribe, _}, Channel) ->
{ok, Channel};
handle_info(Info, Channel) ->
@ -528,7 +521,8 @@ handle_info(Info, Channel) ->
-spec terminate(any(), channel()) -> channel().
terminate(Reason, Channel) ->
Req = #{reason => stringfy(Reason)},
try_dispatch(on_socket_closed, wrap(Req), Channel).
%% XXX: close streams?
dispatch(on_socket_closed, Req, Channel).
%%--------------------------------------------------------------------
%% Sub/UnSub
@ -696,8 +690,6 @@ remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout;
interval(force_timer, _) ->
15000;
interval(alive_timer, #channel{keepalive = Keepalive}) ->
emqx_keepalive:info(interval, Keepalive).
@ -705,34 +697,10 @@ interval(alive_timer, #channel{keepalive = Keepalive}) ->
%% Dispatch
%%--------------------------------------------------------------------
wrap(Req) ->
Req#{conn => base64:encode(term_to_binary(self()))}.
dispatch_or_close_process(
Channel = #channel{
rqueue = Queue,
inflight = undefined,
gcli = GClient
}
) ->
case queue:out(Queue) of
{empty, _} ->
case Channel#channel.conn_state of
disconnected ->
{shutdown, Channel#channel.closed_reason, Channel};
_ ->
{ok, Channel}
end;
{{value, {FunName, Req}}, NQueue} ->
emqx_exproto_gcli:async_call(FunName, Req, GClient),
{ok, Channel#channel{inflight = FunName, rqueue = NQueue}}
end.
try_dispatch(FunName, Req, Channel = #channel{inflight = undefined, gcli = GClient}) ->
emqx_exproto_gcli:async_call(FunName, Req, GClient),
Channel#channel{inflight = FunName};
try_dispatch(FunName, Req, Channel = #channel{rqueue = Queue}) ->
Channel#channel{rqueue = queue:in({FunName, Req}, Queue)}.
dispatch(FunName, Req, Channel = #channel{gcli = GClient}) ->
Req1 = Req#{conn => base64:encode(term_to_binary(self()))},
NGClient = emqx_exproto_gcli:maybe_shoot(FunName, Req1, GClient),
Channel#channel{gcli = NGClient}.
%%--------------------------------------------------------------------
%% Format

View File

@ -14,161 +14,197 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% the gRPC client worker for ConnectionHandler service
%% the gRPC client agent for ConnectionHandler service
-module(emqx_exproto_gcli).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
%% APIs
-export([async_call/3]).
-logger_header("[ExProtoGCli]").
-export([start_link/2]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
init/2,
maybe_shoot/1,
maybe_shoot/3,
ack/2,
is_empty/1
]).
-record(state, {
pool,
id,
streams
}).
-define(CONN_HANDLER_MOD, emqx_exproto_v_1_connection_handler_client).
-define(CONN_UNARY_HANDLER_MOD, emqx_exproto_v_1_connection_unary_handler_client).
-define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client).
-type service_name() :: 'ConnectionUnaryHandler' | 'ConnectionHandler'.
-type grpc_client_state() ::
#{
owner := pid(),
service_name := service_name(),
client_opts := options(),
queue := queue:queue(),
inflight := atom() | undefined,
streams => map(),
middleman => pid() | undefined
}.
-type options() ::
#{channel := term()}.
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start_link(Pool, Id) ->
gen_server:start_link(
{local, emqx_utils:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[]
).
-spec init(service_name(), options()) -> grpc_client_state().
init(ServiceName, Options) ->
#{
owner => self(),
service_name => ServiceName,
client_opts => Options,
queue => queue:new(),
inflight => undefined
}.
-spec async_call(atom(), map(), map()) -> ok.
async_call(
FunName,
Req = #{conn := Conn},
Options = #{pool_name := PoolName}
) ->
case pick(PoolName, Conn) of
-spec maybe_shoot(atom(), map(), grpc_client_state()) -> grpc_client_state().
maybe_shoot(FunName, Req, GState = #{inflight := undefined}) ->
shoot(FunName, Req, GState);
maybe_shoot(FunName, Req, GState) ->
enqueue(FunName, Req, GState).
-spec maybe_shoot(grpc_client_state()) -> grpc_client_state().
maybe_shoot(GState = #{inflight := undefined, queue := Q}) ->
case queue:is_empty(Q) of
true ->
GState;
false ->
reply(self(), FunName, {error, no_available_grpc_client});
Pid when is_pid(Pid) ->
cast(Pid, {rpc, FunName, Req, Options, self()})
end,
ok.
%%--------------------------------------------------------------------
%% cast, pick
%%--------------------------------------------------------------------
-compile({inline, [cast/2, pick/2]}).
cast(Deliver, Msg) ->
gen_server:cast(Deliver, Msg).
-spec pick(term(), term()) -> pid() | false.
pick(PoolName, Conn) ->
gproc_pool:pick_worker(PoolName, Conn).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, streams = #{}}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({rpc, Fun, Req, Options, From}, State = #state{streams = Streams}) ->
case ensure_stream_opened(Fun, Options, Streams) of
{error, Reason} ->
?SLOG(error, #{
msg => "request_grpc_server_failed",
function => {?CONN_ADAPTER_MOD, Fun, Options},
reason => Reason
}),
reply(From, Fun, {error, Reason}),
{noreply, State#state{streams = Streams#{Fun => undefined}}};
{ok, Stream} ->
case catch grpc_client:send(Stream, Req) of
ok ->
?SLOG(debug, #{
msg => "send_grpc_request_succeed",
function => {?CONN_ADAPTER_MOD, Fun},
request => Req
}),
reply(From, Fun, ok),
{noreply, State#state{streams = Streams#{Fun => Stream}}};
{'EXIT', {not_found, _Stk}} ->
%% Not found the stream, reopen it
?SLOG(info, #{
msg => "cannt_find_old_stream_ref",
function => {?CONN_ADAPTER_MOD, Fun}
}),
handle_cast(
{rpc, Fun, Req, Options, From},
State#state{streams = maps:remove(Fun, Streams)}
);
{'EXIT', {timeout, _Stk}} ->
?SLOG(error, #{
msg => "send_grpc_request_timeout",
function => {?CONN_ADAPTER_MOD, Fun},
request => Req
}),
reply(From, Fun, {error, timeout}),
{noreply, State#state{streams = Streams#{Fun => Stream}}};
{'EXIT', {Reason1, Stk}} ->
?SLOG(error, #{
msg => "send_grpc_request_failed",
function => {?CONN_ADAPTER_MOD, Fun},
request => Req,
error => Reason1,
stacktrace => Stk
}),
reply(From, Fun, {error, Reason1}),
{noreply, State#state{streams = Streams#{Fun => undefined}}}
end
{{value, {FunName, Req}}, Q1} = queue:out(Q),
shoot(FunName, Req, GState#{queue => Q1})
end.
handle_info(_Info, State) ->
{noreply, State}.
-spec ack(atom(), grpc_client_state()) -> grpc_client_state().
ack(FunName, GState = #{inflight := FunName}) ->
GState#{inflight => undefined, middleman => undefined};
ack(_, _) ->
error(badarg).
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-spec is_empty(grpc_client_state()) -> boolean().
is_empty(#{queue := Q, inflight := Inflight}) ->
Inflight == undefined andalso queue:is_empty(Q).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
reply(Pid, Fun, Result) ->
Pid ! {hreply, Fun, Result},
ok.
enqueue(FunName, Req, GState = #{queue := Q}) ->
GState#{queue => queue:in({FunName, Req}, Q)}.
ensure_stream_opened(Fun, Options, Streams) ->
case maps:get(Fun, Streams, undefined) of
shoot(FunName, Req, GState) ->
ServiceName = maps:get(service_name, GState),
shoot(ServiceName, FunName, Req, GState).
shoot(
'ConnectionUnaryHandler',
FunName,
Req,
GState = #{owner := Owner, client_opts := Options}
) ->
Pid =
spawn(
fun() ->
try
Result = request(FunName, Req, Options),
hreply(Owner, Result, FunName)
catch
T:R:Stk ->
hreply(Owner, {error, {{T, R}, Stk}}, FunName)
end
end
),
GState#{inflight => FunName, middleman => Pid};
shoot(
'ConnectionHandler',
FunName,
Req,
GState
) ->
GState1 = streaming(FunName, Req, GState),
GState1#{inflight => FunName}.
%%--------------------------------------------------------------------
%% streaming
streaming(
FunName,
Req,
GState = #{owner := Owner, client_opts := Options}
) ->
Streams = maps:get(streams, GState, #{}),
case ensure_stream_opened(FunName, Options, Streams) of
{error, Reason} ->
?SLOG(error, #{
msg => "request_grpc_server_failed",
function => {FunName, Options},
reason => Reason
}),
hreply(Owner, {error, Reason}, FunName),
{ok, GState};
{ok, Stream} ->
case catch grpc_client:send(Stream, Req) of
ok ->
?SLOG(debug, #{
msg => "send_grpc_request_succeed",
function => FunName,
request => Req
}),
hreply(Owner, ok, FunName),
GState#{streams => Streams#{FunName => Stream}};
{'EXIT', {not_found, _Stk}} ->
%% Not found the stream, reopen it
?SLOG(info, #{
msg => "cannt_find_old_stream_ref",
function => FunName
}),
streaming(FunName, Req, GState#{streams => maps:remove(FunName, Streams)});
{'EXIT', {timeout, _Stk}} ->
?SLOG(error, #{
msg => "send_grpc_request_timeout",
function => FunName,
request => Req
}),
hreply(Owner, {error, timeout}, FunName),
GState;
{'EXIT', {Reason1, Stk}} ->
?SLOG(error, #{
msg => "send_grpc_request_failed",
function => FunName,
request => Req,
error => Reason1,
stacktrace => Stk
}),
hreply(Owner, {error, Reason1}, FunName),
GState
end
end.
ensure_stream_opened(FunName, Options, Streams) ->
case maps:get(FunName, Streams, undefined) of
undefined ->
case apply(?CONN_ADAPTER_MOD, Fun, [Options]) of
case apply(?CONN_HANDLER_MOD, FunName, [Options]) of
{ok, Stream} -> {ok, Stream};
{error, Reason} -> {error, Reason}
end;
Stream ->
{ok, Stream}
end.
%%--------------------------------------------------------------------
%% unary
request(FunName, Req, Options) ->
case apply(?CONN_UNARY_HANDLER_MOD, FunName, [Req, Options]) of
{ok, _EmptySucc, _Md} ->
ok;
{error, Reason} ->
{error, Reason}
end.
hreply(Owner, Result, FunName) ->
Owner ! {hreply, FunName, Result},
ok.

View File

@ -74,6 +74,15 @@ fields(exproto_grpc_server) ->
fields(exproto_grpc_handler) ->
[
{address, sc(binary(), #{required => true, desc => ?DESC(exproto_grpc_handler_address)})},
{service_name,
sc(
hoconsc:union(['ConnectionHandler', 'ConnectionUnaryHandler']),
#{
required => true,
default => 'ConnectionUnaryHandler',
desc => ?DESC(exproto_grpc_handler_service_name)
}
)},
{ssl_options,
sc(
ref(emqx_schema, "ssl_client_opts"),

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_exproto, [
{description, "ExProto Gateway"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [kernel, stdlib, grpc, emqx, emqx_gateway]},
{env, []},

View File

@ -62,22 +62,16 @@ on_gateway_load(
GwName,
maps:get(handler, Config, undefined)
),
ServiceName = ensure_service_name(Config),
%% XXX: How to monitor it ?
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
%% XXX: How to monitor it ?
PoolName = pool_name(GwName),
PoolSize = emqx_vm:schedulers() * 2,
{ok, PoolSup} = emqx_pool_sup:start_link(
PoolName,
hash,
PoolSize,
{emqx_exproto_gcli, start_link, []}
),
NConfig = maps:without(
[server, handler],
Config#{pool_name => PoolName}
Config#{
grpc_client_channel => GwName,
grpc_client_service_name => ServiceName
}
),
Listeners = emqx_gateway_utils:normalize_config(
NConfig#{handler => GwName}
@ -93,7 +87,7 @@ on_gateway_load(
)
of
{ok, ListenerPids} ->
{ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}};
{ok, ListenerPids, _GwState = #{ctx => Ctx}};
{error, {Reason, Listener}} ->
throw(
{badconf, #{
@ -126,11 +120,9 @@ on_gateway_unload(
name := GwName,
config := Config
},
_GwState = #{pool := PoolSup}
_GwState
) ->
Listeners = emqx_gateway_utils:normalize_config(Config),
%% Stop funcs???
exit(PoolSup, kill),
stop_grpc_server(GwName),
stop_grpc_client_channel(GwName),
stop_listeners(GwName, Listeners).
@ -139,8 +131,6 @@ on_gateway_unload(
%% Internal funcs
%%--------------------------------------------------------------------
start_grpc_server(_GwName, undefined) ->
undefined;
start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
Services = #{
protos => [emqx_exproto_pb],
@ -179,15 +169,24 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
reason => illegal_grpc_server_confs
}}
)
end.
end;
start_grpc_server(_GwName, Options) ->
throw(
{badconf, #{
key => server,
value => Options,
reason => illegal_grpc_server_confs
}}
).
stop_grpc_server(GwName) ->
_ = grpc:stop_server(GwName),
console_print("Stop ~s gRPC server successfully.~n", [GwName]).
start_grpc_client_channel(_GwName, undefined) ->
undefined;
start_grpc_client_channel(GwName, Options = #{address := Address}) ->
start_grpc_client_channel(
GwName,
Options = #{address := Address}
) ->
#{host := Host, port := Port} =
case emqx_http_lib:uri_parse(Address) of
{ok, URIMap0} ->
@ -201,7 +200,7 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) ->
}}
)
end,
case emqx_utils_maps:deep_get([ssl, enable], Options, false) of
case emqx_utils_maps:deep_get([ssl_options, enable], Options, false) of
false ->
SvrAddr = compose_http_uri(http, Host, Port),
grpc_client_sup:create_channel_pool(GwName, SvrAddr, #{});
@ -217,7 +216,15 @@ start_grpc_client_channel(GwName, Options = #{address := Address}) ->
SvrAddr = compose_http_uri(https, Host, Port),
grpc_client_sup:create_channel_pool(GwName, SvrAddr, ClientOpts)
end.
end;
start_grpc_client_channel(_GwName, Options) ->
throw(
{badconf, #{
key => handler,
value => Options,
reason => ililegal_grpc_client_confs
}}
).
compose_http_uri(Scheme, Host, Port) ->
lists:flatten(
@ -230,8 +237,8 @@ stop_grpc_client_channel(GwName) ->
_ = grpc_client_sup:stop_channel_pool(GwName),
ok.
pool_name(GwName) ->
list_to_atom(lists:concat([GwName, "_gcli_pool"])).
ensure_service_name(Config) ->
emqx_utils_maps:deep_get([handler, service_name], Config, 'ConnectionUnaryHandler').
-ifndef(TEST).
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).

View File

@ -19,8 +19,11 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(
emqx_exproto_echo_svr,
@ -37,19 +40,24 @@
]
).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TCPOPTS, [binary, {active, false}]).
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
-define(PORT, 7993).
-define(DEFAULT_CLIENT, #{
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>
}).
%%--------------------------------------------------------------------
-define(CONF_DEFAULT, <<
"\n"
"gateway.exproto {\n"
" server.bind = 9100,\n"
" handler.address = \"http://127.0.0.1:9001\"\n"
" handler.service_name = \"ConnectionHandler\"\n"
" listeners.tcp.default {\n"
" bind = 7993,\n"
" acceptors = 8\n"
@ -62,43 +70,109 @@
%%--------------------------------------------------------------------
all() ->
[{group, Name} || Name <- metrics()].
[
{group, tcp_listener},
{group, ssl_listener},
{group, udp_listener},
{group, dtls_listener},
{group, https_grpc_server},
{group, streaming_connection_handler}
].
suite() ->
[{timetrap, {seconds, 30}}].
groups() ->
Cases = emqx_common_test_helpers:all(?MODULE),
[{Name, Cases} || Name <- metrics()].
MainCases = [
t_keepalive_timeout,
t_mountpoint_echo,
t_auth_deny,
t_acl_deny,
t_hook_connected_disconnected,
t_hook_session_subscribed_unsubscribed,
t_hook_message_delivered
],
[
{tcp_listener, [sequence], MainCases},
{ssl_listener, [sequence], MainCases},
{udp_listener, [sequence], MainCases},
{dtls_listener, [sequence], MainCases},
{streaming_connection_handler, [sequence], MainCases},
{https_grpc_server, [sequence], MainCases}
].
%% @private
metrics() ->
[tcp, ssl, udp, dtls].
init_per_group(GrpName, Cfg) when
GrpName == tcp_listener;
GrpName == ssl_listener;
GrpName == udp_listener;
GrpName == dtls_listener
->
LisType =
case GrpName of
tcp_listener -> tcp;
ssl_listener -> ssl;
udp_listener -> udp;
dtls_listener -> dtls
end,
init_per_group(LisType, 'ConnectionUnaryHandler', http, Cfg);
init_per_group(https_grpc_server, Cfg) ->
init_per_group(tcp, 'ConnectionUnaryHandler', https, Cfg);
init_per_group(streaming_connection_handler, Cfg) ->
init_per_group(tcp, 'ConnectionHandler', http, Cfg);
init_per_group(_, Cfg) ->
init_per_group(tcp, 'ConnectionUnaryHandler', http, Cfg).
init_per_group(GrpName, Cfg) ->
init_per_group(LisType, ServiceName, Scheme, Cfg) ->
Svrs = emqx_exproto_echo_svr:start(Scheme),
application:load(emqx_gateway_exproto),
put(grpname, GrpName),
Svrs = emqx_exproto_echo_svr:start(),
emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway], fun set_special_cfg/1),
[{servers, Svrs}, {listener_type, GrpName} | Cfg].
emqx_common_test_helpers:start_apps(
[emqx_authn, emqx_gateway],
fun(App) ->
set_special_cfg(App, LisType, ServiceName, Scheme)
end
),
[
{servers, Svrs},
{listener_type, LisType},
{service_name, ServiceName},
{grpc_client_scheme, Scheme}
| Cfg
].
end_per_group(_, Cfg) ->
emqx_config:erase(gateway),
emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn]),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
set_special_cfg(emqx_gateway) ->
LisType = get(grpname),
init_per_testcase(TestCase, Cfg) when
TestCase == t_enter_passive_mode
->
case proplists:get_value(listener_type, Cfg) of
udp -> {skip, ignore};
_ -> Cfg
end;
init_per_testcase(_TestCase, Cfg) ->
Cfg.
end_per_testcase(_TestCase, _Cfg) ->
ok.
set_special_cfg(emqx_gateway, LisType, ServiceName, Scheme) ->
Addrs = lists:flatten(io_lib:format("~s://127.0.0.1:9001", [Scheme])),
emqx_config:put(
[gateway, exproto],
#{
server => #{bind => 9100},
idle_timeout => 5000,
handler => #{address => "http://127.0.0.1:9001"},
handler => #{
address => Addrs,
service_name => ServiceName,
ssl_options => #{enable => Scheme == https}
},
listeners => listener_confs(LisType)
}
);
set_special_cfg(_App) ->
set_special_cfg(_, _, _, _) ->
ok.
listener_confs(Type) ->
@ -112,9 +186,6 @@ default_config() ->
%% Tests cases
%%--------------------------------------------------------------------
t_start_stop(_) ->
ok.
t_mountpoint_echo(Cfg) ->
SockType = proplists:get_value(listener_type, Cfg),
Sock = open(SockType),
@ -264,7 +335,7 @@ t_keepalive_timeout(Cfg) ->
?assertMatch(
{ok, #{
clientid := ClientId1,
reason := {shutdown, {sock_closed, keepalive_timeout}}
reason := {shutdown, keepalive_timeout}
}},
?block_until(#{?snk_kind := conn_process_terminated}, 8000)
);
@ -272,7 +343,7 @@ t_keepalive_timeout(Cfg) ->
?assertMatch(
{ok, #{
clientid := ClientId1,
reason := {shutdown, {sock_closed, keepalive_timeout}}
reason := {shutdown, keepalive_timeout}
}},
?block_until(#{?snk_kind := conn_process_terminated}, 12000)
),

View File

@ -19,7 +19,7 @@
-behaviour(emqx_exproto_v_1_connection_handler_bhvr).
-export([
start/0,
start/1,
stop/1
]).
@ -32,7 +32,10 @@
frame_suback/1,
frame_unsubscribe/1,
frame_unsuback/1,
frame_disconnect/0
frame_disconnect/0,
handle_in/3,
handle_out/2,
handle_out/3
]).
-export([
@ -45,19 +48,6 @@
-define(LOG(Fmt, Args), ct:pal(Fmt, Args)).
-define(HTTP, #{
grpc_opts => #{
service_protos => [emqx_exproto_pb],
services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}
},
listen_opts => #{
port => 9001,
socket_options => []
},
pool_opts => #{size => 8},
transport_opts => #{ssl => false}
}).
-define(CLIENT, emqx_exproto_v_1_connection_adapter_client).
-define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})).
@ -104,9 +94,9 @@ end).
%% APIs
%%--------------------------------------------------------------------
start() ->
start(Scheme) ->
application:ensure_all_started(grpc),
[ensure_channel(), start_server()].
[ensure_channel(), start_server(Scheme)].
ensure_channel() ->
case grpc_client_sup:create_channel_pool(ct_test_channel, "http://127.0.0.1:9100", #{}) of
@ -114,12 +104,40 @@ ensure_channel() ->
{ok, Pid} -> {ok, Pid}
end.
start_server() ->
start_server(Scheme) when Scheme == http; Scheme == https ->
Services = #{
protos => [emqx_exproto_pb],
services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}
services => #{
'emqx.exproto.v1.ConnectionHandler' => ?MODULE,
'emqx.exproto.v1.ConnectionUnaryHandler' => emqx_exproto_unary_echo_svr
}
},
Options = [],
CertsDir = filename:join(
[
emqx_common_test_helpers:proj_root(),
"apps",
"emqx",
"etc",
"certs"
]
),
Options =
case Scheme of
https ->
CAfile = filename:join([CertsDir, "cacert.pem"]),
Keyfile = filename:join([CertsDir, "key.pem"]),
Certfile = filename:join([CertsDir, "cert.pem"]),
[
{ssl_options, [
{cacertfile, CAfile},
{certfile, Certfile},
{keyfile, Keyfile}
]}
];
_ ->
[]
end,
grpc:start_server(?MODULE, 9001, Services, Options).
stop([_ChannPid, _SvrPid]) ->

View File

@ -0,0 +1,97 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exproto_unary_echo_svr).
-behavior(emqx_exproto_v_1_connection_unary_handler_bhvr).
-import(
emqx_exproto_echo_svr,
[
handle_in/3,
handle_out/2,
handle_out/3
]
).
-export([
on_socket_created/2,
on_received_bytes/2,
on_socket_closed/2,
on_timer_timeout/2,
on_received_messages/2
]).
-define(LOG(Fmt, Args), ct:pal(Fmt, Args)).
-define(CLIENT, emqx_exproto_v_1_connection_adapter_client).
-define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})).
-define(TYPE_CONNECT, 1).
-define(TYPE_CONNACK, 2).
-define(TYPE_PUBLISH, 3).
-define(TYPE_PUBACK, 4).
-define(TYPE_SUBSCRIBE, 5).
-define(TYPE_SUBACK, 6).
-define(TYPE_UNSUBSCRIBE, 7).
-define(TYPE_UNSUBACK, 8).
-define(TYPE_DISCONNECT, 9).
%%--------------------------------------------------------------------
%% callbacks
%%--------------------------------------------------------------------
-spec on_socket_created(emqx_exproto_pb:socket_created_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
on_socket_created(_Req, _Md) ->
{ok, #{}, _Md}.
-spec on_socket_closed(emqx_exproto_pb:socket_closed_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
on_socket_closed(_Req, _Md) ->
{ok, #{}, _Md}.
-spec on_received_bytes(emqx_exproto_pb:received_bytes_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
on_received_bytes(#{conn := Conn, bytes := Bytes}, _Md) ->
#{<<"type">> := Type} = Params = emqx_utils_json:decode(Bytes, [return_maps]),
_ = handle_in(Conn, Type, Params),
{ok, #{}, _Md}.
-spec on_timer_timeout(emqx_exproto_pb:timer_timeout_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
on_timer_timeout(#{conn := Conn, type := 'KEEPALIVE'}, _Md) ->
?LOG("Close this connection ~p due to keepalive timeout", [Conn]),
handle_out(Conn, ?TYPE_DISCONNECT),
?close(#{conn => Conn}),
{ok, #{}, _Md}.
-spec on_received_messages(emqx_exproto_pb:received_messages_request(), grpc:metadata()) ->
{ok, emqx_exproto_pb:empty_success(), grpc:metadata()}
| {error, grpc_stream:error_response()}.
on_received_messages(#{conn := Conn, messages := Messages}, _Md) ->
lists:foreach(
fun(Message) ->
handle_out(Conn, ?TYPE_PUBLISH, Message)
end,
Messages
),
{ok, #{}, _Md}.

View File

@ -6,6 +6,16 @@ exproto.desc:
exproto_grpc_handler_address.desc:
"""gRPC server address."""
exproto_grpc_handler_service_name.desc:
"""The service name to handle the connection events.
In the initial version, we expected to use streams to improve the efficiency
of requests in `ConnectionHandler`. But unfortunately, events between different
streams are out of order. It causes the `OnSocketCreated` event to may arrive
later than `OnReceivedBytes`.
So we added the `ConnectionUnaryHandler` service since v5.0.25 and forced
the use of Unary in it to avoid ordering problems.
"""
exproto_grpc_handler_ssl.desc:
"""SSL configuration for the gRPC client."""