diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index 8bdf1bb3c..49cc3322c 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -16,9 +16,16 @@ -module(emqx_coap_impl). +-behaviour(emqx_gateway_impl). + +-include_lib("emqx/include/logger.hrl"). -include_lib("emqx_gateway/include/emqx_gateway.hrl"). --behaviour(emqx_gateway_impl). +-import(emqx_gateway_utils, + [ normalize_config/1 + , start_listeners/4 + , stop_listeners/2 + ]). %% APIs -export([ reg/0 @@ -30,8 +37,6 @@ , on_gateway_unload/2 ]). --include_lib("emqx/include/logger.hrl"). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -51,12 +56,20 @@ unreg() -> on_gateway_load(_Gateway = #{name := GwName, config := Config }, Ctx) -> - Listeners = emqx_gateway_utils:normalize_config(Config), - ListenerPids = lists:map(fun(Lis) -> - start_listener(GwName, Ctx, Lis) - end, Listeners), - - {ok, ListenerPids, #{ctx => Ctx}}. + Listeners = normalize_config(Config), + ModCfg = #{frame_mod => emqx_coap_frame, + chann_mod => emqx_coap_channel + }, + case start_listeners( + Listeners, GwName, Ctx, ModCfg) of + {ok, ListenerPids} -> + {ok, ListenerPids, #{ctx => Ctx}}; + {error, {Reason, Listener}} -> + throw({badconf, #{ key => listeners + , vallue => Listener + , reason => Reason + }}) + end. on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> GwName = maps:get(name, Gateway), @@ -76,63 +89,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> on_gateway_unload(_Gateway = #{ name := GwName, config := Config }, _GwState) -> - Listeners = emqx_gateway_utils:normalize_config(Config), - lists:foreach(fun(Lis) -> - stop_listener(GwName, Lis) - end, Listeners). - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of - {ok, Pid} -> - console_print("Gateway ~ts:~ts:~ts on ~ts started.~n", - [GwName, Type, LisName, ListenOnStr]), - Pid; - {error, Reason} -> - ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]), - throw({badconf, Reason}) - end. - -start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - NCfg = Cfg#{ctx => Ctx, - listener => {GwName, Type, LisName}, - frame_mod => emqx_coap_frame, - chann_mod => emqx_coap_channel - }, - MFA = {emqx_gateway_conn, start_link, [NCfg]}, - do_start_listener(Type, Name, ListenOn, SocketOpts, MFA). - -do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> - esockd:open_udp(Name, ListenOn, SocketOpts, MFA); - -do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> - esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). - -stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case StopRet of - ok -> - console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n", - [GwName, Type, LisName, ListenOnStr]); - {error, Reason} -> - ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]) - end, - StopRet. - -stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - esockd:close(Name, ListenOn). - --ifndef(TEST). -console_print(Fmt, Args) -> ?ULOG(Fmt, Args). --else. -console_print(_Fmt, _Args) -> ok. --endif. + Listeners = normalize_config(Config), + stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 697bccc1d..3c902ac8d 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -532,7 +532,21 @@ params_client_searching_in_qs() -> , {lte_connected_at, mk(binary(), M#{desc => <<"Match the client socket connected datatime less than " - " a certain value">>})} + "a certain value">>})} + , {endpoint_name, + mk(binary(), + M#{desc => <<"Match the lwm2m client's endpoint name">>})} + , {like_endpoint_name, + mk(binary(), + M#{desc => <<"Use sub-string to match lwm2m client's endpoint name">>})} + , {gte_lifetime, + mk(binary(), + M#{desc => <<"Match the lwm2m client registered lifetime greater " + "than a certain value">>})} + , {lte_lifetime, + mk(binary(), + M#{desc => <<"Match the lwm2m client registered lifetime less than " + "a certain value">>})} ]. params_paging() -> diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index ad381ce44..34187224e 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -580,7 +580,7 @@ common_listener_opts() -> #{ nullable => {true, recursively} , desc => <<"The authenticatior for this listener">> })} - ]. + ] ++ emqx_gateway_schema:proxy_protocol_opts(). %%-------------------------------------------------------------------- %% examples diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index 03d55e27e..cc0e09a40 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -28,6 +28,8 @@ %, 'gateway-banned'/1 ]). +-elvis([{elvis_style, function_naming_convention, disable}]). + -spec load() -> ok. load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], @@ -50,18 +52,24 @@ is_cmd(Fun) -> %% Cmds gateway(["list"]) -> - lists:foreach(fun(#{name := Name} = Gateway) -> - %% TODO: More infos: listeners?, connected? - Status = maps:get(status, Gateway, stopped), - print("Gateway(name=~ts, status=~ts)~n", [Name, Status]) - end, emqx_gateway:list()); + lists:foreach( + fun (#{name := Name, status := unloaded}) -> + print("Gateway(name=~ts, status=unloaded)\n", [Name]); + (#{name := Name, status := stopped, stopped_at := StoppedAt}) -> + print("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n", + [Name, StoppedAt]); + (#{name := Name, status := running, current_connections := ConnCnt, + started_at := StartedAt}) -> + print("Gateway(name=~ts, status=running, clients=~w, started_at=~ts)\n", + [Name, ConnCnt, StartedAt]) + end, emqx_gateway_http:gateways(all)); gateway(["lookup", Name]) -> case emqx_gateway:lookup(atom(Name)) of undefined -> - print("undefined~n"); + print("undefined\n"); Info -> - print("~p~n", [Info]) + print("~p\n", [Info]) end; gateway(["load", Name, Conf]) -> @@ -70,17 +78,17 @@ gateway(["load", Name, Conf]) -> emqx_json:decode(Conf, [return_maps]) ) of {ok, _} -> - print("ok~n"); + print("ok\n"); {error, Reason} -> - print("Error: ~p~n", [Reason]) + print("Error: ~p\n", [Reason]) end; gateway(["unload", Name]) -> case emqx_gateway_conf:unload_gateway(bin(Name)) of ok -> - print("ok~n"); + print("ok\n"); {error, Reason} -> - print("Error: ~p~n", [Reason]) + print("Error: ~p\n", [Reason]) end; gateway(["stop", Name]) -> @@ -89,9 +97,9 @@ gateway(["stop", Name]) -> #{<<"enable">> => <<"false">>} ) of {ok, _} -> - print("ok~n"); + print("ok\n"); {error, Reason} -> - print("Error: ~p~n", [Reason]) + print("Error: ~p\n", [Reason]) end; gateway(["start", Name]) -> @@ -100,9 +108,9 @@ gateway(["start", Name]) -> #{<<"enable">> => <<"true">>} ) of {ok, _} -> - print("ok~n"); + print("ok\n"); {error, Reason} -> - print("Error: ~p~n", [Reason]) + print("Error: ~p\n", [Reason]) end; gateway(_) -> @@ -123,7 +131,7 @@ gateway(_) -> 'gateway-registry'(["list"]) -> lists:foreach( fun({Name, #{cbkmod := CbMod}}) -> - print("Registered Name: ~ts, Callback Module: ~ts~n", [Name, CbMod]) + print("Registered Name: ~ts, Callback Module: ~ts\n", [Name, CbMod]) end, emqx_gateway_registry:list()); @@ -137,15 +145,15 @@ gateway(_) -> InfoTab = emqx_gateway_cm:tabname(info, Name), case ets:info(InfoTab) of undefined -> - print("Bad Gateway Name.~n"); + print("Bad Gateway Name.\n"); _ -> - dump(InfoTab, client) + dump(InfoTab, client) end; 'gateway-clients'(["lookup", Name, ClientId]) -> ChanTab = emqx_gateway_cm:tabname(chan, Name), case ets:lookup(ChanTab, bin(ClientId)) of - [] -> print("Not Found.~n"); + [] -> print("Not Found.\n"); [Chann] -> InfoTab = emqx_gateway_cm:tabname(info, Name), [ChannInfo] = ets:lookup(InfoTab, Chann), @@ -154,8 +162,8 @@ gateway(_) -> 'gateway-clients'(["kick", Name, ClientId]) -> case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of - ok -> print("ok~n"); - _ -> print("Not Found.~n") + ok -> print("ok\n"); + _ -> print("Not Found.\n") end; 'gateway-clients'(_) -> @@ -171,11 +179,11 @@ gateway(_) -> Tab = emqx_gateway_metrics:tabname(Name), case ets:info(Tab) of undefined -> - print("Bad Gateway Name.~n"); + print("Bad Gateway Name.\n"); _ -> lists:foreach( fun({K, V}) -> - print("~-30s: ~w~n", [K, V]) + print("~-30s: ~w\n", [K, V]) end, lists:sort(ets:tab2list(Tab))) end; @@ -232,7 +240,7 @@ print_record({client, {_, Infos, Stats}}) -> print("Client(~ts, username=~ts, peername=~ts, " "clean_start=~ts, keepalive=~w, " "subscriptions=~w, delivered_msgs=~w, " - "connected=~ts, created_at=~w, connected_at=~w)~n", + "connected=~ts, created_at=~w, connected_at=~w)\n", [format(K, maps:get(K, Info)) || K <- InfoKeys]). print(S) -> emqx_ctl:print(S). diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index cc14eaa33..294e32375 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -50,6 +50,8 @@ -export([namespace/0, roots/0 , fields/1]). +-export([proxy_protocol_opts/0]). + namespace() -> gateway. roots() -> [gateway]. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 8a81584d6..95720ff13 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -18,6 +18,7 @@ -module(emqx_gateway_utils). -include("emqx_gateway.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([ childspec/2 , childspec/3 @@ -26,6 +27,12 @@ , find_sup_child/2 ]). +-export([ start_listeners/4 + , start_listener/4 + , stop_listeners/2 + , stop_listener/2 + ]). + -export([ apply/2 , format_listenon/1 , parse_listenon/1 @@ -89,9 +96,15 @@ childspec(Id, Type, Mod, Args) -> -spec supervisor_ret(supervisor:startchild_ret()) -> {ok, pid()} | {error, supervisor:startchild_err()}. -supervisor_ret({ok, Pid, _Info}) -> {ok, Pid}; -supervisor_ret({error, {Reason, _Child}}) -> {error, Reason}; -supervisor_ret(Ret) -> Ret. +supervisor_ret({ok, Pid, _Info}) -> + {ok, Pid}; +supervisor_ret({error, {Reason, Child}}) -> + case element(1, Child) == child of + true -> {error, Reason}; + _ -> {error, {Reason, Child}} + end; +supervisor_ret(Ret) -> + Ret. -spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id()) -> false @@ -102,6 +115,120 @@ find_sup_child(Sup, ChildId) -> {_Id, Pid, _Type, _Mods} -> {ok, Pid} end. +%% @doc start listeners. close all listeners if someone failed +-spec start_listeners(Listeners :: list(), + GwName :: atom(), + Ctx :: map(), + ModCfg) + -> {ok, [pid()]} + | {error, term()} + when ModCfg :: #{frame_mod := atom(), chann_mod := atom()}. +start_listeners(Listeners, GwName, Ctx, ModCfg) -> + start_listeners(Listeners, GwName, Ctx, ModCfg, []). + +start_listeners([], _, _, _, Acc) -> + {ok, lists:map(fun({listener, {_, Pid}}) -> Pid end, Acc)}; +start_listeners([L | Ls], GwName, Ctx, ModCfg, Acc) -> + case start_listener(GwName, Ctx, L, ModCfg) of + {ok, {ListenerId, ListenOn, Pid}} -> + NAcc = Acc ++ [{listener, {{ListenerId, ListenOn}, Pid}}], + start_listeners(Ls, GwName, Ctx, ModCfg, NAcc); + {error, Reason} -> + lists:foreach(fun({listener, {{ListenerId, ListenOn}, _}}) -> + esockd:close({ListenerId, ListenOn}) + end, Acc), + {error, {Reason, L}} + end. + +-spec start_listener(GwName :: atom(), + Ctx :: emqx_gateway_ctx:context(), + Listener :: tuple(), + ModCfg :: map()) + -> {ok, {ListenerId :: atom(), esockd:listen_on(), pid()}} + | {error, term()}. +start_listener(GwName, Ctx, + {Type, LisName, ListenOn, SocketOpts, Cfg}, ModCfg) -> + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName), + + NCfg = maps:merge(Cfg, ModCfg), + case start_listener(GwName, Ctx, Type, + LisName, ListenOn, SocketOpts, NCfg) of + {ok, Pid} -> + console_print("Gateway ~ts:~ts:~ts on ~ts started.~n", + [GwName, Type, LisName, ListenOnStr]), + {ok, {ListenerId, ListenOn, Pid}}; + {error, Reason} -> + ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]), + emqx_gateway_utils:supervisor_ret({error, Reason}) + end. + +start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), + NCfg = Cfg#{ ctx => Ctx + , listener => {GwName, Type, LisName} + }, + NSocketOpts = merge_default(Type, SocketOpts), + MFA = {emqx_gateway_conn, start_link, [NCfg]}, + do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA). + +merge_default(Udp, Options) -> + {Key, Default} = case Udp of + udp -> + {udp_options, default_udp_options()}; + dtls -> + {udp_options, default_udp_options()}; + tcp -> + {tcp_options, default_tcp_options()}; + ssl -> + {tcp_options, default_tcp_options()} + end, + case lists:keytake(Key, 1, Options) of + {value, {Key, TcpOpts}, Options1} -> + [{Key, emqx_misc:merge_opts(Default, TcpOpts)} + | Options1]; + false -> + [{Key, Default} | Options] + end. + +do_start_listener(Type, Name, ListenOn, SocketOpts, MFA) + when Type == tcp; + Type == ssl -> + esockd:open(Name, ListenOn, SocketOpts, MFA); +do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> + esockd:open_udp(Name, ListenOn, SocketOpts, MFA); +do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> + esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). + +-spec stop_listeners(GwName :: atom(), Listeners :: list()) -> ok. +stop_listeners(GwName, Listeners) -> + lists:foreach(fun(L) -> stop_listener(GwName, L) end, Listeners). + +-spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok. +stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), + ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), + case StopRet of + ok -> + console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n", + [GwName, Type, LisName, ListenOnStr]); + {error, Reason} -> + ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]) + end, + StopRet. + +stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), + esockd:close(Name, ListenOn). + +-ifndef(TEST). +console_print(Fmt, Args) -> ?ULOG(Fmt, Args). +-else. +console_print(_Fmt, _Args) -> ok. +-endif. + apply({M, F, A}, A2) when is_atom(M), is_atom(M), is_list(A), diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 46e3a1628..48dca4324 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -19,6 +19,14 @@ -behaviour(emqx_gateway_impl). +-include_lib("emqx/include/logger.hrl"). + +-import(emqx_gateway_utils, + [ normalize_config/1 + , start_listeners/4 + , stop_listeners/2 + ]). + %% APIs -export([ reg/0 , unreg/0 @@ -29,8 +37,6 @@ , on_gateway_unload/2 ]). --include_lib("emqx/include/logger.hrl"). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -47,6 +53,73 @@ unreg() -> %% emqx_gateway_registry callbacks %%-------------------------------------------------------------------- +on_gateway_load(_Gateway = #{ name := GwName, + config := Config + }, Ctx) -> + %% XXX: How to monitor it ? + %% Start grpc client pool & client channel + PoolName = pool_name(GwName), + PoolSize = emqx_vm:schedulers() * 2, + {ok, PoolSup} = emqx_pool_sup:start_link( + PoolName, hash, PoolSize, + {emqx_exproto_gcli, start_link, []}), + _ = start_grpc_client_channel(GwName, + maps:get(handler, Config, undefined) + ), + %% XXX: How to monitor it ? + _ = start_grpc_server(GwName, maps:get(server, Config, undefined)), + + NConfig = maps:without( + [server, handler], + Config#{pool_name => PoolName} + ), + Listeners = emqx_gateway_utils:normalize_config( + NConfig#{handler => GwName} + ), + + ModCfg = #{frame_mod => emqx_exproto_frame, + chann_mod => emqx_exproto_channel + }, + case start_listeners( + Listeners, GwName, Ctx, ModCfg) of + {ok, ListenerPids} -> + {ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}; + {error, {Reason, Listener}} -> + throw({badconf, #{ key => listeners + , vallue => Listener + , reason => Reason + }}) + end. + +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), + try + %% XXX: 1. How hot-upgrade the changes ??? + %% XXX: 2. Check the New confs first before destroy old instance ??? + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) + catch + Class : Reason : Stk -> + logger:error("Failed to update ~ts; " + "reason: {~0p, ~0p} stacktrace: ~0p", + [GwName, Class, Reason, Stk]), + {error, {Class, Reason}} + end. + +on_gateway_unload(_Gateway = #{ name := GwName, + config := Config + }, _GwState = #{pool := PoolSup}) -> + Listeners = emqx_gateway_utils:normalize_config(Config), + %% Stop funcs??? + exit(PoolSup, kill), + stop_grpc_server(GwName), + stop_grpc_client_channel(GwName), + stop_listeners(GwName, Listeners). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + start_grpc_server(_GwName, undefined) -> undefined; start_grpc_server(GwName, Options = #{bind := ListenOn}) -> @@ -103,140 +176,9 @@ stop_grpc_client_channel(GwName) -> _ = grpc_client_sup:stop_channel_pool(GwName), ok. -on_gateway_load(_Gateway = #{ name := GwName, - config := Config - }, Ctx) -> - %% XXX: How to monitor it ? - %% Start grpc client pool & client channel - PoolName = pool_name(GwName), - PoolSize = emqx_vm:schedulers() * 2, - {ok, PoolSup} = emqx_pool_sup:start_link( - PoolName, hash, PoolSize, - {emqx_exproto_gcli, start_link, []}), - _ = start_grpc_client_channel(GwName, - maps:get(handler, Config, undefined) - ), - %% XXX: How to monitor it ? - _ = start_grpc_server(GwName, maps:get(server, Config, undefined)), - - NConfig = maps:without( - [server, handler], - Config#{pool_name => PoolName} - ), - Listeners = emqx_gateway_utils:normalize_config( - NConfig#{handler => GwName} - ), - ListenerPids = lists:map(fun(Lis) -> - start_listener(GwName, Ctx, Lis) - end, Listeners), - {ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}. - -on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> - GwName = maps:get(name, Gateway), - try - %% XXX: 1. How hot-upgrade the changes ??? - %% XXX: 2. Check the New confs first before destroy old instance ??? - on_gateway_unload(Gateway, GwState), - on_gateway_load(Gateway#{config => Config}, Ctx) - catch - Class : Reason : Stk -> - logger:error("Failed to update ~ts; " - "reason: {~0p, ~0p} stacktrace: ~0p", - [GwName, Class, Reason, Stk]), - {error, {Class, Reason}} - end. - -on_gateway_unload(_Gateway = #{ name := GwName, - config := Config - }, _GwState = #{pool := PoolSup}) -> - Listeners = emqx_gateway_utils:normalize_config(Config), - %% Stop funcs??? - exit(PoolSup, kill), - stop_grpc_server(GwName), - stop_grpc_client_channel(GwName), - lists:foreach(fun(Lis) -> - stop_listener(GwName, Lis) - end, Listeners). - pool_name(GwName) -> list_to_atom(lists:concat([GwName, "_gcli_pool"])). -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of - {ok, Pid} -> - console_print("Gateway ~ts:~ts:~ts on ~ts started.~n", - [GwName, Type, LisName, ListenOnStr]), - Pid; - {error, Reason} -> - ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]), - throw({badconf, Reason}) - end. - -start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - NCfg = Cfg#{ - ctx => Ctx, - listener => {GwName, Type, LisName}, - frame_mod => emqx_exproto_frame, - chann_mod => emqx_exproto_channel - }, - MFA = {emqx_gateway_conn, start_link, [NCfg]}, - NSockOpts = merge_default_by_type(Type, SocketOpts), - do_start_listener(Type, Name, ListenOn, NSockOpts, MFA). - -do_start_listener(Type, Name, ListenOn, Opts, MFA) - when Type == tcp; - Type == ssl -> - esockd:open(Name, ListenOn, Opts, MFA); -do_start_listener(udp, Name, ListenOn, Opts, MFA) -> - esockd:open_udp(Name, ListenOn, Opts, MFA); -do_start_listener(dtls, Name, ListenOn, Opts, MFA) -> - esockd:open_dtls(Name, ListenOn, Opts, MFA). - -merge_default_by_type(Type, Options) when Type =:= tcp; - Type =:= ssl -> - Default = emqx_gateway_utils:default_tcp_options(), - case lists:keytake(tcp_options, 1, Options) of - {value, {tcp_options, TcpOpts}, Options1} -> - [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)} - | Options1]; - false -> - [{tcp_options, Default} | Options] - end; -merge_default_by_type(Type, Options) when Type =:= udp; - Type =:= dtls -> - Default = emqx_gateway_utils:default_udp_options(), - case lists:keytake(udp_options, 1, Options) of - {value, {udp_options, TcpOpts}, Options1} -> - [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)} - | Options1]; - false -> - [{udp_options, Default} | Options] - end. - -stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case StopRet of - ok -> - console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n", - [GwName, Type, LisName, ListenOnStr]); - {error, Reason} -> - ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]) - end, - StopRet. - -stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - esockd:close(Name, ListenOn). - -ifndef(TEST). console_print(Fmt, Args) -> ?ULOG(Fmt, Args). -else. diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index ee27d89b1..47ed722b1 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -19,6 +19,8 @@ -behaviour(emqx_gateway_impl). +-include_lib("emqx/include/logger.hrl"). + %% APIs -export([ reg/0 , unreg/0 @@ -29,8 +31,6 @@ , on_gateway_unload/2 ]). --include_lib("emqx/include/logger.hrl"). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -54,10 +54,20 @@ on_gateway_load(_Gateway = #{ name := GwName, case emqx_lwm2m_xml_object_db:start_link(XmlDir) of {ok, RegPid} -> Listeners = emqx_gateway_utils:normalize_config(Config), - ListenerPids = lists:map(fun(Lis) -> - start_listener(GwName, Ctx, Lis) - end, Listeners), - {ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}}; + ModCfg = #{frame_mod => emqx_coap_frame, + chann_mod => emqx_lwm2m_channel + }, + case emqx_gateway_utils:start_listeners( + Listeners, GwName, Ctx, ModCfg) of + {ok, ListenerPids} -> + {ok, ListenerPids, #{ctx => Ctx, registry => RegPid}}; + {error, {Reason, Listener}} -> + _ = emqx_lwm2m_xml_object_db:stop(), + throw({badconf, #{ key => listeners + , vallue => Listener + , reason => Reason + }}) + end; {error, Reason} -> throw({badconf, #{ key => xml_dir , value => XmlDir @@ -85,73 +95,4 @@ on_gateway_unload(_Gateway = #{ name := GwName, }, _GwState = #{registry := RegPid}) -> exit(RegPid, kill), Listeners = emqx_gateway_utils:normalize_config(Config), - lists:foreach(fun(Lis) -> - stop_listener(GwName, Lis) - end, Listeners). - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of - {ok, Pid} -> - console_print("Gateway ~ts:~ts:~ts on ~ts started.~n", - [GwName, Type, LisName, ListenOnStr]), - Pid; - {error, Reason} -> - ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]), - throw({badconf, Reason}) - end. - -start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - NCfg = Cfg#{ ctx => Ctx - , listener => {GwName, Type, LisName} - , frame_mod => emqx_coap_frame - , chann_mod => emqx_lwm2m_channel - }, - NSocketOpts = merge_default(SocketOpts), - MFA = {emqx_gateway_conn, start_link, [NCfg]}, - do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA). - -merge_default(Options) -> - Default = emqx_gateway_utils:default_udp_options(), - case lists:keytake(udp_options, 1, Options) of - {value, {udp_options, TcpOpts}, Options1} -> - [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)} - | Options1]; - false -> - [{udp_options, Default} | Options] - end. - -do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> - esockd:open_udp(Name, ListenOn, SocketOpts, MFA); - -do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> - esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). - -stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case StopRet of - ok -> - console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n", - [GwName, Type, LisName, ListenOnStr]); - {error, Reason} -> - ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]) - end, - StopRet. - -stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - esockd:close(Name, ListenOn). - --ifndef(TEST). -console_print(Fmt, Args) -> ?ULOG(Fmt, Args). --else. -console_print(_Fmt, _Args) -> ok. --endif. + emqx_gateway_utils:stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 377c4f6d6..4284af626 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -19,6 +19,14 @@ -behaviour(emqx_gateway_impl). +-include_lib("emqx/include/logger.hrl"). + +-import(emqx_gateway_utils, + [ normalize_config/1 + , start_listeners/4 + , stop_listeners/2 + ]). + %% APIs -export([ reg/0 , unreg/0 @@ -29,8 +37,6 @@ , on_gateway_unload/2 ]). --include_lib("emqx/include/logger.hrl"). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -70,12 +76,23 @@ on_gateway_load(_Gateway = #{ name := GwName, [broadcast, predefined], Config#{registry => emqx_sn_registry:lookup_name(RegistrySvr)} ), + Listeners = emqx_gateway_utils:normalize_config(NConfig), - ListenerPids = lists:map(fun(Lis) -> - start_listener(GwName, Ctx, Lis) - end, Listeners), - {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. + ModCfg = #{frame_mod => emqx_sn_frame, + chann_mod => emqx_sn_channel + }, + + case start_listeners( + Listeners, GwName, Ctx, ModCfg) of + {ok, ListenerPids} -> + {ok, ListenerPids, _GwState = #{ctx => Ctx}}; + {error, {Reason, Listener}} -> + throw({badconf, #{ key => listeners + , vallue => Listener + , reason => Reason + }}) + end. on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> GwName = maps:get(name, Gateway), @@ -95,68 +112,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> on_gateway_unload(_Gateway = #{ name := GwName, config := Config }, _GwState) -> - Listeners = emqx_gateway_utils:normalize_config(Config), - lists:foreach(fun(Lis) -> - stop_listener(GwName, Lis) - end, Listeners). - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of - {ok, Pid} -> - console_print("Gateway ~ts:~ts:~ts on ~ts started.~n", - [GwName, Type, LisName, ListenOnStr]), - Pid; - {error, Reason} -> - ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]), - throw({badconf, Reason}) - end. - -start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - NCfg = Cfg#{ - ctx => Ctx, - listene => {GwName, Type, LisName}, - frame_mod => emqx_sn_frame, - chann_mod => emqx_sn_channel - }, - esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), - {emqx_gateway_conn, start_link, [NCfg]}). - -merge_default(Options) -> - Default = emqx_gateway_utils:default_udp_options(), - case lists:keytake(udp_options, 1, Options) of - {value, {udp_options, TcpOpts}, Options1} -> - [{udp_options, emqx_misc:merge_opts(Default, TcpOpts)} - | Options1]; - false -> - [{udp_options, Default} | Options] - end. - -stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case StopRet of - ok -> - console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n", - [GwName, Type, LisName, ListenOnStr]); - {error, Reason} -> - ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]) - end, - StopRet. - -stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - esockd:close(Name, ListenOn). - --ifndef(TEST). -console_print(Fmt, Args) -> ?ULOG(Fmt, Args). --else. -console_print(_Fmt, _Args) -> ok. --endif. + Listeners = normalize_config(Config), + stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 41df189bc..4e490e181 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -18,6 +18,15 @@ -behaviour(emqx_gateway_impl). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_gateway/include/emqx_gateway.hrl"). + +-import(emqx_gateway_utils, + [ normalize_config/1 + , start_listeners/4 + , stop_listeners/2 + ]). + %% APIs -export([ reg/0 , unreg/0 @@ -28,9 +37,6 @@ , on_gateway_unload/2 ]). --include_lib("emqx_gateway/include/emqx_gateway.hrl"). --include_lib("emqx/include/logger.hrl"). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -52,15 +58,22 @@ unreg() -> on_gateway_load(_Gateway = #{ name := GwName, config := Config }, Ctx) -> - %% Step1. Fold the config to listeners - Listeners = emqx_gateway_utils:normalize_config(Config), - %% Step2. Start listeners or escokd:specs - ListenerPids = lists:map(fun(Lis) -> - start_listener(GwName, Ctx, Lis) - end, Listeners), - %% FIXME: How to throw an exception to interrupt the restart logic ? - %% FIXME: Assign ctx to GwState - {ok, ListenerPids, _GwState = #{ctx => Ctx}}. + Listeners = normalize_config(Config), + ModCfg = #{frame_mod => emqx_stomp_frame, + chann_mod => emqx_stomp_channel + }, + case start_listeners( + Listeners, GwName, Ctx, ModCfg) of + {ok, ListenerPids} -> + %% FIXME: How to throw an exception to interrupt the restart logic ? + %% FIXME: Assign ctx to GwState + {ok, ListenerPids, _GwState = #{ctx => Ctx}}; + {error, {Reason, Listener}} -> + throw({badconf, #{ key => listeners + , vallue => Listener + , reason => Reason + }}) + end. on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> GwName = maps:get(name, Gateway), @@ -80,68 +93,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> on_gateway_unload(_Gateway = #{ name := GwName, config := Config }, _GwState) -> - Listeners = emqx_gateway_utils:normalize_config(Config), - lists:foreach(fun(Lis) -> - stop_listener(GwName, Lis) - end, Listeners). - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of - {ok, Pid} -> - console_print("Gateway ~ts:~ts:~ts on ~ts started.~n", - [GwName, Type, LisName, ListenOnStr]), - Pid; - {error, Reason} -> - ?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]), - throw({badconf, Reason}) - end. - -start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - NCfg = Cfg#{ - ctx => Ctx, - listener => {GwName, Type, LisName}, %% Used for authn - frame_mod => emqx_stomp_frame, - chann_mod => emqx_stomp_channel - }, - esockd:open(Name, ListenOn, merge_default(SocketOpts), - {emqx_gateway_conn, start_link, [NCfg]}). - -merge_default(Options) -> - Default = emqx_gateway_utils:default_tcp_options(), - case lists:keytake(tcp_options, 1, Options) of - {value, {tcp_options, TcpOpts}, Options1} -> - [{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)} - | Options1]; - false -> - [{tcp_options, Default} | Options] - end. - -stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), - ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case StopRet of - ok -> - console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n", - [GwName, Type, LisName, ListenOnStr]); - {error, Reason} -> - ?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n", - [GwName, Type, LisName, ListenOnStr, Reason]) - end, - StopRet. - -stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), - esockd:close(Name, ListenOn). - --ifndef(TEST). -console_print(Fmt, Args) -> ?ULOG(Fmt, Args). --else. -console_print(_Fmt, _Args) -> ok. --endif. + Listeners = normalize_config(Config), + stop_listeners(GwName, Listeners). diff --git a/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl new file mode 100644 index 000000000..a2338ea26 --- /dev/null +++ b/apps/emqx_gateway/test/emqx_gateway_cli_SUITE.erl @@ -0,0 +1,150 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_gateway_cli_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(GP(S), begin S, receive {fmt, P} -> P; O -> O end end). + +%% this parses to #{}, will not cause config cleanup +%% so we will need call emqx_config:erase +-define(CONF_DEFAULT, <<" +gateway {} +">>). + +%%-------------------------------------------------------------------- +%% Setup +%%-------------------------------------------------------------------- + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Conf) -> + emqx_config:erase(gateway), + emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]), + Conf. + +end_per_suite(Conf) -> + emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]), + Conf. + +init_per_testcase(_, Conf) -> + Self = self(), + ok = meck:new(emqx_ctl, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_ctl, usage, + fun(L) -> emqx_ctl:format_usage(L) end), + ok = meck:expect(emqx_ctl, print, + fun(Fmt) -> + Self ! {fmt, emqx_ctl:format(Fmt)} + end), + ok = meck:expect(emqx_ctl, print, + fun(Fmt, Args) -> + Self ! {fmt, emqx_ctl:format(Fmt, Args)} + end), + Conf. + +end_per_testcase(_, _) -> + meck:unload([emqx_ctl]), + ok. + +%%-------------------------------------------------------------------- +%% Cases +%%-------------------------------------------------------------------- + +%% TODO: + +t_load_unload(_) -> + ok. + +t_gateway_registry_usage(_) -> + ?assertEqual( + ["gateway-registry list # List all registered gateways\n"], + emqx_gateway_cli:'gateway-registry'(usage)). + +t_gateway_registry_list(_) -> + emqx_gateway_cli:'gateway-registry'(["list"]), + ?assertEqual( + "Registered Name: coap, Callback Module: emqx_coap_impl\n" + "Registered Name: exproto, Callback Module: emqx_exproto_impl\n" + "Registered Name: lwm2m, Callback Module: emqx_lwm2m_impl\n" + "Registered Name: mqttsn, Callback Module: emqx_sn_impl\n" + "Registered Name: stomp, Callback Module: emqx_stomp_impl\n" + , acc_print()). + +t_gateway_usage(_) -> + ?assertEqual( + ["gateway list # List all gateway\n", + "gateway lookup # Lookup a gateway detailed informations\n", + "gateway load # Load a gateway with config\n", + "gateway unload # Unload the gateway\n", + "gateway stop # Stop the gateway\n", + "gateway start # Start the gateway\n"], + emqx_gateway_cli:gateway(usage) + ). + +t_gateway_list(_) -> + emqx_gateway_cli:gateway(["list"]), + ?assertEqual( + "Gateway(name=coap, status=unloaded)\n" + "Gateway(name=exproto, status=unloaded)\n" + "Gateway(name=lwm2m, status=unloaded)\n" + "Gateway(name=mqttsn, status=unloaded)\n" + "Gateway(name=stomp, status=unloaded)\n" + , acc_print()). + +t_gateway_load(_) -> + ok. + +t_gateway_unload(_) -> + ok. + +t_gateway_start(_) -> + ok. + +t_gateway_stop(_) -> + ok. + +t_gateway_clients_usage(_) -> + ok. + +t_gateway_clients_list(_) -> + ok. + +t_gateway_clients_lookup(_) -> + ok. + +t_gateway_clients_kick(_) -> + ok. + +t_gateway_metrcis_usage(_) -> + ok. + +t_gateway_metrcis(_) -> + ok. + +acc_print() -> + lists:concat(lists:reverse(acc_print([]))). + +acc_print(Acc) -> + receive + {fmt, S} -> acc_print([S|Acc]) + after 200 -> + Acc + end.