chore(gw): more code coverage for emqx_gateway_conn module

This commit is contained in:
JianBo He 2022-01-13 15:56:23 +08:00
parent cce0b1ca34
commit 43284768d0
7 changed files with 139 additions and 40 deletions

View File

@ -720,20 +720,29 @@ serialize_and_inc_stats_fun(#state{
channel = Channel}) -> channel = Channel}) ->
Ctx = ChannMod:info(ctx, Channel), Ctx = ChannMod:info(ctx, Channel),
fun(Packet) -> fun(Packet) ->
case FrameMod:serialize_pkt(Packet, Serialize) of try
<<>> -> Data = FrameMod:serialize_pkt(Packet, Serialize),
?SLOG(debug, #{ msg => "SEND_packet"
%% XXX: optimize it, less cpu comsuption?
, packet => FrameMod:format(Packet)
}),
ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
Data
catch
_ : too_large ->
?SLOG(warning, #{ msg => "packet_too_large_discarded" ?SLOG(warning, #{ msg => "packet_too_large_discarded"
, packet => FrameMod:format(Packet) , packet => FrameMod:format(Packet)
}), }),
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
<<>>; <<>>;
Data -> _ : Reason ->
?SLOG(debug, #{ msg => "SEND_packet" ?SLOG(warning, #{ msg => "packet_serialize_failure"
, packet => FrameMod:format(Packet) , reason => Reason
}), , packet => FrameMod:format(Packet)
ok = inc_outgoing_stats(Ctx, FrameMod, Packet), }),
Data ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
<<>>
end end
end. end.

View File

@ -30,7 +30,7 @@
#{ %% Gateway Name #{ %% Gateway Name
gwname := gateway_name() gwname := gateway_name()
%% Authentication chains %% Authentication chains
, auth := [emqx_authentication:chain_name()] | undefined , auth := [emqx_authentication:chain_name()]
%% The ConnectionManager PID %% The ConnectionManager PID
, cm := pid() , cm := pid()
}. }.
@ -64,18 +64,15 @@
-spec authenticate(context(), emqx_types:clientinfo()) -spec authenticate(context(), emqx_types:clientinfo())
-> {ok, emqx_types:clientinfo()} -> {ok, emqx_types:clientinfo()}
| {error, any()}. | {error, any()}.
authenticate(_Ctx = #{auth := undefined}, ClientInfo) -> authenticate(_Ctx = #{auth := _ChainNames}, ClientInfo0)
{ok, mountpoint(ClientInfo)}; when is_list(_ChainNames) ->
authenticate(_Ctx = #{auth := _ChainName}, ClientInfo0) ->
ClientInfo = ClientInfo0#{zone => default}, ClientInfo = ClientInfo0#{zone => default},
case emqx_access_control:authenticate(ClientInfo) of case emqx_access_control:authenticate(ClientInfo) of
{ok, _} -> {ok, _} ->
{ok, mountpoint(ClientInfo)}; {ok, mountpoint(ClientInfo)};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end; end.
authenticate(_Ctx, ClientInfo) ->
{ok, mountpoint(ClientInfo)}.
%% @doc Register the session to the cluster. %% @doc Register the session to the cluster.
%% %%
@ -95,11 +92,6 @@ open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, open_session(Ctx, CleanStart, ClientInfo, ConnInfo,
CreateSessionFun, emqx_session). CreateSessionFun, emqx_session).
open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
logger:warning("clean_start=false is not supported now, "
"fallback to clean_start mode"),
open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod);
open_session(_Ctx = #{gwname := GwName}, open_session(_Ctx = #{gwname := GwName},
CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
emqx_gateway_cm:open_session(GwName, CleanStart, emqx_gateway_cm:open_session(GwName, CleanStart,

View File

@ -79,7 +79,6 @@
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
-elvis([{elvis_style, no_nested_try_catch, disable}]). -elvis([{elvis_style, no_nested_try_catch, disable}]).
-define(DEFAULT_CALL_TIMEOUT, 15000). -define(DEFAULT_CALL_TIMEOUT, 15000).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -336,53 +335,51 @@ return_http_error(Code, Msg) ->
}) })
}. }.
-spec reason2msg({atom(), map()} | any()) -> error | io_lib:chars(). -spec reason2msg({atom(), map()} | any()) -> error | string().
reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) -> reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
io_lib:format("Bad config value '~s' for '~s', reason: ~s", fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]);
[Value, Key, Reason]);
reason2msg({badres, #{resource := gateway, reason2msg({badres, #{resource := gateway,
gateway := GwName, gateway := GwName,
reason := not_found}}) -> reason := not_found}}) ->
io_lib:format("The ~s gateway is unloaded", [GwName]); fmtstr("The ~s gateway is unloaded", [GwName]);
reason2msg({badres, #{resource := gateway, reason2msg({badres, #{resource := gateway,
gateway := GwName, gateway := GwName,
reason := already_exist}}) -> reason := already_exist}}) ->
io_lib:format("The ~s gateway already loaded", [GwName]); fmtstr("The ~s gateway already loaded", [GwName]);
reason2msg({badres, #{resource := listener, reason2msg({badres, #{resource := listener,
listener := {GwName, LType, LName}, listener := {GwName, LType, LName},
reason := not_found}}) -> reason := not_found}}) ->
io_lib:format("Listener ~s not found", fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]);
[listener_id(GwName, LType, LName)]);
reason2msg({badres, #{resource := listener, reason2msg({badres, #{resource := listener,
listener := {GwName, LType, LName}, listener := {GwName, LType, LName},
reason := already_exist}}) -> reason := already_exist}}) ->
io_lib:format("The listener ~s of ~s already exist", fmtstr("The listener ~s of ~s already exist",
[listener_id(GwName, LType, LName), GwName]); [listener_id(GwName, LType, LName), GwName]);
reason2msg({badres, #{resource := authn, reason2msg({badres, #{resource := authn,
gateway := GwName, gateway := GwName,
reason := not_found}}) -> reason := not_found}}) ->
io_lib:format("The authentication not found on ~s", [GwName]); fmtstr("The authentication not found on ~s", [GwName]);
reason2msg({badres, #{resource := authn, reason2msg({badres, #{resource := authn,
gateway := GwName, gateway := GwName,
reason := already_exist}}) -> reason := already_exist}}) ->
io_lib:format("The authentication already exist on ~s", [GwName]); fmtstr("The authentication already exist on ~s", [GwName]);
reason2msg({badres, #{resource := listener_authn, reason2msg({badres, #{resource := listener_authn,
listener := {GwName, LType, LName}, listener := {GwName, LType, LName},
reason := not_found}}) -> reason := not_found}}) ->
io_lib:format("The authentication not found on ~s", fmtstr("The authentication not found on ~s",
[listener_id(GwName, LType, LName)]); [listener_id(GwName, LType, LName)]);
reason2msg({badres, #{resource := listener_authn, reason2msg({badres, #{resource := listener_authn,
listener := {GwName, LType, LName}, listener := {GwName, LType, LName},
reason := already_exist}}) -> reason := already_exist}}) ->
io_lib:format("The authentication already exist on ~s", fmtstr("The authentication already exist on ~s",
[listener_id(GwName, LType, LName)]); [listener_id(GwName, LType, LName)]);
reason2msg(_) -> reason2msg(_) ->
error. error.
@ -393,6 +390,9 @@ codestr(405) -> 'METHOD_NOT_ALLOWED';
codestr(500) -> 'UNKNOW_ERROR'; codestr(500) -> 'UNKNOW_ERROR';
codestr(501) -> 'NOT_IMPLEMENTED'. codestr(501) -> 'NOT_IMPLEMENTED'.
fmtstr(Fmt, Args) ->
lists:flatten(io_lib:format(Fmt, Args)).
-spec with_authn(binary(), function()) -> any(). -spec with_authn(binary(), function()) -> any().
with_authn(GwName0, Fun) -> with_authn(GwName0, Fun) ->
with_gateway(GwName0, fun(GwName, _GwConf) -> with_gateway(GwName0, fun(GwName, _GwConf) ->

View File

@ -122,7 +122,6 @@ handle_call(info, _From, State) ->
{reply, detailed_gateway_info(State), State}; {reply, detailed_gateway_info(State), State};
handle_call(disable, _From, State = #state{status = Status}) -> handle_call(disable, _From, State = #state{status = Status}) ->
%% XXX: The `disable` opertaion is not persist to config database
case Status of case Status of
running -> running ->
case cb_gateway_unload(State) of case cb_gateway_unload(State) of
@ -328,7 +327,7 @@ do_update_one_by_one(NCfg, State = #state{
AuthnNames = init_authn(State#state.name, NCfg), AuthnNames = init_authn(State#state.name, NCfg),
State#state{authns = AuthnNames} State#state{authns = AuthnNames}
end, end,
%% XXX: minimum impact update ??? %% TODO: minimum impact update ???
cb_gateway_update(NCfg, NState); cb_gateway_update(NCfg, NState);
{running, false} -> {running, false} ->
case cb_gateway_unload(State) of case cb_gateway_unload(State) of
@ -413,7 +412,6 @@ cb_gateway_update(Config,
case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of
{error, Reason} -> {error, Reason}; {error, Reason} -> {error, Reason};
{ok, ChildPidOrSpecs, NGwState} -> {ok, ChildPidOrSpecs, NGwState} ->
%% XXX: Hot-upgrade ???
ChildPids = start_child_process(ChildPidOrSpecs), ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{ {ok, State#state{
config = Config, config = Config,

View File

@ -0,0 +1,67 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gateway_ctx_SUITE).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compile(nowarn_export_all).
%%--------------------------------------------------------------------
%% setups
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) ->
ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
ok = meck:expect(emqx_access_control, authenticate,
fun(#{clientid := bad_client}) ->
{error, bad_username_or_password};
(ClientInfo) -> {ok, ClientInfo}
end),
Conf.
end_per_suite(_Conf) ->
ok.
%%--------------------------------------------------------------------
%% cases
%%--------------------------------------------------------------------
t_authenticate(_) ->
Ctx = #{gwname => mqttsn, auth => [], cm => self()},
Info1 = #{ mountpoint => undefined
, clientid => <<"user1">>
},
NInfo1 = zone(Info1),
?assertEqual({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
Info2 = #{ mountpoint => <<"mqttsn/${clientid}/">>
, clientid => <<"user1">>
},
NInfo2 = zone(Info2#{mountpoint => <<"mqttsn/user1/">>}),
?assertEqual({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
Info3 = #{ mountpoint => <<"mqttsn/${clientid}/">>
, clientid => bad_client
},
{error, bad_username_or_password}
= emqx_gateway_ctx:authenticate(Ctx, Info3),
ok.
zone(Info) -> Info#{zone => default}.

View File

@ -648,7 +648,6 @@ t_publish_qos0_case05(_) ->
gen_udp:close(Socket). gen_udp:close(Socket).
t_publish_qos0_case06(_) -> t_publish_qos0_case06(_) ->
Dup = 0, Dup = 0,
QoS = 0, QoS = 0,

View File

@ -351,6 +351,40 @@ t_ack(_) ->
body = _}, _, _} = parse(Data4) body = _}, _, _} = parse(Data4)
end). end).
t_1000_msg_send(_) ->
with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
[{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}])),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, #stomp_frame{command = <<"CONNECTED">>,
headers = _,
body = _}, _, _} = parse(Data),
Topic = <<"/queue/foo">>,
SendFun = fun() ->
gen_tcp:send(Sock, serialize(<<"SEND">>,
[{<<"destination">>, Topic}],
<<"msgtest">>))
end,
RecvFun = fun() ->
receive
{deliver, Topic, _Msg}->
ok
after 100 ->
?assert(false, "waiting message timeout")
end
end,
emqx:subscribe(Topic),
lists:foreach(fun(_) -> SendFun() end, lists:seq(1, 1000)),
lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
end).
t_rest_clienit_info(_) -> t_rest_clienit_info(_) ->
with_connection(fun(Sock) -> with_connection(fun(Sock) ->
gen_tcp:send(Sock, serialize(<<"CONNECT">>, gen_tcp:send(Sock, serialize(<<"CONNECT">>,