Merge pull request #6549 from HJianBo/gw-review-r3
- Add endpoint_name, like_endpoint_name, gte_lifetime, lte_lifetime query parameters for Swagger docs - Optimize the atomicity of the gateway creation procedure and reduce the side effects if creation failed
This commit is contained in:
commit
e8acec7f56
|
@ -16,9 +16,16 @@
|
||||||
|
|
||||||
-module(emqx_coap_impl).
|
-module(emqx_coap_impl).
|
||||||
|
|
||||||
|
-behaviour(emqx_gateway_impl).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
|
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
|
||||||
|
|
||||||
-behaviour(emqx_gateway_impl).
|
-import(emqx_gateway_utils,
|
||||||
|
[ normalize_config/1
|
||||||
|
, start_listeners/4
|
||||||
|
, stop_listeners/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ reg/0
|
-export([ reg/0
|
||||||
|
@ -30,8 +37,6 @@
|
||||||
, on_gateway_unload/2
|
, on_gateway_unload/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -51,12 +56,20 @@ unreg() ->
|
||||||
on_gateway_load(_Gateway = #{name := GwName,
|
on_gateway_load(_Gateway = #{name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, Ctx) ->
|
}, Ctx) ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = normalize_config(Config),
|
||||||
ListenerPids = lists:map(fun(Lis) ->
|
ModCfg = #{frame_mod => emqx_coap_frame,
|
||||||
start_listener(GwName, Ctx, Lis)
|
chann_mod => emqx_coap_channel
|
||||||
end, Listeners),
|
},
|
||||||
|
case start_listeners(
|
||||||
{ok, ListenerPids, #{ctx => Ctx}}.
|
Listeners, GwName, Ctx, ModCfg) of
|
||||||
|
{ok, ListenerPids} ->
|
||||||
|
{ok, ListenerPids, #{ctx => Ctx}};
|
||||||
|
{error, {Reason, Listener}} ->
|
||||||
|
throw({badconf, #{ key => listeners
|
||||||
|
, vallue => Listener
|
||||||
|
, reason => Reason
|
||||||
|
}})
|
||||||
|
end.
|
||||||
|
|
||||||
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwName = maps:get(name, Gateway),
|
GwName = maps:get(name, Gateway),
|
||||||
|
@ -76,63 +89,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, _GwState) ->
|
}, _GwState) ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = normalize_config(Config),
|
||||||
lists:foreach(fun(Lis) ->
|
stop_listeners(GwName, Listeners).
|
||||||
stop_listener(GwName, Lis)
|
|
||||||
end, Listeners).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal funcs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]),
|
|
||||||
Pid;
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason]),
|
|
||||||
throw({badconf, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
NCfg = Cfg#{ctx => Ctx,
|
|
||||||
listener => {GwName, Type, LisName},
|
|
||||||
frame_mod => emqx_coap_frame,
|
|
||||||
chann_mod => emqx_coap_channel
|
|
||||||
},
|
|
||||||
MFA = {emqx_gateway_conn, start_link, [NCfg]},
|
|
||||||
do_start_listener(Type, Name, ListenOn, SocketOpts, MFA).
|
|
||||||
|
|
||||||
do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
|
|
||||||
esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
|
|
||||||
|
|
||||||
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
|
|
||||||
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
|
|
||||||
|
|
||||||
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case StopRet of
|
|
||||||
ok ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]);
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason])
|
|
||||||
end,
|
|
||||||
StopRet.
|
|
||||||
|
|
||||||
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
esockd:close(Name, ListenOn).
|
|
||||||
|
|
||||||
-ifndef(TEST).
|
|
||||||
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
|
||||||
-else.
|
|
||||||
console_print(_Fmt, _Args) -> ok.
|
|
||||||
-endif.
|
|
||||||
|
|
|
@ -532,7 +532,21 @@ params_client_searching_in_qs() ->
|
||||||
, {lte_connected_at,
|
, {lte_connected_at,
|
||||||
mk(binary(),
|
mk(binary(),
|
||||||
M#{desc => <<"Match the client socket connected datatime less than "
|
M#{desc => <<"Match the client socket connected datatime less than "
|
||||||
" a certain value">>})}
|
"a certain value">>})}
|
||||||
|
, {endpoint_name,
|
||||||
|
mk(binary(),
|
||||||
|
M#{desc => <<"Match the lwm2m client's endpoint name">>})}
|
||||||
|
, {like_endpoint_name,
|
||||||
|
mk(binary(),
|
||||||
|
M#{desc => <<"Use sub-string to match lwm2m client's endpoint name">>})}
|
||||||
|
, {gte_lifetime,
|
||||||
|
mk(binary(),
|
||||||
|
M#{desc => <<"Match the lwm2m client registered lifetime greater "
|
||||||
|
"than a certain value">>})}
|
||||||
|
, {lte_lifetime,
|
||||||
|
mk(binary(),
|
||||||
|
M#{desc => <<"Match the lwm2m client registered lifetime less than "
|
||||||
|
"a certain value">>})}
|
||||||
].
|
].
|
||||||
|
|
||||||
params_paging() ->
|
params_paging() ->
|
||||||
|
|
|
@ -580,7 +580,7 @@ common_listener_opts() ->
|
||||||
#{ nullable => {true, recursively}
|
#{ nullable => {true, recursively}
|
||||||
, desc => <<"The authenticatior for this listener">>
|
, desc => <<"The authenticatior for this listener">>
|
||||||
})}
|
})}
|
||||||
].
|
] ++ emqx_gateway_schema:proxy_protocol_opts().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% examples
|
%% examples
|
||||||
|
|
|
@ -28,6 +28,8 @@
|
||||||
%, 'gateway-banned'/1
|
%, 'gateway-banned'/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-elvis([{elvis_style, function_naming_convention, disable}]).
|
||||||
|
|
||||||
-spec load() -> ok.
|
-spec load() -> ok.
|
||||||
load() ->
|
load() ->
|
||||||
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
|
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
|
||||||
|
@ -50,18 +52,24 @@ is_cmd(Fun) ->
|
||||||
%% Cmds
|
%% Cmds
|
||||||
|
|
||||||
gateway(["list"]) ->
|
gateway(["list"]) ->
|
||||||
lists:foreach(fun(#{name := Name} = Gateway) ->
|
lists:foreach(
|
||||||
%% TODO: More infos: listeners?, connected?
|
fun (#{name := Name, status := unloaded}) ->
|
||||||
Status = maps:get(status, Gateway, stopped),
|
print("Gateway(name=~ts, status=unloaded)\n", [Name]);
|
||||||
print("Gateway(name=~ts, status=~ts)~n", [Name, Status])
|
(#{name := Name, status := stopped, stopped_at := StoppedAt}) ->
|
||||||
end, emqx_gateway:list());
|
print("Gateway(name=~ts, status=stopped, stopped_at=~ts)\n",
|
||||||
|
[Name, StoppedAt]);
|
||||||
|
(#{name := Name, status := running, current_connections := ConnCnt,
|
||||||
|
started_at := StartedAt}) ->
|
||||||
|
print("Gateway(name=~ts, status=running, clients=~w, started_at=~ts)\n",
|
||||||
|
[Name, ConnCnt, StartedAt])
|
||||||
|
end, emqx_gateway_http:gateways(all));
|
||||||
|
|
||||||
gateway(["lookup", Name]) ->
|
gateway(["lookup", Name]) ->
|
||||||
case emqx_gateway:lookup(atom(Name)) of
|
case emqx_gateway:lookup(atom(Name)) of
|
||||||
undefined ->
|
undefined ->
|
||||||
print("undefined~n");
|
print("undefined\n");
|
||||||
Info ->
|
Info ->
|
||||||
print("~p~n", [Info])
|
print("~p\n", [Info])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
gateway(["load", Name, Conf]) ->
|
gateway(["load", Name, Conf]) ->
|
||||||
|
@ -70,17 +78,17 @@ gateway(["load", Name, Conf]) ->
|
||||||
emqx_json:decode(Conf, [return_maps])
|
emqx_json:decode(Conf, [return_maps])
|
||||||
) of
|
) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
print("ok~n");
|
print("ok\n");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
print("Error: ~p~n", [Reason])
|
print("Error: ~p\n", [Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
gateway(["unload", Name]) ->
|
gateway(["unload", Name]) ->
|
||||||
case emqx_gateway_conf:unload_gateway(bin(Name)) of
|
case emqx_gateway_conf:unload_gateway(bin(Name)) of
|
||||||
ok ->
|
ok ->
|
||||||
print("ok~n");
|
print("ok\n");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
print("Error: ~p~n", [Reason])
|
print("Error: ~p\n", [Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
gateway(["stop", Name]) ->
|
gateway(["stop", Name]) ->
|
||||||
|
@ -89,9 +97,9 @@ gateway(["stop", Name]) ->
|
||||||
#{<<"enable">> => <<"false">>}
|
#{<<"enable">> => <<"false">>}
|
||||||
) of
|
) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
print("ok~n");
|
print("ok\n");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
print("Error: ~p~n", [Reason])
|
print("Error: ~p\n", [Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
gateway(["start", Name]) ->
|
gateway(["start", Name]) ->
|
||||||
|
@ -100,9 +108,9 @@ gateway(["start", Name]) ->
|
||||||
#{<<"enable">> => <<"true">>}
|
#{<<"enable">> => <<"true">>}
|
||||||
) of
|
) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
print("ok~n");
|
print("ok\n");
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
print("Error: ~p~n", [Reason])
|
print("Error: ~p\n", [Reason])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
gateway(_) ->
|
gateway(_) ->
|
||||||
|
@ -123,7 +131,7 @@ gateway(_) ->
|
||||||
'gateway-registry'(["list"]) ->
|
'gateway-registry'(["list"]) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({Name, #{cbkmod := CbMod}}) ->
|
fun({Name, #{cbkmod := CbMod}}) ->
|
||||||
print("Registered Name: ~ts, Callback Module: ~ts~n", [Name, CbMod])
|
print("Registered Name: ~ts, Callback Module: ~ts\n", [Name, CbMod])
|
||||||
end,
|
end,
|
||||||
emqx_gateway_registry:list());
|
emqx_gateway_registry:list());
|
||||||
|
|
||||||
|
@ -137,7 +145,7 @@ gateway(_) ->
|
||||||
InfoTab = emqx_gateway_cm:tabname(info, Name),
|
InfoTab = emqx_gateway_cm:tabname(info, Name),
|
||||||
case ets:info(InfoTab) of
|
case ets:info(InfoTab) of
|
||||||
undefined ->
|
undefined ->
|
||||||
print("Bad Gateway Name.~n");
|
print("Bad Gateway Name.\n");
|
||||||
_ ->
|
_ ->
|
||||||
dump(InfoTab, client)
|
dump(InfoTab, client)
|
||||||
end;
|
end;
|
||||||
|
@ -145,7 +153,7 @@ gateway(_) ->
|
||||||
'gateway-clients'(["lookup", Name, ClientId]) ->
|
'gateway-clients'(["lookup", Name, ClientId]) ->
|
||||||
ChanTab = emqx_gateway_cm:tabname(chan, Name),
|
ChanTab = emqx_gateway_cm:tabname(chan, Name),
|
||||||
case ets:lookup(ChanTab, bin(ClientId)) of
|
case ets:lookup(ChanTab, bin(ClientId)) of
|
||||||
[] -> print("Not Found.~n");
|
[] -> print("Not Found.\n");
|
||||||
[Chann] ->
|
[Chann] ->
|
||||||
InfoTab = emqx_gateway_cm:tabname(info, Name),
|
InfoTab = emqx_gateway_cm:tabname(info, Name),
|
||||||
[ChannInfo] = ets:lookup(InfoTab, Chann),
|
[ChannInfo] = ets:lookup(InfoTab, Chann),
|
||||||
|
@ -154,8 +162,8 @@ gateway(_) ->
|
||||||
|
|
||||||
'gateway-clients'(["kick", Name, ClientId]) ->
|
'gateway-clients'(["kick", Name, ClientId]) ->
|
||||||
case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of
|
case emqx_gateway_cm:kick_session(Name, bin(ClientId)) of
|
||||||
ok -> print("ok~n");
|
ok -> print("ok\n");
|
||||||
_ -> print("Not Found.~n")
|
_ -> print("Not Found.\n")
|
||||||
end;
|
end;
|
||||||
|
|
||||||
'gateway-clients'(_) ->
|
'gateway-clients'(_) ->
|
||||||
|
@ -171,11 +179,11 @@ gateway(_) ->
|
||||||
Tab = emqx_gateway_metrics:tabname(Name),
|
Tab = emqx_gateway_metrics:tabname(Name),
|
||||||
case ets:info(Tab) of
|
case ets:info(Tab) of
|
||||||
undefined ->
|
undefined ->
|
||||||
print("Bad Gateway Name.~n");
|
print("Bad Gateway Name.\n");
|
||||||
_ ->
|
_ ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({K, V}) ->
|
fun({K, V}) ->
|
||||||
print("~-30s: ~w~n", [K, V])
|
print("~-30s: ~w\n", [K, V])
|
||||||
end, lists:sort(ets:tab2list(Tab)))
|
end, lists:sort(ets:tab2list(Tab)))
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -232,7 +240,7 @@ print_record({client, {_, Infos, Stats}}) ->
|
||||||
print("Client(~ts, username=~ts, peername=~ts, "
|
print("Client(~ts, username=~ts, peername=~ts, "
|
||||||
"clean_start=~ts, keepalive=~w, "
|
"clean_start=~ts, keepalive=~w, "
|
||||||
"subscriptions=~w, delivered_msgs=~w, "
|
"subscriptions=~w, delivered_msgs=~w, "
|
||||||
"connected=~ts, created_at=~w, connected_at=~w)~n",
|
"connected=~ts, created_at=~w, connected_at=~w)\n",
|
||||||
[format(K, maps:get(K, Info)) || K <- InfoKeys]).
|
[format(K, maps:get(K, Info)) || K <- InfoKeys]).
|
||||||
|
|
||||||
print(S) -> emqx_ctl:print(S).
|
print(S) -> emqx_ctl:print(S).
|
||||||
|
|
|
@ -50,6 +50,8 @@
|
||||||
|
|
||||||
-export([namespace/0, roots/0 , fields/1]).
|
-export([namespace/0, roots/0 , fields/1]).
|
||||||
|
|
||||||
|
-export([proxy_protocol_opts/0]).
|
||||||
|
|
||||||
namespace() -> gateway.
|
namespace() -> gateway.
|
||||||
|
|
||||||
roots() -> [gateway].
|
roots() -> [gateway].
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
-module(emqx_gateway_utils).
|
-module(emqx_gateway_utils).
|
||||||
|
|
||||||
-include("emqx_gateway.hrl").
|
-include("emqx_gateway.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-export([ childspec/2
|
-export([ childspec/2
|
||||||
, childspec/3
|
, childspec/3
|
||||||
|
@ -26,6 +27,12 @@
|
||||||
, find_sup_child/2
|
, find_sup_child/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ start_listeners/4
|
||||||
|
, start_listener/4
|
||||||
|
, stop_listeners/2
|
||||||
|
, stop_listener/2
|
||||||
|
]).
|
||||||
|
|
||||||
-export([ apply/2
|
-export([ apply/2
|
||||||
, format_listenon/1
|
, format_listenon/1
|
||||||
, parse_listenon/1
|
, parse_listenon/1
|
||||||
|
@ -89,9 +96,15 @@ childspec(Id, Type, Mod, Args) ->
|
||||||
-spec supervisor_ret(supervisor:startchild_ret())
|
-spec supervisor_ret(supervisor:startchild_ret())
|
||||||
-> {ok, pid()}
|
-> {ok, pid()}
|
||||||
| {error, supervisor:startchild_err()}.
|
| {error, supervisor:startchild_err()}.
|
||||||
supervisor_ret({ok, Pid, _Info}) -> {ok, Pid};
|
supervisor_ret({ok, Pid, _Info}) ->
|
||||||
supervisor_ret({error, {Reason, _Child}}) -> {error, Reason};
|
{ok, Pid};
|
||||||
supervisor_ret(Ret) -> Ret.
|
supervisor_ret({error, {Reason, Child}}) ->
|
||||||
|
case element(1, Child) == child of
|
||||||
|
true -> {error, Reason};
|
||||||
|
_ -> {error, {Reason, Child}}
|
||||||
|
end;
|
||||||
|
supervisor_ret(Ret) ->
|
||||||
|
Ret.
|
||||||
|
|
||||||
-spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id())
|
-spec find_sup_child(Sup :: pid() | atom(), ChildId :: supervisor:child_id())
|
||||||
-> false
|
-> false
|
||||||
|
@ -102,6 +115,120 @@ find_sup_child(Sup, ChildId) ->
|
||||||
{_Id, Pid, _Type, _Mods} -> {ok, Pid}
|
{_Id, Pid, _Type, _Mods} -> {ok, Pid}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc start listeners. close all listeners if someone failed
|
||||||
|
-spec start_listeners(Listeners :: list(),
|
||||||
|
GwName :: atom(),
|
||||||
|
Ctx :: map(),
|
||||||
|
ModCfg)
|
||||||
|
-> {ok, [pid()]}
|
||||||
|
| {error, term()}
|
||||||
|
when ModCfg :: #{frame_mod := atom(), chann_mod := atom()}.
|
||||||
|
start_listeners(Listeners, GwName, Ctx, ModCfg) ->
|
||||||
|
start_listeners(Listeners, GwName, Ctx, ModCfg, []).
|
||||||
|
|
||||||
|
start_listeners([], _, _, _, Acc) ->
|
||||||
|
{ok, lists:map(fun({listener, {_, Pid}}) -> Pid end, Acc)};
|
||||||
|
start_listeners([L | Ls], GwName, Ctx, ModCfg, Acc) ->
|
||||||
|
case start_listener(GwName, Ctx, L, ModCfg) of
|
||||||
|
{ok, {ListenerId, ListenOn, Pid}} ->
|
||||||
|
NAcc = Acc ++ [{listener, {{ListenerId, ListenOn}, Pid}}],
|
||||||
|
start_listeners(Ls, GwName, Ctx, ModCfg, NAcc);
|
||||||
|
{error, Reason} ->
|
||||||
|
lists:foreach(fun({listener, {{ListenerId, ListenOn}, _}}) ->
|
||||||
|
esockd:close({ListenerId, ListenOn})
|
||||||
|
end, Acc),
|
||||||
|
{error, {Reason, L}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec start_listener(GwName :: atom(),
|
||||||
|
Ctx :: emqx_gateway_ctx:context(),
|
||||||
|
Listener :: tuple(),
|
||||||
|
ModCfg :: map())
|
||||||
|
-> {ok, {ListenerId :: atom(), esockd:listen_on(), pid()}}
|
||||||
|
| {error, term()}.
|
||||||
|
start_listener(GwName, Ctx,
|
||||||
|
{Type, LisName, ListenOn, SocketOpts, Cfg}, ModCfg) ->
|
||||||
|
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
||||||
|
ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
||||||
|
|
||||||
|
NCfg = maps:merge(Cfg, ModCfg),
|
||||||
|
case start_listener(GwName, Ctx, Type,
|
||||||
|
LisName, ListenOn, SocketOpts, NCfg) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
|
||||||
|
[GwName, Type, LisName, ListenOnStr]),
|
||||||
|
{ok, {ListenerId, ListenOn, Pid}};
|
||||||
|
{error, Reason} ->
|
||||||
|
?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
||||||
|
[GwName, Type, LisName, ListenOnStr, Reason]),
|
||||||
|
emqx_gateway_utils:supervisor_ret({error, Reason})
|
||||||
|
end.
|
||||||
|
|
||||||
|
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
|
||||||
|
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
||||||
|
NCfg = Cfg#{ ctx => Ctx
|
||||||
|
, listener => {GwName, Type, LisName}
|
||||||
|
},
|
||||||
|
NSocketOpts = merge_default(Type, SocketOpts),
|
||||||
|
MFA = {emqx_gateway_conn, start_link, [NCfg]},
|
||||||
|
do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA).
|
||||||
|
|
||||||
|
merge_default(Udp, Options) ->
|
||||||
|
{Key, Default} = case Udp of
|
||||||
|
udp ->
|
||||||
|
{udp_options, default_udp_options()};
|
||||||
|
dtls ->
|
||||||
|
{udp_options, default_udp_options()};
|
||||||
|
tcp ->
|
||||||
|
{tcp_options, default_tcp_options()};
|
||||||
|
ssl ->
|
||||||
|
{tcp_options, default_tcp_options()}
|
||||||
|
end,
|
||||||
|
case lists:keytake(Key, 1, Options) of
|
||||||
|
{value, {Key, TcpOpts}, Options1} ->
|
||||||
|
[{Key, emqx_misc:merge_opts(Default, TcpOpts)}
|
||||||
|
| Options1];
|
||||||
|
false ->
|
||||||
|
[{Key, Default} | Options]
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_start_listener(Type, Name, ListenOn, SocketOpts, MFA)
|
||||||
|
when Type == tcp;
|
||||||
|
Type == ssl ->
|
||||||
|
esockd:open(Name, ListenOn, SocketOpts, MFA);
|
||||||
|
do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
|
||||||
|
esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
|
||||||
|
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
|
||||||
|
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
|
||||||
|
|
||||||
|
-spec stop_listeners(GwName :: atom(), Listeners :: list()) -> ok.
|
||||||
|
stop_listeners(GwName, Listeners) ->
|
||||||
|
lists:foreach(fun(L) -> stop_listener(GwName, L) end, Listeners).
|
||||||
|
|
||||||
|
-spec stop_listener(GwName :: atom(), Listener :: tuple()) -> ok.
|
||||||
|
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
||||||
|
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
||||||
|
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
||||||
|
case StopRet of
|
||||||
|
ok ->
|
||||||
|
console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
|
||||||
|
[GwName, Type, LisName, ListenOnStr]);
|
||||||
|
{error, Reason} ->
|
||||||
|
?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
||||||
|
[GwName, Type, LisName, ListenOnStr, Reason])
|
||||||
|
end,
|
||||||
|
StopRet.
|
||||||
|
|
||||||
|
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
|
||||||
|
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
||||||
|
esockd:close(Name, ListenOn).
|
||||||
|
|
||||||
|
-ifndef(TEST).
|
||||||
|
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
||||||
|
-else.
|
||||||
|
console_print(_Fmt, _Args) -> ok.
|
||||||
|
-endif.
|
||||||
|
|
||||||
apply({M, F, A}, A2) when is_atom(M),
|
apply({M, F, A}, A2) when is_atom(M),
|
||||||
is_atom(M),
|
is_atom(M),
|
||||||
is_list(A),
|
is_list(A),
|
||||||
|
|
|
@ -19,6 +19,14 @@
|
||||||
|
|
||||||
-behaviour(emqx_gateway_impl).
|
-behaviour(emqx_gateway_impl).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-import(emqx_gateway_utils,
|
||||||
|
[ normalize_config/1
|
||||||
|
, start_listeners/4
|
||||||
|
, stop_listeners/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ reg/0
|
-export([ reg/0
|
||||||
, unreg/0
|
, unreg/0
|
||||||
|
@ -29,8 +37,6 @@
|
||||||
, on_gateway_unload/2
|
, on_gateway_unload/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -47,6 +53,73 @@ unreg() ->
|
||||||
%% emqx_gateway_registry callbacks
|
%% emqx_gateway_registry callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
|
config := Config
|
||||||
|
}, Ctx) ->
|
||||||
|
%% XXX: How to monitor it ?
|
||||||
|
%% Start grpc client pool & client channel
|
||||||
|
PoolName = pool_name(GwName),
|
||||||
|
PoolSize = emqx_vm:schedulers() * 2,
|
||||||
|
{ok, PoolSup} = emqx_pool_sup:start_link(
|
||||||
|
PoolName, hash, PoolSize,
|
||||||
|
{emqx_exproto_gcli, start_link, []}),
|
||||||
|
_ = start_grpc_client_channel(GwName,
|
||||||
|
maps:get(handler, Config, undefined)
|
||||||
|
),
|
||||||
|
%% XXX: How to monitor it ?
|
||||||
|
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
|
||||||
|
|
||||||
|
NConfig = maps:without(
|
||||||
|
[server, handler],
|
||||||
|
Config#{pool_name => PoolName}
|
||||||
|
),
|
||||||
|
Listeners = emqx_gateway_utils:normalize_config(
|
||||||
|
NConfig#{handler => GwName}
|
||||||
|
),
|
||||||
|
|
||||||
|
ModCfg = #{frame_mod => emqx_exproto_frame,
|
||||||
|
chann_mod => emqx_exproto_channel
|
||||||
|
},
|
||||||
|
case start_listeners(
|
||||||
|
Listeners, GwName, Ctx, ModCfg) of
|
||||||
|
{ok, ListenerPids} ->
|
||||||
|
{ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}};
|
||||||
|
{error, {Reason, Listener}} ->
|
||||||
|
throw({badconf, #{ key => listeners
|
||||||
|
, vallue => Listener
|
||||||
|
, reason => Reason
|
||||||
|
}})
|
||||||
|
end.
|
||||||
|
|
||||||
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
|
GwName = maps:get(name, Gateway),
|
||||||
|
try
|
||||||
|
%% XXX: 1. How hot-upgrade the changes ???
|
||||||
|
%% XXX: 2. Check the New confs first before destroy old instance ???
|
||||||
|
on_gateway_unload(Gateway, GwState),
|
||||||
|
on_gateway_load(Gateway#{config => Config}, Ctx)
|
||||||
|
catch
|
||||||
|
Class : Reason : Stk ->
|
||||||
|
logger:error("Failed to update ~ts; "
|
||||||
|
"reason: {~0p, ~0p} stacktrace: ~0p",
|
||||||
|
[GwName, Class, Reason, Stk]),
|
||||||
|
{error, {Class, Reason}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
|
config := Config
|
||||||
|
}, _GwState = #{pool := PoolSup}) ->
|
||||||
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||||
|
%% Stop funcs???
|
||||||
|
exit(PoolSup, kill),
|
||||||
|
stop_grpc_server(GwName),
|
||||||
|
stop_grpc_client_channel(GwName),
|
||||||
|
stop_listeners(GwName, Listeners).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal funcs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
start_grpc_server(_GwName, undefined) ->
|
start_grpc_server(_GwName, undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
|
start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
|
||||||
|
@ -103,140 +176,9 @@ stop_grpc_client_channel(GwName) ->
|
||||||
_ = grpc_client_sup:stop_channel_pool(GwName),
|
_ = grpc_client_sup:stop_channel_pool(GwName),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
on_gateway_load(_Gateway = #{ name := GwName,
|
|
||||||
config := Config
|
|
||||||
}, Ctx) ->
|
|
||||||
%% XXX: How to monitor it ?
|
|
||||||
%% Start grpc client pool & client channel
|
|
||||||
PoolName = pool_name(GwName),
|
|
||||||
PoolSize = emqx_vm:schedulers() * 2,
|
|
||||||
{ok, PoolSup} = emqx_pool_sup:start_link(
|
|
||||||
PoolName, hash, PoolSize,
|
|
||||||
{emqx_exproto_gcli, start_link, []}),
|
|
||||||
_ = start_grpc_client_channel(GwName,
|
|
||||||
maps:get(handler, Config, undefined)
|
|
||||||
),
|
|
||||||
%% XXX: How to monitor it ?
|
|
||||||
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
|
|
||||||
|
|
||||||
NConfig = maps:without(
|
|
||||||
[server, handler],
|
|
||||||
Config#{pool_name => PoolName}
|
|
||||||
),
|
|
||||||
Listeners = emqx_gateway_utils:normalize_config(
|
|
||||||
NConfig#{handler => GwName}
|
|
||||||
),
|
|
||||||
ListenerPids = lists:map(fun(Lis) ->
|
|
||||||
start_listener(GwName, Ctx, Lis)
|
|
||||||
end, Listeners),
|
|
||||||
{ok, ListenerPids, _GwState = #{ctx => Ctx, pool => PoolSup}}.
|
|
||||||
|
|
||||||
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
|
||||||
GwName = maps:get(name, Gateway),
|
|
||||||
try
|
|
||||||
%% XXX: 1. How hot-upgrade the changes ???
|
|
||||||
%% XXX: 2. Check the New confs first before destroy old instance ???
|
|
||||||
on_gateway_unload(Gateway, GwState),
|
|
||||||
on_gateway_load(Gateway#{config => Config}, Ctx)
|
|
||||||
catch
|
|
||||||
Class : Reason : Stk ->
|
|
||||||
logger:error("Failed to update ~ts; "
|
|
||||||
"reason: {~0p, ~0p} stacktrace: ~0p",
|
|
||||||
[GwName, Class, Reason, Stk]),
|
|
||||||
{error, {Class, Reason}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
|
||||||
config := Config
|
|
||||||
}, _GwState = #{pool := PoolSup}) ->
|
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
|
||||||
%% Stop funcs???
|
|
||||||
exit(PoolSup, kill),
|
|
||||||
stop_grpc_server(GwName),
|
|
||||||
stop_grpc_client_channel(GwName),
|
|
||||||
lists:foreach(fun(Lis) ->
|
|
||||||
stop_listener(GwName, Lis)
|
|
||||||
end, Listeners).
|
|
||||||
|
|
||||||
pool_name(GwName) ->
|
pool_name(GwName) ->
|
||||||
list_to_atom(lists:concat([GwName, "_gcli_pool"])).
|
list_to_atom(lists:concat([GwName, "_gcli_pool"])).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal funcs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]),
|
|
||||||
Pid;
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason]),
|
|
||||||
throw({badconf, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
NCfg = Cfg#{
|
|
||||||
ctx => Ctx,
|
|
||||||
listener => {GwName, Type, LisName},
|
|
||||||
frame_mod => emqx_exproto_frame,
|
|
||||||
chann_mod => emqx_exproto_channel
|
|
||||||
},
|
|
||||||
MFA = {emqx_gateway_conn, start_link, [NCfg]},
|
|
||||||
NSockOpts = merge_default_by_type(Type, SocketOpts),
|
|
||||||
do_start_listener(Type, Name, ListenOn, NSockOpts, MFA).
|
|
||||||
|
|
||||||
do_start_listener(Type, Name, ListenOn, Opts, MFA)
|
|
||||||
when Type == tcp;
|
|
||||||
Type == ssl ->
|
|
||||||
esockd:open(Name, ListenOn, Opts, MFA);
|
|
||||||
do_start_listener(udp, Name, ListenOn, Opts, MFA) ->
|
|
||||||
esockd:open_udp(Name, ListenOn, Opts, MFA);
|
|
||||||
do_start_listener(dtls, Name, ListenOn, Opts, MFA) ->
|
|
||||||
esockd:open_dtls(Name, ListenOn, Opts, MFA).
|
|
||||||
|
|
||||||
merge_default_by_type(Type, Options) when Type =:= tcp;
|
|
||||||
Type =:= ssl ->
|
|
||||||
Default = emqx_gateway_utils:default_tcp_options(),
|
|
||||||
case lists:keytake(tcp_options, 1, Options) of
|
|
||||||
{value, {tcp_options, TcpOpts}, Options1} ->
|
|
||||||
[{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)}
|
|
||||||
| Options1];
|
|
||||||
false ->
|
|
||||||
[{tcp_options, Default} | Options]
|
|
||||||
end;
|
|
||||||
merge_default_by_type(Type, Options) when Type =:= udp;
|
|
||||||
Type =:= dtls ->
|
|
||||||
Default = emqx_gateway_utils:default_udp_options(),
|
|
||||||
case lists:keytake(udp_options, 1, Options) of
|
|
||||||
{value, {udp_options, TcpOpts}, Options1} ->
|
|
||||||
[{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
|
|
||||||
| Options1];
|
|
||||||
false ->
|
|
||||||
[{udp_options, Default} | Options]
|
|
||||||
end.
|
|
||||||
|
|
||||||
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case StopRet of
|
|
||||||
ok ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]);
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason])
|
|
||||||
end,
|
|
||||||
StopRet.
|
|
||||||
|
|
||||||
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
esockd:close(Name, ListenOn).
|
|
||||||
|
|
||||||
-ifndef(TEST).
|
-ifndef(TEST).
|
||||||
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
||||||
-else.
|
-else.
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
-behaviour(emqx_gateway_impl).
|
-behaviour(emqx_gateway_impl).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ reg/0
|
-export([ reg/0
|
||||||
, unreg/0
|
, unreg/0
|
||||||
|
@ -29,8 +31,6 @@
|
||||||
, on_gateway_unload/2
|
, on_gateway_unload/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -54,10 +54,20 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
case emqx_lwm2m_xml_object_db:start_link(XmlDir) of
|
case emqx_lwm2m_xml_object_db:start_link(XmlDir) of
|
||||||
{ok, RegPid} ->
|
{ok, RegPid} ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||||
ListenerPids = lists:map(fun(Lis) ->
|
ModCfg = #{frame_mod => emqx_coap_frame,
|
||||||
start_listener(GwName, Ctx, Lis)
|
chann_mod => emqx_lwm2m_channel
|
||||||
end, Listeners),
|
},
|
||||||
{ok, ListenerPids, _GwState = #{ctx => Ctx, registry => RegPid}};
|
case emqx_gateway_utils:start_listeners(
|
||||||
|
Listeners, GwName, Ctx, ModCfg) of
|
||||||
|
{ok, ListenerPids} ->
|
||||||
|
{ok, ListenerPids, #{ctx => Ctx, registry => RegPid}};
|
||||||
|
{error, {Reason, Listener}} ->
|
||||||
|
_ = emqx_lwm2m_xml_object_db:stop(),
|
||||||
|
throw({badconf, #{ key => listeners
|
||||||
|
, vallue => Listener
|
||||||
|
, reason => Reason
|
||||||
|
}})
|
||||||
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
throw({badconf, #{ key => xml_dir
|
throw({badconf, #{ key => xml_dir
|
||||||
, value => XmlDir
|
, value => XmlDir
|
||||||
|
@ -85,73 +95,4 @@ on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
}, _GwState = #{registry := RegPid}) ->
|
}, _GwState = #{registry := RegPid}) ->
|
||||||
exit(RegPid, kill),
|
exit(RegPid, kill),
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = emqx_gateway_utils:normalize_config(Config),
|
||||||
lists:foreach(fun(Lis) ->
|
emqx_gateway_utils:stop_listeners(GwName, Listeners).
|
||||||
stop_listener(GwName, Lis)
|
|
||||||
end, Listeners).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal funcs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]),
|
|
||||||
Pid;
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason]),
|
|
||||||
throw({badconf, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
NCfg = Cfg#{ ctx => Ctx
|
|
||||||
, listener => {GwName, Type, LisName}
|
|
||||||
, frame_mod => emqx_coap_frame
|
|
||||||
, chann_mod => emqx_lwm2m_channel
|
|
||||||
},
|
|
||||||
NSocketOpts = merge_default(SocketOpts),
|
|
||||||
MFA = {emqx_gateway_conn, start_link, [NCfg]},
|
|
||||||
do_start_listener(Type, Name, ListenOn, NSocketOpts, MFA).
|
|
||||||
|
|
||||||
merge_default(Options) ->
|
|
||||||
Default = emqx_gateway_utils:default_udp_options(),
|
|
||||||
case lists:keytake(udp_options, 1, Options) of
|
|
||||||
{value, {udp_options, TcpOpts}, Options1} ->
|
|
||||||
[{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
|
|
||||||
| Options1];
|
|
||||||
false ->
|
|
||||||
[{udp_options, Default} | Options]
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
|
|
||||||
esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
|
|
||||||
|
|
||||||
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
|
|
||||||
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
|
|
||||||
|
|
||||||
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case StopRet of
|
|
||||||
ok ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]);
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason])
|
|
||||||
end,
|
|
||||||
StopRet.
|
|
||||||
|
|
||||||
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
esockd:close(Name, ListenOn).
|
|
||||||
|
|
||||||
-ifndef(TEST).
|
|
||||||
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
|
||||||
-else.
|
|
||||||
console_print(_Fmt, _Args) -> ok.
|
|
||||||
-endif.
|
|
||||||
|
|
|
@ -19,6 +19,14 @@
|
||||||
|
|
||||||
-behaviour(emqx_gateway_impl).
|
-behaviour(emqx_gateway_impl).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-import(emqx_gateway_utils,
|
||||||
|
[ normalize_config/1
|
||||||
|
, start_listeners/4
|
||||||
|
, stop_listeners/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ reg/0
|
-export([ reg/0
|
||||||
, unreg/0
|
, unreg/0
|
||||||
|
@ -29,8 +37,6 @@
|
||||||
, on_gateway_unload/2
|
, on_gateway_unload/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -70,12 +76,23 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
[broadcast, predefined],
|
[broadcast, predefined],
|
||||||
Config#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
|
Config#{registry => emqx_sn_registry:lookup_name(RegistrySvr)}
|
||||||
),
|
),
|
||||||
|
|
||||||
Listeners = emqx_gateway_utils:normalize_config(NConfig),
|
Listeners = emqx_gateway_utils:normalize_config(NConfig),
|
||||||
|
|
||||||
ListenerPids = lists:map(fun(Lis) ->
|
ModCfg = #{frame_mod => emqx_sn_frame,
|
||||||
start_listener(GwName, Ctx, Lis)
|
chann_mod => emqx_sn_channel
|
||||||
end, Listeners),
|
},
|
||||||
{ok, ListenerPids, _InstaState = #{ctx => Ctx}}.
|
|
||||||
|
case start_listeners(
|
||||||
|
Listeners, GwName, Ctx, ModCfg) of
|
||||||
|
{ok, ListenerPids} ->
|
||||||
|
{ok, ListenerPids, _GwState = #{ctx => Ctx}};
|
||||||
|
{error, {Reason, Listener}} ->
|
||||||
|
throw({badconf, #{ key => listeners
|
||||||
|
, vallue => Listener
|
||||||
|
, reason => Reason
|
||||||
|
}})
|
||||||
|
end.
|
||||||
|
|
||||||
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwName = maps:get(name, Gateway),
|
GwName = maps:get(name, Gateway),
|
||||||
|
@ -95,68 +112,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, _GwState) ->
|
}, _GwState) ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = normalize_config(Config),
|
||||||
lists:foreach(fun(Lis) ->
|
stop_listeners(GwName, Listeners).
|
||||||
stop_listener(GwName, Lis)
|
|
||||||
end, Listeners).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal funcs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]),
|
|
||||||
Pid;
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason]),
|
|
||||||
throw({badconf, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
NCfg = Cfg#{
|
|
||||||
ctx => Ctx,
|
|
||||||
listene => {GwName, Type, LisName},
|
|
||||||
frame_mod => emqx_sn_frame,
|
|
||||||
chann_mod => emqx_sn_channel
|
|
||||||
},
|
|
||||||
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
|
|
||||||
{emqx_gateway_conn, start_link, [NCfg]}).
|
|
||||||
|
|
||||||
merge_default(Options) ->
|
|
||||||
Default = emqx_gateway_utils:default_udp_options(),
|
|
||||||
case lists:keytake(udp_options, 1, Options) of
|
|
||||||
{value, {udp_options, TcpOpts}, Options1} ->
|
|
||||||
[{udp_options, emqx_misc:merge_opts(Default, TcpOpts)}
|
|
||||||
| Options1];
|
|
||||||
false ->
|
|
||||||
[{udp_options, Default} | Options]
|
|
||||||
end.
|
|
||||||
|
|
||||||
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case StopRet of
|
|
||||||
ok ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]);
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason])
|
|
||||||
end,
|
|
||||||
StopRet.
|
|
||||||
|
|
||||||
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
esockd:close(Name, ListenOn).
|
|
||||||
|
|
||||||
-ifndef(TEST).
|
|
||||||
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
|
||||||
-else.
|
|
||||||
console_print(_Fmt, _Args) -> ok.
|
|
||||||
-endif.
|
|
||||||
|
|
|
@ -18,6 +18,15 @@
|
||||||
|
|
||||||
-behaviour(emqx_gateway_impl).
|
-behaviour(emqx_gateway_impl).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
|
||||||
|
|
||||||
|
-import(emqx_gateway_utils,
|
||||||
|
[ normalize_config/1
|
||||||
|
, start_listeners/4
|
||||||
|
, stop_listeners/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ reg/0
|
-export([ reg/0
|
||||||
, unreg/0
|
, unreg/0
|
||||||
|
@ -28,9 +37,6 @@
|
||||||
, on_gateway_unload/2
|
, on_gateway_unload/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -52,15 +58,22 @@ unreg() ->
|
||||||
on_gateway_load(_Gateway = #{ name := GwName,
|
on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, Ctx) ->
|
}, Ctx) ->
|
||||||
%% Step1. Fold the config to listeners
|
Listeners = normalize_config(Config),
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
ModCfg = #{frame_mod => emqx_stomp_frame,
|
||||||
%% Step2. Start listeners or escokd:specs
|
chann_mod => emqx_stomp_channel
|
||||||
ListenerPids = lists:map(fun(Lis) ->
|
},
|
||||||
start_listener(GwName, Ctx, Lis)
|
case start_listeners(
|
||||||
end, Listeners),
|
Listeners, GwName, Ctx, ModCfg) of
|
||||||
|
{ok, ListenerPids} ->
|
||||||
%% FIXME: How to throw an exception to interrupt the restart logic ?
|
%% FIXME: How to throw an exception to interrupt the restart logic ?
|
||||||
%% FIXME: Assign ctx to GwState
|
%% FIXME: Assign ctx to GwState
|
||||||
{ok, ListenerPids, _GwState = #{ctx => Ctx}}.
|
{ok, ListenerPids, _GwState = #{ctx => Ctx}};
|
||||||
|
{error, {Reason, Listener}} ->
|
||||||
|
throw({badconf, #{ key => listeners
|
||||||
|
, vallue => Listener
|
||||||
|
, reason => Reason
|
||||||
|
}})
|
||||||
|
end.
|
||||||
|
|
||||||
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
GwName = maps:get(name, Gateway),
|
GwName = maps:get(name, Gateway),
|
||||||
|
@ -80,68 +93,5 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, _GwState) ->
|
}, _GwState) ->
|
||||||
Listeners = emqx_gateway_utils:normalize_config(Config),
|
Listeners = normalize_config(Config),
|
||||||
lists:foreach(fun(Lis) ->
|
stop_listeners(GwName, Listeners).
|
||||||
stop_listener(GwName, Lis)
|
|
||||||
end, Listeners).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal funcs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts started.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]),
|
|
||||||
Pid;
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to start gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason]),
|
|
||||||
throw({badconf, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
NCfg = Cfg#{
|
|
||||||
ctx => Ctx,
|
|
||||||
listener => {GwName, Type, LisName}, %% Used for authn
|
|
||||||
frame_mod => emqx_stomp_frame,
|
|
||||||
chann_mod => emqx_stomp_channel
|
|
||||||
},
|
|
||||||
esockd:open(Name, ListenOn, merge_default(SocketOpts),
|
|
||||||
{emqx_gateway_conn, start_link, [NCfg]}).
|
|
||||||
|
|
||||||
merge_default(Options) ->
|
|
||||||
Default = emqx_gateway_utils:default_tcp_options(),
|
|
||||||
case lists:keytake(tcp_options, 1, Options) of
|
|
||||||
{value, {tcp_options, TcpOpts}, Options1} ->
|
|
||||||
[{tcp_options, emqx_misc:merge_opts(Default, TcpOpts)}
|
|
||||||
| Options1];
|
|
||||||
false ->
|
|
||||||
[{tcp_options, Default} | Options]
|
|
||||||
end.
|
|
||||||
|
|
||||||
stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
|
|
||||||
StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
|
|
||||||
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
|
|
||||||
case StopRet of
|
|
||||||
ok ->
|
|
||||||
console_print("Gateway ~ts:~ts:~ts on ~ts stopped.~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr]);
|
|
||||||
{error, Reason} ->
|
|
||||||
?ELOG("Failed to stop gateway ~ts:~ts:~ts on ~ts: ~0p~n",
|
|
||||||
[GwName, Type, LisName, ListenOnStr, Reason])
|
|
||||||
end,
|
|
||||||
StopRet.
|
|
||||||
|
|
||||||
stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
|
|
||||||
Name = emqx_gateway_utils:listener_id(GwName, Type, LisName),
|
|
||||||
esockd:close(Name, ListenOn).
|
|
||||||
|
|
||||||
-ifndef(TEST).
|
|
||||||
console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
|
|
||||||
-else.
|
|
||||||
console_print(_Fmt, _Args) -> ok.
|
|
||||||
-endif.
|
|
||||||
|
|
|
@ -0,0 +1,150 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_gateway_cli_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(GP(S), begin S, receive {fmt, P} -> P; O -> O end end).
|
||||||
|
|
||||||
|
%% this parses to #{}, will not cause config cleanup
|
||||||
|
%% so we will need call emqx_config:erase
|
||||||
|
-define(CONF_DEFAULT, <<"
|
||||||
|
gateway {}
|
||||||
|
">>).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setup
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Conf) ->
|
||||||
|
emqx_config:erase(gateway),
|
||||||
|
emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
|
||||||
|
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),
|
||||||
|
Conf.
|
||||||
|
|
||||||
|
end_per_suite(Conf) ->
|
||||||
|
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]),
|
||||||
|
Conf.
|
||||||
|
|
||||||
|
init_per_testcase(_, Conf) ->
|
||||||
|
Self = self(),
|
||||||
|
ok = meck:new(emqx_ctl, [passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(emqx_ctl, usage,
|
||||||
|
fun(L) -> emqx_ctl:format_usage(L) end),
|
||||||
|
ok = meck:expect(emqx_ctl, print,
|
||||||
|
fun(Fmt) ->
|
||||||
|
Self ! {fmt, emqx_ctl:format(Fmt)}
|
||||||
|
end),
|
||||||
|
ok = meck:expect(emqx_ctl, print,
|
||||||
|
fun(Fmt, Args) ->
|
||||||
|
Self ! {fmt, emqx_ctl:format(Fmt, Args)}
|
||||||
|
end),
|
||||||
|
Conf.
|
||||||
|
|
||||||
|
end_per_testcase(_, _) ->
|
||||||
|
meck:unload([emqx_ctl]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% TODO:
|
||||||
|
|
||||||
|
t_load_unload(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_registry_usage(_) ->
|
||||||
|
?assertEqual(
|
||||||
|
["gateway-registry list # List all registered gateways\n"],
|
||||||
|
emqx_gateway_cli:'gateway-registry'(usage)).
|
||||||
|
|
||||||
|
t_gateway_registry_list(_) ->
|
||||||
|
emqx_gateway_cli:'gateway-registry'(["list"]),
|
||||||
|
?assertEqual(
|
||||||
|
"Registered Name: coap, Callback Module: emqx_coap_impl\n"
|
||||||
|
"Registered Name: exproto, Callback Module: emqx_exproto_impl\n"
|
||||||
|
"Registered Name: lwm2m, Callback Module: emqx_lwm2m_impl\n"
|
||||||
|
"Registered Name: mqttsn, Callback Module: emqx_sn_impl\n"
|
||||||
|
"Registered Name: stomp, Callback Module: emqx_stomp_impl\n"
|
||||||
|
, acc_print()).
|
||||||
|
|
||||||
|
t_gateway_usage(_) ->
|
||||||
|
?assertEqual(
|
||||||
|
["gateway list # List all gateway\n",
|
||||||
|
"gateway lookup <Name> # Lookup a gateway detailed informations\n",
|
||||||
|
"gateway load <Name> <JsonConf> # Load a gateway with config\n",
|
||||||
|
"gateway unload <Name> # Unload the gateway\n",
|
||||||
|
"gateway stop <Name> # Stop the gateway\n",
|
||||||
|
"gateway start <Name> # Start the gateway\n"],
|
||||||
|
emqx_gateway_cli:gateway(usage)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_gateway_list(_) ->
|
||||||
|
emqx_gateway_cli:gateway(["list"]),
|
||||||
|
?assertEqual(
|
||||||
|
"Gateway(name=coap, status=unloaded)\n"
|
||||||
|
"Gateway(name=exproto, status=unloaded)\n"
|
||||||
|
"Gateway(name=lwm2m, status=unloaded)\n"
|
||||||
|
"Gateway(name=mqttsn, status=unloaded)\n"
|
||||||
|
"Gateway(name=stomp, status=unloaded)\n"
|
||||||
|
, acc_print()).
|
||||||
|
|
||||||
|
t_gateway_load(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_unload(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_start(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_stop(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_clients_usage(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_clients_list(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_clients_lookup(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_clients_kick(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_metrcis_usage(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_gateway_metrcis(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
acc_print() ->
|
||||||
|
lists:concat(lists:reverse(acc_print([]))).
|
||||||
|
|
||||||
|
acc_print(Acc) ->
|
||||||
|
receive
|
||||||
|
{fmt, S} -> acc_print([S|Acc])
|
||||||
|
after 200 ->
|
||||||
|
Acc
|
||||||
|
end.
|
Loading…
Reference in New Issue