diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index ba1e8168b..591f2523d 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -5,6 +5,7 @@ ## TODO: emqx_gateway: { + stomp.1: { frame: { max_headers: 10 diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 3085afe89..1e5c5d8cd 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -68,7 +68,7 @@ on_insta_create(_Insta = #{ id := InstaId, true -> %% FIXME: Port = 1884, - _ = emqx_sn_broadcast:start_link(SnGwId, Port) + _ = emqx_sn_broadcast:start_link(SnGwId, Port), ok end, PredefTopics = maps:get(predefined, RawConf), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 30583c443..0fac56ae0 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -201,8 +201,13 @@ handle_call({register, ClientId, TopicName}, _From, end; handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) -> - Registry = mnesia:dirty_match_object({Tab, {ClientId, '_'}, '_'}), - lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(Tab, R) end, Registry), + Registry = mnesia:dirty_match_object( + Tab, + {emqx_sn_registry, {ClientId, '_'}, '_'} + ), + lists:foreach(fun(R) -> + ekka_mnesia:dirty_delete_object(Tab, R) + end, Registry), {reply, ok, State}; handle_call(name, _From, State = #state{tabname = Tab}) -> diff --git a/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl index 85042a4be..8b68a6145 100644 --- a/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_frame_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). -import(emqx_sn_frame, [ parse/1 diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 0947bdaca..a63e248ec 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -19,7 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -59,23 +59,41 @@ all() -> init_per_suite(Config) -> logger:set_module_level(emqx_sn_gateway, debug), - emqx_ct_helpers:start_apps([emqx_sn], fun set_special_confs/1), + emqx_ct_helpers:start_apps([emqx_gateway], fun set_special_confs/1), Config. end_per_suite(_) -> - emqx_ct_helpers:stop_apps([emqx_sn]). + emqx_ct_helpers:stop_apps([emqx_gateway]). set_special_confs(emqx) -> application:set_env(emqx, plugins_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); -set_special_confs(emqx_sn) -> - application:set_env(emqx_sn, enable_qos3, ?ENABLE_QOS3), - application:set_env(emqx_sn, enable_stats, true), - application:set_env(emqx_sn, username, <<"user1">>), - application:set_env(emqx_sn, password, <<"pw123">>), - application:set_env(emqx_sn, predefined, - [{?PREDEF_TOPIC_ID1, ?PREDEF_TOPIC_NAME1}, - {?PREDEF_TOPIC_ID2, ?PREDEF_TOPIC_NAME2}]); +set_special_confs(emqx_gateway) -> + emqx_config:put( + [emqx_gateway], + #{ mqttsn => + #{'1' => + #{broadcast => true, + clientinfo_override => + #{password => "pw123", + username => "user1" + }, + enable_qos3 => true, + enable_stats => true, + gateway_id => 1, + idle_timeout => 30000, + listener => + #{udp => + #{'1' => + #{acceptors => 8,active_n => 100,backlog => 1024,bind => 1884, + high_watermark => 1048576,max_conn_rate => 1000, + max_connections => 10240000,send_timeout => 15000, + send_timeout_close => true}}}, + predefined => + [#{id => ?PREDEF_TOPIC_ID1, topic => ?PREDEF_TOPIC_NAME1}, + #{id => ?PREDEF_TOPIC_ID2, topic => ?PREDEF_TOPIC_NAME2}]}} + }); + set_special_confs(_App) -> ok. @@ -87,7 +105,7 @@ set_special_confs(_App) -> %% Connect t_connect(_) -> - SockName = {'mqttsn:udp', {{0,0,0,0}, 1884}}, + SockName = {'mqttsn#1:udp', 1884}, ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), {ok, Socket} = gen_udp:open(0, [binary]), @@ -170,7 +188,6 @@ t_subscribe_case02(_) -> ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), - ClientId = ?CLIENTID, send_connect_msg(Socket, ?CLIENTID), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), diff --git a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl index 58a458ecc..6161687f2 100644 --- a/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_registry_SUITE.erl @@ -23,8 +23,10 @@ -define(REGISTRY, emqx_sn_registry). -define(MAX_PREDEF_ID, 2). --define(PREDEF_TOPICS, [{1, <<"/predefined/topic/name/hello">>}, - {2, <<"/predefined/topic/name/nice">>}]). +-define(PREDEF_TOPICS, [#{id => 1, topic => <<"/predefined/topic/name/hello">>}, + #{id => 2, topic => <<"/predefined/topic/name/nice">>}]). + +-define(INSTA_ID, 'mqttsn#1'). %%-------------------------------------------------------------------- %% Setups @@ -34,88 +36,92 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - _ = application:set_env(emqx_sn, predefined, ?PREDEF_TOPICS), + application:ensure_all_started(ekka), + ekka_mnesia:start(), Config. end_per_suite(_Config) -> + application:stop(ekka), ok. init_per_testcase(_TestCase, Config) -> - application:set_env(ekka, strict_mode, true), - ekka_mnesia:start(), - emqx_sn_registry:mnesia(boot), - ekka_mnesia:clear_table(emqx_sn_registry), - PredefTopics = application:get_env(emqx_sn, predefined, []), - {ok, _Pid} = ?REGISTRY:start_link(PredefTopics), - Config. + {ok, Pid} = ?REGISTRY:start_link(?INSTA_ID, ?PREDEF_TOPICS), + {Tab, Pid} = ?REGISTRY:lookup_name(Pid), + [{reg, {Tab, Pid}} | Config]. end_per_testcase(_TestCase, Config) -> - ?REGISTRY:stop(), + {Tab, _Pid} = proplists:get_value(reg, Config), + ekka_mnesia:clear_table(Tab), Config. %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- -t_register(_Config) -> - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), - emqx_sn_registry:unregister_topic(<<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). +t_register(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), + emqx_sn_registry:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). -t_register_case2(_Config) -> - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic2">>)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic3">>)), - ?REGISTRY:unregister_topic(<<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic1">>)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, <<"Topic2">>)). +t_register_case2(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic3">>)), + ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)). -t_reach_maximum(_Config) -> - register_a_lot(?MAX_PREDEF_ID+1, 16#ffff), - ?assertEqual({error, too_large}, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicABC">>)), +t_reach_maximum(Config) -> + Reg = proplists:get_value(reg, Config), + register_a_lot(?MAX_PREDEF_ID+1, 16#ffff, Reg), + ?assertEqual({error, too_large}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicABC">>)), Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+1])), Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID+2])), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)), - ?REGISTRY:unregister_topic(<<"ClientId">>), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic(<<"ClientId">>, ?MAX_PREDEF_ID+2)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic1)), - ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"ClientId">>, Topic2)). + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)), + ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID+2)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)), + ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)). -t_register_case4(_Config) -> - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicA">>)), - ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicB">>)), - ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicC">>)), - ?REGISTRY:unregister_topic(<<"ClientId">>), - ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(<<"ClientId">>, <<"TopicD">>)). +t_register_case4(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicA">>)), + ?assertEqual(?MAX_PREDEF_ID+2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicB">>)), + ?assertEqual(?MAX_PREDEF_ID+3, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicC">>)), + ?REGISTRY:unregister_topic(Reg, <<"ClientId">>), + ?assertEqual(?MAX_PREDEF_ID+1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicD">>)). -t_deny_wildcard_topic(_Config) -> - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)), - ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)). +t_deny_wildcard_topic(Config) -> + Reg = proplists:get_value(reg, Config), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/TopicA/#">>)), + ?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/+/TopicB">>)). %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- -register_a_lot(Max, Max) -> +register_a_lot(Max, Max, _Reg) -> ok; -register_a_lot(N, Max) when N < Max -> +register_a_lot(N, Max, Reg) when N < Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), - ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), - register_a_lot(N+1, Max). + ?assertEqual(N, ?REGISTRY:register_topic(Reg, <<"ClientId">>, Topic)), + register_a_lot(N+1, Max, Reg). diff --git a/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl b/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl index 8d4dae357..96318788d 100644 --- a/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl +++ b/apps/emqx_gateway/test/props/emqx_sn_proper_types.erl @@ -16,7 +16,7 @@ -module(emqx_sn_proper_types). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). -include_lib("proper/include/proper.hrl"). -compile({no_auto_import, [register/1]}). diff --git a/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl b/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl index 0135ebac7..645d24bed 100644 --- a/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl +++ b/apps/emqx_gateway/test/props/prop_emqx_sn_frame.erl @@ -16,7 +16,7 @@ -module(prop_emqx_sn_frame). --include_lib("emqx_sn/include/emqx_sn.hrl"). +-include_lib("src/mqttsn/include/emqx_sn.hrl"). -include_lib("proper/include/proper.hrl"). -compile({no_auto_import, [register/1]}).