From 79a653e2b417ef991736cfd47984e1461fad883b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 5 Jan 2022 16:32:16 +0800 Subject: [PATCH 01/11] refactor(gw): more readable CLI print --- apps/emqx_gateway/include/emqx_gateway.hrl | 1 + apps/emqx_gateway/src/emqx_gateway_cli.erl | 97 ++++++++++----- apps/emqx_gateway/src/emqx_gateway_conf.erl | 1 + apps/emqx_gateway/src/emqx_gateway_http.erl | 111 ++++++++++-------- .../test/emqx_gateway_cli_SUITE.erl | 74 +++++++++++- 5 files changed, 199 insertions(+), 85 deletions(-) diff --git a/apps/emqx_gateway/include/emqx_gateway.hrl b/apps/emqx_gateway/include/emqx_gateway.hrl index 8a89d237c..b6b176429 100644 --- a/apps/emqx_gateway/include/emqx_gateway.hrl +++ b/apps/emqx_gateway/include/emqx_gateway.hrl @@ -22,6 +22,7 @@ %% @doc The Gateway defination -type gateway() :: #{ name := gateway_name() + %% Description , descr => binary() | undefined %% Appears only in getting gateway info , status => stopped | running | unloaded diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index f8f4c5821..c9ed51d92 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -53,23 +53,16 @@ is_cmd(Fun) -> gateway(["list"]) -> lists:foreach( - fun (#{name := Name, status := unloaded}) -> - print("Gateway(name=~ts, status=unloaded)\n", [Name]); - (#{name := Name, status := stopped, stopped_at := StoppedAt}) -> - print("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n", - [Name, StoppedAt]); - (#{name := Name, status := running, current_connections := ConnCnt, - started_at := StartedAt}) -> - print("Gateway(name=~ts, status=running, clients=~w, started_at=~ts)\n", - [Name, ConnCnt, StartedAt]) + fun (GwSummary) -> + print(format_gw_summary(GwSummary)) end, emqx_gateway_http:gateways(all)); gateway(["lookup", Name]) -> case emqx_gateway:lookup(atom(Name)) of undefined -> print("undefined\n"); - Info -> - print("~p\n", [Info]) + Gateway -> + print(format_gateway(Gateway)) end; gateway(["load", Name, Conf]) -> @@ -80,7 +73,7 @@ gateway(["load", Name, Conf]) -> {ok, _} -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(["unload", Name]) -> @@ -88,7 +81,7 @@ gateway(["unload", Name]) -> ok -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(["stop", Name]) -> @@ -99,7 +92,7 @@ gateway(["stop", Name]) -> {ok, _} -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(["start", Name]) -> @@ -110,23 +103,24 @@ gateway(["start", Name]) -> {ok, _} -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(_) -> - emqx_ctl:usage([ {"gateway list", - "List all gateway"} - , {"gateway lookup ", - "Lookup a gateway detailed informations"} - , {"gateway load ", - "Load a gateway with config"} - , {"gateway unload ", - "Unload the gateway"} - , {"gateway stop ", - "Stop the gateway"} - , {"gateway start ", - "Start the gateway"} - ]). + emqx_ctl:usage( + [ {"gateway list", + "List all gateway"} + , {"gateway lookup ", + "Lookup a gateway detailed informations"} + , {"gateway load ", + "Load a gateway with config"} + , {"gateway unload ", + "Unload the gateway"} + , {"gateway stop ", + "Stop the gateway"} + , {"gateway start ", + "Start the gateway"} + ]). 'gateway-registry'(["list"]) -> lists:foreach( @@ -255,3 +249,50 @@ format(peername, {IPAddr, Port}) -> format(_, Val) -> Val. + +format_gw_summary(#{name := Name, status := unloaded}) -> + io_lib:format("Gateway(name=~ts, status=unloaded)\n", [Name]); + +format_gw_summary(#{name := Name, status := stopped, + stopped_at := StoppedAt}) -> + io_lib:format("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n", + [Name, StoppedAt]); +format_gw_summary(#{name := Name, status := running, + current_connections := ConnCnt, + started_at := StartedAt}) -> + io_lib:format("Gateway(name=~ts, status=running, clients=~w, " + "started_at=~ts)\n", [Name, ConnCnt, StartedAt]). + +format_gateway(#{name := Name, + status := unloaded}) -> + io_lib:format( + "name: ~ts\n" + "status: unloaded\n", [Name]); + +format_gateway(Gw = + #{name := Name, + status := Status, + created_at := CreatedAt, + config := Config + }) -> + {StopOrStart, Timestamp} = + case Status of + stopped -> {stopped_at, maps:get(stopped_at, Gw)}; + running -> {started_at, maps:get(started_at, Gw)} + end, + io_lib:format( + "name: ~ts\n" + "status: ~ts\n" + "created_at: ~ts\n" + "~ts: ~ts\n" + "config: ~p\n", + [Name, Status, + emqx_gateway_utils:unix_ts_to_rfc3339(CreatedAt), + StopOrStart, emqx_gateway_utils:unix_ts_to_rfc3339(Timestamp), + Config]). + +format_error(Reason) -> + case emqx_gateway_http:reason2msg(Reason) of + error -> io_lib:format("~p", [Reason]); + Msg -> Msg + end. diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 0b7b3f099..52aa204a3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -93,6 +93,7 @@ load_gateway(GwName, Conf) -> %% @doc convert listener array to map unconvert_listeners(Ls) when is_list(Ls) -> lists:foldl(fun(Lis, Acc) -> + %% FIXME: params apperence guard? {[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis), NLis1 = maps:without([<<"id">>], Lis1), emqx_map_lib:deep_merge(Acc, #{Type => #{Name => NLis1}}) diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 1560a2126..7a1ac519d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -62,12 +62,15 @@ , with_listener_authn/3 , checks/2 , reason2resp/1 + , reason2msg/1 ]). -type gateway_summary() :: #{ name := binary() , status := running | stopped | unloaded + , created_at => binary() , started_at => binary() + , stopped_at => binary() , max_connections => integer() , current_connections => integer() , listeners => [] @@ -317,57 +320,13 @@ with_channel(GwName, ClientId, Fun) -> %%-------------------------------------------------------------------- -spec reason2resp({atom(), map()} | any()) -> binary() | any(). -reason2resp({badconf, #{key := Key, value := Value, reason := Reason}}) -> - fmt400err("Bad config value '~s' for '~s', reason: ~s", - [Value, Key, Reason]); -reason2resp({badres, #{resource := gateway, - gateway := GwName, - reason := not_found}}) -> - fmt400err("The ~s gateway is unloaded", [GwName]); - -reason2resp({badres, #{resource := gateway, - gateway := GwName, - reason := already_exist}}) -> - fmt400err("The ~s gateway has loaded", [GwName]); - -reason2resp({badres, #{resource := listener, - listener := {GwName, LType, LName}, - reason := not_found}}) -> - fmt400err("Listener ~s not found", - [listener_id(GwName, LType, LName)]); - -reason2resp({badres, #{resource := listener, - listener := {GwName, LType, LName}, - reason := already_exist}}) -> - fmt400err("The listener ~s of ~s already exist", - [listener_id(GwName, LType, LName), GwName]); - -reason2resp({badres, #{resource := authn, - gateway := GwName, - reason := not_found}}) -> - fmt400err("The authentication not found on ~s", [GwName]); - -reason2resp({badres, #{resource := authn, - gateway := GwName, - reason := already_exist}}) -> - fmt400err("The authentication already exist on ~s", [GwName]); - -reason2resp({badres, #{resource := listener_authn, - listener := {GwName, LType, LName}, - reason := not_found}}) -> - fmt400err("The authentication not found on ~s", - [listener_id(GwName, LType, LName)]); - -reason2resp({badres, #{resource := listener_authn, - listener := {GwName, LType, LName}, - reason := already_exist}}) -> - fmt400err("The authentication already exist on ~s", - [listener_id(GwName, LType, LName)]); - -reason2resp(R) -> return_http_error(500, R). - -fmt400err(Fmt, Args) -> - return_http_error(400, io_lib:format(Fmt, Args)). +reason2resp(R) -> + case reason2msg(R) of + error -> + return_http_error(500, R); + Msg -> + return_http_error(400, Msg) + end. -spec return_http_error(integer(), any()) -> {integer(), binary()}. return_http_error(Code, Msg) -> @@ -377,6 +336,56 @@ return_http_error(Code, Msg) -> }) }. +-spec reason2msg({atom(), map()} | any()) -> error | io_lib:chars(). +reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) -> + io_lib:format("Bad config value '~s' for '~s', reason: ~s", + [Value, Key, Reason]); +reason2msg({badres, #{resource := gateway, + gateway := GwName, + reason := not_found}}) -> + io_lib:format("The ~s gateway is unloaded", [GwName]); + +reason2msg({badres, #{resource := gateway, + gateway := GwName, + reason := already_exist}}) -> + io_lib:format("The ~s gateway already loaded", [GwName]); + +reason2msg({badres, #{resource := listener, + listener := {GwName, LType, LName}, + reason := not_found}}) -> + io_lib:format("Listener ~s not found", + [listener_id(GwName, LType, LName)]); + +reason2msg({badres, #{resource := listener, + listener := {GwName, LType, LName}, + reason := already_exist}}) -> + io_lib:format("The listener ~s of ~s already exist", + [listener_id(GwName, LType, LName), GwName]); + +reason2msg({badres, #{resource := authn, + gateway := GwName, + reason := not_found}}) -> + io_lib:format("The authentication not found on ~s", [GwName]); + +reason2msg({badres, #{resource := authn, + gateway := GwName, + reason := already_exist}}) -> + io_lib:format("The authentication already exist on ~s", [GwName]); + +reason2msg({badres, #{resource := listener_authn, + listener := {GwName, LType, LName}, + reason := not_found}}) -> + io_lib:format("The authentication not found on ~s", + [listener_id(GwName, LType, LName)]); + +reason2msg({badres, #{resource := listener_authn, + listener := {GwName, LType, LName}, + reason := already_exist}}) -> + io_lib:format("The authentication already exist on ~s", + [listener_id(GwName, LType, LName)]); +reason2msg(_) -> + error. + codestr(400) -> 'BAD_REQUEST'; codestr(401) -> 'NOT_SUPPORTED_NOW'; codestr(404) -> 'RESOURCE_NOT_FOUND'; diff --git a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl index cf27dc027..618908f92 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl @@ -29,6 +29,23 @@ gateway {} ">>). +%% The config with json format for mqtt-sn gateway +-define(CONF_MQTTSN, " +{\"idle_timeout\": \"30s\", + \"enable_stats\": true, + \"mountpoint\": \"mqttsn/\", + \"gateway_id\": 1, + \"broadcast\": true, + \"enable_qos3\": true, + \"predefined\": [{\"id\": 1001, \"topic\": \"pred/a\"}], + \"listeners\": + [{\"type\": \"udp\", + \"name\": \"ct\", + \"bind\": \"2885\" + }] +} +"). + %%-------------------------------------------------------------------- %% Setup %%-------------------------------------------------------------------- @@ -109,16 +126,61 @@ t_gateway_list(_) -> "Gateway(name=stomp, status=unloaded)\n" , acc_print()). -t_gateway_load(_) -> +t_gateway_load_unload_lookup(_) -> + emqx_gateway_cli:gateway(["lookup", "mqttsn"]), + ?assertEqual("undefined\n", acc_print()), + + emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]), + ?assertEqual("ok\n", acc_print()), + + %% TODO: bad config name, format??? + + emqx_gateway_cli:gateway(["lookup", "mqttsn"]), + %% TODO: assert it. for example: + %% name: mqttsn + %% status: running + %% created_at: 2022-01-05T14:40:20.039+08:00 + %% started_at: 2022-01-05T14:42:37.894+08:00 + %% config: #{broadcast => false,enable => true,enable_qos3 => true, + %% enable_stats => true,gateway_id => 1,idle_timeout => 30000, + %% mountpoint => <<>>,predefined => []} + _ = acc_print(), + + emqx_gateway_cli:gateway(["load", "mqttsn", "{}"]), + ?assertEqual( + "Error: The mqttsn gateway already loaded\n" + , acc_print()), + + emqx_gateway_cli:gateway(["load", "bad-gw-name", "{}"]), + %% TODO: assert it. for example: + %% Error: Illegal gateway name + _ = acc_print(), + + emqx_gateway_cli:gateway(["unload", "mqttsn"]), + ?assertEqual("ok\n", acc_print()), + %% Always return ok, even the gateway has unloaded + emqx_gateway_cli:gateway(["unload", "mqttsn"]), + ?assertEqual("ok\n", acc_print()), + + emqx_gateway_cli:gateway(["lookup", "mqttsn"]), + ?assertEqual("undefined\n", acc_print()), ok. -t_gateway_unload(_) -> - ok. +t_gateway_start_stop(_) -> + emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]), + ?assertEqual("ok\n", acc_print()), -t_gateway_start(_) -> - ok. + emqx_gateway_cli:gateway(["stop", "mqttsn"]), + ?assertEqual("ok\n", acc_print()), + %% dupliacted stop gateway, return ok + emqx_gateway_cli:gateway(["stop", "mqttsn"]), + ?assertEqual("ok\n", acc_print()), -t_gateway_stop(_) -> + emqx_gateway_cli:gateway(["start", "mqttsn"]), + ?assertEqual("ok\n", acc_print()), + %% dupliacted start gateway, return ok + emqx_gateway_cli:gateway(["start", "mqttsn"]), + ?assertEqual("ok\n", acc_print()), ok. t_gateway_clients_usage(_) -> From 0e011ec4b8577ab3d252d41f13fd9eb4be8ec3f4 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 6 Jan 2022 11:05:20 +0800 Subject: [PATCH 02/11] test(gw): more testcases for emqx_gateway_cli module --- apps/emqx_gateway/src/emqx_gateway_cli.erl | 19 ++-- .../test/emqx_gateway_cli_SUITE.erl | 88 ++++++++++++++++--- .../test/emqx_sn_protocol_SUITE.erl | 2 +- 3 files changed, 90 insertions(+), 19 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index c9ed51d92..5aaede248 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -135,7 +135,7 @@ gateway(_) -> ]). 'gateway-clients'(["list", Name]) -> - %% FIXME: page me? + %% XXX: page me? InfoTab = emqx_gateway_cm:tabname(info, Name), case ets:info(InfoTab) of undefined -> @@ -146,12 +146,17 @@ gateway(_) -> 'gateway-clients'(["lookup", Name, ClientId]) -> ChanTab = emqx_gateway_cm:tabname(chan, Name), - case ets:lookup(ChanTab, bin(ClientId)) of - [] -> print("Not Found.\n"); - [Chann] -> - InfoTab = emqx_gateway_cm:tabname(info, Name), - [ChannInfo] = ets:lookup(InfoTab, Chann), - print_record({client, ChannInfo}) + case ets:info(ChanTab) of + undefined -> + print("Bad Gateway Name.\n"); + _ -> + case ets:lookup(ChanTab, bin(ClientId)) of + [] -> print("Not Found.\n"); + [Chann] -> + InfoTab = emqx_gateway_cm:tabname(info, Name), + [ChannInfo] = ets:lookup(InfoTab, Chann), + print_record({client, ChannInfo}) + end end; 'gateway-clients'(["kick", Name, ClientId]) -> diff --git a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl index 618908f92..501d9b44d 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl @@ -41,7 +41,7 @@ gateway {} \"listeners\": [{\"type\": \"udp\", \"name\": \"ct\", - \"bind\": \"2885\" + \"bind\": \"1884\" }] } "). @@ -163,8 +163,7 @@ t_gateway_load_unload_lookup(_) -> ?assertEqual("ok\n", acc_print()), emqx_gateway_cli:gateway(["lookup", "mqttsn"]), - ?assertEqual("undefined\n", acc_print()), - ok. + ?assertEqual("undefined\n", acc_print()). t_gateway_start_stop(_) -> emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]), @@ -181,22 +180,78 @@ t_gateway_start_stop(_) -> %% dupliacted start gateway, return ok emqx_gateway_cli:gateway(["start", "mqttsn"]), ?assertEqual("ok\n", acc_print()), - ok. + + emqx_gateway_cli:gateway(["unload", "mqttsn"]), + ?assertEqual("ok\n", acc_print()). t_gateway_clients_usage(_) -> - ok. + ?assertEqual( + ["gateway-clients list " + "# List all clients for a gateway\n", + "gateway-clients lookup " + "# Lookup the Client Info for specified client\n", + "gateway-clients kick " + "# Kick out a client\n"], + emqx_gateway_cli:'gateway-clients'(usage) + ). -t_gateway_clients_list(_) -> - ok. +t_gateway_clients(_) -> + emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]), + ?assertEqual("ok\n", acc_print()), -t_gateway_clients_lookup(_) -> - ok. + Socket = sn_client_connect(<<"client1">>), + + _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]), + ClientDesc1 = acc_print(), + + _ = emqx_gateway_cli:'gateway-clients'(["lookup", "mqttsn", "client1"]), + ClientDesc2 = acc_print(), + ?assertEqual(ClientDesc1, ClientDesc2), + + sn_client_disconnect(Socket), + timer:sleep(500), + + _ = emqx_gateway_cli:'gateway-clients'(["lookup", "mqttsn", "bad-client"]), + ?assertEqual("Not Found.\n", acc_print()), + + _ = emqx_gateway_cli:'gateway-clients'(["lookup", "bad-gw", "bad-client"]), + ?assertEqual("Bad Gateway Name.\n", acc_print()), + + _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]), + %% no print for empty client list + + _ = emqx_gateway_cli:'gateway-clients'(["list", "bad-gw"]), + ?assertEqual("Bad Gateway Name.\n", acc_print()), + + emqx_gateway_cli:gateway(["unload", "mqttsn"]), + ?assertEqual("ok\n", acc_print()). t_gateway_clients_kick(_) -> - ok. + emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]), + ?assertEqual("ok\n", acc_print()), + + Socket = sn_client_connect(<<"client1">>), + + _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]), + _ = acc_print(), + + _ = emqx_gateway_cli:'gateway-clients'(["kick", "mqttsn", "bad-client"]), + ?assertEqual("Not Found.\n", acc_print()), + + _ = emqx_gateway_cli:'gateway-clients'(["kick", "mqttsn", "client1"]), + ?assertEqual("ok\n", acc_print()), + + sn_client_disconnect(Socket), + + emqx_gateway_cli:gateway(["unload", "mqttsn"]), + ?assertEqual("ok\n", acc_print()). t_gateway_metrcis_usage(_) -> - ok. + ?assertEqual( + [ "gateway-metrics " + "# List all metrics for a gateway\n"], + emqx_gateway_cli:'gateway-metrics'(usage) + ). t_gateway_metrcis(_) -> ok. @@ -210,3 +265,14 @@ acc_print(Acc) -> after 200 -> Acc end. + +sn_client_connect(ClientId) -> + {ok, Socket} = gen_udp:open(0, [binary]), + _ = emqx_sn_protocol_SUITE:send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, 16#05, 0>>, + emqx_sn_protocol_SUITE:receive_response(Socket)), + Socket. + +sn_client_disconnect(Socket) -> + _ = emqx_sn_protocol_SUITE:send_disconnect_msg(Socket, undefined), + gen_udp:close(Socket), ok. diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 933776d51..fff7fb8e2 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module (emqx_sn_protocol_SUITE). +-module(emqx_sn_protocol_SUITE). -compile(export_all). -compile(nowarn_export_all). From 056e284bc2966d6b355b4a8575968b9c2fb48741 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 6 Jan 2022 14:14:05 +0800 Subject: [PATCH 03/11] test(gw): more testcases for emqx_gateway_cm_registry --- .../src/emqx_gateway_cm_registry.erl | 55 ++++++----- .../test/emqx_gateway_cm_registry_SUITE.erl | 97 +++++++++++++++++++ 2 files changed, 127 insertions(+), 25 deletions(-) create mode 100644 apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 914cf1cae..108db3216 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -17,6 +17,8 @@ %% @doc The gateway connection registry -module(emqx_gateway_cm_registry). +-include("include/emqx_gateway.hrl"). + -behaviour(gen_server). -export([start_link/1]). @@ -27,6 +29,8 @@ -export([lookup_channels/2]). +-export([tabname/1]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -41,39 +45,40 @@ -record(channel, {chid, pid}). -%% @doc Start the global channel registry. --spec(start_link(atom()) -> gen_server:startlink_ret()). -start_link(Type) -> - gen_server:start_link(?MODULE, [Type], []). +%% @doc Start the global channel registry for the gived gateway name. +-spec(start_link(gateway_name()) -> gen_server:startlink_ret()). +start_link(Name) -> + gen_server:start_link(?MODULE, [Name], []). --spec tabname(atom()) -> atom(). -tabname(Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_registry'])). +-spec tabname(gateway_name()) -> atom(). +tabname(Name) -> + %% XXX: unsafe ?? + list_to_atom(lists:concat([emqx_gateway_, Name, '_channel_registry'])). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Register a global channel. --spec register_channel(atom(), binary() | {binary(), pid()}) -> ok. -register_channel(Type, ClientId) when is_binary(ClientId) -> - register_channel(Type, {ClientId, self()}); +-spec register_channel(gateway_name(), binary() | {binary(), pid()}) -> ok. +register_channel(Name, ClientId) when is_binary(ClientId) -> + register_channel(Name, {ClientId, self()}); -register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - mria:dirty_write(tabname(Type), record(ClientId, ChanPid)). +register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> + mria:dirty_write(tabname(Name), record(ClientId, ChanPid)). %% @doc Unregister a global channel. --spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok. -unregister_channel(Type, ClientId) when is_binary(ClientId) -> - unregister_channel(Type, {ClientId, self()}); +-spec unregister_channel(gateway_name(), binary() | {binary(), pid()}) -> ok. +unregister_channel(Name, ClientId) when is_binary(ClientId) -> + unregister_channel(Name, {ClientId, self()}); -unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). +unregister_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> + mria:dirty_delete_object(tabname(Name), record(ClientId, ChanPid)). %% @doc Lookup the global channels. --spec lookup_channels(atom(), binary()) -> list(pid()). -lookup_channels(Type, ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Type), ClientId)]. +-spec lookup_channels(gateway_name(), binary()) -> list(pid()). +lookup_channels(Name, ClientId) -> + [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)]. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. @@ -82,8 +87,8 @@ record(ClientId, ChanPid) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Type]) -> - Tab = tabname(Type), +init([Name]) -> + Tab = tabname(Name), ok = mria:create_table(Tab, [ {type, bag}, {rlog_shard, ?CM_SHARD}, @@ -94,7 +99,7 @@ init([Type]) -> {write_concurrency, true}]}]}]), ok = mria:wait_for_tables([Tab]), ok = ekka:monitor(membership), - {ok, #{type => Type}}. + {ok, #{name => Name}}. handle_call(Req, _From, State) -> logger:error("Unexpected call: ~p", [Req]), @@ -104,8 +109,8 @@ handle_cast(Msg, State) -> logger:error("Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) -> - Tab = tabname(Type), +handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) -> + Tab = tabname(Name), global:trans({?LOCK, self()}, fun() -> mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl new file mode 100644 index 000000000..1541ed281 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl @@ -0,0 +1,97 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_cm_registry_SUITE). + +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(GWNAME, mqttsn). +-define(CLIENTID, <<"client1">>). + +-define(CONF_DEFAULT, <<"gateway {}">>). + +%%-------------------------------------------------------------------- +%% setups +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + emqx_config:erase(gateway), + emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_common_test_helpers:start_apps([]), + Conf. + +end_per_suite(_Conf) -> + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(_TestCase, Conf) -> + {ok, Pid} = emqx_gateway_cm_registry:start_link(?GWNAME), + [{registry, Pid} | Conf]. + +end_per_testcase(_TestCase, Conf) -> + Pid = proplists:get_value(registry, Conf), + gen_server:stop(Pid), + Conf. + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_tabname(_) -> + ?assertEqual( + emqx_gateway_gw_name_channel_registry, + emqx_gateway_cm_registry:tabname(gw_name)). + +t_register_unregister_channel(_) -> + ok = emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID), + ?assertEqual( + [{channel, ?CLIENTID, self()}], + ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))), + + ?assertEqual( + [self()], + emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)), + + ok = emqx_gateway_cm_registry:unregister_channel(?GWNAME, ?CLIENTID), + + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))), + ?assertEqual( + [], + emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)). + +t_cleanup_channels(Conf) -> + Pid = proplists:get_value(registry, Conf), + emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID), + ?assertEqual( + [self()], + emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)), + Pid ! {membership, {mnesia, down, node()}}, + ct:sleep(100), + ?assertEqual( + [], + emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)). + +t_unexpected_msg_handling(Conf) -> + Pid = proplists:get_value(registry, Conf), + _ = Pid ! unexpected_info, + ok = gen_server:cast(Pid, unexpected_cast), + ignored = gen_server:call(Pid, unexpected_call). From a829b0b9d0e08d60a88171314b2b91fd6c7fc3b8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 6 Jan 2022 15:51:44 +0800 Subject: [PATCH 04/11] test(gw): more testcases for emqx_gateway module --- apps/emqx_gateway/src/emqx_gateway.erl | 3 + apps/emqx_gateway/src/emqx_gateway_gw_sup.erl | 1 - .../src/emqx_gateway_insta_sup.erl | 3 +- apps/emqx_gateway/src/emqx_gateway_sup.erl | 3 +- apps/emqx_gateway/test/emqx_gateway_SUITE.erl | 100 ++++++++++++++++++ .../test/emqx_gateway_cm_registry_SUITE.erl | 2 +- .../test/emqx_gateway_registry_SUITE.erl | 6 ++ 7 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 apps/emqx_gateway/test/emqx_gateway_SUITE.erl diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 9421229f7..74a8244d4 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -41,6 +41,7 @@ registered_gateway() -> %%-------------------------------------------------------------------- %% Gateway APIs +%% @doc List the load gateways -spec list() -> [gateway()]. list() -> emqx_gateway_sup:list_gateway_insta(). @@ -65,6 +66,8 @@ lookup(Name) -> -spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}. %% @doc This function only supports full configuration updates +%% +%% Note: If the `enable` option is missing, it will be set to true by default update(Name, Config) -> emqx_gateway_sup:update_gateway(Name, Config). diff --git a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl index fa55ceb8d..9ca806213 100644 --- a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl @@ -51,7 +51,6 @@ create_insta(Sup, Gateway = #{name := Name}, GwDscrptr) -> {ok, _GwInstaPid} -> {error, alredy_existed}; false -> Ctx = ctx(Sup, Name), - %% ChildSpec = emqx_gateway_utils:childspec( Name, worker, diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 30cebb3cc..096ab1f81 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -308,8 +308,7 @@ do_update_one_by_one(NCfg, State = #state{ name = GwName, config = OCfg, status = Status}) -> - OEnable = maps:get(enable, OCfg, true), - NEnable = maps:get(enable, NCfg, OEnable), + NEnable = maps:get(enable, NCfg, true), OAuths = authns(GwName, OCfg), NAuths = authns(GwName, NCfg), diff --git a/apps/emqx_gateway/src/emqx_gateway_sup.erl b/apps/emqx_gateway/src/emqx_gateway_sup.erl index 9a969a5ce..499132d35 100644 --- a/apps/emqx_gateway/src/emqx_gateway_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_sup.erl @@ -59,7 +59,8 @@ load_gateway(Gateway = #{name := GwName}) -> unload_gateway(GwName) -> case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of false -> {error, not_found}; - _ -> + {_Id, Pid, _Type, _Mods} -> + _ = emqx_gateway_gw_sup:remove_insta(Pid, GwName), _ = supervisor:terminate_child(?MODULE, GwName), _ = supervisor:delete_child(?MODULE, GwName), ok diff --git a/apps/emqx_gateway/test/emqx_gateway_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl new file mode 100644 index 000000000..dc6ab3a9c --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl @@ -0,0 +1,100 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_SUITE). + +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(GWNAME, mqttsn). +-define(CONF_DEFAULT, <<"gateway {}">>). + +%%-------------------------------------------------------------------- +%% setups +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + emqx_config:erase(gateway), + emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_common_test_helpers:start_apps([emqx_gateway]), + Conf. + +end_per_suite(_Conf) -> + emqx_common_test_helpers:stop_apps([emqx_gateway]). + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_registered_gateway(_) -> + [{coap, #{cbkmod := emqx_coap_impl}}, + {exproto, #{cbkmod := emqx_exproto_impl}}, + {lwm2m, #{cbkmod := emqx_lwm2m_impl}}, + {mqttsn, #{cbkmod := emqx_sn_impl}}, + {stomp, #{cbkmod := emqx_stomp_impl}}] = emqx_gateway:registered_gateway(). + +t_load_unload_list_lookup(_) -> + {ok, _} = emqx_gateway:load(?GWNAME, #{idle_timeout => 1000}), + ?assertEqual( + {error, alredy_existed}, + emqx_gateway:load(?GWNAME, #{})), + ?assertEqual( + {error, {unknown_gateway_name, bad_gw_name}}, + emqx_gateway:load(bad_gw_name, #{})), + + ?assertEqual(1, length(emqx_gateway:list())), + ?assertEqual( + emqx_gateway:lookup(?GWNAME), + lists:nth(1, emqx_gateway:list())), + + ?assertEqual(ok, emqx_gateway:unload(?GWNAME)), + ?assertEqual({error, not_found}, emqx_gateway:unload(?GWNAME)). + +t_start_stop_update(_) -> + {ok, _} = emqx_gateway:load(?GWNAME, #{idle_timeout => 1000}), + + #{status := running} = emqx_gateway:lookup(?GWNAME), + + ok = emqx_gateway:stop(?GWNAME), + {error, already_stopped} = emqx_gateway:stop(?GWNAME), + + #{status := stopped} = emqx_gateway:lookup(?GWNAME), + + ok = emqx_gateway:update( + ?GWNAME, #{enable => false, idle_timeout => 2000}), + #{status := stopped, + config := #{idle_timeout := 2000}} = emqx_gateway:lookup(?GWNAME), + + ok = emqx_gateway:update( + ?GWNAME, #{enable => true, idle_timeout => 3000}), + #{status := running, + config := #{idle_timeout := 3000}} = emqx_gateway:lookup(?GWNAME), + + ok = emqx_gateway:update( + ?GWNAME, #{enable => false, idle_timeout => 4000}), + #{status := stopped, + config := #{idle_timeout := 4000}} = emqx_gateway:lookup(?GWNAME), + + ok = emqx_gateway:start(?GWNAME), + #{status := running, + config := #{idle_timeout := 4000}} = emqx_gateway:lookup(?GWNAME), + + {error, already_started} = emqx_gateway:start(?GWNAME), + ok. diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl index 1541ed281..3cca034a6 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl @@ -90,7 +90,7 @@ t_cleanup_channels(Conf) -> [], emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)). -t_unexpected_msg_handling(Conf) -> +t_handle_unexpected_msg(Conf) -> Pid = proplists:get_value(registry, Conf), _ = Pid ! unexpected_info, ok = gen_server:cast(Pid, unexpected_cast), diff --git a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl index 993da10bb..63378f875 100644 --- a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl @@ -57,7 +57,13 @@ t_load_unload(_) -> {error, already_existed} = emqx_gateway_registry:reg(test, [{cbkmod, ?MODULE}]), + ok = emqx_gateway_registry:unreg(test), ok = emqx_gateway_registry:unreg(test), undefined = emqx_gateway_registry:lookup(test), OldCnt = length(emqx_gateway_registry:list()), ok. + +t_handle_unexpected_msg(_) -> + _ = emqx_gateway_registry ! unexpected_info, + ok = gen_server:cast(emqx_gateway_registry, unexpected_cast), + ok = gen_server:call(emqx_gateway_registry, unexpected_call). From 3caf0822c4d30e3ae332a437eaad8fe855d5df37 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 6 Jan 2022 18:26:02 +0800 Subject: [PATCH 05/11] test(gw): more testcases for emqx_gateway_metrics --- apps/emqx_gateway/src/emqx_gateway_cli.erl | 10 +-- .../emqx_gateway/src/emqx_gateway_metrics.erl | 13 +++- apps/emqx_gateway/src/emqx_gateway_utils.erl | 5 +- apps/emqx_gateway/test/emqx_gateway_SUITE.erl | 2 +- .../test/emqx_gateway_api_SUITE.erl | 20 ++--- .../test/emqx_gateway_cli_SUITE.erl | 12 ++- .../test/emqx_gateway_metrics_SUITE.erl | 76 +++++++++++++++++++ 7 files changed, 117 insertions(+), 21 deletions(-) create mode 100644 apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index 5aaede248..24bf559c1 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -175,15 +175,13 @@ gateway(_) -> ]). 'gateway-metrics'([Name]) -> - Tab = emqx_gateway_metrics:tabname(Name), - case ets:info(Tab) of + case emqx_gateway_metrics:lookup(atom(Name)) of undefined -> print("Bad Gateway Name.\n"); - _ -> + Metrics -> lists:foreach( - fun({K, V}) -> - print("~-30s: ~w\n", [K, V]) - end, lists:sort(ets:tab2list(Tab))) + fun({K, V}) -> print("~-30s: ~w\n", [K, V]) end, + Metrics) end; 'gateway-metrics'(_) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index 9f1258e6d..d2a9d7442 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -20,7 +20,6 @@ -include_lib("emqx_gateway/include/emqx_gateway.hrl"). - %% APIs -export([start_link/1]). @@ -30,6 +29,8 @@ , dec/3 ]). +-export([lookup/1]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -67,6 +68,16 @@ dec(GwName, Name) -> dec(GwName, Name, Oct) -> inc(GwName, Name, -Oct). +-spec lookup(gateway_name()) + -> undefined + | [{Name :: atom(), integer()}]. +lookup(GwName) -> + Tab = emqx_gateway_metrics:tabname(GwName), + case ets:info(Tab) of + undefined -> undefined; + _ -> lists:sort(ets:tab2list(Tab)) + end. + tabname(GwName) -> list_to_atom(lists:concat([emqx_gateway_, GwName, '_metrics'])). diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 1def6ebdb..e1c29c289 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -291,9 +291,8 @@ is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> end. %% same with emqx_authentication:global_chain/1 -global_chain(mqtt) -> - 'mqtt:global'; -global_chain('mqtt-sn') -> +-spec global_chain(GatewayName :: atom()) -> atom(). +global_chain('mqttsn') -> 'mqtt-sn:global'; global_chain(coap) -> 'coap:global'; diff --git a/apps/emqx_gateway/test/emqx_gateway_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl index dc6ab3a9c..d6da58df3 100644 --- a/apps/emqx_gateway/test/emqx_gateway_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl @@ -81,7 +81,7 @@ t_start_stop_update(_) -> ?GWNAME, #{enable => false, idle_timeout => 2000}), #{status := stopped, config := #{idle_timeout := 2000}} = emqx_gateway:lookup(?GWNAME), - + ok = emqx_gateway:update( ?GWNAME, #{enable => true, idle_timeout => 3000}), #{status := running, diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl index e2375decb..12b0ef7b5 100644 --- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl @@ -30,9 +30,7 @@ %% this parses to #{}, will not cause config cleanup %% so we will need call emqx_config:erase --define(CONF_DEFAULT, <<" -gateway {} -">>). +-define(CONF_DEFAULT, <<"gateway {}">>). %%-------------------------------------------------------------------- %% Setup @@ -307,6 +305,10 @@ t_listeners_authn(_) -> {200, ConfResp3} = request(get, Path), assert_confs(AuthConf2, ConfResp3), + + {204, _} = request(delete, Path), + %% FIXME: 204? + {204, _} = request(get, Path), {204, _} = request(delete, "/gateway/stomp"). t_listeners_authn_data_mgmt(_) -> @@ -340,32 +342,32 @@ t_listeners_authn_data_mgmt(_) -> {200, #{data := [UserRespd1]} } = request( get, - "/gateway/stomp/listeners/stomp:tcp:def/authentication/users"), + Path ++ "/users"), assert_confs(UserRespd1, User1), {200, UserRespd2} = request( get, - "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"), + Path ++ "/users/test"), assert_confs(UserRespd2, User1), {200, UserRespd3} = request( put, - "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test", + Path ++ "/users/test", #{password => <<"654321">>, is_superuser => true}), assert_confs(UserRespd3, User1#{is_superuser => true}), {200, UserRespd4} = request( get, - "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"), + Path ++ "/users/test"), assert_confs(UserRespd4, User1#{is_superuser => true}), {204, _} = request( delete, - "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"), + Path ++ "/users/test"), {200, #{data := []}} = request( get, - "/gateway/stomp/listeners/stomp:tcp:def/authentication/users"), + Path ++ "/users"), {204, _} = request(delete, "/gateway/stomp"). %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl index 501d9b44d..24dacf750 100644 --- a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl @@ -124,7 +124,17 @@ t_gateway_list(_) -> "Gateway(name=lwm2m, status=unloaded)\n" "Gateway(name=mqttsn, status=unloaded)\n" "Gateway(name=stomp, status=unloaded)\n" - , acc_print()). + , acc_print()), + + emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]), + ?assertEqual("ok\n", acc_print()), + + emqx_gateway_cli:gateway(["list"]), + %% TODO: assert it. + _ = acc_print(), + + emqx_gateway_cli:gateway(["unload", "mqttsn"]), + ?assertEqual("ok\n", acc_print()). t_gateway_load_unload_lookup(_) -> emqx_gateway_cli:gateway(["lookup", "mqttsn"]), diff --git a/apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl new file mode 100644 index 000000000..b55de1738 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl @@ -0,0 +1,76 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_metrics_SUITE). + +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(GWNAME, mqttsn). +-define(METRIC, 'ct.test.metrics_name'). +-define(CONF_DEFAULT, <<"gateway {}">>). + +%%-------------------------------------------------------------------- +%% setups +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + emqx_config:erase(gateway), + emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_common_test_helpers:start_apps([]), + Conf. + +end_per_suite(_Conf) -> + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(_TestCase, Conf) -> + {ok, Pid} = emqx_gateway_metrics:start_link(?GWNAME), + [{metrics, Pid} | Conf]. + +end_per_testcase(_TestCase, Conf) -> + Pid = proplists:get_value(metrics, Conf), + gen_server:stop(Pid), + Conf. + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_inc_dec(_) -> + ok = emqx_gateway_metrics:inc(?GWNAME, ?METRIC), + ok = emqx_gateway_metrics:inc(?GWNAME, ?METRIC), + + ?assertEqual( + [{?METRIC, 2}], + emqx_gateway_metrics:lookup(?GWNAME)), + + ok = emqx_gateway_metrics:dec(?GWNAME, ?METRIC), + ok = emqx_gateway_metrics:dec(?GWNAME, ?METRIC), + + ?assertEqual( + [{?METRIC, 0}], + emqx_gateway_metrics:lookup(?GWNAME)). + +t_handle_unexpected_msg(Conf) -> + Pid = proplists:get_value(metrics, Conf), + _ = Pid ! unexpected_info, + ok = gen_server:cast(Pid, unexpected_cast), + ok = gen_server:call(Pid, unexpected_call), + ok. From e9e559ccd0db278f667bf00132f66897ed7d2769 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 10 Jan 2022 16:33:19 +0800 Subject: [PATCH 06/11] test: more code coverage for emqx_gateway_api_clients --- .../src/coap/emqx_coap_channel.erl | 1 + .../src/coap/emqx_coap_session.erl | 4 +- .../src/emqx_gateway_api_clients.erl | 42 +-- .../src/exproto/emqx_exproto_channel.erl | 2 +- .../src/mqttsn/emqx_sn_channel.erl | 2 +- apps/emqx_gateway/test/emqx_coap_SUITE.erl | 295 +++++++++++------- .../test/emqx_gateway_test_utils.erl | 2 + apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl | 81 ++++- .../test/emqx_sn_protocol_SUITE.erl | 72 ++++- apps/emqx_management/src/emqx_mgmt_api.erl | 45 +++ .../src/emqx_mgmt_api_clients.erl | 51 +-- .../test/emqx_mgmt_api_clients_SUITE.erl | 6 +- 12 files changed, 405 insertions(+), 198 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index fcc240ca5..b4f9e30f0 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -247,6 +247,7 @@ handle_call({subscribe, Topic, SubOpts}, _From, %% modifty session state SubReq = {Topic, Token}, TempMsg = #coap_message{type = non}, + %% FIXME: The subopts is not used for emqx_coap_session Result = emqx_coap_session:process_subscribe( SubReq, TempMsg, #{}, Session), NSession = maps:get(session, Result), diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 52921c16a..108b4c55b 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -92,7 +92,9 @@ info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{observe_manager = OM}) -> Topics = emqx_coap_observe_res:subscriptions(OM), - lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics); + lists:foldl( + fun(T, Acc) -> Acc#{T => emqx_gateway_utils:default_subopts()} end, + #{}, Topics); info(subscriptions_cnt, #session{observe_manager = OM}) -> erlang:length(emqx_coap_observe_res:subscriptions(OM)); info(subscriptions_max, _) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 5be4adccf..86dbb69af 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -73,7 +73,7 @@ paths() -> , {<<"ip_address">>, ip} , {<<"conn_state">>, atom} , {<<"clean_start">>, atom} - , {<<"proto_ver">>, integer} + , {<<"proto_ver">>, binary} , {<<"like_clientid">>, binary} , {<<"like_username">>, binary} , {<<"gte_created_at">>, timestamp} @@ -83,15 +83,16 @@ paths() -> %% special keys for lwm2m protocol , {<<"endpoint_name">>, binary} , {<<"like_endpoint_name">>, binary} - , {<<"gte_lifetime">>, timestamp} - , {<<"lte_lifetime">>, timestamp} + , {<<"gte_lifetime">>, integer} + , {<<"lte_lifetime">>, integer} ]). -define(QUERY_FUN, {?MODULE, query}). clients(get, #{ bindings := #{name := Name0} - , query_string := Params + , query_string := Params0 }) -> + Params = emqx_mgmt_api:ensure_timestamp_format(Params0, time_keys()), with_gateway(Name0, fun(GwName, _) -> TabName = emqx_gateway_cm:tabname(info, GwName), case maps:get(<<"node">>, Params, undefined) of @@ -147,10 +148,6 @@ subscriptions(get, #{ bindings := #{name := Name0, ClientId = emqx_mgmt_util:urldecode(ClientId0), with_gateway(Name0, fun(GwName, _) -> case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of - {error, nosupport} -> - return_http_error(405, <<"Not support to list subscriptions">>); - {error, noimpl} -> - return_http_error(501, <<"Not implemented now">>); {error, Reason} -> return_http_error(500, Reason); {ok, Subs} -> @@ -171,14 +168,6 @@ subscriptions(post, #{ bindings := #{name := Name0, {Topic, SubOpts} -> case emqx_gateway_http:client_subscribe( GwName, ClientId, Topic, SubOpts) of - {error, nosupport} -> - return_http_error( - 405, - <<"Not support to add a subscription">>); - {error, noimpl} -> - return_http_error( - 501, - <<"Not implemented now">>); {error, Reason} -> return_http_error(404, Reason); {ok, {NTopic, NSubOpts}}-> @@ -221,6 +210,16 @@ extra_sub_props(Props) -> #{subid => maps:get(<<"subid">>, Props, undefined)} ). +%%-------------------------------------------------------------------- +%% QueryString data-fomrat convert +%% (try rfc3339 to timestamp or keep timestamp) + +time_keys() -> + [ <<"gte_created_at">> + , <<"lte_created_at">> + , <<"gte_connected_at">> + , <<"lte_connected_at">>]. + %%-------------------------------------------------------------------- %% query funcs @@ -264,10 +263,8 @@ ms(clientid, X) -> #{clientinfo => #{clientid => X}}; ms(username, X) -> #{clientinfo => #{username => X}}; -ms(zone, X) -> - #{clientinfo => #{zone => X}}; ms(ip_address, X) -> - #{clientinfo => #{peername => {X, '_'}}}; + #{clientinfo => #{peerhost => X}}; ms(conn_state, X) -> #{conn_state => X}; ms(clean_start, X) -> @@ -616,9 +613,6 @@ roots() -> , subscription ]. -fields(test) -> - [{key, mk(binary(), #{ desc => <<"Desc">>})}]; - fields(stomp_client) -> common_client_props(); fields(mqttsn_client) -> @@ -707,10 +701,6 @@ common_client_props() -> %, {will_msg, % mk(binary(), % #{ desc => <<"Client will message">>})} - %, {zone, - % mk(binary(), - % #{ desc => <<"Indicate the configuration group used by the " - % "client">>})} , {keepalive, mk(integer(), #{ desc => <<"keepalive time, with the unit of second">>})} diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 41b795b7b..843cf39d5 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -451,7 +451,7 @@ do_subscribe(TopicFilter, SubOpts, Channel = subscriptions = Subs}) -> %% Mountpoint first NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), - NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), SubId = maps:get(clientid, ClientInfo, undefined), %% XXX: is_new? IsNew = not maps:is_key(NTopicFilter, Subs), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 58cc24f9d..aee4189a5 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -931,7 +931,7 @@ do_subscribe({TopicId, TopicName, SubOpts}, clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) -> NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), - NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of {ok, NSession} -> {ok, {TopicId, NTopicName, NSubOpts}, diff --git a/apps/emqx_gateway/test/emqx_coap_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_SUITE.erl index 8b336252b..e35c6e2db 100644 --- a/apps/emqx_gateway/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_coap_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ request/2 + , request/3 + ]). + -include_lib("er_coap_client/include/coap.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -48,114 +53,115 @@ gateway.coap all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1), + ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), Config. -set_special_cfg(emqx_gateway) -> - ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT); - -set_special_cfg(_) -> - ok. - -end_per_suite(Config) -> +end_per_suite(_) -> {ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]), - emqx_common_test_helpers:stop_apps([emqx_gateway]), - Config. + emqx_mgmt_api_test_util:end_suite([emqx_gateway]). %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- -t_connection(_Config) -> + +t_connection(_) -> Action = fun(Channel) -> - %% connection - Token = connection(Channel), + %% connection + Token = connection(Channel), - timer:sleep(100), - ?assertNotEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)), + timer:sleep(100), + ?assertNotEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)), - %% heartbeat - HeartURI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token, - ?LOGT("send heartbeat request:~ts~n", [HeartURI]), - {ok, changed, _} = er_coap_client:request(put, HeartURI), + %% heartbeat + HeartURI = ?MQTT_PREFIX ++ + "/connection?clientid=client1&token=" ++ + Token, - disconnection(Channel, Token), + ?LOGT("send heartbeat request:~ts~n", [HeartURI]), + {ok, changed, _} = er_coap_client:request(put, HeartURI), - timer:sleep(100), - ?assertEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)) - end, + disconnection(Channel, Token), + + timer:sleep(100), + ?assertEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)) + end, do(Action). - -t_publish(_Config) -> +t_publish(_) -> Action = fun(Channel, Token) -> - Topic = <<"/abc">>, - Payload = <<"123">>, + Topic = <<"/abc">>, + Payload = <<"123">>, - TopicStr = binary_to_list(Topic), - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + TopicStr = binary_to_list(Topic), + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - %% Sub topic first - emqx:subscribe(Topic), + %% Sub topic first + emqx:subscribe(Topic), - Req = make_req(post, Payload), - {ok, changed, _} = do_request(Channel, URI, Req), - - receive - {deliver, Topic, Msg} -> - ?assertEqual(Topic, Msg#message.topic), - ?assertEqual(Payload, Msg#message.payload) - after - 500 -> - ?assert(false) - end - end, + Req = make_req(post, Payload), + {ok, changed, _} = do_request(Channel, URI, Req), + receive + {deliver, Topic, Msg} -> + ?assertEqual(Topic, Msg#message.topic), + ?assertEqual(Payload, Msg#message.payload) + after + 500 -> + ?assert(false) + end + end, with_connection(Action). - -%t_publish_authz_deny(_Config) -> +%t_publish_authz_deny(_) -> % Action = fun(Channel, Token) -> -% Topic = <<"/abc">>, -% Payload = <<"123">>, -% InvalidToken = lists:reverse(Token), +% Topic = <<"/abc">>, +% Payload = <<"123">>, +% InvalidToken = lists:reverse(Token), % -% TopicStr = binary_to_list(Topic), -% URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ InvalidToken, +% TopicStr = binary_to_list(Topic), +% URI = ?PS_PREFIX ++ +% TopicStr ++ +% "?clientid=client1&token=" ++ InvalidToken, % -% %% Sub topic first -% emqx:subscribe(Topic), +% %% Sub topic first +% emqx:subscribe(Topic), % -% Req = make_req(post, Payload), -% Result = do_request(Channel, URI, Req), -% ?assertEqual({error, reset}, Result) -% end, +% Req = make_req(post, Payload), +% Result = do_request(Channel, URI, Req), +% ?assertEqual({error, reset}, Result) +% end, % % with_connection(Action). -t_subscribe(_Config) -> +t_subscribe(_) -> Topic = <<"/abc">>, Fun = fun(Channel, Token) -> - TopicStr = binary_to_list(Topic), - Payload = <<"123">>, + TopicStr = binary_to_list(Topic), + Payload = <<"123">>, - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - Req = make_req(get, Payload, [{observe, 0}]), - {ok, content, _} = do_request(Channel, URI, Req), - ?LOGT("observer topic:~ts~n", [Topic]), + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), - timer:sleep(100), - [SubPid] = emqx:subscribers(Topic), - ?assert(is_pid(SubPid)), + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), - %% Publish a message - emqx:publish(emqx_message:make(Topic, Payload)), - {ok, content, Notify} = with_response(Channel), - ?LOGT("observer get Notif=~p", [Notify]), + %% Publish a message + emqx:publish(emqx_message:make(Topic, Payload)), + {ok, content, Notify} = with_response(Channel), + ?LOGT("observer get Notif=~p", [Notify]), - #coap_content{payload = PayloadRecv} = Notify, + #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) - end, + ?assertEqual(Payload, PayloadRecv) + end, with_connection(Fun), timer:sleep(100), @@ -163,63 +169,117 @@ t_subscribe(_Config) -> ?assertEqual([], emqx:subscribers(Topic)). -t_un_subscribe(_Config) -> +t_un_subscribe(_) -> Topic = <<"/abc">>, Fun = fun(Channel, Token) -> - TopicStr = binary_to_list(Topic), - Payload = <<"123">>, + TopicStr = binary_to_list(Topic), + Payload = <<"123">>, - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - Req = make_req(get, Payload, [{observe, 0}]), - {ok, content, _} = do_request(Channel, URI, Req), - ?LOGT("observer topic:~ts~n", [Topic]), + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), - timer:sleep(100), - [SubPid] = emqx:subscribers(Topic), - ?assert(is_pid(SubPid)), + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), - UnReq = make_req(get, Payload, [{observe, 1}]), - {ok, nocontent, _} = do_request(Channel, URI, UnReq), - ?LOGT("un observer topic:~ts~n", [Topic]), - timer:sleep(100), - ?assertEqual([], emqx:subscribers(Topic)) - end, + UnReq = make_req(get, Payload, [{observe, 1}]), + {ok, nocontent, _} = do_request(Channel, URI, UnReq), + ?LOGT("un observer topic:~ts~n", [Topic]), + timer:sleep(100), + ?assertEqual([], emqx:subscribers(Topic)) + end, with_connection(Fun). -t_observe_wildcard(_Config) -> +t_observe_wildcard(_) -> Fun = fun(Channel, Token) -> - %% resolve_url can't process wildcard with # - Topic = <<"/abc/+">>, - TopicStr = binary_to_list(Topic), - Payload = <<"123">>, + %% resolve_url can't process wildcard with # + Topic = <<"/abc/+">>, + TopicStr = binary_to_list(Topic), + Payload = <<"123">>, - URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, - Req = make_req(get, Payload, [{observe, 0}]), - {ok, content, _} = do_request(Channel, URI, Req), - ?LOGT("observer topic:~ts~n", [Topic]), + URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, + Req = make_req(get, Payload, [{observe, 0}]), + {ok, content, _} = do_request(Channel, URI, Req), + ?LOGT("observer topic:~ts~n", [Topic]), - timer:sleep(100), - [SubPid] = emqx:subscribers(Topic), - ?assert(is_pid(SubPid)), + timer:sleep(100), + [SubPid] = emqx:subscribers(Topic), + ?assert(is_pid(SubPid)), - %% Publish a message - PubTopic = <<"/abc/def">>, - emqx:publish(emqx_message:make(PubTopic, Payload)), - {ok, content, Notify} = with_response(Channel), + %% Publish a message + PubTopic = <<"/abc/def">>, + emqx:publish(emqx_message:make(PubTopic, Payload)), + {ok, content, Notify} = with_response(Channel), - ?LOGT("observer get Notif=~p", [Notify]), + ?LOGT("observer get Notif=~p", [Notify]), - #coap_content{payload = PayloadRecv} = Notify, + #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) - end, + ?assertEqual(Payload, PayloadRecv) + end, with_connection(Fun). +t_clients_api(_) -> + Fun = fun(_Channel, _Token) -> + ClientId = <<"client1">>, + %% list + {200, #{data := [Client1]}} = request(get, "/gateway/coap/clients"), + #{clientid := ClientId} = Client1, + %% searching + {200, #{data := [Client2]}} = + request(get, "/gateway/coap/clients", + [{<<"clientid">>, ClientId}]), + {200, #{data := [Client3]}} = + request(get, "/gateway/coap/clients", + [{<<"like_clientid">>, <<"cli">>}]), + %% lookup + {200, Client4} = + request(get, "/gateway/coap/clients/client1"), + %% assert + Client1 = Client2 = Client3 = Client4, + %% kickout + {204, _} = + request(delete, "/gateway/coap/clients/client1"), + {200, #{data := []}} = request(get, "/gateway/coap/clients") + end, + with_connection(Fun). + +t_clients_subscription_api(_) -> + Fun = fun(_Channel, _Token) -> + Path = "/gateway/coap/clients/client1/subscriptions", + %% list + {200, []} = request(get, Path), + %% create + SubReq = #{ topic => <<"tx">> + , qos => 0 + , nl => 0 + , rap => 0 + , rh => 0 + }, + + {201, SubsResp} = request(post, Path, SubReq), + {200, [SubsResp2]} = request(get, Path), + ?assertEqual( + maps:get(topic, SubsResp), + maps:get(topic, SubsResp2)), + + {204, _} = request(delete, Path ++ "/tx"), + + {200, []} = request(get, Path) + end, + with_connection(Fun). + +%%-------------------------------------------------------------------- +%% helpers + connection(Channel) -> - URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&username=admin&password=public", + URI = ?MQTT_PREFIX ++ + "/connection?clientid=client1&username=admin&password=public", Req = make_req(post), {ok, created, Data} = do_request(Channel, URI, Req), #coap_content{payload = BinToken} = Data, @@ -252,7 +312,8 @@ do_request(Channel, URI, #coap_message{options = Opts} = Req) -> with_response(Channel) -> receive - {coap_response, _ChId, Channel, _Ref, Message=#coap_message{method=Code}} -> + {coap_response, _ChId, Channel, + _Ref, Message=#coap_message{method=Code}} -> return_response(Code, Message); {coap_error, _ChId, Channel, _Ref, reset} -> {error, reset} @@ -280,10 +341,10 @@ do(Fun) -> with_connection(Action) -> Fun = fun(Channel) -> - Token = connection(Channel), - timer:sleep(100), - Action(Channel, Token), - disconnection(Channel, Token), - timer:sleep(100) - end, + Token = connection(Channel), + timer:sleep(100), + Action(Channel, Token), + disconnection(Channel, Token), + timer:sleep(100) + end, do(Fun). diff --git a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl index d2daf48b4..c96dee651 100644 --- a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl +++ b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl @@ -117,6 +117,8 @@ req(Path, Qs) -> req(Path, Qs, Body) -> {url(Path, Qs), auth([]), "application/json", emqx_json:encode(Body)}. +url(Path, []) -> + lists:concat([?http_api_host, Path]); url(Path, Qs) -> lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]). diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl index b6c1b837f..30e910f2b 100644 --- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ request/2 + , request/3 + ]). + -define(PORT, 5783). -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). @@ -66,9 +71,9 @@ all() -> , {group, test_grp_4_discover} , {group, test_grp_5_write_attr} , {group, test_grp_6_observe} - %% {group, test_grp_8_object_19} , {group, test_grp_9_psm_queue_mode} + , {group, test_grp_10_rest_api} ]. suite() -> [{timetrap, {seconds, 90}}]. @@ -147,21 +152,29 @@ groups() -> [ case90_psm_mode, case90_queue_mode + ]}, + {test_grp_10_rest_api, [RepeatOpt], + [ + case100_clients_api, + case100_subscription_api ]} ]. init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_conf]), + %% load application first for minirest api searching + application:load(emqx_gateway), + emqx_mgmt_api_test_util:init_suite([emqx_conf]), Config. end_per_suite(Config) -> timer:sleep(300), {ok, _} = emqx_conf:remove([<<"gateway">>,<<"lwm2m">>], #{}), - emqx_common_test_helpers:stop_apps([emqx_conf]), + emqx_mgmt_api_test_util:end_suite([emqx_conf]), Config. init_per_testcase(_AllTestCase, Config) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + {ok, _} = application:ensure_all_started(emqx_gateway), {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]), @@ -1887,6 +1900,68 @@ server_cache_mode(Config, RegOption) -> verify_read_response_1(2, UdpSock), verify_read_response_1(3, UdpSock). +case100_clients_api(Config) -> + Epn = "urn:oma:lwm2m:oma:3", + MsgId1 = 15, + UdpSock = ?config(sock, Config), + ObjectList = <<", , , , ">>, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic), + + %% list + {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"), + %% searching + {200, #{data := [Client2]}} = + request(get, "/gateway/lwm2m/clients", + [{<<"endpoint_name">>, list_to_binary(Epn)}]), + {200, #{data := [Client3]}} = + request(get, "/gateway/lwm2m/clients", + [{<<"like_endpoint_name">>, list_to_binary(Epn)}, + {<<"gte_lifetime">>, <<"1">>} + ]), + %% lookup + ClientId = maps:get(clientid, Client1), + {200, Client4} = + request(get, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)), + %% assert + Client1 = Client2 = Client3 = Client4, + %% kickout + {204, _} = + request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)), + {200, #{data := []}} = request(get, "/gateway/lwm2m/clients"). + +case100_subscription_api(Config) -> + Epn = "urn:oma:lwm2m:oma:3", + MsgId1 = 15, + UdpSock = ?config(sock, Config), + ObjectList = <<", , , , ">>, + RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"), + std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic), + + {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"), + ClientId = maps:get(clientid, Client1), + Path = "/gateway/lwm2m/clients/" ++ + binary_to_list(ClientId) ++ + "/subscriptions", + + %% list + {200, [InitSub]} = request(get, Path), + ?assertEqual( + <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/#">>, + maps:get(topic, InitSub)), + + %% create + SubReq = #{ topic => <<"tx">> + , qos => 1 + , nl => 0 + , rap => 0 + , rh => 0 + }, + {201, _} = request(post, Path, SubReq), + {200, _} = request(get, Path), + {204, _} = request(delete, Path ++ "/tx"), + {200, [InitSub]} = request(get, Path). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Internal Functions %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index fff7fb8e2..d3cae8596 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). +-import(emqx_gateway_test_utils, + [ request/2 + , request/3 + ]). + -include("src/mqttsn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -27,7 +32,6 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). - -define(HOST, {127,0,0,1}). -define(PORT, 1884). @@ -85,12 +89,12 @@ all() -> init_per_suite(Config) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_common_test_helpers:start_apps([emqx_gateway]), + emqx_mgmt_api_test_util:init_suite([emqx_gateway]), Config. end_per_suite(_) -> {ok, _} = emqx:remove_config([gateway, mqttsn]), - emqx_common_test_helpers:stop_apps([emqx_gateway]). + emqx_mgmt_api_test_util:end_suite([emqx_gateway]). %%-------------------------------------------------------------------- %% Test cases @@ -1762,6 +1766,68 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_clients_api(_) -> + TsNow = emqx_gateway_utils:unix_ts_to_rfc3339( + erlang:system_time(millisecond)), + ClientId = <<"client_id_test1">>, + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + %% list + {200, #{data := [Client1]}} = request(get, "/gateway/mqttsn/clients"), + #{clientid := ClientId} = Client1, + %% searching + {200, #{data := [Client2]}} = + request(get, "/gateway/mqttsn/clients", [{<<"clientid">>, ClientId}]), + {200, #{data := [Client3]}} = + request(get, "/gateway/mqttsn/clients", + [{<<"like_clientid">>, <<"test1">>}, + {<<"proto_ver">>, <<"1.2">>}, + {<<"ip_address">>, <<"127.0.0.1">>}, + {<<"conn_state">>, <<"connected">>}, + {<<"clean_start">>, <<"true">>}, + {<<"gte_connected_at">>, TsNow} + ]), + %% lookup + {200, Client4} = + request(get, "/gateway/mqttsn/clients/client_id_test1"), + %% assert + Client1 = Client2 = Client3 = Client4, + %% kickout + {204, _} = + request(delete, "/gateway/mqttsn/clients/client_id_test1"), + {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"), + + send_disconnect_msg(Socket, undefined), + gen_udp:close(Socket). + +t_clients_subscription_api(_) -> + ClientId = <<"client_id_test1">>, + Path = "/gateway/mqttsn/clients/client_id_test1/subscriptions", + {ok, Socket} = gen_udp:open(0, [binary]), + send_connect_msg(Socket, ClientId), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + %% list + {200, []} = request(get, Path), + %% create + SubReq = #{ topic => <<"tx">> + , qos => 1 + , nl => 0 + , rap => 0 + , rh => 0 + }, + {201, SubsResp} = request(post, Path, SubReq), + + {200, [SubsResp]} = request(get, Path), + + {204, _} = request(delete, Path ++ "/tx"), + + {200, []} = request(get, Path), + + send_disconnect_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), + gen_udp:close(Socket). + %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 2ff730a6a..6136cab3a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -35,6 +35,15 @@ -export([do_query/6]). +-export([ ensure_timestamp_format/2 + ]). + +-export([ unix_ts_to_rfc3339_bin/1 + , unix_ts_to_rfc3339_bin/2 + , time_string_to_unix_ts_int/1 + , time_string_to_unix_ts_int/2 + ]). + paginate(Tables, Params, {Module, FormatFun}) -> Qh = query_handle(Tables), Count = count(Tables), @@ -401,6 +410,7 @@ to_integer(B) when is_binary(B) -> to_timestamp(I) when is_integer(I) -> I; to_timestamp(B) when is_binary(B) -> + binary_to_integer(B). aton(B) when is_binary(B) -> @@ -412,6 +422,41 @@ to_ip_port(IPAddress) -> Port = list_to_integer(Port0), {IP, Port}. +%%-------------------------------------------------------------------- +%% time format funcs + +ensure_timestamp_format(Qs, TimeKeys) + when is_map(Qs); + is_list(TimeKeys) -> + Fun = fun (Key, NQs) -> + case NQs of + %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339) + %% or "1609430400000" (in millisecond) + #{Key := TimeString} -> + NQs#{Key => time_string_to_unix_ts_int(TimeString)}; + #{} -> NQs + end + end, + lists:foldl(Fun, Qs, TimeKeys). + +unix_ts_to_rfc3339_bin(TimeStamp) -> + unix_ts_to_rfc3339_bin(TimeStamp, millisecond). + +unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) -> + list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])). + +time_string_to_unix_ts_int(DateTime) -> + time_string_to_unix_ts_int(DateTime, millisecond). + +time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) -> + try binary_to_integer(DateTime) of + TimeStamp when is_integer(TimeStamp) -> TimeStamp + catch + error:badarg -> + calendar:rfc3339_to_system_time( + binary_to_list(DateTime), [{unit, Unit}]) + end. + %%-------------------------------------------------------------------- %% EUnits %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 007916d1b..be38e401d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -44,14 +44,6 @@ %% for batch operation -export([do_subscribe/3]). -%% for test suite --export([ unix_ts_to_rfc3339_bin/1 - , unix_ts_to_rfc3339_bin/2 - , time_string_to_unix_ts_int/1 - , time_string_to_unix_ts_int/2 - ]). - - -define(CLIENT_QS_SCHEMA, {emqx_channel_info, [ {<<"node">>, atom} , {<<"username">>, binary} @@ -463,7 +455,7 @@ keepalive_api() -> %%%============================================================================================== %% parameters trans clients(get, #{query_string := Qs}) -> - list(generate_qs(Qs)). + list(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())). client(get, #{bindings := Bindings}) -> lookup(Bindings); @@ -625,7 +617,8 @@ do_unsubscribe(ClientID, Topic) -> end. %%-------------------------------------------------------------------- -%% QueryString Generation (try rfc3339 to timestamp or keep timestamp) +%% QueryString data-fomrat convert +%% (try rfc3339 to timestamp or keep timestamp) time_keys() -> [ <<"gte_created_at">> @@ -633,18 +626,6 @@ time_keys() -> , <<"gte_connected_at">> , <<"lte_connected_at">>]. -generate_qs(Qs) -> - Fun = - fun (Key, NQs) -> - case NQs of - %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339) - %% or "1609430400000" (in millisecond) - #{Key := TimeString} -> NQs#{Key => time_string_to_unix_ts_int(TimeString)}; - #{} -> NQs - end - end, - lists:foldl(Fun, Qs, time_keys()). - %%-------------------------------------------------------------------- %% Query Functions @@ -778,8 +759,11 @@ take_maps_from_inner(Key, Value, Current) -> result_format_time_fun(Key, NClientInfoMap) -> case NClientInfoMap of - #{Key := TimeStamp} -> NClientInfoMap#{Key => unix_ts_to_rfc3339_bin(TimeStamp)}; - #{} -> NClientInfoMap + #{Key := TimeStamp} -> + NClientInfoMap#{ + Key => emqx_mgmt_api:unix_ts_to_rfc3339_bin(TimeStamp)}; + #{} -> + NClientInfoMap end. -spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}). @@ -795,22 +779,3 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) -> updated_time => Timestamp }. -%%-------------------------------------------------------------------- -%% time format funcs - -unix_ts_to_rfc3339_bin(TimeStamp) -> - unix_ts_to_rfc3339_bin(TimeStamp, millisecond). - -unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) -> - list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])). - -time_string_to_unix_ts_int(DateTime) -> - time_string_to_unix_ts_int(DateTime, millisecond). - -time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) -> - try binary_to_integer(DateTime) of - TimeStamp when is_integer(TimeStamp) -> TimeStamp - catch - error:badarg -> - calendar:rfc3339_to_system_time(binary_to_list(DateTime), [{unit, Unit}]) - end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index a80d862f7..3c4864ab7 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -129,7 +129,7 @@ t_query_clients_with_time(_) -> NowTimeStampInt = erlang:system_time(millisecond), %% Do not uri_encode `=` to `%3D` Rfc3339String = emqx_http_lib:uri_encode(binary:bin_to_list( - emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))), + emqx_mgmt_api:unix_ts_to_rfc3339_bin(NowTimeStampInt))), TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)), LteKeys = ["lte_created_at=", "lte_connected_at="], @@ -147,10 +147,10 @@ t_query_clients_with_time(_) -> || {ok, Response} <- RequestResults], {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults), %% EachData :: list() - [?assert( emqx_mgmt_api_clients:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt) + [?assert( emqx_mgmt_api:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt) || #{<<"data">> := EachData} <- LteResponseDecodeds, #{<<"created_at">> := CreatedAt} <- EachData], - [?assert(emqx_mgmt_api_clients:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt) + [?assert(emqx_mgmt_api:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt) || #{<<"data">> := EachData} <- LteResponseDecodeds, #{<<"connected_at">> := ConnectedAt} <- EachData], [?assertEqual(EachData, []) From 4ce11fec6e420eb6e0e67fda688317e58b239369 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 11 Jan 2022 15:04:32 +0800 Subject: [PATCH 07/11] fix(stomp): fix parsing rear frame split byte crash --- apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index bdd8c6e58..e079e5fe6 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -134,7 +134,7 @@ g(Key, Opts, Val) -> parse(<<>>, Parser) -> {more, Parser}; -parse(Bytes, #{phase := body, len := Len, state := State}) -> +parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); parse(Bytes, Parser = #{pre := Pre}) -> From 44ea85305911650761215b325261862e44bb8031 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 12 Jan 2022 18:09:28 +0800 Subject: [PATCH 08/11] test(gw): more coverage for emqx_gateway_cm --- apps/emqx_gateway/src/emqx_gateway_cm.erl | 28 +- .../src/emqx_gateway_cm_registry.erl | 18 +- .../test/emqx_gateway_cm_SUITE.erl | 248 ++++++++++++++++++ .../test/emqx_sn_protocol_SUITE.erl | 4 + 4 files changed, 276 insertions(+), 22 deletions(-) create mode 100644 apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index ee1ce19ef..647d3a6a7 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc The Gateway Connection-Manager +%% @doc The Gateway Channel Manager %% %% For a certain type of protocol, this is a single instance of the manager. %% It means that no matter how many instances of the stomp gateway are created, @@ -26,7 +26,6 @@ -include("include/emqx_gateway.hrl"). -include_lib("emqx/include/logger.hrl"). - %% APIs -export([start_link/1]). @@ -74,6 +73,8 @@ -type option() :: {gwname, gateway_name()}. -type options() :: list(option()). +-define(T_KICK, 5000). +-define(T_GET_INFO, 5000). -define(T_TAKEOVER, 15000). -define(DEFAULT_BATCH_SIZE, 10000). @@ -94,9 +95,9 @@ procname(GwName) -> ConnTab :: atom(), ChannInfoTab :: atom()}. cmtabs(GwName) -> - { tabname(chan, GwName) %% Client Tabname; Record: {ClientId, Pid} - , tabname(conn, GwName) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod} - , tabname(info, GwName) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats} + { tabname(chan, GwName) %% Record: {ClientId, Pid} + , tabname(conn, GwName) %% Recrod: {{ClientId, Pid}, ConnMod} + , tabname(info, GwName) %% Record: {{ClientId, Pid}, Info, Stats} }. tabname(chan, GwName) -> @@ -134,7 +135,6 @@ unregister_channel(GwName, ClientId) when is_binary(ClientId) -> insert_channel_info(GwName, ClientId, Info, Stats) -> Chan = {ClientId, self()}, true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}), - %%?tp(debug, insert_channel_info, #{client_id => ClientId}), ok. %% @doc Get info of a channel. @@ -207,7 +207,8 @@ set_chan_stats(GwName, ClientId, Stats) -> emqx_types:clientid(), pid(), emqx_types:stats()) -> boolean(). -set_chan_stats(GwName, ClientId, ChanPid, Stats) when node(ChanPid) == node() -> +set_chan_stats(GwName, ClientId, ChanPid, Stats) + when node(ChanPid) == node() -> Chan = {ClientId, self()}, try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) catch @@ -232,7 +233,7 @@ connection_closed(GwName, ClientId) -> -> {ok, #{session := Session, present := boolean(), pendings => list() - }} + }} | {error, any()}. open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> @@ -256,7 +257,7 @@ open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, open_session(_Type, false = _CleanStart, _ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) -> - %% TODO: + %% TODO: The session takeover logic will be implemented on 0.9? {error, not_supported_now}. %% @private @@ -305,17 +306,12 @@ do_discard_session(GwName, ClientId, Pid) -> discard_session(GwName, ClientId, Pid) catch _ : noproc -> % emqx_ws_connection: call - %?tp(debug, "session_already_gone", #{pid => Pid}), ok; _ : {noproc, _} -> % emqx_connection: gen_server:call - %?tp(debug, "session_already_gone", #{pid => Pid}), ok; _ : {{shutdown, _}, _} -> - %?tp(debug, "session_already_shutdown", #{pid => Pid}), ok; _ : _Error : _St -> - %?tp(error, "failed_to_discard_session", - % #{pid => Pid, reason => Error, stacktrace=>St}) ok end. @@ -464,7 +460,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{registry = Registry, locker = Locker}) -> + _ = gen_server:stop(Registry), + _ = ekka_locker:stop(Locker), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 108db3216..845ad7b7e 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -64,7 +64,8 @@ tabname(Name) -> register_channel(Name, ClientId) when is_binary(ClientId) -> register_channel(Name, {ClientId, self()}); -register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> +register_channel(Name, {ClientId, ChanPid}) + when is_binary(ClientId), is_pid(ChanPid) -> mria:dirty_write(tabname(Name), record(ClientId, ChanPid)). %% @doc Unregister a global channel. @@ -72,13 +73,15 @@ register_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(Cha unregister_channel(Name, ClientId) when is_binary(ClientId) -> unregister_channel(Name, {ClientId, self()}); -unregister_channel(Name, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> +unregister_channel(Name, {ClientId, ChanPid}) + when is_binary(ClientId), is_pid(ChanPid) -> mria:dirty_delete_object(tabname(Name), record(ClientId, ChanPid)). %% @doc Lookup the global channels. -spec lookup_channels(gateway_name(), binary()) -> list(pid()). lookup_channels(Name, ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)]. + [ChanPid + || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)]. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. @@ -111,10 +114,11 @@ handle_cast(Msg, State) -> handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) -> Tab = tabname(Name), - global:trans({?LOCK, self()}, - fun() -> - mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) - end), + global:trans( + {?LOCK, self()}, + fun() -> + mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) + end), {noreply, State}; handle_info({membership, _Event}, State) -> diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl new file mode 100644 index 000000000..82d97a166 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl @@ -0,0 +1,248 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_cm_SUITE). + +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +-define(GWNAME, mqttsn). +-define(CLIENTID, <<"client1">>). + +-define(CONF_DEFAULT, <<"gateway {}">>). + +%%-------------------------------------------------------------------- +%% setups +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + emqx_config:erase(gateway), + emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_common_test_helpers:start_apps([]), + + ok = meck:new(emqx_gateway_metrics, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_gateway_metrics, inc, fun(_, _) -> ok end), + Conf. + +end_per_suite(_Conf) -> + meck:unload(emqx_gateway_metrics), + emqx_common_test_helpers:stop_apps([]). + +init_per_testcase(_TestCase, Conf) -> + process_flag(trap_exit, true), + {ok, CMPid} = emqx_gateway_cm:start_link([{gwname, ?GWNAME}]), + [{cm, CMPid} | Conf]. + +end_per_testcase(_TestCase, Conf) -> + CMPid = proplists:get_value(cm, Conf), + gen_server:stop(CMPid), + process_flag(trap_exit, false), + Conf. + +%%-------------------------------------------------------------------- +%% cases +%%-------------------------------------------------------------------- + +t_open_session(_) -> + {error, not_supported_now} = emqx_gateway_cm:open_session( + ?GWNAME, false, clientinfo(), conninfo(), + fun(_, _) -> #{} end), + + {ok, SessionRes} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + ?assertEqual(#{present => false, + session => #{no => 1}}, SessionRes), + + %% assert1. check channel infos in ets table + Chann = {?CLIENTID, self()}, + ?assertEqual( + [Chann], + ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))), + ?assertEqual( + [{Chann, ?MODULE}], + ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))), + + %% assert2. discard the presented session + + {ok, SessionRes2} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 2} end), + ?assertEqual(#{present => false, + session => #{no => 2}}, SessionRes2), + + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + ?assertEqual( + 1, + ets:info(emqx_gateway_cm:tabname(info, ?GWNAME), size)), + + receive + discard -> + emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID), + emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID) + after 100 -> + ?assert(false, "waiting discard msg timeout") + end, + + %% assert3. no channel infos in ets table + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))). + +t_get_set_chan_info_stats(_) -> + {ok, SessionRes} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + ?assertEqual(#{present => false, + session => #{no => 1}}, SessionRes), + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + + %% Info: get/set + NInfo = #{newinfo => true}, + emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo), + ?assertEqual( + NInfo, + emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID)), + ?assertEqual( + NInfo, + emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID, self())), + %% Stats: get/set + NStats = [{newstats, true}], + emqx_gateway_cm:set_chan_stats(?GWNAME, ?CLIENTID, NStats), + ?assertEqual( + NStats, + emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID)), + ?assertEqual( + NStats, + emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID, self())), + + emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID), + emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID). + +t_handle_process_down(Conf) -> + Pid = proplists:get_value(cm, Conf), + + {ok, SessionRes} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + ?assertEqual(#{present => false, + session => #{no => 1}}, SessionRes), + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + + _ = Pid ! {'DOWN', mref, process, self(), normal}, + + timer:sleep(200), %% wait the asycn clear task + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))), + ?assertEqual( + [], + ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))). + +t_kick_session(_) -> + %% session1 + {ok, _} = emqx_gateway_cm:open_session( + ?GWNAME, true, clientinfo(), conninfo(), + fun(_, _) -> #{no => 1} end), + emqx_gateway_cm:insert_channel_info( + ?GWNAME, ?CLIENTID, + #{clientinfo => clientinfo(), conninfo => conninfo()}, []), + + %% meck `lookup_channels` + Self = self(), + ok = meck:new(emqx_gateway_cm_registry, + [passthrough, no_history, no_link]), + ok = meck:expect(emqx_gateway_cm_registry, lookup_channels, + fun(_, ?CLIENTID) -> [Self, Self] end), + + ok = emqx_gateway_cm:kick_session(?GWNAME, ?CLIENTID), + + receive discard -> ok + after 100 -> ?assert(false, "waiting discard msg timeout") + end, + receive + kick -> + emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID), + emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID) + after + 100 -> + ?assert(false, "waiting kick msg timeout") + end, + meck:unload(emqx_gateway_cm_registry). + +t_unexpected_handle(Conf) -> + Pid = proplists:get_value(cm, Conf), + _ = Pid ! unexpected_info, + ok = gen_server:call(Pid, unexpected_call), + ok = gen_server:cast(Pid, unexpected_cast). + +%%-------------------------------------------------------------------- +%% helpers + +clientinfo() -> + #{ clientid => ?CLIENTID + , is_bridge => false + , is_superuser => false + , listener => 'mqttsn:udp:default' + , mountpoint => <<"mqttsn/">> + , peerhost => {127, 0, 0, 1} + , protocol => 'mqtt-sn' + , sockport => 1884 + , username => undefined + , zone => default + }. + +conninfo() -> + #{ clean_start => true + , clientid => ?CLIENTID + , conn_mod => ?MODULE + , connected_at => 1641805544652 + , expiry_interval => 0 + , keepalive => 10 + , peercert => nossl + , peername => {{127, 0, 0, 1}, 64810} + , proto_name => <<"MQTT-SN">> + , proto_ver => <<"1.2">> + , sockname => {{0, 0, 0, 0}, 1884} + , socktype => udp + }. + +%%-------------------------------------------------------------------- +%% connection module mock + +call(ConnPid, discard, _) -> + ConnPid ! discard, ok; +call(ConnPid, kick, _) -> + ConnPid ! kick, ok. diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index d3cae8596..114a5ceed 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1766,6 +1766,10 @@ t_broadcast_test1(_) -> timer:sleep(600), gen_udp:close(Socket). +t_socket_passvice(_) -> + %% TODO: test this gateway enter the passvie event + ok. + t_clients_api(_) -> TsNow = emqx_gateway_utils:unix_ts_to_rfc3339( erlang:system_time(millisecond)), From cce0b1ca340f0673c677586e786e7c977f1d4646 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 13 Jan 2022 14:33:11 +0800 Subject: [PATCH 09/11] fix(stomp): fix the sticky tcp stream parsing --- apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl | 2 +- apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 8a63b28d8..51653d3c0 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -649,7 +649,7 @@ parse_incoming(Data, Packets, , reason => Reason , stacktrace => Stk }), - {[{frame_error, Reason}|Packets], State} + {[{frame_error, Reason} | Packets], State} end. next_incoming_msgs([Packet]) -> diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index e079e5fe6..a2ffa1988 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -136,6 +136,8 @@ parse(<<>>, Parser) -> parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); +parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none -> + parse(Phase, Bytes, State); parse(Bytes, Parser = #{pre := Pre}) -> parse(<
>, maps:without([pre], Parser));
@@ -162,6 +164,8 @@ parse(command, <>, State = #parser_state{acc = Acc}) ->
     parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
 parse(command, <>, State) ->
     parse(command, Rest, acc(Ch, State));
+parse(command, <<>>, State) ->
+    {more, #{phase => command, state => State}};
 
 parse(headers, <>, State) ->
     parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
@@ -174,6 +178,8 @@ parse(hdname, <>, State = #parser_state{acc = Acc}) ->
     parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
 parse(hdname, <>, State) ->
     parse(hdname, Rest, acc(Ch, State));
+parse(hdname, <<>>, State) ->
+    {more, #{phase => hdname, state => State}};
 
 parse(hdvalue, <>,
       State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
@@ -183,7 +189,9 @@ parse(hdvalue, <>,
                                },
     parse(headers, Rest, NState);
 parse(hdvalue, <>, State) ->
-    parse(hdvalue, Rest, acc(Ch, State)).
+    parse(hdvalue, Rest, acc(Ch, State));
+parse(hdvalue, <<>>, State) ->
+    {more, #{phase => hdvalue, state => State}}.
 
 %% @private
 parse(body, <<>>, State, Length) ->

From 43284768d046aab0cfe207f8d75f6d587ca11d5c Mon Sep 17 00:00:00 2001
From: JianBo He 
Date: Thu, 13 Jan 2022 15:56:23 +0800
Subject: [PATCH 10/11] chore(gw): more code coverage for emqx_gateway_conn
 module

---
 .../src/bhvrs/emqx_gateway_conn.erl           | 25 ++++---
 apps/emqx_gateway/src/emqx_gateway_ctx.erl    | 16 ++---
 apps/emqx_gateway/src/emqx_gateway_http.erl   | 32 ++++-----
 .../src/emqx_gateway_insta_sup.erl            |  4 +-
 .../test/emqx_gateway_ctx_SUITE.erl           | 67 +++++++++++++++++++
 .../test/emqx_sn_protocol_SUITE.erl           |  1 -
 apps/emqx_gateway/test/emqx_stomp_SUITE.erl   | 34 ++++++++++
 7 files changed, 139 insertions(+), 40 deletions(-)
 create mode 100644 apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl

diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl
index 51653d3c0..8e0c3d7ba 100644
--- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl
+++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl
@@ -720,20 +720,29 @@ serialize_and_inc_stats_fun(#state{
                                channel = Channel}) ->
     Ctx = ChannMod:info(ctx, Channel),
     fun(Packet) ->
-        case FrameMod:serialize_pkt(Packet, Serialize) of
-            <<>> ->
+        try
+            Data = FrameMod:serialize_pkt(Packet, Serialize),
+            ?SLOG(debug, #{ msg => "SEND_packet"
+                          %% XXX: optimize it, less cpu comsuption?
+                          , packet => FrameMod:format(Packet)
+                          }),
+            ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
+            Data
+        catch
+            _ : too_large ->
                 ?SLOG(warning, #{ msg => "packet_too_large_discarded"
                                 , packet => FrameMod:format(Packet)
                                 }),
                  ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'),
                  ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
                  <<>>;
-            Data ->
-                ?SLOG(debug, #{ msg => "SEND_packet"
-                              , packet => FrameMod:format(Packet)
-                              }),
-                ok = inc_outgoing_stats(Ctx, FrameMod, Packet),
-                Data
+            _ : Reason ->
+                ?SLOG(warning, #{ msg => "packet_serialize_failure"
+                                , reason => Reason
+                                , packet => FrameMod:format(Packet)
+                                }),
+                 ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'),
+                 <<>>
         end
     end.
 
diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl
index ca170814f..ab81c1ddb 100644
--- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl
@@ -30,7 +30,7 @@
         #{ %% Gateway Name
            gwname := gateway_name()
            %% Authentication chains
-         , auth   := [emqx_authentication:chain_name()] | undefined
+         , auth   := [emqx_authentication:chain_name()]
            %% The ConnectionManager PID
          , cm     := pid()
          }.
@@ -64,18 +64,15 @@
 -spec authenticate(context(), emqx_types:clientinfo())
     -> {ok, emqx_types:clientinfo()}
      | {error, any()}.
-authenticate(_Ctx = #{auth := undefined}, ClientInfo) ->
-    {ok, mountpoint(ClientInfo)};
-authenticate(_Ctx = #{auth := _ChainName}, ClientInfo0) ->
+authenticate(_Ctx = #{auth := _ChainNames}, ClientInfo0)
+  when is_list(_ChainNames) ->
     ClientInfo = ClientInfo0#{zone => default},
     case emqx_access_control:authenticate(ClientInfo) of
         {ok, _} ->
             {ok, mountpoint(ClientInfo)};
         {error, Reason} ->
             {error, Reason}
-    end;
-authenticate(_Ctx, ClientInfo) ->
-    {ok, mountpoint(ClientInfo)}.
+    end.
 
 %% @doc Register the session to the cluster.
 %%
@@ -95,11 +92,6 @@ open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
     open_session(Ctx, CleanStart, ClientInfo, ConnInfo,
                  CreateSessionFun, emqx_session).
 
-open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
-    logger:warning("clean_start=false is not supported now, "
-                   "fallback to clean_start mode"),
-    open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod);
-
 open_session(_Ctx = #{gwname := GwName},
              CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
     emqx_gateway_cm:open_session(GwName, CleanStart,
diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl
index 7a1ac519d..641f29932 100644
--- a/apps/emqx_gateway/src/emqx_gateway_http.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_http.erl
@@ -79,7 +79,6 @@
 -elvis([{elvis_style, god_modules, disable}]).
 -elvis([{elvis_style, no_nested_try_catch, disable}]).
 
-
 -define(DEFAULT_CALL_TIMEOUT, 15000).
 
 %%--------------------------------------------------------------------
@@ -336,53 +335,51 @@ return_http_error(Code, Msg) ->
               })
     }.
 
--spec reason2msg({atom(), map()} | any()) -> error | io_lib:chars().
+-spec reason2msg({atom(), map()} | any()) -> error | string().
 reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
-    io_lib:format("Bad config value '~s' for '~s', reason: ~s",
-                  [Value, Key, Reason]);
+    fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]);
 reason2msg({badres, #{resource := gateway,
                       gateway := GwName,
                       reason := not_found}}) ->
-    io_lib:format("The ~s gateway is unloaded", [GwName]);
+    fmtstr("The ~s gateway is unloaded", [GwName]);
 
 reason2msg({badres, #{resource := gateway,
                       gateway := GwName,
                       reason := already_exist}}) ->
-    io_lib:format("The ~s gateway already loaded", [GwName]);
+    fmtstr("The ~s gateway already loaded", [GwName]);
 
 reason2msg({badres, #{resource := listener,
                       listener := {GwName, LType, LName},
                       reason := not_found}}) ->
-    io_lib:format("Listener ~s not found",
-                  [listener_id(GwName, LType, LName)]);
+    fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]);
 
 reason2msg({badres, #{resource := listener,
                       listener := {GwName, LType, LName},
                       reason := already_exist}}) ->
-    io_lib:format("The listener ~s of ~s already exist",
-                  [listener_id(GwName, LType, LName), GwName]);
+    fmtstr("The listener ~s of ~s already exist",
+           [listener_id(GwName, LType, LName), GwName]);
 
 reason2msg({badres, #{resource := authn,
                       gateway := GwName,
                       reason := not_found}}) ->
-    io_lib:format("The authentication not found on ~s", [GwName]);
+    fmtstr("The authentication not found on ~s", [GwName]);
 
 reason2msg({badres, #{resource := authn,
                       gateway := GwName,
                       reason := already_exist}}) ->
-    io_lib:format("The authentication already exist on ~s", [GwName]);
+    fmtstr("The authentication already exist on ~s", [GwName]);
 
 reason2msg({badres, #{resource := listener_authn,
                       listener := {GwName, LType, LName},
                       reason := not_found}}) ->
-    io_lib:format("The authentication not found on ~s",
-                  [listener_id(GwName, LType, LName)]);
+    fmtstr("The authentication not found on ~s",
+           [listener_id(GwName, LType, LName)]);
 
 reason2msg({badres, #{resource := listener_authn,
                       listener := {GwName, LType, LName},
                       reason := already_exist}}) ->
-    io_lib:format("The authentication already exist on ~s",
-                  [listener_id(GwName, LType, LName)]);
+    fmtstr("The authentication already exist on ~s",
+           [listener_id(GwName, LType, LName)]);
 reason2msg(_) ->
     error.
 
@@ -393,6 +390,9 @@ codestr(405) -> 'METHOD_NOT_ALLOWED';
 codestr(500) -> 'UNKNOW_ERROR';
 codestr(501) -> 'NOT_IMPLEMENTED'.
 
+fmtstr(Fmt, Args) ->
+    lists:flatten(io_lib:format(Fmt, Args)).
+
 -spec with_authn(binary(), function()) -> any().
 with_authn(GwName0, Fun) ->
     with_gateway(GwName0, fun(GwName, _GwConf) ->
diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl
index 096ab1f81..ddeb3620d 100644
--- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl
@@ -122,7 +122,6 @@ handle_call(info, _From, State) ->
     {reply, detailed_gateway_info(State), State};
 
 handle_call(disable, _From, State = #state{status = Status}) ->
-    %% XXX: The `disable` opertaion is not persist to config database
     case Status of
         running ->
             case cb_gateway_unload(State) of
@@ -328,7 +327,7 @@ do_update_one_by_one(NCfg, State = #state{
                              AuthnNames = init_authn(State#state.name, NCfg),
                              State#state{authns = AuthnNames}
                      end,
-            %% XXX: minimum impact update ???
+            %% TODO: minimum impact update ???
             cb_gateway_update(NCfg, NState);
         {running, false} ->
             case cb_gateway_unload(State) of
@@ -413,7 +412,6 @@ cb_gateway_update(Config,
         case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of
             {error, Reason} -> {error, Reason};
             {ok, ChildPidOrSpecs, NGwState} ->
-                %% XXX: Hot-upgrade ???
                 ChildPids = start_child_process(ChildPidOrSpecs),
                 {ok, State#state{
                        config = Config,
diff --git a/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl
new file mode 100644
index 000000000..e09e5bc1b
--- /dev/null
+++ b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl
@@ -0,0 +1,67 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_ctx_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_access_control, authenticate,
+                     fun(#{clientid := bad_client}) ->
+                             {error, bad_username_or_password};
+                        (ClientInfo) -> {ok, ClientInfo}
+                     end),
+    Conf.
+
+end_per_suite(_Conf) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_authenticate(_) ->
+    Ctx = #{gwname => mqttsn, auth => [], cm => self()},
+    Info1 = #{ mountpoint => undefined
+             , clientid => <<"user1">>
+             },
+    NInfo1 = zone(Info1),
+    ?assertEqual({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
+
+    Info2 = #{ mountpoint => <<"mqttsn/${clientid}/">> 
+             , clientid => <<"user1">>
+             },
+    NInfo2 = zone(Info2#{mountpoint => <<"mqttsn/user1/">>}),
+    ?assertEqual({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
+
+    Info3 = #{ mountpoint => <<"mqttsn/${clientid}/">> 
+             , clientid => bad_client
+             },
+    {error, bad_username_or_password}
+        = emqx_gateway_ctx:authenticate(Ctx, Info3),
+    ok.
+
+zone(Info) -> Info#{zone => default}.
diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
index 114a5ceed..ab333cf87 100644
--- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
@@ -648,7 +648,6 @@ t_publish_qos0_case05(_) ->
 
     gen_udp:close(Socket).
 
-
 t_publish_qos0_case06(_) ->
     Dup = 0,
     QoS = 0,
diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
index f2ed76114..53aae7562 100644
--- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
@@ -351,6 +351,40 @@ t_ack(_) ->
                                   body    = _}, _, _} = parse(Data4)
     end).
 
+t_1000_msg_send(_) ->
+    with_connection(fun(Sock) ->
+        gen_tcp:send(Sock, serialize(<<"CONNECT">>,
+                                     [{<<"accept-version">>, ?STOMP_VER},
+                                      {<<"host">>, <<"127.0.0.1:61613">>},
+                                      {<<"login">>, <<"guest">>},
+                                      {<<"passcode">>, <<"guest">>},
+                                      {<<"heart-beat">>, <<"0,0">>}])),
+        {ok, Data} = gen_tcp:recv(Sock, 0),
+        {ok, #stomp_frame{command = <<"CONNECTED">>,
+                          headers = _,
+                          body    = _}, _, _} = parse(Data),
+
+        Topic = <<"/queue/foo">>,
+        SendFun = fun() ->
+            gen_tcp:send(Sock, serialize(<<"SEND">>,
+                                        [{<<"destination">>, Topic}],
+                                        <<"msgtest">>))
+        end,
+
+        RecvFun = fun() ->
+            receive
+                {deliver, Topic, _Msg}->
+                    ok
+            after 100 ->
+                      ?assert(false, "waiting message timeout")
+            end
+        end,
+
+        emqx:subscribe(Topic),
+        lists:foreach(fun(_) -> SendFun() end, lists:seq(1, 1000)),
+        lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
+    end).
+
 t_rest_clienit_info(_) ->
     with_connection(fun(Sock) ->
         gen_tcp:send(Sock, serialize(<<"CONNECT">>,

From c8088f18d9008067af309afb30b7a92f42f16dd5 Mon Sep 17 00:00:00 2001
From: JianBo He 
Date: Fri, 14 Jan 2022 10:56:19 +0800
Subject: [PATCH 11/11] chore(gw): fix dialyzer warnings

---
 apps/emqx_gateway/include/emqx_gateway.hrl | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/apps/emqx_gateway/include/emqx_gateway.hrl b/apps/emqx_gateway/include/emqx_gateway.hrl
index b6b176429..1ad201d7a 100644
--- a/apps/emqx_gateway/include/emqx_gateway.hrl
+++ b/apps/emqx_gateway/include/emqx_gateway.hrl
@@ -30,6 +30,8 @@
          , created_at => integer()
          %% Timestamp in millisecond
          , started_at => integer()
+         %% Timestamp in millisecond
+         , stopped_at => integer()
          %% Appears only in getting gateway info
          , config => emqx_config:config()
          }.