From 836ca38c2b83101f4bffc22e8d3ec10f2e859428 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 24 Aug 2021 17:59:20 +0800 Subject: [PATCH] chore(gw): append the protoname into listener name --- apps/emqx_gateway/src/coap/emqx_coap_impl.erl | 36 +++++++++---------- apps/emqx_gateway/src/emqx_gateway_utils.erl | 5 +-- .../src/exproto/emqx_exproto_impl.erl | 36 +++++++++---------- .../src/lwm2m/emqx_lwm2m_impl.erl | 36 +++++++++---------- apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 36 +++++++++---------- .../src/stomp/emqx_stomp_impl.erl | 36 +++++++++---------- 6 files changed, 93 insertions(+), 92 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index b3f7b640a..cf181f07e 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -85,21 +85,21 @@ on_gateway_unload(_Gateway = #{ name := GwName, %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]), + ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, Type), +start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, LisName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_coap_frame, @@ -114,21 +114,21 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). -name(GwName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type])). +name(GwName, LisName, Type) -> + list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])). -stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, Type), +stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, LisName, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index d8c2b9be7..593729d67 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -109,6 +109,7 @@ format_listenon({Addr, Port}) when is_tuple(Addr) -> -spec normalize_rawconf(rawconf()) -> list({ Type :: udp | tcp | ssl | dtls + , Name :: atom() , ListenOn :: esockd:listen_on() , SocketOpts :: esockd:option() , Cfg :: map() @@ -118,14 +119,14 @@ normalize_rawconf(RawConf) -> Cfg0 = maps:without([listeners], RawConf), lists:append(maps:fold(fun(Type, Liss, AccIn1) -> Listeners = - maps:fold(fun(_Name, Confs, AccIn2) -> + maps:fold(fun(Name, Confs, AccIn2) -> ListenOn = maps:get(bind, Confs), SocketOpts = esockd:parse_opt(maps:to_list(Confs)), RemainCfgs = maps:without( [bind] ++ proplists:get_keys(SocketOpts), Confs), Cfg = maps:merge(Cfg0, RemainCfgs), - [{Type, ListenOn, SocketOpts, Cfg}|AccIn2] + [{Type, Name, ListenOn, SocketOpts, Cfg}|AccIn2] end, [], Liss), [Listeners|AccIn1] end, [], LisMap)). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 8c500be82..c17a6e766 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -139,21 +139,21 @@ pool_name(GwName) -> %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]), + ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, Type), +start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, LisName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_exproto_frame, @@ -172,8 +172,8 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) -> do_start_listener(dtls, Name, ListenOn, Opts, MFA) -> esockd:open_dtls(Name, ListenOn, Opts, MFA). -name(GwName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type])). +name(GwName, LisName, Type) -> + list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])). merge_default_by_type(Type, Options) when Type =:= tcp; Type =:= ssl -> @@ -196,18 +196,18 @@ merge_default_by_type(Type, Options) when Type =:= udp; [{udp_options, Default} | Options] end. -stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, Type), +stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, LisName, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index d5fb0d06e..d1afbc6b2 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -103,21 +103,21 @@ on_gateway_unload(_Gateway = #{ name := GwName, %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]), + ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, udp), +start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, LisName, udp), NCfg = Cfg#{ctx => Ctx}, NSocketOpts = merge_default(SocketOpts), Options = [{config, NCfg}|NSocketOpts], @@ -128,8 +128,8 @@ start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> lwm2m_coap_server:start_dtls(Name, ListenOn, Options) end. -name(GwName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type])). +name(GwName, LisName, Type) -> + list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_udp_options(), @@ -141,20 +141,20 @@ merge_default(Options) -> [{udp_options, Default} | Options] end. -stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, Type), +stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, LisName, Type), case Type of udp -> lwm2m_coap_server:stop_udp(Name, ListenOn); diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index d35228e1f..9ea3ca3ae 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -104,21 +104,21 @@ on_gateway_unload(_Insta = #{ name := GwName, %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]), + ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, Type), +start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, LisName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_sn_frame, @@ -127,8 +127,8 @@ start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(GwName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type])). +name(GwName, LisName, Type) -> + list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_udp_options(), @@ -140,18 +140,18 @@ merge_default(Options) -> [{udp_options, Default} | Options] end. -stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, LisName, Type, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, Type), +stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, LisName, Type), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 19bfc16ab..d8b4bfcff 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -89,21 +89,21 @@ on_gateway_unload(_Gateway = #{ name := GwName, %% Internal funcs %%-------------------------------------------------------------------- -start_listener(GwName, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - case start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) of + case start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> - ?ULOG("Start ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]), + ?ULOG("Start ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]), Pid; {error, Reason} -> - ?ELOG("Failed to start ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]), + ?ELOG("Failed to start ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]), throw({badconf, Reason}) end. -start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, Type), +start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> + Name = name(GwName, LisName, Type), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_stomp_frame, @@ -112,8 +112,8 @@ start_listener(GwName, Ctx, Type, ListenOn, SocketOpts, Cfg) -> esockd:open(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(GwName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type])). +name(GwName, LisName, Type) -> + list_to_atom(lists:concat([GwName, ":", LisName, ":", Type])). merge_default(Options) -> Default = emqx_gateway_utils:default_tcp_options(), @@ -125,18 +125,18 @@ merge_default(Options) -> [{tcp_options, Default} | Options] end. -stop_listener(GwName, {Type, ListenOn, SocketOpts, Cfg}) -> - StopRet = stop_listener(GwName, Type, ListenOn, SocketOpts, Cfg), +stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> + StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), case StopRet of - ok -> ?ULOG("Stop ~s:~s listener on ~s successfully.~n", - [GwName, Type, ListenOnStr]); + ok -> ?ULOG("Stop ~s:~s:~s listener on ~s successfully.~n", + [GwName, Type, LisName, ListenOnStr]); {error, Reason} -> - ?ELOG("Failed to stop ~s:~s listener on ~s: ~0p~n", - [GwName, Type, ListenOnStr, Reason]) + ?ELOG("Failed to stop ~s:~s:~s listener on ~s: ~0p~n", + [GwName, Type, LisName, ListenOnStr, Reason]) end, StopRet. -stop_listener(GwName, Type, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, Type), +stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> + Name = name(GwName, LisName, Type), esockd:close(Name, ListenOn).