chore(gw): append the protoname into listener name

This commit is contained in:
JianBo He 2021-08-24 17:59:20 +08:00 committed by turtleDeng
parent 914c375d9e
commit 836ca38c2b
6 changed files with 93 additions and 92 deletions

View File

@ -85,21 +85,21 @@ on_gateway_unload(_Gateway = #{ name := GwName,
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
{ok, Pid} -> {ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n", ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]), [GwName, Type, LisName, ListenOnStr]),
Pid; Pid;
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]), [GwName, Type, LisName, ListenOnStr, Reason]),
throw({badconf, Reason}) throw({badconf, Reason})
end. end.
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_coap_frame, frame_mod => emqx_coap_frame,
@ -114,21 +114,21 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
name(GwName, Type) -> name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])). list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])).
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]); [GwName, Type, LisName, ListenOnStr]);
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]) [GwName, Type, LisName, ListenOnStr, Reason])
end, end,
StopRet. StopRet.
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).

View File

@ -109,6 +109,7 @@ format_listenon({Addr, Port}) when is_tuple(Addr) ->
-spec normalize_rawconf(rawconf()) -spec normalize_rawconf(rawconf())
-> list({ Type :: udp | tcp | ssl | dtls -> list({ Type :: udp | tcp | ssl | dtls
, Name :: atom()
, ListenOn :: esockd:listen_on() , ListenOn :: esockd:listen_on()
, SocketOpts :: esockd:option() , SocketOpts :: esockd:option()
, Cfg :: map() , Cfg :: map()
@ -118,14 +119,14 @@ normalize_rawconf(RawConf) ->
Cfg0 = maps:without([listeners], RawConf), Cfg0 = maps:without([listeners], RawConf),
lists:append(maps:fold(fun(Type, Liss, AccIn1) -> lists:append(maps:fold(fun(Type, Liss, AccIn1) ->
Listeners = Listeners =
maps:fold(fun(_Name, Confs, AccIn2) -> maps:fold(fun(Name, Confs, AccIn2) ->
ListenOn = maps:get(bind, Confs), ListenOn = maps:get(bind, Confs),
SocketOpts = esockd:parse_opt(maps:to_list(Confs)), SocketOpts = esockd:parse_opt(maps:to_list(Confs)),
RemainCfgs = maps:without( RemainCfgs = maps:without(
[bind] ++ proplists:get_keys(SocketOpts), [bind] ++ proplists:get_keys(SocketOpts),
Confs), Confs),
Cfg = maps:merge(Cfg0, RemainCfgs), Cfg = maps:merge(Cfg0, RemainCfgs),
[{Type, ListenOn, SocketOpts, Cfg}|AccIn2] [{Type, Name, ListenOn, SocketOpts, Cfg}|AccIn2]
end, [], Liss), end, [], Liss),
[Listeners|AccIn1] [Listeners|AccIn1]
end, [], LisMap)). end, [], LisMap)).

View File

@ -139,21 +139,21 @@ pool_name(GwName) ->
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
{ok, Pid} -> {ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n", ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]), [GwName, Type, LisName, ListenOnStr]),
Pid; Pid;
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]), [GwName, Type, LisName, ListenOnStr, Reason]),
throw({badconf, Reason}) throw({badconf, Reason})
end. end.
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_exproto_frame, frame_mod => emqx_exproto_frame,
@ -172,8 +172,8 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) ->
do_start_listener(dtls, Name, ListenOn, Opts, MFA) -> do_start_listener(dtls, Name, ListenOn, Opts, MFA) ->
esockd:open_dtls(Name, ListenOn, Opts, MFA). esockd:open_dtls(Name, ListenOn, Opts, MFA).
name(GwName, Type) -> name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])). list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])).
merge_default_by_type(Type, Options) when Type =:= tcp; merge_default_by_type(Type, Options) when Type =:= tcp;
Type =:= ssl -> Type =:= ssl ->
@ -196,18 +196,18 @@ merge_default_by_type(Type, Options) when Type =:= udp;
[{udp_options, Default} | Options] [{udp_options, Default} | Options]
end. end.
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]); [GwName, Type, LisName, ListenOnStr]);
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]) [GwName, Type, LisName, ListenOnStr, Reason])
end, end,
StopRet. StopRet.
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).

View File

@ -103,21 +103,21 @@ on_gateway_unload(_Gateway = #{ name := GwName,
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
{ok, Pid} -> {ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n", ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]), [GwName, Type, LisName, ListenOnStr]),
Pid; Pid;
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]), [GwName, Type, LisName, ListenOnStr, Reason]),
throw({badconf, Reason}) throw({badconf, Reason})
end. end.
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, udp), Name = name(GwName, LisName, udp),
NCfg = Cfg#{ctx => Ctx}, NCfg = Cfg#{ctx => Ctx},
NSocketOpts = merge_default(SocketOpts), NSocketOpts = merge_default(SocketOpts),
Options = [{config, NCfg}|NSocketOpts], Options = [{config, NCfg}|NSocketOpts],
@ -128,8 +128,8 @@ start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
lwm2m_coap_server:start_dtls(Name, ListenOn, Options) lwm2m_coap_server:start_dtls(Name, ListenOn, Options)
end. end.
name(GwName, Type) -> name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])). list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])).
merge_default(Options) -> merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(), Default = emqx_gateway_utils:default_udp_options(),
@ -141,20 +141,20 @@ merge_default(Options) ->
[{udp_options, Default} | Options] [{udp_options, Default} | Options]
end. end.
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]); [GwName, Type, LisName, ListenOnStr]);
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]) [GwName, Type, LisName, ListenOnStr, Reason])
end, end,
StopRet. StopRet.
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
case Type of case Type of
udp -> udp ->
lwm2m_coap_server:stop_udp(Name, ListenOn); lwm2m_coap_server:stop_udp(Name, ListenOn);

View File

@ -104,21 +104,21 @@ on_gateway_unload(_Insta = #{ name := GwName,
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
{ok, Pid} -> {ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n", ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]), [GwName, Type, LisName, ListenOnStr]),
Pid; Pid;
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]), [GwName, Type, LisName, ListenOnStr, Reason]),
throw({badconf, Reason}) throw({badconf, Reason})
end. end.
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_sn_frame, frame_mod => emqx_sn_frame,
@ -127,8 +127,8 @@ start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), esockd:open_udp(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}). {emqx_gateway_conn, start_link, [NCfg]}).
name(GwName, Type) -> name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])). list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])).
merge_default(Options) -> merge_default(Options) ->
Default = emqx_gateway_utils:default_udp_options(), Default = emqx_gateway_utils:default_udp_options(),
@ -140,18 +140,18 @@ merge_default(Options) ->
[{udp_options, Default} | Options] [{udp_options, Default} | Options]
end. end.
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), StopRet = stop_listener(GwName, LisName, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]); [GwName, Type, LisName, ListenOnStr]);
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]) [GwName, Type, LisName, ListenOnStr, Reason])
end, end,
StopRet. StopRet.
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).

View File

@ -89,21 +89,21 @@ on_gateway_unload(_Gateway = #{ name := GwName,
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of
{ok, Pid} -> {ok, Pid} ->
?ULOG("Start ~s:~s listener on ~s successfully.~n", ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]), [GwName, Type, LisName, ListenOnStr]),
Pid; Pid;
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]), [GwName, Type, LisName, ListenOnStr, Reason]),
throw({badconf, Reason}) throw({badconf, Reason})
end. end.
start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
NCfg = Cfg#{ NCfg = Cfg#{
ctx => Ctx, ctx => Ctx,
frame_mod => emqx_stomp_frame, frame_mod => emqx_stomp_frame,
@ -112,8 +112,8 @@ start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
esockd:open(Name, ListenOn, merge_default(SocketOpts), esockd:open(Name, ListenOn, merge_default(SocketOpts),
{emqx_gateway_conn, start_link, [NCfg]}). {emqx_gateway_conn, start_link, [NCfg]}).
name(GwName, Type) -> name(GwName, LisName, Type) ->
list_to_atom(lists:concat([GwName, ":", Type])). list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])).
merge_default(Options) -> merge_default(Options) ->
Default = emqx_gateway_utils:default_tcp_options(), Default = emqx_gateway_utils:default_tcp_options(),
@ -125,18 +125,18 @@ merge_default(Options) ->
[{tcp_options, Default} | Options] [{tcp_options, Default} | Options]
end. end.
stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of case StopRet of
ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n",
[GwName, Type, ListenOnStr]); [GwName, Type, LisName, ListenOnStr]);
{error, Reason} -> {error, Reason} ->
?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n",
[GwName, Type, ListenOnStr, Reason]) [GwName, Type, LisName, ListenOnStr, Reason])
end, end,
StopRet. StopRet.
stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) ->
Name = name(GwName, Type), Name = name(GwName, LisName, Type),
esockd:close(Name, ListenOn). esockd:close(Name, ListenOn).