diff --git a/apps/emqx_gateway/include/emqx_gateway.hrl b/apps/emqx_gateway/include/emqx_gateway.hrl index 8a89d237c..1ad201d7a 100644 --- a/apps/emqx_gateway/include/emqx_gateway.hrl +++ b/apps/emqx_gateway/include/emqx_gateway.hrl @@ -22,6 +22,7 @@ %% @doc The Gateway defination -type gateway() :: #{ name := gateway_name() + %% Description , descr => binary() | undefined %% Appears only in getting gateway info , status => stopped | running | unloaded @@ -29,6 +30,8 @@ , created_at => integer() %% Timestamp in millisecond , started_at => integer() + %% Timestamp in millisecond + , stopped_at => integer() %% Appears only in getting gateway info , config => emqx_config:config() }. diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 8a63b28d8..8e0c3d7ba 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -649,7 +649,7 @@ parse_incoming(Data, Packets, , reason => Reason , stacktrace => Stk }), - {[{frame_error, Reason}|Packets], State} + {[{frame_error, Reason} | Packets], State} end. next_incoming_msgs([Packet]) -> @@ -720,20 +720,29 @@ serialize_and_inc_stats_fun(#state{ channel = Channel}) -> Ctx = ChannMod:info(ctx, Channel), fun(Packet) -> - case FrameMod:serialize_pkt(Packet, Serialize) of - <<>> -> + try + Data = FrameMod:serialize_pkt(Packet, Serialize), + ?SLOG(debug, #{ msg => "SEND_packet" + %% XXX: optimize it, less cpu comsuption? + , packet => FrameMod:format(Packet) + }), + ok = inc_outgoing_stats(Ctx, FrameMod, Packet), + Data + catch + _ : too_large -> ?SLOG(warning, #{ msg => "packet_too_large_discarded" , packet => FrameMod:format(Packet) }), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped.too_large'), ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), <<>>; - Data -> - ?SLOG(debug, #{ msg => "SEND_packet" - , packet => FrameMod:format(Packet) - }), - ok = inc_outgoing_stats(Ctx, FrameMod, Packet), - Data + _ : Reason -> + ?SLOG(warning, #{ msg => "packet_serialize_failure" + , reason => Reason + , packet => FrameMod:format(Packet) + }), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'delivery.dropped'), + <<>> end end. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index fcc240ca5..b4f9e30f0 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -247,6 +247,7 @@ handle_call({subscribe, Topic, SubOpts}, _From, %% modifty session state SubReq = {Topic, Token}, TempMsg = #coap_message{type = non}, + %% FIXME: The subopts is not used for emqx_coap_session Result = emqx_coap_session:process_subscribe( SubReq, TempMsg, #{}, Session), NSession = maps:get(session, Result), diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 52921c16a..108b4c55b 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -92,7 +92,9 @@ info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{observe_manager = OM}) -> Topics = emqx_coap_observe_res:subscriptions(OM), - lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics); + lists:foldl( + fun(T, Acc) -> Acc#{T => emqx_gateway_utils:default_subopts()} end, + #{}, Topics); info(subscriptions_cnt, #session{observe_manager = OM}) -> erlang:length(emqx_coap_observe_res:subscriptions(OM)); info(subscriptions_max, _) -> diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 9421229f7..74a8244d4 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -41,6 +41,7 @@ registered_gateway() -> %%-------------------------------------------------------------------- %% Gateway APIs +%% @doc List the load gateways -spec list() -> [gateway()]. list() -> emqx_gateway_sup:list_gateway_insta(). @@ -65,6 +66,8 @@ lookup(Name) -> -spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}. %% @doc This function only supports full configuration updates +%% +%% Note: If the `enable` option is missing, it will be set to true by default update(Name, Config) -> emqx_gateway_sup:update_gateway(Name, Config). diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 5be4adccf..86dbb69af 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -73,7 +73,7 @@ paths() -> , {<<"ip_address">>, ip} , {<<"conn_state">>, atom} , {<<"clean_start">>, atom} - , {<<"proto_ver">>, integer} + , {<<"proto_ver">>, binary} , {<<"like_clientid">>, binary} , {<<"like_username">>, binary} , {<<"gte_created_at">>, timestamp} @@ -83,15 +83,16 @@ paths() -> %% special keys for lwm2m protocol , {<<"endpoint_name">>, binary} , {<<"like_endpoint_name">>, binary} - , {<<"gte_lifetime">>, timestamp} - , {<<"lte_lifetime">>, timestamp} + , {<<"gte_lifetime">>, integer} + , {<<"lte_lifetime">>, integer} ]). -define(QUERY_FUN, {?MODULE, query}). clients(get, #{ bindings := #{name := Name0} - , query_string := Params + , query_string := Params0 }) -> + Params = emqx_mgmt_api:ensure_timestamp_format(Params0, time_keys()), with_gateway(Name0, fun(GwName, _) -> TabName = emqx_gateway_cm:tabname(info, GwName), case maps:get(<<"node">>, Params, undefined) of @@ -147,10 +148,6 @@ subscriptions(get, #{ bindings := #{name := Name0, ClientId = emqx_mgmt_util:urldecode(ClientId0), with_gateway(Name0, fun(GwName, _) -> case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of - {error, nosupport} -> - return_http_error(405, <<"Not support to list subscriptions">>); - {error, noimpl} -> - return_http_error(501, <<"Not implemented now">>); {error, Reason} -> return_http_error(500, Reason); {ok, Subs} -> @@ -171,14 +168,6 @@ subscriptions(post, #{ bindings := #{name := Name0, {Topic, SubOpts} -> case emqx_gateway_http:client_subscribe( GwName, ClientId, Topic, SubOpts) of - {error, nosupport} -> - return_http_error( - 405, - <<"Not support to add a subscription">>); - {error, noimpl} -> - return_http_error( - 501, - <<"Not implemented now">>); {error, Reason} -> return_http_error(404, Reason); {ok, {NTopic, NSubOpts}}-> @@ -221,6 +210,16 @@ extra_sub_props(Props) -> #{subid => maps:get(<<"subid">>, Props, undefined)} ). +%%-------------------------------------------------------------------- +%% QueryString data-fomrat convert +%% (try rfc3339 to timestamp or keep timestamp) + +time_keys() -> + [ <<"gte_created_at">> + , <<"lte_created_at">> + , <<"gte_connected_at">> + , <<"lte_connected_at">>]. + %%-------------------------------------------------------------------- %% query funcs @@ -264,10 +263,8 @@ ms(clientid, X) -> #{clientinfo => #{clientid => X}}; ms(username, X) -> #{clientinfo => #{username => X}}; -ms(zone, X) -> - #{clientinfo => #{zone => X}}; ms(ip_address, X) -> - #{clientinfo => #{peername => {X, '_'}}}; + #{clientinfo => #{peerhost => X}}; ms(conn_state, X) -> #{conn_state => X}; ms(clean_start, X) -> @@ -616,9 +613,6 @@ roots() -> , subscription ]. -fields(test) -> - [{key, mk(binary(), #{ desc => <<"Desc">>})}]; - fields(stomp_client) -> common_client_props(); fields(mqttsn_client) -> @@ -707,10 +701,6 @@ common_client_props() -> %, {will_msg, % mk(binary(), % #{ desc => <<"Client will message">>})} - %, {zone, - % mk(binary(), - % #{ desc => <<"Indicate the configuration group used by the " - % "client">>})} , {keepalive, mk(integer(), #{ desc => <<"keepalive time, with the unit of second">>})} diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index f8f4c5821..24bf559c1 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -53,23 +53,16 @@ is_cmd(Fun) -> gateway(["list"]) -> lists:foreach( - fun (#{name := Name, status := unloaded}) -> - print("Gateway(name=~ts, status=unloaded)\n", [Name]); - (#{name := Name, status := stopped, stopped_at := StoppedAt}) -> - print("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n", - [Name, StoppedAt]); - (#{name := Name, status := running, current_connections := ConnCnt, - started_at := StartedAt}) -> - print("Gateway(name=~ts, status=running, clients=~w, started_at=~ts)\n", - [Name, ConnCnt, StartedAt]) + fun (GwSummary) -> + print(format_gw_summary(GwSummary)) end, emqx_gateway_http:gateways(all)); gateway(["lookup", Name]) -> case emqx_gateway:lookup(atom(Name)) of undefined -> print("undefined\n"); - Info -> - print("~p\n", [Info]) + Gateway -> + print(format_gateway(Gateway)) end; gateway(["load", Name, Conf]) -> @@ -80,7 +73,7 @@ gateway(["load", Name, Conf]) -> {ok, _} -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(["unload", Name]) -> @@ -88,7 +81,7 @@ gateway(["unload", Name]) -> ok -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(["stop", Name]) -> @@ -99,7 +92,7 @@ gateway(["stop", Name]) -> {ok, _} -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(["start", Name]) -> @@ -110,23 +103,24 @@ gateway(["start", Name]) -> {ok, _} -> print("ok\n"); {error, Reason} -> - print("Error: ~p\n", [Reason]) + print("Error: ~ts\n", [format_error(Reason)]) end; gateway(_) -> - emqx_ctl:usage([ {"gateway list", - "List all gateway"} - , {"gateway lookup ", - "Lookup a gateway detailed informations"} - , {"gateway load ", - "Load a gateway with config"} - , {"gateway unload ", - "Unload the gateway"} - , {"gateway stop ", - "Stop the gateway"} - , {"gateway start ", - "Start the gateway"} - ]). + emqx_ctl:usage( + [ {"gateway list", + "List all gateway"} + , {"gateway lookup ", + "Lookup a gateway detailed informations"} + , {"gateway load ", + "Load a gateway with config"} + , {"gateway unload ", + "Unload the gateway"} + , {"gateway stop ", + "Stop the gateway"} + , {"gateway start ", + "Start the gateway"} + ]). 'gateway-registry'(["list"]) -> lists:foreach( @@ -141,7 +135,7 @@ gateway(_) -> ]). 'gateway-clients'(["list", Name]) -> - %% FIXME: page me? + %% XXX: page me? InfoTab = emqx_gateway_cm:tabname(info, Name), case ets:info(InfoTab) of undefined -> @@ -152,12 +146,17 @@ gateway(_) -> 'gateway-clients'(["lookup", Name, ClientId]) -> ChanTab = emqx_gateway_cm:tabname(chan, Name), - case ets:lookup(ChanTab, bin(ClientId)) of - [] -> print("Not Found.\n"); - [Chann] -> - InfoTab = emqx_gateway_cm:tabname(info, Name), - [ChannInfo] = ets:lookup(InfoTab, Chann), - print_record({client, ChannInfo}) + case ets:info(ChanTab) of + undefined -> + print("Bad Gateway Name.\n"); + _ -> + case ets:lookup(ChanTab, bin(ClientId)) of + [] -> print("Not Found.\n"); + [Chann] -> + InfoTab = emqx_gateway_cm:tabname(info, Name), + [ChannInfo] = ets:lookup(InfoTab, Chann), + print_record({client, ChannInfo}) + end end; 'gateway-clients'(["kick", Name, ClientId]) -> @@ -176,15 +175,13 @@ gateway(_) -> ]). 'gateway-metrics'([Name]) -> - Tab = emqx_gateway_metrics:tabname(Name), - case ets:info(Tab) of + case emqx_gateway_metrics:lookup(atom(Name)) of undefined -> print("Bad Gateway Name.\n"); - _ -> + Metrics -> lists:foreach( - fun({K, V}) -> - print("~-30s: ~w\n", [K, V]) - end, lists:sort(ets:tab2list(Tab))) + fun({K, V}) -> print("~-30s: ~w\n", [K, V]) end, + Metrics) end; 'gateway-metrics'(_) -> @@ -255,3 +252,50 @@ format(peername, {IPAddr, Port}) -> format(_, Val) -> Val. + +format_gw_summary(#{name := Name, status := unloaded}) -> + io_lib:format("Gateway(name=~ts, status=unloaded)\n", [Name]); + +format_gw_summary(#{name := Name, status := stopped, + stopped_at := StoppedAt}) -> + io_lib:format("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n", + [Name, StoppedAt]); +format_gw_summary(#{name := Name, status := running, + current_connections := ConnCnt, + started_at := StartedAt}) -> + io_lib:format("Gateway(name=~ts, status=running, clients=~w, " + "started_at=~ts)\n", [Name, ConnCnt, StartedAt]). + +format_gateway(#{name := Name, + status := unloaded}) -> + io_lib:format( + "name: ~ts\n" + "status: unloaded\n", [Name]); + +format_gateway(Gw = + #{name := Name, + status := Status, + created_at := CreatedAt, + config := Config + }) -> + {StopOrStart, Timestamp} = + case Status of + stopped -> {stopped_at, maps:get(stopped_at, Gw)}; + running -> {started_at, maps:get(started_at, Gw)} + end, + io_lib:format( + "name: ~ts\n" + "status: ~ts\n" + "created_at: ~ts\n" + "~ts: ~ts\n" + "config: ~p\n", + [Name, Status, + emqx_gateway_utils:unix_ts_to_rfc3339(CreatedAt), + StopOrStart, emqx_gateway_utils:unix_ts_to_rfc3339(Timestamp), + Config]). + +format_error(Reason) -> + case emqx_gateway_http:reason2msg(Reason) of + error -> io_lib:format("~p", [Reason]); + Msg -> Msg + end. diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index ee1ce19ef..647d3a6a7 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc The Gateway Connection-Manager +%% @doc The Gateway Channel Manager %% %% For a certain type of protocol, this is a single instance of the manager. %% It means that no matter how many instances of the stomp gateway are created, @@ -26,7 +26,6 @@ -include("include/emqx_gateway.hrl"). -include_lib("emqx/include/logger.hrl"). - %% APIs -export([start_link/1]). @@ -74,6 +73,8 @@ -type option() :: {gwname, gateway_name()}. -type options() :: list(option()). +-define(T_KICK, 5000). +-define(T_GET_INFO, 5000). -define(T_TAKEOVER, 15000). -define(DEFAULT_BATCH_SIZE, 10000). @@ -94,9 +95,9 @@ procname(GwName) -> ConnTab :: atom(), ChannInfoTab :: atom()}. cmtabs(GwName) -> - { tabname(chan, GwName) %% Client Tabname; Record: {ClientId, Pid} - , tabname(conn, GwName) %% Client ConnMod; Recrod: {{ClientId, Pid}, ConnMod} - , tabname(info, GwName) %% ClientInfo Tabname; Record: {{ClientId, Pid}, ClientInfo, ClientStats} + { tabname(chan, GwName) %% Record: {ClientId, Pid} + , tabname(conn, GwName) %% Recrod: {{ClientId, Pid}, ConnMod} + , tabname(info, GwName) %% Record: {{ClientId, Pid}, Info, Stats} }. tabname(chan, GwName) -> @@ -134,7 +135,6 @@ unregister_channel(GwName, ClientId) when is_binary(ClientId) -> insert_channel_info(GwName, ClientId, Info, Stats) -> Chan = {ClientId, self()}, true = ets:insert(tabname(info, GwName), {Chan, Info, Stats}), - %%?tp(debug, insert_channel_info, #{client_id => ClientId}), ok. %% @doc Get info of a channel. @@ -207,7 +207,8 @@ set_chan_stats(GwName, ClientId, Stats) -> emqx_types:clientid(), pid(), emqx_types:stats()) -> boolean(). -set_chan_stats(GwName, ClientId, ChanPid, Stats) when node(ChanPid) == node() -> +set_chan_stats(GwName, ClientId, ChanPid, Stats) + when node(ChanPid) == node() -> Chan = {ClientId, self()}, try ets:update_element(tabname(info, GwName), Chan, {3, Stats}) catch @@ -232,7 +233,7 @@ connection_closed(GwName, ClientId) -> -> {ok, #{session := Session, present := boolean(), pendings => list() - }} + }} | {error, any()}. open_session(GwName, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> @@ -256,7 +257,7 @@ open_session(GwName, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, open_session(_Type, false = _CleanStart, _ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) -> - %% TODO: + %% TODO: The session takeover logic will be implemented on 0.9? {error, not_supported_now}. %% @private @@ -305,17 +306,12 @@ do_discard_session(GwName, ClientId, Pid) -> discard_session(GwName, ClientId, Pid) catch _ : noproc -> % emqx_ws_connection: call - %?tp(debug, "session_already_gone", #{pid => Pid}), ok; _ : {noproc, _} -> % emqx_connection: gen_server:call - %?tp(debug, "session_already_gone", #{pid => Pid}), ok; _ : {{shutdown, _}, _} -> - %?tp(debug, "session_already_shutdown", #{pid => Pid}), ok; _ : _Error : _St -> - %?tp(error, "failed_to_discard_session", - % #{pid => Pid, reason => Error, stacktrace=>St}) ok end. @@ -464,7 +460,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{registry = Registry, locker = Locker}) -> + _ = gen_server:stop(Registry), + _ = ekka_locker:stop(Locker), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 914cf1cae..845ad7b7e 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -17,6 +17,8 @@ %% @doc The gateway connection registry -module(emqx_gateway_cm_registry). +-include("include/emqx_gateway.hrl"). + -behaviour(gen_server). -export([start_link/1]). @@ -27,6 +29,8 @@ -export([lookup_channels/2]). +-export([tabname/1]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -41,39 +45,43 @@ -record(channel, {chid, pid}). -%% @doc Start the global channel registry. --spec(start_link(atom()) -> gen_server:startlink_ret()). -start_link(Type) -> - gen_server:start_link(?MODULE, [Type], []). +%% @doc Start the global channel registry for the gived gateway name. +-spec(start_link(gateway_name()) -> gen_server:startlink_ret()). +start_link(Name) -> + gen_server:start_link(?MODULE, [Name], []). --spec tabname(atom()) -> atom(). -tabname(Type) -> - list_to_atom(lists:concat([emqx_gateway_, Type, '_channel_registry'])). +-spec tabname(gateway_name()) -> atom(). +tabname(Name) -> + %% XXX: unsafe ?? + list_to_atom(lists:concat([emqx_gateway_, Name, '_channel_registry'])). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Register a global channel. --spec register_channel(atom(), binary() | {binary(), pid()}) -> ok. -register_channel(Type, ClientId) when is_binary(ClientId) -> - register_channel(Type, {ClientId, self()}); +-spec register_channel(gateway_name(), binary() | {binary(), pid()}) -> ok. +register_channel(Name, ClientId) when is_binary(ClientId) -> + register_channel(Name, {ClientId, self()}); -register_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - mria:dirty_write(tabname(Type), record(ClientId, ChanPid)). +register_channel(Name, {ClientId, ChanPid}) + when is_binary(ClientId), is_pid(ChanPid) -> + mria:dirty_write(tabname(Name), record(ClientId, ChanPid)). %% @doc Unregister a global channel. --spec unregister_channel(atom(), binary() | {binary(), pid()}) -> ok. -unregister_channel(Type, ClientId) when is_binary(ClientId) -> - unregister_channel(Type, {ClientId, self()}); +-spec unregister_channel(gateway_name(), binary() | {binary(), pid()}) -> ok. +unregister_channel(Name, ClientId) when is_binary(ClientId) -> + unregister_channel(Name, {ClientId, self()}); -unregister_channel(Type, {ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> - mria:dirty_delete_object(tabname(Type), record(ClientId, ChanPid)). +unregister_channel(Name, {ClientId, ChanPid}) + when is_binary(ClientId), is_pid(ChanPid) -> + mria:dirty_delete_object(tabname(Name), record(ClientId, ChanPid)). %% @doc Lookup the global channels. --spec lookup_channels(atom(), binary()) -> list(pid()). -lookup_channels(Type, ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Type), ClientId)]. +-spec lookup_channels(gateway_name(), binary()) -> list(pid()). +lookup_channels(Name, ClientId) -> + [ChanPid + || #channel{pid = ChanPid} <- mnesia:dirty_read(tabname(Name), ClientId)]. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. @@ -82,8 +90,8 @@ record(ClientId, ChanPid) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Type]) -> - Tab = tabname(Type), +init([Name]) -> + Tab = tabname(Name), ok = mria:create_table(Tab, [ {type, bag}, {rlog_shard, ?CM_SHARD}, @@ -94,7 +102,7 @@ init([Type]) -> {write_concurrency, true}]}]}]), ok = mria:wait_for_tables([Tab]), ok = ekka:monitor(membership), - {ok, #{type => Type}}. + {ok, #{name => Name}}. handle_call(Req, _From, State) -> logger:error("Unexpected call: ~p", [Req]), @@ -104,12 +112,13 @@ handle_cast(Msg, State) -> logger:error("Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({membership, {mnesia, down, Node}}, State = #{type := Type}) -> - Tab = tabname(Type), - global:trans({?LOCK, self()}, - fun() -> - mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) - end), +handle_info({membership, {mnesia, down, Node}}, State = #{name := Name}) -> + Tab = tabname(Name), + global:trans( + {?LOCK, self()}, + fun() -> + mria:transaction(?CM_SHARD, fun cleanup_channels/2, [Node, Tab]) + end), {noreply, State}; handle_info({membership, _Event}, State) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 0b7b3f099..52aa204a3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -93,6 +93,7 @@ load_gateway(GwName, Conf) -> %% @doc convert listener array to map unconvert_listeners(Ls) when is_list(Ls) -> lists:foldl(fun(Lis, Acc) -> + %% FIXME: params apperence guard? {[Type, Name], Lis1} = maps_key_take([<<"type">>, <<"name">>], Lis), NLis1 = maps:without([<<"id">>], Lis1), emqx_map_lib:deep_merge(Acc, #{Type => #{Name => NLis1}}) diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index ca170814f..ab81c1ddb 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -30,7 +30,7 @@ #{ %% Gateway Name gwname := gateway_name() %% Authentication chains - , auth := [emqx_authentication:chain_name()] | undefined + , auth := [emqx_authentication:chain_name()] %% The ConnectionManager PID , cm := pid() }. @@ -64,18 +64,15 @@ -spec authenticate(context(), emqx_types:clientinfo()) -> {ok, emqx_types:clientinfo()} | {error, any()}. -authenticate(_Ctx = #{auth := undefined}, ClientInfo) -> - {ok, mountpoint(ClientInfo)}; -authenticate(_Ctx = #{auth := _ChainName}, ClientInfo0) -> +authenticate(_Ctx = #{auth := _ChainNames}, ClientInfo0) + when is_list(_ChainNames) -> ClientInfo = ClientInfo0#{zone => default}, case emqx_access_control:authenticate(ClientInfo) of {ok, _} -> {ok, mountpoint(ClientInfo)}; {error, Reason} -> {error, Reason} - end; -authenticate(_Ctx, ClientInfo) -> - {ok, mountpoint(ClientInfo)}. + end. %% @doc Register the session to the cluster. %% @@ -95,11 +92,6 @@ open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session). -open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> - logger:warning("clean_start=false is not supported now, " - "fallback to clean_start mode"), - open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod); - open_session(_Ctx = #{gwname := GwName}, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> emqx_gateway_cm:open_session(GwName, CleanStart, diff --git a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl index fa55ceb8d..9ca806213 100644 --- a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl @@ -51,7 +51,6 @@ create_insta(Sup, Gateway = #{name := Name}, GwDscrptr) -> {ok, _GwInstaPid} -> {error, alredy_existed}; false -> Ctx = ctx(Sup, Name), - %% ChildSpec = emqx_gateway_utils:childspec( Name, worker, diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 1560a2126..641f29932 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -62,12 +62,15 @@ , with_listener_authn/3 , checks/2 , reason2resp/1 + , reason2msg/1 ]). -type gateway_summary() :: #{ name := binary() , status := running | stopped | unloaded + , created_at => binary() , started_at => binary() + , stopped_at => binary() , max_connections => integer() , current_connections => integer() , listeners => [] @@ -76,7 +79,6 @@ -elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, no_nested_try_catch, disable}]). - -define(DEFAULT_CALL_TIMEOUT, 15000). %%-------------------------------------------------------------------- @@ -317,57 +319,13 @@ with_channel(GwName, ClientId, Fun) -> %%-------------------------------------------------------------------- -spec reason2resp({atom(), map()} | any()) -> binary() | any(). -reason2resp({badconf, #{key := Key, value := Value, reason := Reason}}) -> - fmt400err("Bad config value '~s' for '~s', reason: ~s", - [Value, Key, Reason]); -reason2resp({badres, #{resource := gateway, - gateway := GwName, - reason := not_found}}) -> - fmt400err("The ~s gateway is unloaded", [GwName]); - -reason2resp({badres, #{resource := gateway, - gateway := GwName, - reason := already_exist}}) -> - fmt400err("The ~s gateway has loaded", [GwName]); - -reason2resp({badres, #{resource := listener, - listener := {GwName, LType, LName}, - reason := not_found}}) -> - fmt400err("Listener ~s not found", - [listener_id(GwName, LType, LName)]); - -reason2resp({badres, #{resource := listener, - listener := {GwName, LType, LName}, - reason := already_exist}}) -> - fmt400err("The listener ~s of ~s already exist", - [listener_id(GwName, LType, LName), GwName]); - -reason2resp({badres, #{resource := authn, - gateway := GwName, - reason := not_found}}) -> - fmt400err("The authentication not found on ~s", [GwName]); - -reason2resp({badres, #{resource := authn, - gateway := GwName, - reason := already_exist}}) -> - fmt400err("The authentication already exist on ~s", [GwName]); - -reason2resp({badres, #{resource := listener_authn, - listener := {GwName, LType, LName}, - reason := not_found}}) -> - fmt400err("The authentication not found on ~s", - [listener_id(GwName, LType, LName)]); - -reason2resp({badres, #{resource := listener_authn, - listener := {GwName, LType, LName}, - reason := already_exist}}) -> - fmt400err("The authentication already exist on ~s", - [listener_id(GwName, LType, LName)]); - -reason2resp(R) -> return_http_error(500, R). - -fmt400err(Fmt, Args) -> - return_http_error(400, io_lib:format(Fmt, Args)). +reason2resp(R) -> + case reason2msg(R) of + error -> + return_http_error(500, R); + Msg -> + return_http_error(400, Msg) + end. -spec return_http_error(integer(), any()) -> {integer(), binary()}. return_http_error(Code, Msg) -> @@ -377,6 +335,54 @@ return_http_error(Code, Msg) -> }) }. +-spec reason2msg({atom(), map()} | any()) -> error | string(). +reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) -> + fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]); +reason2msg({badres, #{resource := gateway, + gateway := GwName, + reason := not_found}}) -> + fmtstr("The ~s gateway is unloaded", [GwName]); + +reason2msg({badres, #{resource := gateway, + gateway := GwName, + reason := already_exist}}) -> + fmtstr("The ~s gateway already loaded", [GwName]); + +reason2msg({badres, #{resource := listener, + listener := {GwName, LType, LName}, + reason := not_found}}) -> + fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]); + +reason2msg({badres, #{resource := listener, + listener := {GwName, LType, LName}, + reason := already_exist}}) -> + fmtstr("The listener ~s of ~s already exist", + [listener_id(GwName, LType, LName), GwName]); + +reason2msg({badres, #{resource := authn, + gateway := GwName, + reason := not_found}}) -> + fmtstr("The authentication not found on ~s", [GwName]); + +reason2msg({badres, #{resource := authn, + gateway := GwName, + reason := already_exist}}) -> + fmtstr("The authentication already exist on ~s", [GwName]); + +reason2msg({badres, #{resource := listener_authn, + listener := {GwName, LType, LName}, + reason := not_found}}) -> + fmtstr("The authentication not found on ~s", + [listener_id(GwName, LType, LName)]); + +reason2msg({badres, #{resource := listener_authn, + listener := {GwName, LType, LName}, + reason := already_exist}}) -> + fmtstr("The authentication already exist on ~s", + [listener_id(GwName, LType, LName)]); +reason2msg(_) -> + error. + codestr(400) -> 'BAD_REQUEST'; codestr(401) -> 'NOT_SUPPORTED_NOW'; codestr(404) -> 'RESOURCE_NOT_FOUND'; @@ -384,6 +390,9 @@ codestr(405) -> 'METHOD_NOT_ALLOWED'; codestr(500) -> 'UNKNOW_ERROR'; codestr(501) -> 'NOT_IMPLEMENTED'. +fmtstr(Fmt, Args) -> + lists:flatten(io_lib:format(Fmt, Args)). + -spec with_authn(binary(), function()) -> any(). with_authn(GwName0, Fun) -> with_gateway(GwName0, fun(GwName, _GwConf) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 30cebb3cc..ddeb3620d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -122,7 +122,6 @@ handle_call(info, _From, State) -> {reply, detailed_gateway_info(State), State}; handle_call(disable, _From, State = #state{status = Status}) -> - %% XXX: The `disable` opertaion is not persist to config database case Status of running -> case cb_gateway_unload(State) of @@ -308,8 +307,7 @@ do_update_one_by_one(NCfg, State = #state{ name = GwName, config = OCfg, status = Status}) -> - OEnable = maps:get(enable, OCfg, true), - NEnable = maps:get(enable, NCfg, OEnable), + NEnable = maps:get(enable, NCfg, true), OAuths = authns(GwName, OCfg), NAuths = authns(GwName, NCfg), @@ -329,7 +327,7 @@ do_update_one_by_one(NCfg, State = #state{ AuthnNames = init_authn(State#state.name, NCfg), State#state{authns = AuthnNames} end, - %% XXX: minimum impact update ??? + %% TODO: minimum impact update ??? cb_gateway_update(NCfg, NState); {running, false} -> case cb_gateway_unload(State) of @@ -414,7 +412,6 @@ cb_gateway_update(Config, case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of {error, Reason} -> {error, Reason}; {ok, ChildPidOrSpecs, NGwState} -> - %% XXX: Hot-upgrade ??? ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ config = Config, diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index 9f1258e6d..d2a9d7442 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -20,7 +20,6 @@ -include_lib("emqx_gateway/include/emqx_gateway.hrl"). - %% APIs -export([start_link/1]). @@ -30,6 +29,8 @@ , dec/3 ]). +-export([lookup/1]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -67,6 +68,16 @@ dec(GwName, Name) -> dec(GwName, Name, Oct) -> inc(GwName, Name, -Oct). +-spec lookup(gateway_name()) + -> undefined + | [{Name :: atom(), integer()}]. +lookup(GwName) -> + Tab = emqx_gateway_metrics:tabname(GwName), + case ets:info(Tab) of + undefined -> undefined; + _ -> lists:sort(ets:tab2list(Tab)) + end. + tabname(GwName) -> list_to_atom(lists:concat([emqx_gateway_, GwName, '_metrics'])). diff --git a/apps/emqx_gateway/src/emqx_gateway_sup.erl b/apps/emqx_gateway/src/emqx_gateway_sup.erl index 9a969a5ce..499132d35 100644 --- a/apps/emqx_gateway/src/emqx_gateway_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_sup.erl @@ -59,7 +59,8 @@ load_gateway(Gateway = #{name := GwName}) -> unload_gateway(GwName) -> case lists:keyfind(GwName, 1, supervisor:which_children(?MODULE)) of false -> {error, not_found}; - _ -> + {_Id, Pid, _Type, _Mods} -> + _ = emqx_gateway_gw_sup:remove_insta(Pid, GwName), _ = supervisor:terminate_child(?MODULE, GwName), _ = supervisor:delete_child(?MODULE, GwName), ok diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 1def6ebdb..e1c29c289 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -291,9 +291,8 @@ is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> end. %% same with emqx_authentication:global_chain/1 -global_chain(mqtt) -> - 'mqtt:global'; -global_chain('mqtt-sn') -> +-spec global_chain(GatewayName :: atom()) -> atom(). +global_chain('mqttsn') -> 'mqtt-sn:global'; global_chain(coap) -> 'coap:global'; diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 41b795b7b..843cf39d5 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -451,7 +451,7 @@ do_subscribe(TopicFilter, SubOpts, Channel = subscriptions = Subs}) -> %% Mountpoint first NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), - NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), SubId = maps:get(clientid, ClientInfo, undefined), %% XXX: is_new? IsNew = not maps:is_key(NTopicFilter, Subs), diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 58cc24f9d..aee4189a5 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -931,7 +931,7 @@ do_subscribe({TopicId, TopicName, SubOpts}, clientinfo = ClientInfo = #{mountpoint := Mountpoint}}) -> NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), - NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of {ok, NSession} -> {ok, {TopicId, NTopicName, NSubOpts}, diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index bdd8c6e58..a2ffa1988 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -134,8 +134,10 @@ g(Key, Opts, Val) -> parse(<<>>, Parser) -> {more, Parser}; -parse(Bytes, #{phase := body, len := Len, state := State}) -> +parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); +parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none -> + parse(Phase, Bytes, State); parse(Bytes, Parser = #{pre := Pre}) -> parse(<
>, maps:without([pre], Parser));
@@ -162,6 +164,8 @@ parse(command, <>, State = #parser_state{acc = Acc}) ->
     parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
 parse(command, <>, State) ->
     parse(command, Rest, acc(Ch, State));
+parse(command, <<>>, State) ->
+    {more, #{phase => command, state => State}};
 
 parse(headers, <>, State) ->
     parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
@@ -174,6 +178,8 @@ parse(hdname, <>, State = #parser_state{acc = Acc}) ->
     parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
 parse(hdname, <>, State) ->
     parse(hdname, Rest, acc(Ch, State));
+parse(hdname, <<>>, State) ->
+    {more, #{phase => hdname, state => State}};
 
 parse(hdvalue, <>,
       State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
@@ -183,7 +189,9 @@ parse(hdvalue, <>,
                                },
     parse(headers, Rest, NState);
 parse(hdvalue, <>, State) ->
-    parse(hdvalue, Rest, acc(Ch, State)).
+    parse(hdvalue, Rest, acc(Ch, State));
+parse(hdvalue, <<>>, State) ->
+    {more, #{phase => hdvalue, state => State}}.
 
 %% @private
 parse(body, <<>>, State, Length) ->
diff --git a/apps/emqx_gateway/test/emqx_coap_SUITE.erl b/apps/emqx_gateway/test/emqx_coap_SUITE.erl
index 8b336252b..e35c6e2db 100644
--- a/apps/emqx_gateway/test/emqx_coap_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_coap_SUITE.erl
@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -include_lib("er_coap_client/include/coap.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -48,114 +53,115 @@ gateway.coap
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1),
+    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
     Config.
 
-set_special_cfg(emqx_gateway) ->
-    ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT);
-
-set_special_cfg(_) ->
-    ok.
-
-end_per_suite(Config) ->
+end_per_suite(_) ->
     {ok, _} = emqx:remove_config([<<"gateway">>,<<"coap">>]),
-    emqx_common_test_helpers:stop_apps([emqx_gateway]),
-    Config.
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
 
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------
-t_connection(_Config) ->
+
+t_connection(_) ->
     Action = fun(Channel) ->
-                     %% connection
-                     Token = connection(Channel),
+        %% connection
+        Token = connection(Channel),
 
-                     timer:sleep(100),
-                     ?assertNotEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),
+        timer:sleep(100),
+        ?assertNotEqual(
+           [],
+           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)),
 
-                     %% heartbeat
-                     HeartURI = ?MQTT_PREFIX ++ "/connection?clientid=client1&token=" ++ Token,
-                     ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
-                     {ok, changed, _} = er_coap_client:request(put, HeartURI),
+        %% heartbeat
+        HeartURI = ?MQTT_PREFIX ++
+                   "/connection?clientid=client1&token=" ++
+                   Token,
 
-                     disconnection(Channel, Token),
+        ?LOGT("send heartbeat request:~ts~n", [HeartURI]),
+        {ok, changed, _} = er_coap_client:request(put, HeartURI),
 
-                     timer:sleep(100),
-                     ?assertEqual([], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
-             end,
+        disconnection(Channel, Token),
+
+        timer:sleep(100),
+        ?assertEqual(
+           [],
+           emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>))
+    end,
     do(Action).
 
-
-t_publish(_Config) ->
+t_publish(_) ->
     Action = fun(Channel, Token) ->
-                     Topic = <<"/abc">>,
-                     Payload = <<"123">>,
+        Topic = <<"/abc">>,
+        Payload = <<"123">>,
 
-                     TopicStr = binary_to_list(Topic),
-                     URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        TopicStr = binary_to_list(Topic),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
-                     %% Sub topic first
-                     emqx:subscribe(Topic),
+        %% Sub topic first
+        emqx:subscribe(Topic),
 
-                     Req = make_req(post, Payload),
-                     {ok, changed, _} = do_request(Channel, URI, Req),
-
-                     receive
-                         {deliver, Topic, Msg} ->
-                             ?assertEqual(Topic, Msg#message.topic),
-                             ?assertEqual(Payload, Msg#message.payload)
-                     after
-                         500 ->
-                             ?assert(false)
-                     end
-             end,
+        Req = make_req(post, Payload),
+        {ok, changed, _} = do_request(Channel, URI, Req),
 
+        receive
+            {deliver, Topic, Msg} ->
+                ?assertEqual(Topic, Msg#message.topic),
+                ?assertEqual(Payload, Msg#message.payload)
+        after
+            500 ->
+                ?assert(false)
+        end
+    end,
     with_connection(Action).
 
-
-%t_publish_authz_deny(_Config) ->
+%t_publish_authz_deny(_) ->
 %    Action = fun(Channel, Token) ->
-%                     Topic = <<"/abc">>,
-%                     Payload = <<"123">>,
-%                     InvalidToken = lists:reverse(Token),
+%        Topic = <<"/abc">>,
+%        Payload = <<"123">>,
+%        InvalidToken = lists:reverse(Token),
 %
-%                     TopicStr = binary_to_list(Topic),
-%                     URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ InvalidToken,
+%        TopicStr = binary_to_list(Topic),
+%        URI = ?PS_PREFIX ++
+%              TopicStr ++
+%              "?clientid=client1&token=" ++ InvalidToken,
 %
-%                     %% Sub topic first
-%                     emqx:subscribe(Topic),
+%        %% Sub topic first
+%        emqx:subscribe(Topic),
 %
-%                     Req = make_req(post, Payload),
-%                     Result = do_request(Channel, URI, Req),
-%                     ?assertEqual({error, reset}, Result)
-%             end,
+%        Req = make_req(post, Payload),
+%        Result = do_request(Channel, URI, Req),
+%        ?assertEqual({error, reset}, Result)
+%    end,
 %
 %    with_connection(Action).
 
-t_subscribe(_Config) ->
+t_subscribe(_) ->
     Topic = <<"/abc">>,
     Fun = fun(Channel, Token) ->
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  %% Publish a message
-                  emqx:publish(emqx_message:make(Topic, Payload)),
-                  {ok, content, Notify} = with_response(Channel),
-                  ?LOGT("observer get Notif=~p", [Notify]),
+        %% Publish a message
+        emqx:publish(emqx_message:make(Topic, Payload)),
+        {ok, content, Notify} = with_response(Channel),
+        ?LOGT("observer get Notif=~p", [Notify]),
 
-                  #coap_content{payload = PayloadRecv} = Notify,
+        #coap_content{payload = PayloadRecv} = Notify,
 
-                  ?assertEqual(Payload, PayloadRecv)
-          end,
+        ?assertEqual(Payload, PayloadRecv)
+    end,
 
     with_connection(Fun),
     timer:sleep(100),
@@ -163,63 +169,117 @@ t_subscribe(_Config) ->
     ?assertEqual([], emqx:subscribers(Topic)).
 
 
-t_un_subscribe(_Config) ->
+t_un_subscribe(_) ->
     Topic = <<"/abc">>,
     Fun = fun(Channel, Token) ->
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
 
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  UnReq = make_req(get, Payload, [{observe, 1}]),
-                  {ok, nocontent, _} = do_request(Channel, URI, UnReq),
-                  ?LOGT("un observer topic:~ts~n", [Topic]),
-                  timer:sleep(100),
-                  ?assertEqual([], emqx:subscribers(Topic))
-          end,
+        UnReq = make_req(get, Payload, [{observe, 1}]),
+        {ok, nocontent, _} = do_request(Channel, URI, UnReq),
+        ?LOGT("un observer topic:~ts~n", [Topic]),
+        timer:sleep(100),
+        ?assertEqual([], emqx:subscribers(Topic))
+    end,
 
     with_connection(Fun).
 
-t_observe_wildcard(_Config) ->
+t_observe_wildcard(_) ->
     Fun = fun(Channel, Token) ->
-                  %% resolve_url can't process wildcard with #
-                  Topic = <<"/abc/+">>,
-                  TopicStr = binary_to_list(Topic),
-                  Payload = <<"123">>,
+        %% resolve_url can't process wildcard with #
+        Topic = <<"/abc/+">>,
+        TopicStr = binary_to_list(Topic),
+        Payload = <<"123">>,
 
-                  URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
-                  Req = make_req(get, Payload, [{observe, 0}]),
-                  {ok, content, _} = do_request(Channel, URI, Req),
-                  ?LOGT("observer topic:~ts~n", [Topic]),
+        URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+        Req = make_req(get, Payload, [{observe, 0}]),
+        {ok, content, _} = do_request(Channel, URI, Req),
+        ?LOGT("observer topic:~ts~n", [Topic]),
 
-                  timer:sleep(100),
-                  [SubPid] = emqx:subscribers(Topic),
-                  ?assert(is_pid(SubPid)),
+        timer:sleep(100),
+        [SubPid] = emqx:subscribers(Topic),
+        ?assert(is_pid(SubPid)),
 
-                  %% Publish a message
-                  PubTopic = <<"/abc/def">>,
-                  emqx:publish(emqx_message:make(PubTopic, Payload)),
-                  {ok, content, Notify} = with_response(Channel),
+        %% Publish a message
+        PubTopic = <<"/abc/def">>,
+        emqx:publish(emqx_message:make(PubTopic, Payload)),
+        {ok, content, Notify} = with_response(Channel),
 
-                  ?LOGT("observer get Notif=~p", [Notify]),
+        ?LOGT("observer get Notif=~p", [Notify]),
 
-                  #coap_content{payload = PayloadRecv} = Notify,
+        #coap_content{payload = PayloadRecv} = Notify,
 
-                  ?assertEqual(Payload, PayloadRecv)
-          end,
+        ?assertEqual(Payload, PayloadRecv)
+    end,
 
     with_connection(Fun).
 
+t_clients_api(_) ->
+    Fun = fun(_Channel, _Token) ->
+        ClientId = <<"client1">>,
+        %% list
+        {200, #{data := [Client1]}} = request(get, "/gateway/coap/clients"),
+        #{clientid := ClientId} = Client1,
+        %% searching
+        {200, #{data := [Client2]}} =
+            request(get, "/gateway/coap/clients",
+                    [{<<"clientid">>, ClientId}]),
+        {200, #{data := [Client3]}} =
+            request(get, "/gateway/coap/clients",
+                    [{<<"like_clientid">>, <<"cli">>}]),
+        %% lookup
+        {200, Client4} =
+            request(get, "/gateway/coap/clients/client1"),
+        %% assert
+        Client1 = Client2 = Client3 = Client4,
+        %% kickout
+        {204, _} =
+            request(delete, "/gateway/coap/clients/client1"),
+        {200, #{data := []}} = request(get, "/gateway/coap/clients")
+    end,
+    with_connection(Fun).
+
+t_clients_subscription_api(_) ->
+    Fun = fun(_Channel, _Token) ->
+        Path = "/gateway/coap/clients/client1/subscriptions",
+        %% list
+        {200, []} = request(get, Path),
+        %% create
+        SubReq = #{ topic => <<"tx">>
+                  , qos => 0
+                  , nl => 0
+                  , rap => 0
+                  , rh => 0
+                  },
+
+        {201, SubsResp} = request(post, Path, SubReq),
+        {200, [SubsResp2]} = request(get, Path),
+        ?assertEqual(
+           maps:get(topic, SubsResp),
+           maps:get(topic, SubsResp2)),
+
+        {204, _} = request(delete, Path ++ "/tx"),
+
+        {200, []} = request(get, Path)
+    end,
+    with_connection(Fun).
+
+%%--------------------------------------------------------------------
+%% helpers
+
 connection(Channel) ->
-    URI = ?MQTT_PREFIX ++ "/connection?clientid=client1&username=admin&password=public",
+    URI = ?MQTT_PREFIX ++
+          "/connection?clientid=client1&username=admin&password=public",
     Req = make_req(post),
     {ok, created, Data} = do_request(Channel, URI, Req),
     #coap_content{payload = BinToken} = Data,
@@ -252,7 +312,8 @@ do_request(Channel, URI, #coap_message{options = Opts} = Req) ->
 
 with_response(Channel) ->
     receive
-        {coap_response, _ChId, Channel, _Ref, Message=#coap_message{method=Code}} ->
+        {coap_response, _ChId, Channel,
+         _Ref, Message=#coap_message{method=Code}} ->
             return_response(Code, Message);
         {coap_error, _ChId, Channel, _Ref, reset} ->
             {error, reset}
@@ -280,10 +341,10 @@ do(Fun) ->
 
 with_connection(Action) ->
     Fun = fun(Channel) ->
-                  Token = connection(Channel),
-                  timer:sleep(100),
-                  Action(Channel, Token),
-                  disconnection(Channel, Token),
-                  timer:sleep(100)
-          end,
+        Token = connection(Channel),
+        timer:sleep(100),
+        Action(Channel, Token),
+        disconnection(Channel, Token),
+        timer:sleep(100)
+    end,
     do(Fun).
diff --git a/apps/emqx_gateway/test/emqx_gateway_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl
new file mode 100644
index 000000000..d6da58df3
--- /dev/null
+++ b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl
@@ -0,0 +1,100 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([emqx_gateway]),
+    Conf.
+
+end_per_suite(_Conf) ->
+    emqx_common_test_helpers:stop_apps([emqx_gateway]).
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_registered_gateway(_) ->
+    [{coap, #{cbkmod := emqx_coap_impl}},
+     {exproto, #{cbkmod := emqx_exproto_impl}},
+     {lwm2m, #{cbkmod := emqx_lwm2m_impl}},
+     {mqttsn, #{cbkmod := emqx_sn_impl}},
+     {stomp, #{cbkmod := emqx_stomp_impl}}] =  emqx_gateway:registered_gateway().
+
+t_load_unload_list_lookup(_) ->
+    {ok, _} = emqx_gateway:load(?GWNAME, #{idle_timeout => 1000}),
+    ?assertEqual(
+       {error, alredy_existed},
+       emqx_gateway:load(?GWNAME, #{})),
+    ?assertEqual(
+       {error, {unknown_gateway_name, bad_gw_name}},
+       emqx_gateway:load(bad_gw_name, #{})),
+
+    ?assertEqual(1, length(emqx_gateway:list())),
+    ?assertEqual(
+       emqx_gateway:lookup(?GWNAME),
+       lists:nth(1, emqx_gateway:list())),
+
+    ?assertEqual(ok, emqx_gateway:unload(?GWNAME)),
+    ?assertEqual({error, not_found}, emqx_gateway:unload(?GWNAME)).
+
+t_start_stop_update(_) ->
+    {ok, _} = emqx_gateway:load(?GWNAME, #{idle_timeout => 1000}),
+
+    #{status := running} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:stop(?GWNAME),
+    {error, already_stopped} = emqx_gateway:stop(?GWNAME),
+
+    #{status := stopped} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:update(
+           ?GWNAME, #{enable => false, idle_timeout => 2000}),
+    #{status := stopped,
+      config := #{idle_timeout := 2000}} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:update(
+           ?GWNAME, #{enable => true, idle_timeout => 3000}),
+    #{status := running,
+      config := #{idle_timeout := 3000}} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:update(
+           ?GWNAME, #{enable => false, idle_timeout => 4000}),
+    #{status := stopped,
+      config := #{idle_timeout := 4000}} = emqx_gateway:lookup(?GWNAME),
+
+    ok = emqx_gateway:start(?GWNAME),
+    #{status := running,
+      config := #{idle_timeout := 4000}} = emqx_gateway:lookup(?GWNAME),
+
+    {error, already_started} = emqx_gateway:start(?GWNAME),
+    ok.
diff --git a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl
index e2375decb..12b0ef7b5 100644
--- a/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl
@@ -30,9 +30,7 @@
 
 %% this parses to #{}, will not cause config cleanup
 %% so we will need call emqx_config:erase
--define(CONF_DEFAULT, <<"
-gateway {}
-">>).
+-define(CONF_DEFAULT, <<"gateway {}">>).
 
 %%--------------------------------------------------------------------
 %% Setup
@@ -307,6 +305,10 @@ t_listeners_authn(_) ->
 
     {200, ConfResp3} = request(get, Path),
     assert_confs(AuthConf2, ConfResp3),
+
+    {204, _} = request(delete, Path),
+    %% FIXME: 204?
+    {204, _} = request(get, Path),
     {204, _} = request(delete, "/gateway/stomp").
 
 t_listeners_authn_data_mgmt(_) ->
@@ -340,32 +342,32 @@ t_listeners_authn_data_mgmt(_) ->
     {200,
      #{data := [UserRespd1]} } = request(
                                    get,
-                                   "/gateway/stomp/listeners/stomp:tcp:def/authentication/users"),
+                                   Path ++ "/users"),
     assert_confs(UserRespd1, User1),
 
     {200, UserRespd2} = request(
                           get,
-                          "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"),
+                          Path ++ "/users/test"),
     assert_confs(UserRespd2, User1),
 
     {200, UserRespd3} = request(
                           put,
-                          "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test",
+                          Path ++ "/users/test",
                           #{password => <<"654321">>, is_superuser => true}),
     assert_confs(UserRespd3, User1#{is_superuser => true}),
 
     {200, UserRespd4} = request(
                           get,
-                          "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"),
+                          Path ++ "/users/test"),
     assert_confs(UserRespd4, User1#{is_superuser => true}),
 
     {204, _} = request(
                  delete,
-                 "/gateway/stomp/listeners/stomp:tcp:def/authentication/users/test"),
+                 Path ++ "/users/test"),
 
     {200, #{data := []}} = request(
                              get,
-                             "/gateway/stomp/listeners/stomp:tcp:def/authentication/users"),
+                             Path ++ "/users"),
     {204, _} = request(delete, "/gateway/stomp").
 
 %%--------------------------------------------------------------------
diff --git a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl
index cf27dc027..24dacf750 100644
--- a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl
@@ -29,6 +29,23 @@
 gateway {}
 ">>).
 
+%% The config with json format for mqtt-sn gateway
+-define(CONF_MQTTSN, "
+{\"idle_timeout\": \"30s\",
+ \"enable_stats\": true,
+ \"mountpoint\": \"mqttsn/\",
+ \"gateway_id\": 1,
+ \"broadcast\": true,
+ \"enable_qos3\": true,
+ \"predefined\": [{\"id\": 1001, \"topic\": \"pred/a\"}],
+ \"listeners\":
+    [{\"type\": \"udp\",
+      \"name\": \"ct\",
+      \"bind\": \"1884\"
+    }]
+}
+").
+
 %%--------------------------------------------------------------------
 %% Setup
 %%--------------------------------------------------------------------
@@ -107,34 +124,144 @@ t_gateway_list(_) ->
       "Gateway(name=lwm2m, status=unloaded)\n"
       "Gateway(name=mqttsn, status=unloaded)\n"
       "Gateway(name=stomp, status=unloaded)\n"
-      , acc_print()).
+      , acc_print()),
 
-t_gateway_load(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
 
-t_gateway_unload(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["list"]),
+    %% TODO: assert it.
+    _ = acc_print(),
 
-t_gateway_start(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
-t_gateway_stop(_) ->
-    ok.
+t_gateway_load_unload_lookup(_) ->
+    emqx_gateway_cli:gateway(["lookup", "mqttsn"]),
+    ?assertEqual("undefined\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
+
+    %% TODO: bad config name, format???
+
+    emqx_gateway_cli:gateway(["lookup", "mqttsn"]),
+    %% TODO: assert it. for example:
+    %% name: mqttsn
+    %% status: running
+    %% created_at: 2022-01-05T14:40:20.039+08:00
+    %% started_at: 2022-01-05T14:42:37.894+08:00
+    %% config: #{broadcast => false,enable => true,enable_qos3 => true,
+    %%           enable_stats => true,gateway_id => 1,idle_timeout => 30000,
+    %%           mountpoint => <<>>,predefined => []}
+    _ = acc_print(),
+
+    emqx_gateway_cli:gateway(["load", "mqttsn", "{}"]),
+    ?assertEqual(
+        "Error: The mqttsn gateway already loaded\n"
+        , acc_print()),
+
+    emqx_gateway_cli:gateway(["load", "bad-gw-name", "{}"]),
+    %% TODO: assert it. for example:
+    %% Error: Illegal gateway name
+    _ = acc_print(),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+    %% Always return ok, even the gateway has unloaded
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["lookup", "mqttsn"]),
+    ?assertEqual("undefined\n", acc_print()).
+
+t_gateway_start_stop(_) ->
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["stop", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+    %% dupliacted stop gateway, return ok
+    emqx_gateway_cli:gateway(["stop", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["start", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+    %% dupliacted start gateway, return ok
+    emqx_gateway_cli:gateway(["start", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
 t_gateway_clients_usage(_) ->
-    ok.
+    ?assertEqual(
+       ["gateway-clients list               "
+            "# List all clients for a gateway\n",
+        "gateway-clients lookup   "
+            "# Lookup the Client Info for specified client\n",
+        "gateway-clients kick     "
+            "# Kick out a client\n"],
+       emqx_gateway_cli:'gateway-clients'(usage)
+     ).
 
-t_gateway_clients_list(_) ->
-    ok.
+t_gateway_clients(_) ->
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
 
-t_gateway_clients_lookup(_) ->
-    ok.
+    Socket = sn_client_connect(<<"client1">>),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]),
+    ClientDesc1 = acc_print(),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["lookup", "mqttsn", "client1"]),
+    ClientDesc2 = acc_print(),
+    ?assertEqual(ClientDesc1, ClientDesc2),
+
+    sn_client_disconnect(Socket),
+    timer:sleep(500),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["lookup", "mqttsn", "bad-client"]),
+    ?assertEqual("Not Found.\n", acc_print()),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["lookup", "bad-gw", "bad-client"]),
+    ?assertEqual("Bad Gateway Name.\n", acc_print()),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]),
+    %% no print for empty client list
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "bad-gw"]),
+    ?assertEqual("Bad Gateway Name.\n", acc_print()),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
 t_gateway_clients_kick(_) ->
-    ok.
+    emqx_gateway_cli:gateway(["load", "mqttsn", ?CONF_MQTTSN]),
+    ?assertEqual("ok\n", acc_print()),
+
+    Socket = sn_client_connect(<<"client1">>),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["list", "mqttsn"]),
+    _ = acc_print(),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["kick", "mqttsn", "bad-client"]),
+    ?assertEqual("Not Found.\n", acc_print()),
+
+    _ = emqx_gateway_cli:'gateway-clients'(["kick", "mqttsn", "client1"]),
+    ?assertEqual("ok\n", acc_print()),
+
+    sn_client_disconnect(Socket),
+
+    emqx_gateway_cli:gateway(["unload", "mqttsn"]),
+    ?assertEqual("ok\n", acc_print()).
 
 t_gateway_metrcis_usage(_) ->
-    ok.
+    ?assertEqual(
+       [ "gateway-metrics  "
+            "# List all metrics for a gateway\n"],
+       emqx_gateway_cli:'gateway-metrics'(usage)
+     ).
 
 t_gateway_metrcis(_) ->
     ok.
@@ -148,3 +275,14 @@ acc_print(Acc) ->
     after 200 ->
         Acc
     end.
+
+sn_client_connect(ClientId) ->
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    _ = emqx_sn_protocol_SUITE:send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, 16#05, 0>>,
+                 emqx_sn_protocol_SUITE:receive_response(Socket)),
+    Socket.
+
+sn_client_disconnect(Socket) ->
+    _ = emqx_sn_protocol_SUITE:send_disconnect_msg(Socket, undefined),
+    gen_udp:close(Socket), ok.
diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl
new file mode 100644
index 000000000..82d97a166
--- /dev/null
+++ b/apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl
@@ -0,0 +1,248 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_cm_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(CLIENTID, <<"client1">>).
+
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([]),
+
+    ok = meck:new(emqx_gateway_metrics, [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_gateway_metrics, inc, fun(_, _) -> ok end),
+    Conf.
+
+end_per_suite(_Conf) ->
+    meck:unload(emqx_gateway_metrics),
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_TestCase, Conf) ->
+    process_flag(trap_exit, true),
+    {ok, CMPid} = emqx_gateway_cm:start_link([{gwname, ?GWNAME}]),
+    [{cm, CMPid} | Conf].
+
+end_per_testcase(_TestCase, Conf) ->
+    CMPid = proplists:get_value(cm, Conf),
+    gen_server:stop(CMPid),
+    process_flag(trap_exit, false),
+    Conf.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_open_session(_) ->
+    {error, not_supported_now} = emqx_gateway_cm:open_session(
+                                 ?GWNAME, false, clientinfo(), conninfo(),
+                                 fun(_, _) -> #{} end),
+
+    {ok, SessionRes} = emqx_gateway_cm:open_session(
+                         ?GWNAME, true, clientinfo(), conninfo(),
+                         fun(_, _) -> #{no => 1} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 1}}, SessionRes),
+
+    %% assert1. check channel infos in ets table
+    Chann = {?CLIENTID, self()},
+    ?assertEqual(
+       [Chann],
+       ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
+    ?assertEqual(
+       [{Chann, ?MODULE}],
+       ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
+
+    %% assert2. discard the presented session
+
+    {ok, SessionRes2} = emqx_gateway_cm:open_session(
+                          ?GWNAME, true, clientinfo(), conninfo(),
+                          fun(_, _) -> #{no => 2} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 2}}, SessionRes2),
+
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+    ?assertEqual(
+       1,
+       ets:info(emqx_gateway_cm:tabname(info, ?GWNAME), size)),
+
+    receive
+        discard ->
+            emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
+            emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
+    after 100 ->
+        ?assert(false, "waiting discard msg timeout")
+    end,
+
+    %% assert3. no channel infos in ets table
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))).
+
+t_get_set_chan_info_stats(_) ->
+    {ok, SessionRes} = emqx_gateway_cm:open_session(
+                         ?GWNAME, true, clientinfo(), conninfo(),
+                         fun(_, _) -> #{no => 1} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 1}}, SessionRes),
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+
+    %% Info: get/set
+    NInfo = #{newinfo => true},
+    emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
+    ?assertEqual(
+       NInfo,
+       emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID)),
+    ?assertEqual(
+       NInfo,
+       emqx_gateway_cm:get_chan_info(?GWNAME, ?CLIENTID, self())),
+    %% Stats: get/set
+    NStats = [{newstats, true}],
+    emqx_gateway_cm:set_chan_stats(?GWNAME, ?CLIENTID, NStats),
+    ?assertEqual(
+       NStats,
+       emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID)),
+    ?assertEqual(
+       NStats,
+       emqx_gateway_cm:get_chan_stats(?GWNAME, ?CLIENTID, self())),
+
+    emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
+    emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID).
+
+t_handle_process_down(Conf) ->
+    Pid = proplists:get_value(cm, Conf),
+
+    {ok, SessionRes} = emqx_gateway_cm:open_session(
+                         ?GWNAME, true, clientinfo(), conninfo(),
+                         fun(_, _) -> #{no => 1} end),
+    ?assertEqual(#{present => false,
+                   session => #{no => 1}}, SessionRes),
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+
+    _ = Pid ! {'DOWN', mref, process, self(), normal},
+
+    timer:sleep(200), %% wait the asycn clear task
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(chan, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(conn, ?GWNAME))),
+    ?assertEqual(
+       [],
+       ets:tab2list(emqx_gateway_cm:tabname(info, ?GWNAME))).
+
+t_kick_session(_) ->
+    %% session1
+    {ok, _} = emqx_gateway_cm:open_session(
+                ?GWNAME, true, clientinfo(), conninfo(),
+                fun(_, _) -> #{no => 1} end),
+    emqx_gateway_cm:insert_channel_info(
+      ?GWNAME, ?CLIENTID,
+      #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
+
+    %% meck `lookup_channels`
+    Self = self(),
+    ok = meck:new(emqx_gateway_cm_registry,
+                  [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_gateway_cm_registry, lookup_channels,
+                     fun(_, ?CLIENTID) -> [Self, Self] end),
+
+    ok = emqx_gateway_cm:kick_session(?GWNAME, ?CLIENTID),
+
+    receive discard -> ok
+    after 100 -> ?assert(false, "waiting discard msg timeout")
+    end,
+    receive
+        kick ->
+            emqx_gateway_cm:connection_closed(?GWNAME, ?CLIENTID),
+            emqx_gateway_cm:unregister_channel(?GWNAME, ?CLIENTID)
+    after
+        100 ->
+            ?assert(false, "waiting kick msg timeout")
+    end,
+    meck:unload(emqx_gateway_cm_registry).
+
+t_unexpected_handle(Conf) ->
+    Pid = proplists:get_value(cm, Conf),
+    _ = Pid ! unexpected_info,
+    ok = gen_server:call(Pid, unexpected_call),
+    ok = gen_server:cast(Pid, unexpected_cast).
+
+%%--------------------------------------------------------------------
+%% helpers
+
+clientinfo() ->
+    #{ clientid => ?CLIENTID
+     , is_bridge => false
+     , is_superuser => false
+     , listener => 'mqttsn:udp:default'
+     , mountpoint => <<"mqttsn/">>
+     , peerhost => {127, 0, 0, 1}
+     , protocol => 'mqtt-sn'
+     , sockport => 1884
+     , username => undefined
+     , zone => default
+     }.
+
+conninfo() ->
+    #{ clean_start => true
+     , clientid => ?CLIENTID
+     , conn_mod => ?MODULE
+     , connected_at => 1641805544652
+     , expiry_interval => 0
+     , keepalive => 10
+     , peercert => nossl
+     , peername => {{127, 0, 0, 1}, 64810}
+     , proto_name => <<"MQTT-SN">>
+     , proto_ver => <<"1.2">>
+     , sockname => {{0, 0, 0, 0}, 1884}
+     , socktype => udp
+     }.
+
+%%--------------------------------------------------------------------
+%% connection module mock
+
+call(ConnPid, discard, _) ->
+    ConnPid ! discard, ok;
+call(ConnPid, kick, _) ->
+    ConnPid ! kick, ok.
diff --git a/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl
new file mode 100644
index 000000000..3cca034a6
--- /dev/null
+++ b/apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl
@@ -0,0 +1,97 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_cm_registry_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(CLIENTID, <<"client1">>).
+
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([]),
+    Conf.
+
+end_per_suite(_Conf) ->
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_TestCase, Conf) ->
+    {ok, Pid} = emqx_gateway_cm_registry:start_link(?GWNAME),
+    [{registry, Pid} | Conf].
+
+end_per_testcase(_TestCase, Conf) ->
+    Pid = proplists:get_value(registry, Conf),
+    gen_server:stop(Pid),
+    Conf.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_tabname(_) ->
+    ?assertEqual(
+       emqx_gateway_gw_name_channel_registry,
+       emqx_gateway_cm_registry:tabname(gw_name)).
+
+t_register_unregister_channel(_) ->
+    ok = emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID),
+    ?assertEqual(
+       [{channel, ?CLIENTID, self()}],
+       ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))),
+
+    ?assertEqual(
+       [self()],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)),
+
+    ok = emqx_gateway_cm_registry:unregister_channel(?GWNAME, ?CLIENTID),
+
+    ?assertEqual(
+       [], 
+       ets:tab2list(emqx_gateway_cm_registry:tabname(?GWNAME))),
+    ?assertEqual(
+       [],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)).
+
+t_cleanup_channels(Conf) ->
+    Pid = proplists:get_value(registry, Conf),
+    emqx_gateway_cm_registry:register_channel(?GWNAME, ?CLIENTID),
+    ?assertEqual(
+       [self()],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)),
+    Pid ! {membership, {mnesia, down, node()}},
+    ct:sleep(100),
+    ?assertEqual(
+       [],
+       emqx_gateway_cm_registry:lookup_channels(?GWNAME, ?CLIENTID)).
+
+t_handle_unexpected_msg(Conf) ->
+    Pid = proplists:get_value(registry, Conf),
+    _ = Pid ! unexpected_info,
+    ok = gen_server:cast(Pid, unexpected_cast),
+    ignored = gen_server:call(Pid, unexpected_call).
diff --git a/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl
new file mode 100644
index 000000000..e09e5bc1b
--- /dev/null
+++ b/apps/emqx_gateway/test/emqx_gateway_ctx_SUITE.erl
@@ -0,0 +1,67 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_ctx_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_access_control, authenticate,
+                     fun(#{clientid := bad_client}) ->
+                             {error, bad_username_or_password};
+                        (ClientInfo) -> {ok, ClientInfo}
+                     end),
+    Conf.
+
+end_per_suite(_Conf) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_authenticate(_) ->
+    Ctx = #{gwname => mqttsn, auth => [], cm => self()},
+    Info1 = #{ mountpoint => undefined
+             , clientid => <<"user1">>
+             },
+    NInfo1 = zone(Info1),
+    ?assertEqual({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
+
+    Info2 = #{ mountpoint => <<"mqttsn/${clientid}/">> 
+             , clientid => <<"user1">>
+             },
+    NInfo2 = zone(Info2#{mountpoint => <<"mqttsn/user1/">>}),
+    ?assertEqual({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
+
+    Info3 = #{ mountpoint => <<"mqttsn/${clientid}/">> 
+             , clientid => bad_client
+             },
+    {error, bad_username_or_password}
+        = emqx_gateway_ctx:authenticate(Ctx, Info3),
+    ok.
+
+zone(Info) -> Info#{zone => default}.
diff --git a/apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl
new file mode 100644
index 000000000..b55de1738
--- /dev/null
+++ b/apps/emqx_gateway/test/emqx_gateway_metrics_SUITE.erl
@@ -0,0 +1,76 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_metrics_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(GWNAME, mqttsn).
+-define(METRIC, 'ct.test.metrics_name').
+-define(CONF_DEFAULT, <<"gateway {}">>).
+
+%%--------------------------------------------------------------------
+%% setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Conf) ->
+    emqx_config:erase(gateway),
+    emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+    emqx_common_test_helpers:start_apps([]),
+    Conf.
+
+end_per_suite(_Conf) ->
+    emqx_common_test_helpers:stop_apps([]).
+
+init_per_testcase(_TestCase, Conf) ->
+    {ok, Pid} = emqx_gateway_metrics:start_link(?GWNAME),
+    [{metrics, Pid} | Conf].
+
+end_per_testcase(_TestCase, Conf) ->
+    Pid = proplists:get_value(metrics, Conf),
+    gen_server:stop(Pid),
+    Conf.
+
+%%--------------------------------------------------------------------
+%% cases
+%%--------------------------------------------------------------------
+
+t_inc_dec(_) ->
+    ok = emqx_gateway_metrics:inc(?GWNAME, ?METRIC),
+    ok = emqx_gateway_metrics:inc(?GWNAME, ?METRIC),
+
+    ?assertEqual(
+      [{?METRIC, 2}],
+      emqx_gateway_metrics:lookup(?GWNAME)),
+
+    ok = emqx_gateway_metrics:dec(?GWNAME, ?METRIC),
+    ok = emqx_gateway_metrics:dec(?GWNAME, ?METRIC),
+
+    ?assertEqual(
+      [{?METRIC, 0}],
+      emqx_gateway_metrics:lookup(?GWNAME)).
+
+t_handle_unexpected_msg(Conf) ->
+    Pid = proplists:get_value(metrics, Conf),
+    _ = Pid ! unexpected_info,
+    ok = gen_server:cast(Pid, unexpected_cast),
+    ok = gen_server:call(Pid, unexpected_call),
+    ok.
diff --git a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl
index 993da10bb..63378f875 100644
--- a/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_gateway_registry_SUITE.erl
@@ -57,7 +57,13 @@ t_load_unload(_) ->
 
     {error, already_existed} = emqx_gateway_registry:reg(test, [{cbkmod, ?MODULE}]),
 
+    ok = emqx_gateway_registry:unreg(test),
     ok = emqx_gateway_registry:unreg(test),
     undefined = emqx_gateway_registry:lookup(test),
     OldCnt = length(emqx_gateway_registry:list()),
     ok.
+
+t_handle_unexpected_msg(_) ->
+    _ = emqx_gateway_registry ! unexpected_info,
+    ok = gen_server:cast(emqx_gateway_registry, unexpected_cast),
+    ok = gen_server:call(emqx_gateway_registry, unexpected_call).
diff --git a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl
index d2daf48b4..c96dee651 100644
--- a/apps/emqx_gateway/test/emqx_gateway_test_utils.erl
+++ b/apps/emqx_gateway/test/emqx_gateway_test_utils.erl
@@ -117,6 +117,8 @@ req(Path, Qs) ->
 req(Path, Qs, Body) ->
     {url(Path, Qs), auth([]), "application/json", emqx_json:encode(Body)}.
 
+url(Path, []) ->
+    lists:concat([?http_api_host, Path]);
 url(Path, Qs) ->
     lists:concat([?http_api_host, Path, "?", binary_to_list(cow_qs:qs(Qs))]).
 
diff --git a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl
index b6c1b837f..30e910f2b 100644
--- a/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl
@@ -19,6 +19,11 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -define(PORT, 5783).
 
 -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
@@ -66,9 +71,9 @@ all() ->
     , {group, test_grp_4_discover}
     , {group, test_grp_5_write_attr}
     , {group, test_grp_6_observe}
-
       %% {group, test_grp_8_object_19}
     , {group, test_grp_9_psm_queue_mode}
+    , {group, test_grp_10_rest_api}
     ].
 
 suite() -> [{timetrap, {seconds, 90}}].
@@ -147,21 +152,29 @@ groups() ->
       [
        case90_psm_mode,
        case90_queue_mode
+      ]},
+     {test_grp_10_rest_api, [RepeatOpt],
+      [
+       case100_clients_api,
+       case100_subscription_api
       ]}
     ].
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_conf]),
+    %% load application first for minirest api searching
+    application:load(emqx_gateway),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf]),
     Config.
 
 end_per_suite(Config) ->
     timer:sleep(300),
     {ok, _} = emqx_conf:remove([<<"gateway">>,<<"lwm2m">>], #{}),
-    emqx_common_test_helpers:stop_apps([emqx_conf]),
+    emqx_mgmt_api_test_util:end_suite([emqx_conf]),
     Config.
 
 init_per_testcase(_AllTestCase, Config) ->
     ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
+
     {ok, _} = application:ensure_all_started(emqx_gateway),
     {ok, ClientUdpSock} = gen_udp:open(0, [binary, {active, false}]),
 
@@ -1887,6 +1900,68 @@ server_cache_mode(Config, RegOption) ->
     verify_read_response_1(2, UdpSock),
     verify_read_response_1(3, UdpSock).
 
+case100_clients_api(Config) ->
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId1 = 15,
+    UdpSock = ?config(sock, Config),
+    ObjectList = <<", , , , ">>,
+    RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
+    std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic),
+
+    %% list
+    {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"),
+    %% searching
+    {200, #{data := [Client2]}} =
+        request(get, "/gateway/lwm2m/clients",
+                [{<<"endpoint_name">>, list_to_binary(Epn)}]),
+    {200, #{data := [Client3]}} =
+        request(get, "/gateway/lwm2m/clients",
+                [{<<"like_endpoint_name">>, list_to_binary(Epn)},
+                 {<<"gte_lifetime">>, <<"1">>}
+                ]),
+    %% lookup
+    ClientId = maps:get(clientid, Client1),
+    {200, Client4} =
+        request(get, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
+    %% assert
+    Client1 = Client2 = Client3 = Client4,
+    %% kickout
+    {204, _} =
+        request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
+    {200, #{data := []}} = request(get, "/gateway/lwm2m/clients").
+
+case100_subscription_api(Config) ->
+    Epn = "urn:oma:lwm2m:oma:3",
+    MsgId1 = 15,
+    UdpSock = ?config(sock, Config),
+    ObjectList = <<", , , , ">>,
+    RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
+    std_register(UdpSock, Epn, ObjectList, MsgId1, RespTopic),
+
+    {200, #{data := [Client1]}} = request(get, "/gateway/lwm2m/clients"),
+    ClientId = maps:get(clientid, Client1),
+    Path = "/gateway/lwm2m/clients/" ++
+            binary_to_list(ClientId) ++
+            "/subscriptions",
+
+    %% list
+    {200, [InitSub]} = request(get, Path),
+    ?assertEqual(
+       <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/#">>,
+       maps:get(topic, InitSub)),
+
+    %% create
+    SubReq = #{ topic => <<"tx">>
+              , qos => 1
+              , nl => 0
+              , rap => 0
+              , rh => 0
+              },
+    {201, _} = request(post, Path, SubReq),
+    {200, _} = request(get, Path),
+    {204, _} = request(delete, Path ++ "/tx"),
+    {200, [InitSub]} = request(get, Path).
+
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 %%% Internal Functions
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
index 933776d51..ab333cf87 100644
--- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
@@ -14,11 +14,16 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module (emqx_sn_protocol_SUITE).
+-module(emqx_sn_protocol_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_gateway_test_utils,
+        [ request/2
+        , request/3
+        ]).
+
 -include("src/mqttsn/include/emqx_sn.hrl").
 
 -include_lib("eunit/include/eunit.hrl").
@@ -27,7 +32,6 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 
-
 -define(HOST, {127,0,0,1}).
 -define(PORT, 1884).
 
@@ -85,12 +89,12 @@ all() ->
 
 init_per_suite(Config) ->
     ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
-    emqx_common_test_helpers:start_apps([emqx_gateway]),
+    emqx_mgmt_api_test_util:init_suite([emqx_gateway]),
     Config.
 
 end_per_suite(_) ->
     {ok, _} = emqx:remove_config([gateway, mqttsn]),
-    emqx_common_test_helpers:stop_apps([emqx_gateway]).
+    emqx_mgmt_api_test_util:end_suite([emqx_gateway]).
 
 %%--------------------------------------------------------------------
 %% Test cases
@@ -644,7 +648,6 @@ t_publish_qos0_case05(_) ->
 
     gen_udp:close(Socket).
 
-
 t_publish_qos0_case06(_) ->
     Dup = 0,
     QoS = 0,
@@ -1762,6 +1765,72 @@ t_broadcast_test1(_) ->
     timer:sleep(600),
     gen_udp:close(Socket).
 
+t_socket_passvice(_) ->
+    %% TODO: test this gateway enter the passvie event
+    ok.
+
+t_clients_api(_) ->
+    TsNow = emqx_gateway_utils:unix_ts_to_rfc3339(
+              erlang:system_time(millisecond)),
+    ClientId = <<"client_id_test1">>,
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+    %% list
+    {200, #{data := [Client1]}} = request(get, "/gateway/mqttsn/clients"),
+    #{clientid := ClientId} = Client1,
+    %% searching
+    {200, #{data := [Client2]}} =
+        request(get, "/gateway/mqttsn/clients", [{<<"clientid">>, ClientId}]),
+    {200, #{data := [Client3]}} =
+        request(get, "/gateway/mqttsn/clients",
+                [{<<"like_clientid">>, <<"test1">>},
+                 {<<"proto_ver">>, <<"1.2">>},
+                 {<<"ip_address">>, <<"127.0.0.1">>},
+                 {<<"conn_state">>, <<"connected">>},
+                 {<<"clean_start">>, <<"true">>},
+                 {<<"gte_connected_at">>, TsNow}
+                ]),
+    %% lookup
+    {200, Client4} =
+        request(get, "/gateway/mqttsn/clients/client_id_test1"),
+    %% assert
+    Client1 = Client2 = Client3 = Client4,
+    %% kickout
+    {204, _} =
+        request(delete, "/gateway/mqttsn/clients/client_id_test1"),
+    {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"),
+
+    send_disconnect_msg(Socket, undefined),
+    gen_udp:close(Socket).
+
+t_clients_subscription_api(_) ->
+    ClientId = <<"client_id_test1">>,
+    Path = "/gateway/mqttsn/clients/client_id_test1/subscriptions",
+    {ok, Socket} = gen_udp:open(0, [binary]),
+    send_connect_msg(Socket, ClientId),
+    ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
+    %% list
+    {200, []} = request(get, Path),
+    %% create
+    SubReq = #{ topic => <<"tx">>
+              , qos => 1
+              , nl => 0
+              , rap => 0
+              , rh => 0
+              },
+    {201, SubsResp} = request(post, Path, SubReq),
+
+    {200, [SubsResp]} = request(get, Path),
+
+    {204, _} = request(delete, Path ++ "/tx"),
+
+    {200, []} = request(get, Path),
+
+    send_disconnect_msg(Socket, undefined),
+    ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
+    gen_udp:close(Socket).
+
 %%--------------------------------------------------------------------
 %% Helper funcs
 %%--------------------------------------------------------------------
diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
index f2ed76114..53aae7562 100644
--- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
+++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl
@@ -351,6 +351,40 @@ t_ack(_) ->
                                   body    = _}, _, _} = parse(Data4)
     end).
 
+t_1000_msg_send(_) ->
+    with_connection(fun(Sock) ->
+        gen_tcp:send(Sock, serialize(<<"CONNECT">>,
+                                     [{<<"accept-version">>, ?STOMP_VER},
+                                      {<<"host">>, <<"127.0.0.1:61613">>},
+                                      {<<"login">>, <<"guest">>},
+                                      {<<"passcode">>, <<"guest">>},
+                                      {<<"heart-beat">>, <<"0,0">>}])),
+        {ok, Data} = gen_tcp:recv(Sock, 0),
+        {ok, #stomp_frame{command = <<"CONNECTED">>,
+                          headers = _,
+                          body    = _}, _, _} = parse(Data),
+
+        Topic = <<"/queue/foo">>,
+        SendFun = fun() ->
+            gen_tcp:send(Sock, serialize(<<"SEND">>,
+                                        [{<<"destination">>, Topic}],
+                                        <<"msgtest">>))
+        end,
+
+        RecvFun = fun() ->
+            receive
+                {deliver, Topic, _Msg}->
+                    ok
+            after 100 ->
+                      ?assert(false, "waiting message timeout")
+            end
+        end,
+
+        emqx:subscribe(Topic),
+        lists:foreach(fun(_) -> SendFun() end, lists:seq(1, 1000)),
+        lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
+    end).
+
 t_rest_clienit_info(_) ->
     with_connection(fun(Sock) ->
         gen_tcp:send(Sock, serialize(<<"CONNECT">>,
diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl
index 2ff730a6a..6136cab3a 100644
--- a/apps/emqx_management/src/emqx_mgmt_api.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api.erl
@@ -35,6 +35,15 @@
 
 -export([do_query/6]).
 
+-export([ ensure_timestamp_format/2
+        ]).
+
+-export([ unix_ts_to_rfc3339_bin/1
+        , unix_ts_to_rfc3339_bin/2
+        , time_string_to_unix_ts_int/1
+        , time_string_to_unix_ts_int/2
+        ]).
+
 paginate(Tables, Params, {Module, FormatFun}) ->
     Qh = query_handle(Tables),
     Count = count(Tables),
@@ -401,6 +410,7 @@ to_integer(B) when is_binary(B) ->
 to_timestamp(I) when is_integer(I) ->
     I;
 to_timestamp(B) when is_binary(B) ->
+
     binary_to_integer(B).
 
 aton(B) when is_binary(B) ->
@@ -412,6 +422,41 @@ to_ip_port(IPAddress) ->
     Port = list_to_integer(Port0),
     {IP, Port}.
 
+%%--------------------------------------------------------------------
+%% time format funcs
+
+ensure_timestamp_format(Qs, TimeKeys)
+  when is_map(Qs);
+       is_list(TimeKeys) ->
+    Fun = fun (Key, NQs) ->
+        case NQs of
+            %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
+            %% or "1609430400000" (in millisecond)
+            #{Key := TimeString} ->
+                NQs#{Key => time_string_to_unix_ts_int(TimeString)};
+            #{} -> NQs
+        end
+    end,
+    lists:foldl(Fun, Qs, TimeKeys).
+
+unix_ts_to_rfc3339_bin(TimeStamp) ->
+    unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
+
+unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
+    list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
+
+time_string_to_unix_ts_int(DateTime) ->
+    time_string_to_unix_ts_int(DateTime, millisecond).
+
+time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
+    try binary_to_integer(DateTime) of
+        TimeStamp when is_integer(TimeStamp) -> TimeStamp
+    catch
+        error:badarg ->
+            calendar:rfc3339_to_system_time(
+              binary_to_list(DateTime), [{unit, Unit}])
+    end.
+
 %%--------------------------------------------------------------------
 %% EUnits
 %%--------------------------------------------------------------------
diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
index 007916d1b..be38e401d 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl
@@ -44,14 +44,6 @@
 %% for batch operation
 -export([do_subscribe/3]).
 
-%% for test suite
--export([ unix_ts_to_rfc3339_bin/1
-        , unix_ts_to_rfc3339_bin/2
-        , time_string_to_unix_ts_int/1
-        , time_string_to_unix_ts_int/2
-        ]).
-
-
 -define(CLIENT_QS_SCHEMA, {emqx_channel_info,
     [ {<<"node">>, atom}
     , {<<"username">>, binary}
@@ -463,7 +455,7 @@ keepalive_api() ->
 %%%==============================================================================================
 %% parameters trans
 clients(get, #{query_string := Qs}) ->
-    list(generate_qs(Qs)).
+    list(emqx_mgmt_api:ensure_timestamp_format(Qs, time_keys())).
 
 client(get, #{bindings := Bindings}) ->
     lookup(Bindings);
@@ -625,7 +617,8 @@ do_unsubscribe(ClientID, Topic) ->
     end.
 
 %%--------------------------------------------------------------------
-%% QueryString Generation (try rfc3339 to timestamp or keep timestamp)
+%% QueryString data-fomrat convert
+%%  (try rfc3339 to timestamp or keep timestamp)
 
 time_keys() ->
     [ <<"gte_created_at">>
@@ -633,18 +626,6 @@ time_keys() ->
     , <<"gte_connected_at">>
     , <<"lte_connected_at">>].
 
-generate_qs(Qs) ->
-    Fun =
-        fun (Key, NQs) ->
-                case NQs of
-                    %% TimeString likes "2021-01-01T00:00:00.000+08:00" (in rfc3339)
-                    %% or "1609430400000" (in millisecond)
-                    #{Key := TimeString} -> NQs#{Key => time_string_to_unix_ts_int(TimeString)};
-                    #{}                  -> NQs
-                end
-        end,
-    lists:foldl(Fun, Qs, time_keys()).
-
 %%--------------------------------------------------------------------
 %% Query Functions
 
@@ -778,8 +759,11 @@ take_maps_from_inner(Key, Value, Current) ->
 
 result_format_time_fun(Key, NClientInfoMap) ->
     case NClientInfoMap of
-        #{Key := TimeStamp} -> NClientInfoMap#{Key => unix_ts_to_rfc3339_bin(TimeStamp)};
-        #{}                 -> NClientInfoMap
+        #{Key := TimeStamp} ->
+            NClientInfoMap#{
+              Key => emqx_mgmt_api:unix_ts_to_rfc3339_bin(TimeStamp)};
+        #{} ->
+            NClientInfoMap
     end.
 
 -spec(peername_dispart(emqx_types:peername()) -> {binary(), inet:port_number()}).
@@ -795,22 +779,3 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
        updated_time => Timestamp
      }.
 
-%%--------------------------------------------------------------------
-%% time format funcs
-
-unix_ts_to_rfc3339_bin(TimeStamp) ->
-    unix_ts_to_rfc3339_bin(TimeStamp, millisecond).
-
-unix_ts_to_rfc3339_bin(TimeStamp, Unit) when is_integer(TimeStamp) ->
-    list_to_binary(calendar:system_time_to_rfc3339(TimeStamp, [{unit, Unit}])).
-
-time_string_to_unix_ts_int(DateTime) ->
-    time_string_to_unix_ts_int(DateTime, millisecond).
-
-time_string_to_unix_ts_int(DateTime, Unit) when is_binary(DateTime) ->
-    try binary_to_integer(DateTime) of
-        TimeStamp when is_integer(TimeStamp) -> TimeStamp
-    catch
-        error:badarg ->
-            calendar:rfc3339_to_system_time(binary_to_list(DateTime), [{unit, Unit}])
-    end.
diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
index a80d862f7..3c4864ab7 100644
--- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
+++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
@@ -129,7 +129,7 @@ t_query_clients_with_time(_) ->
     NowTimeStampInt = erlang:system_time(millisecond),
     %% Do not uri_encode `=` to `%3D`
     Rfc3339String   = emqx_http_lib:uri_encode(binary:bin_to_list(
-        emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
+        emqx_mgmt_api:unix_ts_to_rfc3339_bin(NowTimeStampInt))),
     TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)),
 
     LteKeys         = ["lte_created_at=", "lte_connected_at="],
@@ -147,10 +147,10 @@ t_query_clients_with_time(_) ->
                        || {ok, Response} <- RequestResults],
     {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults),
     %% EachData :: list()
-    [?assert( emqx_mgmt_api_clients:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt)
+    [?assert( emqx_mgmt_api:time_string_to_unix_ts_int(CreatedAt) < NowTimeStampInt)
      || #{<<"data">> := EachData} <- LteResponseDecodeds,
         #{<<"created_at">> := CreatedAt}     <- EachData],
-    [?assert(emqx_mgmt_api_clients:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt)
+    [?assert(emqx_mgmt_api:time_string_to_unix_ts_int(ConnectedAt) < NowTimeStampInt)
      || #{<<"data">> := EachData} <- LteResponseDecodeds,
         #{<<"connected_at">> := ConnectedAt} <- EachData],
     [?assertEqual(EachData, [])