Merge pull request #4190 from zmstone/refactor-assing-names-to-listeners
feat(listeners): Add identifier to listeners
This commit is contained in:
commit
98b319a0c0
|
@ -55,7 +55,7 @@ start_listener({Proto, ListenOn, Opts}) ->
|
|||
io:format("Start coap:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start coap:~s listener on ~s - ~0p~n!",
|
||||
io:format(standard_error, "Failed to start coap:~s listener on ~s: ~0p~n",
|
||||
[Proto, format(ListenOn), Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
@ -71,7 +71,7 @@ stop_listener({Proto, ListenOn, _Opts}) ->
|
|||
ok -> io:format("Stop coap:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to stop coap:~s listener on ~s - ~p~n.",
|
||||
io:format(standard_error, "Failed to stop coap:~s listener on ~s: ~0p~n.",
|
||||
[Proto, format(ListenOn), Reason])
|
||||
end,
|
||||
Ret.
|
||||
|
|
|
@ -69,7 +69,7 @@ start_connection_handler_instance({_Proto, _LisType, _ListenOn, Opts}) ->
|
|||
{ok, _ClientChannelPid} ->
|
||||
{_Proto, _LisType, _ListenOn, [{handler, Name} | LisOpts]};
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start ~s's connection handler - ~0p~n!",
|
||||
io:format(standard_error, "Failed to start ~s's connection handler: ~0p~n",
|
||||
[Name, Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
@ -85,7 +85,7 @@ start_server({Name, Port, SSLOptions}) ->
|
|||
io:format("Start ~s gRPC server on ~w successfully.~n",
|
||||
[Name, Port]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start ~s gRPC server on ~w - ~0p~n!",
|
||||
io:format(standard_error, "Failed to start ~s gRPC server on ~w: ~0p~n",
|
||||
[Name, Port, Reason]),
|
||||
error({failed_start_server, Reason})
|
||||
end.
|
||||
|
@ -101,7 +101,7 @@ start_listener({Proto, LisType, ListenOn, Opts}) ->
|
|||
io:format("Start ~s listener on ~s successfully.~n",
|
||||
[Name, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start ~s listener on ~s - ~0p~n!",
|
||||
io:format(standard_error, "Failed to start ~s listener on ~s: ~0p~n",
|
||||
[Name, format(ListenOn), Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
@ -132,7 +132,7 @@ stop_listener({Proto, LisType, ListenOn, Opts}) ->
|
|||
io:format("Stop ~s listener on ~s successfully.~n",
|
||||
[Name, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to stop ~s listener on ~s - ~p~n.",
|
||||
io:format(standard_error, "Failed to stop ~s listener on ~s: ~0p~n",
|
||||
[Name, format(ListenOn), Reason])
|
||||
end,
|
||||
StopRet.
|
||||
|
|
|
@ -47,7 +47,7 @@ start_listener({Proto, ListenOn, Opts}) ->
|
|||
io:format("Start lwm2m:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start lwm2m:~s listener on ~s - ~0p~n!",
|
||||
io:format(standard_error, "Failed to start lwm2m:~s listener on ~s: ~0p~n",
|
||||
[Proto, format(ListenOn), Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
@ -63,7 +63,7 @@ stop_listener({Proto, ListenOn, _Opts}) ->
|
|||
ok -> io:format("Stop lwm2m:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to stop lwm2m:~s listener on ~s - ~p~n.",
|
||||
io:format(standard_error, "Failed to stop lwm2m:~s listener on ~s: ~0p~n",
|
||||
[Proto, format(ListenOn), Reason])
|
||||
end,
|
||||
Ret.
|
||||
|
|
|
@ -71,7 +71,7 @@ start_listener({Proto, ListenOn, Options}) ->
|
|||
{ok, _} -> io:format("Start mqttsn:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start mqttsn:~s listener on ~s - ~0p~n!",
|
||||
io:format(standard_error, "Failed to start mqttsn:~s listener on ~s: ~0p~n",
|
||||
[Proto, format(ListenOn), Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
@ -101,7 +101,7 @@ stop_listener({Proto, ListenOn, Opts}) ->
|
|||
ok -> io:format("Stop mqttsn:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to stop mqttsn:~s listener on ~s - ~p~n.",
|
||||
io:format(standard_error, "Failed to stop mqttsn:~s listener on ~s: ~0p~n",
|
||||
[Proto, format(ListenOn), Reason])
|
||||
end,
|
||||
StopRet.
|
||||
|
|
|
@ -73,7 +73,7 @@ start_listener({Proto, ListenOn, Options}) ->
|
|||
{ok, _} -> io:format("Start stomp:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start stomp:~s listener on ~s - ~0p~n!",
|
||||
io:format(standard_error, "Failed to start stomp:~s listener on ~s: ~0p~n",
|
||||
[Proto, format(ListenOn), Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
@ -102,7 +102,7 @@ stop_listener({Proto, ListenOn, Opts}) ->
|
|||
ok -> io:format("Stop stomp:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to stop stomp:~s listener on ~s - ~p~n.",
|
||||
io:format(standard_error, "Failed to stop stomp:~s listener on ~s: ~0p~n",
|
||||
[Proto, format(ListenOn), Reason])
|
||||
end,
|
||||
StopRet.
|
||||
|
|
|
@ -47,7 +47,7 @@ main(Args) ->
|
|||
{true, pong} ->
|
||||
ok;
|
||||
{false, pong} ->
|
||||
io:format(standard_error, "Failed to connect to node ~p .\n", [TargetNode]),
|
||||
io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]),
|
||||
halt(1);
|
||||
{_, pang} ->
|
||||
io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]),
|
||||
|
|
|
@ -541,6 +541,7 @@ list_listeners(Node) when Node =:= node() ->
|
|||
Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
|
||||
#{protocol => Protocol,
|
||||
listen_on => ListenOn,
|
||||
identifier => emqx_listeners:find_id_by_listen_on(ListenOn),
|
||||
acceptors => esockd:get_acceptors({Protocol, ListenOn}),
|
||||
max_conns => esockd:get_max_connections({Protocol, ListenOn}),
|
||||
current_conns => esockd:get_current_connections({Protocol, ListenOn}),
|
||||
|
|
|
@ -157,7 +157,7 @@ cluster(["join", SNode]) ->
|
|||
ignore ->
|
||||
emqx_ctl:print("Ignore.~n");
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to join the cluster: ~p~n", [Error])
|
||||
emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
|
||||
end;
|
||||
|
||||
cluster(["leave"]) ->
|
||||
|
@ -166,7 +166,7 @@ cluster(["leave"]) ->
|
|||
emqx_ctl:print("Leave the cluster successfully.~n"),
|
||||
cluster(["status"]);
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to leave the cluster: ~p~n", [Error])
|
||||
emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
|
||||
end;
|
||||
|
||||
cluster(["force-leave", SNode]) ->
|
||||
|
@ -177,7 +177,7 @@ cluster(["force-leave", SNode]) ->
|
|||
ignore ->
|
||||
emqx_ctl:print("Ignore.~n");
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to remove the node from cluster: ~p~n", [Error])
|
||||
emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
|
||||
end;
|
||||
|
||||
cluster(["status"]) ->
|
||||
|
@ -510,49 +510,64 @@ trace_off(Who, Name) ->
|
|||
|
||||
listeners([]) ->
|
||||
foreach(fun({{Protocol, ListenOn}, _Pid}) ->
|
||||
Info = [{acceptors, esockd:get_acceptors({Protocol, ListenOn})},
|
||||
Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}},
|
||||
{acceptors, esockd:get_acceptors({Protocol, ListenOn})},
|
||||
{max_conns, esockd:get_max_connections({Protocol, ListenOn})},
|
||||
{current_conn, esockd:get_current_connections({Protocol, ListenOn})},
|
||||
{shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}],
|
||||
emqx_ctl:print("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
|
||||
foreach(fun({Key, Val}) ->
|
||||
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val])
|
||||
end, Info)
|
||||
{shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}
|
||||
],
|
||||
emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]),
|
||||
foreach(fun indent_print/1, Info)
|
||||
end, esockd:listeners()),
|
||||
foreach(fun({Protocol, Opts}) ->
|
||||
Info = [{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
|
||||
Port = proplists:get_value(port, Opts),
|
||||
Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}},
|
||||
{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
|
||||
{max_conns, proplists:get_value(max_connections, Opts)},
|
||||
{current_conn, proplists:get_value(all_connections, Opts)},
|
||||
{shutdown_count, []}],
|
||||
emqx_ctl:print("listener on ~s:~p~n", [Protocol, proplists:get_value(port, Opts)]),
|
||||
foreach(fun({Key, Val}) ->
|
||||
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val])
|
||||
end, Info)
|
||||
emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]),
|
||||
foreach(fun indent_print/1, Info)
|
||||
end, ranch:info());
|
||||
|
||||
listeners(["stop", Name = "http" ++ _N, ListenOn]) ->
|
||||
listeners(["stop", Name = "http" ++ _N | _MaybePort]) ->
|
||||
%% _MaybePort is to be backward compatible, to stop http listener, there is no need for the port number
|
||||
case minirest:stop_http(list_to_atom(Name)) of
|
||||
ok ->
|
||||
emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Name, ListenOn]);
|
||||
emqx_ctl:print("Stop ~s listener successfully.~n", [Name]);
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Name, ListenOn, Error])
|
||||
emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [Name, Error])
|
||||
end;
|
||||
|
||||
listeners(["stop", Proto, ListenOn]) ->
|
||||
listeners(["stop", "mqtt:" ++ _ = Identifier]) ->
|
||||
stop_listener(emqx_listeners:find_by_id(Identifier), Identifier);
|
||||
|
||||
listeners(["stop", _Proto, ListenOn]) ->
|
||||
%% this clause is kept to be backward compatible
|
||||
ListenOn1 = case string:tokens(ListenOn, ":") of
|
||||
[Port] -> list_to_integer(Port);
|
||||
[IP, Port] -> {IP, list_to_integer(Port)}
|
||||
end,
|
||||
case emqx_listeners:stop_listener({list_to_atom(Proto), ListenOn1, []}) of
|
||||
ok ->
|
||||
emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]);
|
||||
{error, Error} ->
|
||||
emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error])
|
||||
end;
|
||||
stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1);
|
||||
|
||||
listeners(_) ->
|
||||
emqx_ctl:usage([{"listeners", "List listeners"},
|
||||
{"listeners stop <Proto> <Port>", "Stop a listener"}]).
|
||||
{"listeners stop <Identifier>", "Stop a listener"},
|
||||
{"listeners stop <Proto> <Port>", "Stop a listener"}
|
||||
]).
|
||||
|
||||
stop_listener(false, Input) ->
|
||||
emqx_ctl:print("No such listener ~p~n", [Input]);
|
||||
stop_listener(#{listen_on := ListenOn} = Listener, _Input) ->
|
||||
ID = emqx_listeners:identifier(Listener),
|
||||
ListenOnStr = emqx_listeners:format_listen_on(ListenOn),
|
||||
case emqx_listeners:stop_listener(Listener) of
|
||||
ok ->
|
||||
emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]);
|
||||
{error, Reason} ->
|
||||
emqx_ctl:print("Failed to stop ~s listener on ~s: ~0p~n",
|
||||
[ID, ListenOnStr, Reason])
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc data Command
|
||||
|
@ -707,3 +722,16 @@ format(_, Val) ->
|
|||
Val.
|
||||
|
||||
bin(S) -> iolist_to_binary(S).
|
||||
|
||||
indent_print({Key, {string, Val}}) ->
|
||||
emqx_ctl:print(" ~-16s: ~s~n", [Key, Val]);
|
||||
indent_print({Key, Val}) ->
|
||||
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
|
||||
|
||||
listener_identifier(Protocol, ListenOn) ->
|
||||
case emqx_listeners:find_id_by_listen_on(ListenOn) of
|
||||
false ->
|
||||
"http" ++ _ = atom_to_list(Protocol); %% assert
|
||||
ID ->
|
||||
ID
|
||||
end.
|
||||
|
|
|
@ -49,7 +49,8 @@ groups() ->
|
|||
t_broker_cmd,
|
||||
t_router_cmd,
|
||||
t_subscriptions_cmd,
|
||||
t_listeners_cmd
|
||||
t_listeners_cmd_old,
|
||||
t_listeners_cmd_new
|
||||
]}].
|
||||
|
||||
apps() ->
|
||||
|
@ -275,12 +276,23 @@ t_subscriptions_cmd(_) ->
|
|||
?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"),
|
||||
unmock_print().
|
||||
|
||||
t_listeners_cmd(_) ->
|
||||
t_listeners_cmd_old(_) ->
|
||||
ok = emqx_listeners:ensure_all_started(),
|
||||
mock_print(),
|
||||
?assertEqual(emqx_mgmt_cli:listeners([]), ok),
|
||||
?assertEqual(
|
||||
emqx_mgmt_cli:listeners(["stop", "wss", "8084"]),
|
||||
"Stop wss listener on 8084 successfully.\n"
|
||||
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
|
||||
emqx_mgmt_cli:listeners(["stop", "wss", "8084"])
|
||||
),
|
||||
unmock_print().
|
||||
|
||||
t_listeners_cmd_new(_) ->
|
||||
ok = emqx_listeners:ensure_all_started(),
|
||||
mock_print(),
|
||||
?assertEqual(emqx_mgmt_cli:listeners([]), ok),
|
||||
?assertEqual(
|
||||
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
|
||||
emqx_mgmt_cli:listeners(["stop", "mqtt:wss:external"])
|
||||
),
|
||||
unmock_print().
|
||||
|
||||
|
|
|
@ -2010,8 +2010,15 @@ end}.
|
|||
Other -> Other
|
||||
end
|
||||
end,
|
||||
[{Atom(Type), ListenOnN, [{deflate_options, DeflateOpts(Prefix)},
|
||||
{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
|
||||
[#{ proto => Atom(Type)
|
||||
, name => Name
|
||||
, listen_on => ListenOnN
|
||||
, opts => [ {deflate_options, DeflateOpts(Prefix)}
|
||||
, {tcp_options, TcpOpts(Prefix)}
|
||||
| LisOpts(Prefix)
|
||||
]
|
||||
}
|
||||
]
|
||||
end,
|
||||
SslListeners = fun(Type, Name) ->
|
||||
Prefix = string:join(["listener", Type, Name], "."),
|
||||
|
@ -2019,9 +2026,16 @@ end}.
|
|||
undefined ->
|
||||
[];
|
||||
ListenOn ->
|
||||
[{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
|
||||
{tcp_options, TcpOpts(Prefix)},
|
||||
{ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}]
|
||||
[#{ proto => Atom(Type)
|
||||
, name => Name
|
||||
, listen_on => ListenOn
|
||||
, opts => [ {deflate_options, DeflateOpts(Prefix)}
|
||||
, {tcp_options, TcpOpts(Prefix)}
|
||||
, {ssl_options, SslOpts(Prefix)}
|
||||
| LisOpts(Prefix)
|
||||
]
|
||||
}
|
||||
]
|
||||
end
|
||||
end,
|
||||
|
||||
|
|
|
@ -47,8 +47,8 @@ test_plugins() ->
|
|||
|
||||
test_deps() ->
|
||||
[ {bbmustache, "1.10.0"}
|
||||
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.5"}}}
|
||||
, meck
|
||||
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.6"}}}
|
||||
, meck
|
||||
].
|
||||
|
||||
default_compile_opts() ->
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
%% APIs
|
||||
-export([ start/0
|
||||
, ensure_all_started/0
|
||||
, restart/0
|
||||
, stop/0
|
||||
]).
|
||||
|
@ -28,30 +29,88 @@
|
|||
-export([ start_listener/1
|
||||
, start_listener/3
|
||||
, stop_listener/1
|
||||
, stop_listener/3
|
||||
, restart_listener/1
|
||||
, restart_listener/3
|
||||
]).
|
||||
|
||||
-type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}).
|
||||
-export([ find_id_by_listen_on/1
|
||||
, find_by_listen_on/1
|
||||
, find_by_id/1
|
||||
, identifier/1
|
||||
, format_listen_on/1
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
-type(listener() :: #{ name := binary()
|
||||
, proto := esockd:proto()
|
||||
, listen_on := esockd:listen_on()
|
||||
, opts := [esockd:option()]
|
||||
}).
|
||||
|
||||
%% @doc Find listener identifier by listen-on.
|
||||
%% Return empty string (binary) if listener is not found in config.
|
||||
-spec(find_id_by_listen_on(esockd:listen_on()) -> binary() | false).
|
||||
find_id_by_listen_on(ListenOn) ->
|
||||
case find_by_listen_on(ListenOn) of
|
||||
false -> false;
|
||||
L -> identifier(L)
|
||||
end.
|
||||
|
||||
%% @doc Find listener by listen-on.
|
||||
%% Return 'false' if not found.
|
||||
-spec(find_by_listen_on(esockd:listen_on()) -> listener() | false).
|
||||
find_by_listen_on(ListenOn) ->
|
||||
find_by_listen_on(ListenOn, emqx:get_env(listeners, [])).
|
||||
|
||||
%% @doc Find listener by identifier.
|
||||
%% Return 'false' if not found.
|
||||
-spec(find_by_id(string() | binary()) -> listener() | false).
|
||||
find_by_id(Id) ->
|
||||
find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])).
|
||||
|
||||
%% @doc Return the ID of the given listener.
|
||||
-spec identifier(listener()) -> binary().
|
||||
identifier(#{proto := Proto, name := Name}) ->
|
||||
identifier(Proto, Name).
|
||||
|
||||
%% @doc Start all listeners.
|
||||
-spec(start() -> ok).
|
||||
start() ->
|
||||
lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
|
||||
|
||||
%% @doc Ensure all configured listeners are started.
|
||||
%% Raise exception if any of them failed to start.
|
||||
-spec(ensure_all_started() -> ok).
|
||||
ensure_all_started() ->
|
||||
ensure_all_started(emqx:get_env(listeners, []), []).
|
||||
|
||||
ensure_all_started([], []) -> ok;
|
||||
ensure_all_started([], Failed) -> error(Failed);
|
||||
ensure_all_started([L | Rest], Results) ->
|
||||
#{proto := Proto, listen_on := ListenOn, opts := Options} = L,
|
||||
NewResults =
|
||||
case start_listener(Proto, ListenOn, Options) of
|
||||
{ok, _Pid} ->
|
||||
Results;
|
||||
{error, {already_started, _Pid}} ->
|
||||
Results;
|
||||
{error, Reason} ->
|
||||
[{identifier(L), Reason} | Results]
|
||||
end,
|
||||
ensure_all_started(Rest, NewResults).
|
||||
|
||||
%% @doc Format address:port for logging.
|
||||
-spec(format_listen_on(esockd:listen_on()) -> binary()).
|
||||
format_listen_on(ListenOn) -> format(ListenOn).
|
||||
|
||||
-spec(start_listener(listener()) -> ok).
|
||||
start_listener({Proto, ListenOn, Options}) ->
|
||||
start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) ->
|
||||
ID = identifier(Proto, Name),
|
||||
case start_listener(Proto, ListenOn, Options) of
|
||||
{ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{ok, _} -> io:format("Start ~s listener on ~s successfully.~n",
|
||||
[ID, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!",
|
||||
[Proto, format(ListenOn), Reason]),
|
||||
io:format(standard_error, "Failed to start mqtt listener ~s on ~s: ~0p~n",
|
||||
[ID, format(ListenOn), Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
||||
|
@ -115,7 +174,7 @@ restart() ->
|
|||
lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
|
||||
|
||||
-spec(restart_listener(listener()) -> any()).
|
||||
restart_listener({Proto, ListenOn, Options}) ->
|
||||
restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) ->
|
||||
restart_listener(Proto, ListenOn, Options).
|
||||
|
||||
-spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()).
|
||||
|
@ -138,16 +197,8 @@ stop() ->
|
|||
lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])).
|
||||
|
||||
-spec(stop_listener(listener()) -> ok | {error, term()}).
|
||||
stop_listener({Proto, ListenOn, Opts}) ->
|
||||
StopRet = stop_listener(Proto, ListenOn, Opts),
|
||||
case StopRet of
|
||||
ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n",
|
||||
[Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.",
|
||||
[Proto, format(ListenOn), Reason])
|
||||
end,
|
||||
StopRet.
|
||||
stop_listener(#{proto := Proto, listen_on := ListenOn, opts := Opts}) ->
|
||||
stop_listener(Proto, ListenOn, Opts).
|
||||
|
||||
-spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
|
||||
-> ok | {error, term()}).
|
||||
|
@ -181,3 +232,19 @@ ws_name(Name, {_Addr, Port}) ->
|
|||
ws_name(Name, Port);
|
||||
ws_name(Name, Port) ->
|
||||
list_to_atom(lists:concat([Name, ":", Port])).
|
||||
|
||||
identifier(Proto, Name) when is_atom(Proto) ->
|
||||
identifier(atom_to_list(Proto), Name);
|
||||
identifier(Proto, Name) ->
|
||||
iolist_to_binary(["mqtt", ":", Proto, ":", Name]).
|
||||
|
||||
find_by_listen_on(_ListenOn, []) -> false;
|
||||
find_by_listen_on(ListenOn, [#{listen_on := ListenOn} = L | _]) -> L;
|
||||
find_by_listen_on(ListenOn, [_ | Rest]) -> find_by_listen_on(ListenOn, Rest).
|
||||
|
||||
find_by_id(_Id, []) -> false;
|
||||
find_by_id(Id, [L | Rest]) ->
|
||||
case identifier(L) =:= Id of
|
||||
true -> L;
|
||||
false -> find_by_id(Id, Rest)
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue