diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 040cf4dd0..87193c6a3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -122,7 +122,7 @@ gateway_insta(put, #{body := GwConf, with_gateway(Name0, fun(GwName, _) -> case emqx_gateway_conf:update_gateway(GwName, GwConf) of ok -> - {200}; + {204}; {error, Reason} -> return_http_error(500, Reason) end diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 53f3ad3ba..9a28e5e0d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -73,8 +73,8 @@ fields(stomp_frame) -> fields(mqttsn) -> [ {gateway_id, sc(integer())} - , {broadcast, sc(boolean())} - , {enable_qos3, sc(boolean())} + , {broadcast, sc(boolean(), false)} + , {enable_qos3, sc(boolean(), true)} , {predefined, hoconsc:array(ref(mqttsn_predefined))} , {listeners, sc(ref(udp_listeners))} ] ++ gateway_common_options(); @@ -98,6 +98,7 @@ fields(lwm2m) -> , {lifetime_min, sc(duration(), "1s")} , {lifetime_max, sc(duration(), "86400s")} , {qmode_time_window, sc(duration_s(), "22s")} + %% TODO: Support config resource path , {auto_observe, sc(boolean(), false)} , {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))} , {translators, sc_meta(ref(translators), #{nullable => false})} diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index 13edc785b..7f2dbdd79 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -51,21 +51,21 @@ on_gateway_load(_Gateway = #{ name := GwName, config := Config }, Ctx) -> %% Xml registry - {ok, _} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)), + {ok, RegPid} = emqx_lwm2m_xml_object_db:start_link(maps:get(xml_dir, Config)), Listeners = emqx_gateway_utils:normalize_config(Config), ListenerPids = lists:map(fun(Lis) -> start_listener(GwName, Ctx, Lis) end, Listeners), - {ok, ListenerPids, _GwState = #{ctx => Ctx}}. + {ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}}. -on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwName = maps:get(name, NewGateway), +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_gateway_unload(OldGateway, GwState), - on_gateway_load(NewGateway, Ctx) + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) catch Class : Reason : Stk -> logger:error("Failed to update ~ts; " @@ -76,7 +76,8 @@ on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> on_gateway_unload(_Gateway = #{ name := GwName, config := Config - }, _GwState) -> + }, _GwState = #{registry := RegPid}) -> + exit(RegPid, kill), Listeners = emqx_gateway_utils:normalize_config(Config), lists:foreach(fun(Lis) -> stop_listener(GwName, Lis) diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl new file mode 100644 index 000000000..9e00ef43a --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -0,0 +1,280 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(emqx_gateway_test_utils, + [ assert_confs/2 + , request/2 + , request/3 + ]). + +-include_lib("eunit/include/eunit.hrl"). + +%%-------------------------------------------------------------------- +%% Setup +%%-------------------------------------------------------------------- + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Conf) -> + %% FIXME: Magic line. for saving gateway schema name for emqx_config + emqx_config:init_load(emqx_gateway_schema, <<"gateway {}">>), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), + %% Start emqx-authn separately, due to emqx_authn_schema + %% not implementing the roots/0 method, it cannot be started with + %% emqx-ct-helpers at the moment. + application:ensure_all_started([emqx_authn]), + Conf. + +end_per_suite(Conf) -> + application:stop([emqx_authn]), + emqx_mgmt_api_test_util:end_suite([emqx_gateway]), + Conf. + +%%-------------------------------------------------------------------- +%% Cases +%%-------------------------------------------------------------------- + +t_gateway(_) -> + {200, Gateways}= request(get, "/gateway"), + lists:foreach(fun assert_gw_unloaded/1, Gateways), + {400, BadReq} = request(get, "/gateway/uname_gateway"), + assert_bad_request(BadReq), + {204, _} = request(post, "/gateway", #{name => <<"stomp">>}), + {200, StompGw1} = request(get, "/gateway/stomp"), + assert_feilds_apperence([name, status, enable, created_at, started_at], + StompGw1), + {204, _} = request(delete, "/gateway/stomp"), + {200, StompGw2} = request(get, "/gateway/stomp"), + assert_gw_unloaded(StompGw2), + ok. + +t_gateway_stomp(_) -> + {200, Gw} = request(get, "/gateway/stomp"), + assert_gw_unloaded(Gw), + %% post + GwConf = #{name => <<"stomp">>, + frame => #{max_headers => 5, + max_headers_length => 100, + max_body_length => 100 + }, + listeners => [ + #{name => <<"def">>, type => <<"tcp">>, bind => <<"61613">>} + ] + }, + {204, _} = request(post, "/gateway", GwConf), + {200, ConfResp} = request(get, "/gateway/stomp"), + assert_confs(GwConf, ConfResp), + %% put + GwConf2 = emqx_map_lib:deep_merge(GwConf, #{frame => #{max_headers => 10}}), + {204, _} = request(put, "/gateway/stomp", maps:without([name], GwConf2)), + {200, ConfResp2} = request(get, "/gateway/stomp"), + assert_confs(GwConf2, ConfResp2), + {204, _} = request(delete, "/gateway/stomp"). + +t_gateway_mqttsn(_) -> + {200, Gw} = request(get, "/gateway/mqttsn"), + assert_gw_unloaded(Gw), + %% post + GwConf = #{name => <<"mqttsn">>, + gateway_id => 1, + broadcast => true, + predefined => [#{id => 1, topic => <<"t/a">>}], + enable_qos3 => true, + listeners => [ + #{name => <<"def">>, type => <<"udp">>, bind => <<"1884">>} + ] + }, + {204, _} = request(post, "/gateway", GwConf), + {200, ConfResp} = request(get, "/gateway/mqttsn"), + assert_confs(GwConf, ConfResp), + %% put + GwConf2 = emqx_map_lib:deep_merge(GwConf, #{predefined => []}), + {204, _} = request(put, "/gateway/mqttsn", maps:without([name], GwConf2)), + {200, ConfResp2} = request(get, "/gateway/mqttsn"), + assert_confs(GwConf2, ConfResp2), + {204, _} = request(delete, "/gateway/mqttsn"). + +t_gateway_coap(_) -> + {200, Gw} = request(get, "/gateway/coap"), + assert_gw_unloaded(Gw), + %% post + GwConf = #{name => <<"coap">>, + heartbeat => <<"60s">>, + connection_required => true, + listeners => [ + #{name => <<"def">>, type => <<"udp">>, bind => <<"5683">>} + ] + }, + {204, _} = request(post, "/gateway", GwConf), + {200, ConfResp} = request(get, "/gateway/coap"), + assert_confs(GwConf, ConfResp), + %% put + GwConf2 = emqx_map_lib:deep_merge(GwConf, #{heartbeat => <<"10s">>}), + {204, _} = request(put, "/gateway/coap", maps:without([name], GwConf2)), + {200, ConfResp2} = request(get, "/gateway/coap"), + assert_confs(GwConf2, ConfResp2), + {204, _} = request(delete, "/gateway/coap"). + +t_gateway_lwm2m(_) -> + {200, Gw} = request(get, "/gateway/lwm2m"), + assert_gw_unloaded(Gw), + %% post + GwConf = #{name => <<"lwm2m">>, + xml_dir => <<"../../lib/emqx_gateway/src/lwm2m/lwm2m_xml">>, + lifetime_min => <<"1s">>, + lifetime_max => <<"1000s">>, + qmode_time_window => <<"30s">>, + auto_observe => true, + translators => #{ + command => #{ topic => <<"dn/#">>}, + response => #{ topic => <<"up/resp">>}, + notify => #{ topic => <<"up/resp">>}, + register => #{ topic => <<"up/resp">>}, + update => #{ topic => <<"up/resp">>} + }, + listeners => [ + #{name => <<"def">>, type => <<"udp">>, bind => <<"5783">>} + ] + }, + {204, _} = request(post, "/gateway", GwConf), + {200, ConfResp} = request(get, "/gateway/lwm2m"), + assert_confs(GwConf, ConfResp), + %% put + GwConf2 = emqx_map_lib:deep_merge(GwConf, #{qmode_time_window => <<"10s">>}), + {204, _} = request(put, "/gateway/lwm2m", maps:without([name], GwConf2)), + {200, ConfResp2} = request(get, "/gateway/lwm2m"), + assert_confs(GwConf2, ConfResp2), + {204, _} = request(delete, "/gateway/lwm2m"). + +t_gateway_exproto(_) -> + {200, Gw} = request(get, "/gateway/exproto"), + assert_gw_unloaded(Gw), + %% post + GwConf = #{name => <<"exproto">>, + server => #{bind => <<"9100">>}, + handler => #{address => <<"127.0.0.1:9001">>}, + listeners => [ + #{name => <<"def">>, type => <<"tcp">>, bind => <<"7993">>} + ] + }, + {204, _} = request(post, "/gateway", GwConf), + {200, ConfResp} = request(get, "/gateway/exproto"), + assert_confs(GwConf, ConfResp), + %% put + GwConf2 = emqx_map_lib:deep_merge(GwConf, #{server => #{bind => <<"9200">>}}), + {204, _} = request(put, "/gateway/exproto", maps:without([name], GwConf2)), + {200, ConfResp2} = request(get, "/gateway/exproto"), + assert_confs(GwConf2, ConfResp2), + {204, _} = request(delete, "/gateway/exproto"). + +t_authn(_) -> + GwConf = #{name => <<"stomp">>}, + {204, _} = request(post, "/gateway", GwConf), + {404, _} = request(get, "/gateway/stomp/authentication"), + + AuthConf = #{mechanism => <<"password-based">>, + backend => <<"built-in-database">>, + user_id_type => <<"clientid">> + }, + {204, _} = request(post, "/gateway/stomp/authentication", AuthConf), + {200, ConfResp} = request(get, "/gateway/stomp/authentication"), + assert_confs(AuthConf, ConfResp), + + AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}), + {204, _} = request(put, "/gateway/stomp/authentication", AuthConf2), + + {200, ConfResp2} = request(get, "/gateway/stomp/authentication"), + assert_confs(AuthConf2, ConfResp2), + + {204, _} = request(delete, "/gateway/stomp/authentication"), + {404, _} = request(get, "/gateway/stomp/authentication"), + {204, _} = request(delete, "/gateway/stomp"). + +t_listeners(_) -> + GwConf = #{name => <<"stomp">>}, + {204, _} = request(post, "/gateway", GwConf), + {404, _} = request(get, "/gateway/stomp/listeners"), + LisConf = #{name => <<"def">>, + type => <<"tcp">>, + bind => <<"61613">> + }, + {204, _} = request(post, "/gateway/stomp/listeners", LisConf), + {200, ConfResp} = request(get, "/gateway/stomp/listeners"), + assert_confs([LisConf], ConfResp), + {200, ConfResp1} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"), + assert_confs(LisConf, ConfResp1), + + LisConf2 = maps:merge(LisConf, #{bind => <<"61614">>}), + {204, _} = request( + put, + "/gateway/stomp/listeners/stomp:tcp:def", + LisConf2 + ), + + {200, ConfResp2} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"), + assert_confs(LisConf2, ConfResp2), + + {204, _} = request(delete, "/gateway/stomp/listeners/stomp:tcp:def"), + {404, _} = request(get, "/gateway/stomp/listeners/stomp:tcp:def"), + {204, _} = request(delete, "/gateway/stomp"). + +t_listeners_authn(_) -> + GwConf = #{name => <<"stomp">>, + listeners => [ + #{name => <<"def">>, + type => <<"tcp">>, + bind => <<"61613">> + }]}, + {204, _} = request(post, "/gateway", GwConf), + {200, ConfResp} = request(get, "/gateway/stomp"), + assert_confs(GwConf, ConfResp), + + AuthConf = #{mechanism => <<"password-based">>, + backend => <<"built-in-database">>, + user_id_type => <<"clientid">> + }, + Path = "/gateway/stomp/listeners/stomp:tcp:def/authentication", + {204, _} = request(post, Path, AuthConf), + {200, ConfResp2} = request(get, Path), + assert_confs(AuthConf, ConfResp2), + + AuthConf2 = maps:merge(AuthConf, #{user_id_type => <<"username">>}), + {204, _} = request(put, Path, AuthConf2), + + {200, ConfResp3} = request(get, Path), + assert_confs(AuthConf2, ConfResp3), + {204, _} = request(delete, "/gateway/stomp"). + +%%-------------------------------------------------------------------- +%% Asserts + +assert_gw_unloaded(Gateway) -> + ?assertEqual(<<"unloaded">>, maps:get(status, Gateway)). + +assert_bad_request(BadReq) -> + ?assertEqual(<<"BAD_REQUEST">>, maps:get(code, BadReq)). + +assert_feilds_apperence(Ks, Map) -> + lists:foreach(fun(K) -> + _ = maps:get(K, Map) + end, Ks). + + diff --git a/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl index c752150ac..6b6164f9a 100644 --- a/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ assert_confs/2 + , maybe_unconvert_listeners/1 + ]). + -include_lib("eunit/include/eunit.hrl"). %%-------------------------------------------------------------------- @@ -228,33 +233,3 @@ compose_listener_authn(Basic, Listener, Authn) -> listener(L) -> #{<<"listeners">> => [L#{<<"type">> => <<"tcp">>, <<"name">> => <<"default">>}]}. - -assert_confs(Expected0, Effected) -> - Expected = maybe_unconvert_listeners(Expected0), - case do_assert_confs(Expected, Effected) of - false -> - io:format(standard_error, "Expected config: ~p,\n" - "Effected config: ~p", - [Expected, Effected]), - exit(conf_not_match); - true -> - ok - end. - -do_assert_confs(Expected, Effected) when is_map(Expected), - is_map(Effected) -> - Ks1 = maps:keys(Expected), - lists:all(fun(K) -> - do_assert_confs(maps:get(K, Expected), - maps:get(K, Effected, undefined)) - end, Ks1); -do_assert_confs(Expected, Effected) -> - Expected =:= Effected. - -maybe_unconvert_listeners(Conf) -> - case maps:take(<<"listeners">>, Conf) of - error -> Conf; - {Ls, Conf1} -> - Conf1#{<<"listeners">> => - emqx_gateway_conf:unconvert_listeners(Ls)} - end. diff --git a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl new file mode 100644 index 000000000..3dafab38f --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl @@ -0,0 +1,107 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_test_utils). + +-compile(export_all). +-compile(nowarn_export_all). + +assert_confs(Expected0, Effected) -> + Expected = maybe_unconvert_listeners(Expected0), + case do_assert_confs(Expected, Effected) of + false -> + io:format(standard_error, "Expected config: ~p,\n" + "Effected config: ~p", + [Expected, Effected]), + exit(conf_not_match); + true -> + ok + end. + +do_assert_confs(Expected, Effected) when is_map(Expected), + is_map(Effected) -> + Ks1 = maps:keys(Expected), + lists:all(fun(K) -> + do_assert_confs(maps:get(K, Expected), + maps:get(K, Effected, undefined)) + end, Ks1); + +do_assert_confs([Expected|More1], [Effected|More2]) -> + do_assert_confs(Expected, Effected) andalso do_assert_confs(More1, More2); +do_assert_confs([], []) -> + true; +do_assert_confs(Expected, Effected) -> + Res = Expected =:= Effected, + Res == false andalso + ct:pal("Errors: conf not match, " + "expected: ~p, got: ~p~n", [Expected, Effected]), + Res. + +maybe_unconvert_listeners(Conf) when is_map(Conf) -> + case maps:take(<<"listeners">>, Conf) of + error -> Conf; + {Ls, Conf1} -> + Conf1#{<<"listeners">> => + emqx_gateway_conf:unconvert_listeners(Ls)} + end; +maybe_unconvert_listeners(Conf) -> + Conf. + +%%-------------------------------------------------------------------- +%% http + +-define(http_api_host, "http://127.0.0.1:18083/api/v5"). +-define(default_user, "admin"). +-define(default_pass, "public"). + +request(delete = Mth, Path) -> + do_request(Mth, req(Path, [])); +request(get = Mth, Path) -> + do_request(Mth, req(Path, [])). + +request(get = Mth, Path, Qs) -> + do_request(Mth, req(Path, Qs)); +request(put = Mth, Path, Body) -> + do_request(Mth, req(Path, [], Body)); +request(post = Mth, Path, Body) -> + do_request(Mth, req(Path, [], Body)). + +do_request(Mth, Req) -> + case httpc:request(Mth, Req, [], [{body_format, binary}]) of + {ok, {{_Vsn, Code, _Text}, _, Resp}} -> + NResp = case Resp of + <<>> -> #{}; + _ -> + emqx_map_lib:unsafe_atom_key_map( + emqx_json:decode(Resp, [return_maps])) + end, + {Code, NResp}; + {error, Reason} -> + error({failed_to_request, Reason}) + end. + +req(Path, Qs) -> + {url(Path, Qs), auth([])}. + +req(Path, Qs, Body) -> + {url(Path, Qs), auth([]), "application/json", emqx_json:encode(Body)}. + +url(Path, Qs) -> + lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]). + +auth(Headers) -> + Token = base64:encode(?default_user ++ ":" ++ ?default_pass), + [{"Authorization", "Basic " ++ binary_to_list(Token)}] ++ Headers.