diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index 01fb9f316..9f558b761 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -43,6 +43,10 @@ gateway.stomp { max_connections = 1024000 max_conn_rate = 1000 + access_rules = [ + "allow all" + ] + ## TCP options ## See ${example_common_tcp_options} for more information tcp.active_n = 100 @@ -68,6 +72,16 @@ gateway.stomp { ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem" ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem" ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" + #ssl.verify = verify_none + #ssl.fail_if_no_peer_cert = false + #ssl.server_name_indication = disable + #ssl.secure_renegotiate = false + #ssl.reuse_sessions = false + #ssl.honor_cipher_order = false + #ssl.handshake_timeout = 15s + #ssl.depth = 10 + #ssl.password = foo + #ssl.dhfile = path-to-your-file } } @@ -116,6 +130,9 @@ gateway.coap { ## DTLS Options ## See #{example_common_dtls_options} for more information dtls.versions = ["dtlsv1"] + dtls.keyfile = "{{ platform_etc_dir }}/certs/key.pem" + dtls.certfile = "{{ platform_etc_dir }}/certs/cert.pem" + dtls.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" } } diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index b0714f5e9..69015788a 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -99,7 +99,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> end. start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_coap_frame, @@ -114,9 +114,6 @@ do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). -name(GwName, LisName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). - stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), @@ -130,5 +127,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet. stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 9c9398945..f264339a4 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -66,18 +66,21 @@ gateway_insta(get, #{bindings := #{name := Name0}}) -> Name = binary_to_existing_atom(Name0), case emqx_gateway:lookup(Name) of #{config := _Config} -> - %% FIXME: Got the parsed config, but we should return rawconfig to - %% frontend - RawConf = emqx_config:fill_defaults( - emqx_config:get_root_raw([<<"gateway">>]) - ), - {200, emqx_map_lib:deep_get([<<"gateway">>, Name0], RawConf)}; + GwCfs = filled_raw_confs([<<"gateway">>, Name0]), + NGwCfs = GwCfs#{<<"listeners">> => + emqx_gateway_http:mapping_listener_m2l( + Name0, maps:get(<<"listeners">>, GwCfs, #{}) + ) + }, + {200, NGwCfs}; undefined -> return_http_error(404, <<"Gateway not found">>) end; -gateway_insta(put, #{body := RawConfsIn, +gateway_insta(put, #{body := RawConfsIn0, bindings := #{name := Name} }) -> + RawConfsIn = maps:without([<<"authentication">>, + <<"listeners">>], RawConfsIn0), %% FIXME: Cluster Consistence ?? case emqx_gateway:update_rawconf(Name, RawConfsIn) of ok -> @@ -91,6 +94,12 @@ gateway_insta(put, #{body := RawConfsIn, gateway_insta_stats(get, _Req) -> return_http_error(401, <<"Implement it later (maybe 5.1)">>). +filled_raw_confs(Path) -> + RawConf = emqx_config:fill_defaults( + emqx_config:get_root_raw(Path) + ), + Confs = emqx_map_lib:deep_get(Path, RawConf), + emqx_map_lib:jsonable_map(Confs). %%-------------------------------------------------------------------- %% Swagger defines @@ -199,8 +208,13 @@ schema_gateway_overview_list() -> <<"enable">> => true, <<"enable_stats">> => true,<<"heartbeat">> => <<"30s">>, <<"idle_timeout">> => <<"30s">>, - <<"listeners">> => - #{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5683}}}, + <<"listeners">> => [ + #{<<"id">> => <<"coap:udp:default">>, + <<"type">> => <<"udp">>, + <<"running">> => true, + <<"acceptors">> => 8,<<"bind">> => 5683, + <<"max_conn_rate">> => 1000, + <<"max_connections">> => 10240}], <<"mountpoint">> => <<>>,<<"notify_type">> => <<"qos">>, <<"publish_qos">> => <<"qos1">>, <<"subscribe_qos">> => <<"qos0">>} @@ -212,12 +226,13 @@ schema_gateway_overview_list() -> <<"handler">> => #{<<"address">> => <<"http://127.0.0.1:9001">>}, <<"idle_timeout">> => <<"30s">>, - <<"listeners">> => - #{<<"tcp">> => - #{<<"default">> => - #{<<"acceptors">> => 8,<<"bind">> => 7993, - <<"max_conn_rate">> => 1000, - <<"max_connections">> => 10240}}}, + <<"listeners">> => [ + #{<<"id">> => <<"exproto:tcp:default">>, + <<"type">> => <<"tcp">>, + <<"running">> => true, + <<"acceptors">> => 8,<<"bind">> => 7993, + <<"max_conn_rate">> => 1000, + <<"max_connections">> => 10240}], <<"mountpoint">> => <<>>, <<"server">> => #{<<"bind">> => 9100}} ). @@ -229,8 +244,11 @@ schema_gateway_overview_list() -> <<"idle_timeout">> => <<"30s">>, <<"lifetime_max">> => <<"86400s">>, <<"lifetime_min">> => <<"1s">>, - <<"listeners">> => - #{<<"udp">> => #{<<"default">> => #{<<"bind">> => 5783}}}, + <<"listeners">> => [ + #{<<"id">> => <<"lwm2m:udp:default">>, + <<"type">> => <<"udp">>, + <<"running">> => true, + <<"bind">> => 5783}], <<"mountpoint">> => <<"lwm2m/%e/">>, <<"qmode_time_windonw">> => 22, <<"translators">> => @@ -251,11 +269,12 @@ schema_gateway_overview_list() -> <<"enable">> => true, <<"enable_qos3">> => true,<<"enable_stats">> => true, <<"gateway_id">> => 1,<<"idle_timeout">> => <<"30s">>, - <<"listeners">> => - #{<<"udp">> => - #{<<"default">> => - #{<<"bind">> => 1884,<<"max_conn_rate">> => 1000, - <<"max_connections">> => 10240000}}}, + <<"listeners">> => [ + #{<<"id">> => <<"mqttsn:udp:default">>, + <<"type">> => <<"udp">>, + <<"running">> => true, + <<"bind">> => 1884,<<"max_conn_rate">> => 1000, + <<"max_connections">> => 10240000}], <<"mountpoint">> => <<>>, <<"predefined">> => [#{<<"id">> => 1, @@ -279,12 +298,13 @@ schema_gateway_overview_list() -> #{<<"max_body_length">> => 8192,<<"max_headers">> => 10, <<"max_headers_length">> => 1024}, <<"idle_timeout">> => <<"30s">>, - <<"listeners">> => - #{<<"tcp">> => - #{<<"default">> => - #{<<"acceptors">> => 16,<<"active_n">> => 100, - <<"bind">> => 61613,<<"max_conn_rate">> => 1000, - <<"max_connections">> => 1024000}}}, + <<"listeners">> => [ + #{<<"id">> => <<"stomp:tcp:default">>, + <<"type">> => <<"tcp">>, + <<"running">> => true, + <<"acceptors">> => 16,<<"active_n">> => 100, + <<"bind">> => 61613,<<"max_conn_rate">> => 1000, + <<"max_connections">> => 1024000}], <<"mountpoint">> => <<>>} ). @@ -312,10 +332,12 @@ schema_gateway_stats() -> properties_gateway_overview() -> ListenerProps = - [ {name, string, - <<"Listener Name">>} - , {status, string, - <<"Listener Status">>, [<<"activing">>, <<"inactived">>]} + [ {id, string, + <<"Listener ID">>} + , {running, boolean, + <<"Listener Running status">>} + , {type, string, + <<"Listener Type">>, [<<"tcp">>, <<"ssl">>, <<"udp">>, <<"dtls">>]} ], emqx_mgmt_util:properties( [ {name, string, @@ -323,9 +345,13 @@ properties_gateway_overview() -> , {status, string, <<"Gateway Status">>, [<<"running">>, <<"stopped">>, <<"unloaded">>]} + , {created_at, string, + <<>>} , {started_at, string, <<>>} - , {max_connection, integer, <<>>} - , {current_connection, integer, <<>>} + , {stopped_at, string, + <<>>} + , {max_connections, integer, <<>>} + , {current_connections, integer, <<>>} , {listeners, {array, object}, ListenerProps} ]). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 2aa6b4b3d..f233a6151 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -24,6 +24,12 @@ -export([ gateways/1 ]). +%% Mgmt APIs - listeners +-export([ listeners/1 + , listener/2 + , mapping_listener_m2l/2 + ]). + %% Mgmt APIs - clients -export([ lookup_client/3 , lookup_client/4 @@ -42,8 +48,8 @@ #{ name := binary() , status := running | stopped | unloaded , started_at => binary() - , max_connection => integer() - , current_connect => integer() + , max_connections => integer() + , current_connections => integer() , listeners => [] }. @@ -68,8 +74,10 @@ gateways(Status) -> created_at, started_at, stopped_at], GwInfo0), - GwInfo1#{listeners => get_listeners_status(GwName, Config)} - + GwInfo1#{ + max_connections => max_connections_count(Config), + current_connections => current_connections_count(GwName), + listeners => get_listeners_status(GwName, Config)} end end, emqx_gateway_registry:list()), case Status of @@ -78,24 +86,78 @@ gateways(Status) -> [Gw || Gw = #{status := S} <- Gateways, S == Status] end. +%% @private +max_connections_count(Config) -> + Listeners = emqx_gateway_utils:normalize_config(Config), + lists:foldl(fun({_, _, _, SocketOpts, _}, Acc) -> + Acc + proplists:get_value(max_connections, SocketOpts, 0) + end, 0, Listeners). + +%% @private +current_connections_count(GwName) -> + try + InfoTab = emqx_gateway_cm:tabname(info, GwName), + ets:info(InfoTab, size) + catch _ : _ -> + 0 + end. + %% @private get_listeners_status(GwName, Config) -> Listeners = emqx_gateway_utils:normalize_config(Config), lists:map(fun({Type, LisName, ListenOn, _, _}) -> - Name0 = listener_name(GwName, Type, LisName), + Name0 = emqx_gateway_utils:listener_id(GwName, Type, LisName), Name = {Name0, ListenOn}, + LisO = #{id => Name0, type => Type}, case catch esockd:listener(Name) of _Pid when is_pid(_Pid) -> - #{Name0 => <<"activing">>}; + LisO#{running => true}; _ -> - #{Name0 => <<"inactived">>} - + LisO#{running => false} end end, Listeners). -%% @private -listener_name(GwName, Type, LisName) -> - list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). +%%-------------------------------------------------------------------- +%% Mgmt APIs - listeners +%%-------------------------------------------------------------------- + +listeners(GwName) when is_atom (GwName) -> + listeners(atom_to_binary(GwName)); +listeners(GwName) -> + RawConf = emqx_config:fill_defaults( + emqx_config:get_root_raw([<<"gateway">>])), + Listeners = emqx_map_lib:jsonable_map( + emqx_map_lib:deep_get( + [<<"gateway">>, GwName, <<"listeners">>], RawConf)), + mapping_listener_m2l(GwName, Listeners). + +listener(_GwName, _ListenerId) -> + ok. + +mapping_listener_m2l(GwName, Listeners0) -> + Listeners = maps:to_list(Listeners0), + lists:append([listener(GwName, Type, maps:to_list(Conf)) + || {Type, Conf} <- Listeners]). + +listener(GwName, Type, Conf) -> + [begin + ListenerId = emqx_gateway_utils:listener_id(GwName, Type, LName), + Running = is_running(ListenerId, LConf), + LConf#{ + id => ListenerId, + type => Type, + running => Running + } + end || {LName, LConf} <- Conf, is_map(LConf)]. + +is_running(ListenerId, #{<<"bind">> := ListenOn0}) -> + ListenOn = emqx_gateway_utils:parse_listenon(ListenOn0), + try esockd:listener({ListenerId, ListenOn}) of + Pid when is_pid(Pid)-> + true + catch _:_ -> + false + end. %%-------------------------------------------------------------------- %% Mgmt APIs - clients @@ -145,7 +207,7 @@ list_client_subscriptions(GwName, ClientId) -> with_channel(GwName, ClientId, fun(Pid) -> Subs = emqx_gateway_conn:call( - Pid, + Pid, subscriptions, ?DEFAULT_CALL_TIMEOUT), {ok, lists:map(fun({Topic, SubOpts}) -> SubOpts#{topic => Topic} diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 8c3663358..41840c7f7 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -204,7 +204,6 @@ fields(udp_opts) -> fields(dtls_listener_ssl_opts) -> Base = emqx_schema:fields("listener_ssl_opts"), - %% XXX: ciphers ??? DtlsVers = hoconsc:mk( typerefl:alias("string", list(atom())), #{ default => default_dtls_vsns(), @@ -212,12 +211,41 @@ fields(dtls_listener_ssl_opts) -> [dtls_vsn(iolist_to_binary(V)) || V <- Vsns] end }), - lists:keyreplace("versions", 1, Base, {"versions", DtlsVers}); + Ciphers = sc(hoconsc:array(string()), default_ciphers()), + lists:keydelete( + "handshake_timeout", 1, + lists:keyreplace( + "ciphers", 1, + lists:keyreplace("versions", 1, Base, {"versions", DtlsVers}), + {"ciphers", Ciphers} + ) + ); fields(ExtraField) -> Mod = list_to_atom(ExtraField++"_schema"), Mod:fields(ExtraField). +default_ciphers() -> + ["ECDHE-ECDSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES256-GCM-SHA384", "ECDHE-ECDSA-AES256-SHA384", "ECDHE-RSA-AES256-SHA384", + "ECDHE-ECDSA-DES-CBC3-SHA", "ECDH-ECDSA-AES256-GCM-SHA384", "ECDH-RSA-AES256-GCM-SHA384", + "ECDH-ECDSA-AES256-SHA384", "ECDH-RSA-AES256-SHA384", "DHE-DSS-AES256-GCM-SHA384", + "DHE-DSS-AES256-SHA256", "AES256-GCM-SHA384", "AES256-SHA256", + "ECDHE-ECDSA-AES128-GCM-SHA256", "ECDHE-RSA-AES128-GCM-SHA256", + "ECDHE-ECDSA-AES128-SHA256", "ECDHE-RSA-AES128-SHA256", "ECDH-ECDSA-AES128-GCM-SHA256", + "ECDH-RSA-AES128-GCM-SHA256", "ECDH-ECDSA-AES128-SHA256", "ECDH-RSA-AES128-SHA256", + "DHE-DSS-AES128-GCM-SHA256", "DHE-DSS-AES128-SHA256", "AES128-GCM-SHA256", "AES128-SHA256", + "ECDHE-ECDSA-AES256-SHA", "ECDHE-RSA-AES256-SHA", "DHE-DSS-AES256-SHA", + "ECDH-ECDSA-AES256-SHA", "ECDH-RSA-AES256-SHA", "AES256-SHA", "ECDHE-ECDSA-AES128-SHA", + "ECDHE-RSA-AES128-SHA", "DHE-DSS-AES128-SHA", "ECDH-ECDSA-AES128-SHA", + "ECDH-RSA-AES128-SHA", "AES128-SHA" + ] ++ psk_ciphers(). + +psk_ciphers() -> + ["PSK-AES128-CBC-SHA", "PSK-AES256-CBC-SHA", + "PSK-3DES-EDE-CBC-SHA", "PSK-RC4-SHA" + ]. + % authentication() -> % hoconsc:union( % [ undefined @@ -242,7 +270,7 @@ gateway_common_options() -> [ {enable, sc(boolean(), true)} , {enable_stats, sc(boolean(), true)} , {idle_timeout, sc(duration(), <<"30s">>)} - , {mountpoint, sc(binary())} + , {mountpoint, sc(binary(), undefined)} , {clientinfo_override, sc(ref(clientinfo_override))} , {authentication, sc(hoconsc:lazy(map()))} ]. @@ -254,6 +282,7 @@ common_listener_opts() -> , {max_connections, sc(integer(), 1024)} , {max_conn_rate, sc(integer())} %, {rate_limit, sc(comma_separated_list())} + , {mountpoint, sc(binary(), undefined)} , {access_rules, sc(hoconsc:array(string()), [])} ]. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 27e64bed3..4f19db23b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -28,8 +28,11 @@ -export([ apply/2 , format_listenon/1 + , parse_listenon/1 , unix_ts_to_rfc3339/1 , unix_ts_to_rfc3339/2 + , listener_id/3 + , parse_listener_id/1 ]). -export([ stringfy/1 @@ -112,6 +115,38 @@ format_listenon({Addr, Port}) when is_list(Addr) -> format_listenon({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). +parse_listenon(Port) when is_integer(Port) -> + Port; +parse_listenon(Str) when is_binary(Str) -> + parse_listenon(binary_to_list(Str)); +parse_listenon(Str) when is_list(Str) -> + case emqx_schema:to_ip_port(Str) of + {ok, R} -> R; + {error, _} -> + error({invalid_listenon_name, Str}) + end. + +listener_id(GwName, Type, LisName) -> + binary_to_atom( + <<(bin(GwName))/binary, ":", + (bin(Type))/binary, ":", + (bin(LisName))/binary + >>). + +parse_listener_id(Id) -> + try + [GwName, Type, Name] = binary:split(bin(Id), <<":">>, [global]), + {binary_to_existing_atom(GwName), binary_to_existing_atom(Type), + binary_to_atom(Name)} + catch + _ : _ -> error({invalid_listener_id, Id}) + end. + +bin(A) when is_atom(A) -> + atom_to_binary(A); +bin(L) when is_list(L); is_binary(L) -> + iolist_to_binary(L). + unix_ts_to_rfc3339(Keys, Map) when is_list(Keys) -> lists:foldl(fun(K, Acc) -> unix_ts_to_rfc3339(K, Acc) end, Map, Keys); unix_ts_to_rfc3339(Key, Map) -> diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 3b62ecd20..a9c0ee5fd 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -153,7 +153,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> end. start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_exproto_frame, @@ -172,9 +172,6 @@ do_start_listener(udp, Name, ListenOn, Opts, MFA) -> do_start_listener(dtls, Name, ListenOn, Opts, MFA) -> esockd:open_dtls(Name, ListenOn, Opts, MFA). -name(GwName, LisName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). - merge_default_by_type(Type, Options) when Type =:= tcp; Type =:= ssl -> Default = emqx_gateway_utils:default_tcp_options(), @@ -209,5 +206,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet. stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl index e6720905b..5e1e9a70d 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_impl.erl @@ -100,7 +100,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> end. start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, LisName, udp), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), NCfg = Cfg#{ ctx => Ctx , frame_mod => emqx_coap_frame , chann_mod => emqx_lwm2m_channel @@ -119,16 +119,12 @@ merge_default(Options) -> [{udp_options, Default} | Options] end. -name(GwName, LisName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). - do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) -> esockd:open_udp(Name, ListenOn, SocketOpts, MFA); do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) -> esockd:open_dtls(Name, ListenOn, SocketOpts, MFA). - stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet = stop_listener(GwName, Type, LisName, ListenOn, SocketOpts, Cfg), ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), @@ -142,5 +138,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet. stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index f510afdf9..8e64a41d1 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -118,7 +118,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> end. start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_sn_frame, @@ -127,9 +127,6 @@ start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> esockd:open_udp(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(GwName, LisName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). - merge_default(Options) -> Default = emqx_gateway_utils:default_udp_options(), case lists:keytake(udp_options, 1, Options) of @@ -153,5 +150,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet. stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), esockd:close(Name, ListenOn). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index 2175b767b..96c7b7a3b 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -103,7 +103,7 @@ start_listener(GwName, Ctx, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> end. start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), NCfg = Cfg#{ ctx => Ctx, frame_mod => emqx_stomp_frame, @@ -112,9 +112,6 @@ start_listener(GwName, Ctx, Type, LisName, ListenOn, SocketOpts, Cfg) -> esockd:open(Name, ListenOn, merge_default(SocketOpts), {emqx_gateway_conn, start_link, [NCfg]}). -name(GwName, LisName, Type) -> - list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). - merge_default(Options) -> Default = emqx_gateway_utils:default_tcp_options(), case lists:keytake(tcp_options, 1, Options) of @@ -138,5 +135,5 @@ stop_listener(GwName, {Type, LisName, ListenOn, SocketOpts, Cfg}) -> StopRet. stop_listener(GwName, Type, LisName, ListenOn, _SocketOpts, _Cfg) -> - Name = name(GwName, LisName, Type), + Name = emqx_gateway_utils:listener_id(GwName, Type, LisName), esockd:close(Name, ListenOn).