test(gw): fix bad test cases

This commit is contained in:
JianBo He 2021-08-16 18:25:34 +08:00
parent cac0ea4b19
commit 630e0fb8cb
10 changed files with 33 additions and 31 deletions

View File

@ -254,7 +254,7 @@ cb_gateway_unload(State = #state{gw = Gateway = #{type := GwType},
gw_state = GwState}) -> gw_state = GwState}) ->
try try
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType), #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwType),
CbMod:on_gateway_unload(Gateway, GwState, GwState), CbMod:on_gateway_unload(Gateway, GwState),
{ok, State#state{child_pids = [], {ok, State#state{child_pids = [],
gw_state = undefined, gw_state = undefined,
status = stopped}} status = stopped}}
@ -316,6 +316,7 @@ cb_gateway_update(NewGateway,
{error, {Class, Reason1, Stk}} {error, {Class, Reason1, Stk}}
end. end.
start_child_process([]) -> [];
start_child_process([Indictor|_] = ChildPidOrSpecs) -> start_child_process([Indictor|_] = ChildPidOrSpecs) ->
case erlang:is_pid(Indictor) of case erlang:is_pid(Indictor) of
true -> true ->

View File

@ -60,6 +60,7 @@ fields(mqttsn_structs) ->
, {idle_timeout, t(duration())} , {idle_timeout, t(duration())}
, {predefined, hoconsc:array(ref(mqttsn_predefined))} , {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {clientinfo_override, t(ref(clientinfo_override))} , {clientinfo_override, t(ref(clientinfo_override))}
, {authentication, t(ref(authentication))}
, {listener, t(ref(udp_listener_group))} , {listener, t(ref(udp_listener_group))}
]; ];
@ -78,6 +79,7 @@ fields(lwm2m_structs) ->
, {mountpoint, t(string())} , {mountpoint, t(string())}
, {update_msg_publish_condition, t(union([always, contains_object_list]))} , {update_msg_publish_condition, t(union([always, contains_object_list]))}
, {translators, t(ref(translators))} , {translators, t(ref(translators))}
, {authentication, t(ref(authentication))}
, {listener, t(ref(udp_listener_group))} , {listener, t(ref(udp_listener_group))}
]; ];
@ -201,11 +203,11 @@ fields(coap) ->
fields(coap_structs) -> fields(coap_structs) ->
[ {enable_stats, t(boolean(), undefined, true)} [ {enable_stats, t(boolean(), undefined, true)}
, {authentication, t(ref(authentication))}
, {heartbeat, t(duration(), undefined, "30s")} , {heartbeat, t(duration(), undefined, "30s")}
, {notify_type, t(union([non, con, qos]), undefined, qos)} , {notify_type, t(union([non, con, qos]), undefined, qos)}
, {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {authentication, t(ref(authentication))}
, {listener, t(ref(udp_listener_group))} , {listener, t(ref(udp_listener_group))}
]; ];

View File

@ -120,7 +120,8 @@ format_listenon({Addr, Port}) when is_tuple(Addr) ->
, SocketOpts :: esockd:option() , SocketOpts :: esockd:option()
, Cfg :: map() , Cfg :: map()
}). }).
normalize_rawconf(RawConf = #{listener := LisMap}) -> normalize_rawconf(RawConf) ->
LisMap = maps:get(listener, RawConf, #{}),
Cfg0 = maps:without([listener], RawConf), Cfg0 = maps:without([listener], RawConf),
lists:append(maps:fold(fun(Type, Liss, AccIn1) -> lists:append(maps:fold(fun(Type, Liss, AccIn1) ->
Listeners = Listeners =

View File

@ -47,6 +47,8 @@ unreg() ->
%% emqx_gateway_registry callbacks %% emqx_gateway_registry callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_grpc_server(_GwType, undefined) ->
undefined;
start_grpc_server(GwType, Options = #{bind := ListenOn}) -> start_grpc_server(GwType, Options = #{bind := ListenOn}) ->
Services = #{protos => [emqx_exproto_pb], Services = #{protos => [emqx_exproto_pb],
services => #{ services => #{
@ -60,6 +62,8 @@ start_grpc_server(GwType, Options = #{bind := ListenOn}) ->
_ = grpc:start_server(GwType, ListenOn, Services, SvrOptions), _ = grpc:start_server(GwType, ListenOn, Services, SvrOptions),
?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwType, ListenOn]). ?ULOG("Start ~s gRPC server on ~p successfully.~n", [GwType, ListenOn]).
start_grpc_client_channel(_GwType, undefined) ->
undefined;
start_grpc_client_channel(GwType, Options = #{address := UriStr}) -> start_grpc_client_channel(GwType, Options = #{address := UriStr}) ->
UriMap = uri_string:parse(UriStr), UriMap = uri_string:parse(UriStr),
Scheme = maps:get(scheme, UriMap), Scheme = maps:get(scheme, UriMap),
@ -88,10 +92,10 @@ on_gateway_load(_Gateway = #{ type := GwType,
PoolSize = emqx_vm:schedulers() * 2, PoolSize = emqx_vm:schedulers() * 2,
{ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize, {ok, _} = emqx_pool_sup:start_link(PoolName, hash, PoolSize,
{emqx_exproto_gcli, start_link, []}), {emqx_exproto_gcli, start_link, []}),
_ = start_grpc_client_channel(GwType, maps:get(handler, RawConf)), _ = start_grpc_client_channel(GwType, maps:get(handler, RawConf, undefined)),
%% XXX: How to monitor it ? %% XXX: How to monitor it ?
_ = start_grpc_server(GwType, maps:get(server, RawConf)), _ = start_grpc_server(GwType, maps:get(server, RawConf, undefined)),
NRawConf = maps:without( NRawConf = maps:without(
[server, handler], [server, handler],

View File

@ -53,17 +53,17 @@ on_gateway_load(_Gateway = #{ type := GwType,
%% We Also need to start `emqx_sn_broadcast` & %% We Also need to start `emqx_sn_broadcast` &
%% `emqx_sn_registry` process %% `emqx_sn_registry` process
SnGwId = maps:get(gateway_id, RawConf), case maps:get(broadcast, RawConf, false) of
case maps:get(broadcast, RawConf) of
false -> false ->
ok; ok;
true -> true ->
%% FIXME: %% FIXME:
Port = 1884, Port = 1884,
SnGwId = maps:get(gateway_id, RawConf, undefined),
_ = emqx_sn_broadcast:start_link(SnGwId, Port), ok _ = emqx_sn_broadcast:start_link(SnGwId, Port), ok
end, end,
PredefTopics = maps:get(predefined, RawConf), PredefTopics = maps:get(predefined, RawConf, []),
{ok, RegistrySvr} = emqx_sn_registry:start_link(GwType, PredefTopics), {ok, RegistrySvr} = emqx_sn_registry:start_link(GwType, PredefTopics),
NRawConf = maps:without( NRawConf = maps:without(

View File

@ -55,19 +55,19 @@ metrics() ->
init_per_group(GrpName, Cfg) -> init_per_group(GrpName, Cfg) ->
put(grpname, GrpName), put(grpname, GrpName),
Svrs = emqx_exproto_echo_svr:start(), Svrs = emqx_exproto_echo_svr:start(),
emqx_ct_helpers:start_apps([emqx_gateway], fun set_special_cfg/1), emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway], fun set_special_cfg/1),
emqx_logger:set_log_level(debug), emqx_logger:set_log_level(debug),
[{servers, Svrs}, {listener_type, GrpName} | Cfg]. [{servers, Svrs}, {listener_type, GrpName} | Cfg].
end_per_group(_, Cfg) -> end_per_group(_, Cfg) ->
emqx_ct_helpers:stop_apps([emqx_gateway]), emqx_ct_helpers:stop_apps([emqx_gateway, emqx_authn]),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
set_special_cfg(emqx_gateway) -> set_special_cfg(emqx_gateway) ->
LisType = get(grpname), LisType = get(grpname),
emqx_config:put( emqx_config:put(
[gateway, exproto], [gateway, exproto],
#{authenticator => allow_anonymous, #{authentication => #{enable => false},
server => #{bind => 9100}, server => #{bind => 9100},
handler => #{address => "http://127.0.0.1:9001"}, handler => #{address => "http://127.0.0.1:9001"},
listener => listener_confs(LisType) listener => listener_confs(LisType)
@ -77,7 +77,7 @@ set_special_cfg(_App) ->
listener_confs(Type) -> listener_confs(Type) ->
Default = #{bind => 7993, acceptors => 8}, Default = #{bind => 7993, acceptors => 8},
#{Type => maps:merge(Default, maps:from_list(socketopts(Type)))}. #{Type => #{'1' => maps:merge(Default, maps:from_list(socketopts(Type)))}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Tests cases %% Tests cases

View File

@ -35,11 +35,11 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Cfg) -> init_per_suite(Cfg) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_ct_helpers:start_apps([emqx_gateway]), emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway]),
Cfg. Cfg.
end_per_suite(_Cfg) -> end_per_suite(_Cfg) ->
emqx_ct_helpers:stop_apps([emqx_gateway]), emqx_ct_helpers:stop_apps([emqx_authn, emqx_gateway]),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -49,21 +49,15 @@ end_per_suite(_Cfg) ->
t_load_unload(_) -> t_load_unload(_) ->
OldCnt = length(emqx_gateway_registry:list()), OldCnt = length(emqx_gateway_registry:list()),
RgOpts = [{cbkmod, ?MODULE}], RgOpts = [{cbkmod, ?MODULE}],
GwOpts = [paramsin], ok = emqx_gateway_registry:reg(test, RgOpts),
ok = emqx_gateway_registry:load(test, RgOpts, GwOpts),
?assertEqual(OldCnt+1, length(emqx_gateway_registry:list())), ?assertEqual(OldCnt+1, length(emqx_gateway_registry:list())),
#{cbkmod := ?MODULE, #{cbkmod := ?MODULE,
rgopts := RgOpts, rgopts := RgOpts} = emqx_gateway_registry:lookup(test),
gwopts := GwOpts,
state := #{gwstate := 1}} = emqx_gateway_registry:lookup(test),
{error, already_existed} = emqx_gateway_registry:load(test, [{cbkmod, ?MODULE}], GwOpts), {error, already_existed} = emqx_gateway_registry:reg(test, [{cbkmod, ?MODULE}]),
ok = emqx_gateway_registry:unload(test), ok = emqx_gateway_registry:unreg(test),
undefined = emqx_gateway_registry:lookup(test), undefined = emqx_gateway_registry:lookup(test),
OldCnt = length(emqx_gateway_registry:list()), OldCnt = length(emqx_gateway_registry:list()),
ok. ok.
init([paramsin]) ->
{ok, _GwState = #{gwstate => 1}}.

View File

@ -134,12 +134,12 @@ groups() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx]), emqx_ct_helpers:start_apps([emqx_authn]),
Config. Config.
end_per_suite(Config) -> end_per_suite(Config) ->
timer:sleep(300), timer:sleep(300),
emqx_ct_helpers:stop_apps([emqx]), emqx_ct_helpers:stop_apps([emqx_authn]),
Config. Config.
init_per_testcase(_AllTestCase, Config) -> init_per_testcase(_AllTestCase, Config) ->

View File

@ -84,11 +84,11 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_ct_helpers:start_apps([emqx_gateway]), emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway]),
Config. Config.
end_per_suite(_) -> end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_gateway]). emqx_ct_helpers:stop_apps([emqx_gateway, emqx_authn]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases %% Test cases
@ -98,7 +98,7 @@ end_per_suite(_) ->
%% Connect %% Connect
t_connect(_) -> t_connect(_) ->
SockName = {'mqttsn#1:udp', 1884}, SockName = {'mqttsn:udp', 1884},
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),

View File

@ -45,11 +45,11 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Cfg) -> init_per_suite(Cfg) ->
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_ct_helpers:start_apps([emqx_gateway]), emqx_ct_helpers:start_apps([emqx_authn, emqx_gateway]),
Cfg. Cfg.
end_per_suite(_Cfg) -> end_per_suite(_Cfg) ->
emqx_ct_helpers:stop_apps([emqx_gateway]), emqx_ct_helpers:stop_apps([emqx_gateway, emqx_authn]),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------