Merge remote-tracking branch 'origin/dev/v4.3.0' into dev/v5.0

This commit is contained in:
Zaiming Shi 2021-02-19 21:13:33 +01:00
commit c2cd2fd231
150 changed files with 622 additions and 303 deletions

View File

@ -106,7 +106,7 @@ load_hooks() ->
ok = emqx_auth_http:register_metrics(), ok = emqx_auth_http:register_metrics(),
PoolOpts = proplists:get_value(pool_opts, AuthReq), PoolOpts = proplists:get_value(pool_opts, AuthReq),
PoolName = proplists:get_value(pool_name, AuthReq), PoolName = proplists:get_value(pool_name, AuthReq),
ehttpc_sup:start_pool(PoolName, PoolOpts), {ok, _} = ehttpc_sup:start_pool(PoolName, PoolOpts),
case application:get_env(?APP, super_req) of case application:get_env(?APP, super_req) of
undefined -> undefined ->
emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq),
@ -114,7 +114,7 @@ load_hooks() ->
{ok, SuperReq} -> {ok, SuperReq} ->
PoolOpts1 = proplists:get_value(pool_opts, SuperReq), PoolOpts1 = proplists:get_value(pool_opts, SuperReq),
PoolName1 = proplists:get_value(pool_name, SuperReq), PoolName1 = proplists:get_value(pool_name, SuperReq),
ehttpc_sup:start_pool(PoolName1, PoolOpts1), {ok, _} = ehttpc_sup:start_pool(PoolName1, PoolOpts1),
emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq), emqx:hook('client.authenticate', {emqx_auth_http, check, [#{auth => maps:from_list(AuthReq),
super => maps:from_list(SuperReq)}]}) super => maps:from_list(SuperReq)}]})
end end
@ -125,7 +125,7 @@ load_hooks() ->
ok = emqx_acl_http:register_metrics(), ok = emqx_acl_http:register_metrics(),
PoolOpts2 = proplists:get_value(pool_opts, ACLReq), PoolOpts2 = proplists:get_value(pool_opts, ACLReq),
PoolName2 = proplists:get_value(pool_name, ACLReq), PoolName2 = proplists:get_value(pool_name, ACLReq),
ehttpc_sup:start_pool(PoolName2, PoolOpts2), {ok, _} = ehttpc_sup:start_pool(PoolName2, PoolOpts2),
emqx:hook('client.check_acl', {emqx_acl_http, check_acl, [#{acl => maps:from_list(ACLReq)}]}) emqx:hook('client.check_acl', {emqx_acl_http, check_acl, [#{acl => maps:from_list(ACLReq)}]})
end, end,
ok. ok.
@ -133,9 +133,9 @@ load_hooks() ->
unload_hooks() -> unload_hooks() ->
emqx:unhook('client.authenticate', {emqx_auth_http, check}), emqx:unhook('client.authenticate', {emqx_auth_http, check}),
emqx:unhook('client.check_acl', {emqx_acl_http, check_acl}), emqx:unhook('client.check_acl', {emqx_acl_http, check_acl}),
ehttpc_sup:stop_pool('emqx_auth_http/auth_req'), _ = ehttpc_sup:stop_pool('emqx_auth_http/auth_req'),
ehttpc_sup:stop_pool('emqx_auth_http/super_req'), _ = ehttpc_sup:stop_pool('emqx_auth_http/super_req'),
ehttpc_sup:stop_pool('emqx_auth_http/acl_req'), _ = ehttpc_sup:stop_pool('emqx_auth_http/acl_req'),
ok. ok.
parse_host(Host) -> parse_host(Host) ->

View File

@ -129,7 +129,7 @@ add(_Bindings, Params) ->
case Re of case Re of
#{result := ok} -> return({ok, Re}); #{result := ok} -> return({ok, Re});
#{result := <<"ok">>} -> return({ok, Re}); #{result := <<"ok">>} -> return({ok, Re});
_ -> return({error, Re}) _ -> return({error, {add, Re}})
end end
end. end.

View File

@ -59,12 +59,12 @@ insert_user(User = #emqx_user{login = Login}) ->
%% @doc Update User %% @doc Update User
-spec(update_user(tuple(), binary()) -> ok | {error, any()}). -spec(update_user(tuple(), binary()) -> ok | {error, any()}).
update_user(Login, NewPassword) -> update_user(Login, NewPassword) ->
User = #emqx_user{login = Login, password = encrypted_data(NewPassword)}, ret(mnesia:transaction(fun do_update_user/2, [Login, encrypted_data(NewPassword)])).
ret(mnesia:transaction(fun do_update_user/1, [User])).
do_update_user(User = #emqx_user{login = Login}) -> do_update_user(Login, NewPassword) ->
case mnesia:read(?TABLE, Login) of case mnesia:read(?TABLE, Login) of
[{?TABLE, Login, _, CreateAt}] -> mnesia:write(User#emqx_user{created_at = CreateAt}); [#emqx_user{} = User] ->
mnesia:write(User#emqx_user{password = NewPassword});
[] -> mnesia:abort(noexisted) [] -> mnesia:abort(noexisted)
end. end.
@ -119,7 +119,7 @@ hash(Password, SaltBin, HashType) ->
emqx_passwd:hash(HashType, <<SaltBin/binary, Password/binary>>). emqx_passwd:hash(HashType, <<SaltBin/binary, Password/binary>>).
salt() -> salt() ->
rand:seed(exsplus, erlang:timestamp()), {_AlgHandler, _AlgState} = rand:seed(exsplus, erlang:timestamp()),
Salt = rand:uniform(16#ffffffff), <<Salt:32>>. Salt = rand:uniform(16#ffffffff), <<Salt:32>>.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -55,7 +55,7 @@ start_listener({Proto, ListenOn, Opts}) ->
io:format("Start coap:~s listener on ~s successfully.~n", io:format("Start coap:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to start coap:~s listener on ~s - ~0p~n!", io:format(standard_error, "Failed to start coap:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]), [Proto, format(ListenOn), Reason]),
error(Reason) error(Reason)
end. end.
@ -71,7 +71,7 @@ stop_listener({Proto, ListenOn, _Opts}) ->
ok -> io:format("Stop coap:~s listener on ~s successfully.~n", ok -> io:format("Stop coap:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to stop coap:~s listener on ~s - ~p~n.", io:format(standard_error, "Failed to stop coap:~s listener on ~s: ~0p~n.",
[Proto, format(ListenOn), Reason]) [Proto, format(ListenOn), Reason])
end, end,
Ret. Ret.

View File

@ -69,7 +69,7 @@ start_connection_handler_instance({_Proto, _LisType, _ListenOn, Opts}) ->
{ok, _ClientChannelPid} -> {ok, _ClientChannelPid} ->
{_Proto, _LisType, _ListenOn, [{handler, Name} | LisOpts]}; {_Proto, _LisType, _ListenOn, [{handler, Name} | LisOpts]};
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to start ~s's connection handler - ~0p~n!", io:format(standard_error, "Failed to start ~s's connection handler: ~0p~n",
[Name, Reason]), [Name, Reason]),
error(Reason) error(Reason)
end. end.
@ -85,7 +85,7 @@ start_server({Name, Port, SSLOptions}) ->
io:format("Start ~s gRPC server on ~w successfully.~n", io:format("Start ~s gRPC server on ~w successfully.~n",
[Name, Port]); [Name, Port]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to start ~s gRPC server on ~w - ~0p~n!", io:format(standard_error, "Failed to start ~s gRPC server on ~w: ~0p~n",
[Name, Port, Reason]), [Name, Port, Reason]),
error({failed_start_server, Reason}) error({failed_start_server, Reason})
end. end.
@ -101,7 +101,7 @@ start_listener({Proto, LisType, ListenOn, Opts}) ->
io:format("Start ~s listener on ~s successfully.~n", io:format("Start ~s listener on ~s successfully.~n",
[Name, format(ListenOn)]); [Name, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to start ~s listener on ~s - ~0p~n!", io:format(standard_error, "Failed to start ~s listener on ~s: ~0p~n",
[Name, format(ListenOn), Reason]), [Name, format(ListenOn), Reason]),
error(Reason) error(Reason)
end. end.
@ -132,7 +132,7 @@ stop_listener({Proto, LisType, ListenOn, Opts}) ->
io:format("Stop ~s listener on ~s successfully.~n", io:format("Stop ~s listener on ~s successfully.~n",
[Name, format(ListenOn)]); [Name, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to stop ~s listener on ~s - ~p~n.", io:format(standard_error, "Failed to stop ~s listener on ~s: ~0p~n",
[Name, format(ListenOn), Reason]) [Name, format(ListenOn), Reason])
end, end,
StopRet. StopRet.

View File

@ -47,7 +47,7 @@ start_listener({Proto, ListenOn, Opts}) ->
io:format("Start lwm2m:~s listener on ~s successfully.~n", io:format("Start lwm2m:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to start lwm2m:~s listener on ~s - ~0p~n!", io:format(standard_error, "Failed to start lwm2m:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]), [Proto, format(ListenOn), Reason]),
error(Reason) error(Reason)
end. end.
@ -63,7 +63,7 @@ stop_listener({Proto, ListenOn, _Opts}) ->
ok -> io:format("Stop lwm2m:~s listener on ~s successfully.~n", ok -> io:format("Stop lwm2m:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to stop lwm2m:~s listener on ~s - ~p~n.", io:format(standard_error, "Failed to stop lwm2m:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]) [Proto, format(ListenOn), Reason])
end, end,
Ret. Ret.

View File

@ -71,7 +71,7 @@ start_listener({Proto, ListenOn, Options}) ->
{ok, _} -> io:format("Start mqttsn:~s listener on ~s successfully.~n", {ok, _} -> io:format("Start mqttsn:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to start mqttsn:~s listener on ~s - ~0p~n!", io:format(standard_error, "Failed to start mqttsn:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]), [Proto, format(ListenOn), Reason]),
error(Reason) error(Reason)
end. end.
@ -101,7 +101,7 @@ stop_listener({Proto, ListenOn, Opts}) ->
ok -> io:format("Stop mqttsn:~s listener on ~s successfully.~n", ok -> io:format("Stop mqttsn:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to stop mqttsn:~s listener on ~s - ~p~n.", io:format(standard_error, "Failed to stop mqttsn:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]) [Proto, format(ListenOn), Reason])
end, end,
StopRet. StopRet.

View File

@ -5,4 +5,4 @@
{platform_etc_dir, "etc"}. {platform_etc_dir, "etc"}.
{platform_lib_dir, "lib"}. {platform_lib_dir, "lib"}.
{platform_log_dir, "log"}. {platform_log_dir, "log"}.
{platform_plugins_dir, "plugins"}. {platform_plugins_dir, "plugins"}.

View File

@ -73,7 +73,7 @@ start_listener({Proto, ListenOn, Options}) ->
{ok, _} -> io:format("Start stomp:~s listener on ~s successfully.~n", {ok, _} -> io:format("Start stomp:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to start stomp:~s listener on ~s - ~0p~n!", io:format(standard_error, "Failed to start stomp:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]), [Proto, format(ListenOn), Reason]),
error(Reason) error(Reason)
end. end.
@ -102,7 +102,7 @@ stop_listener({Proto, ListenOn, Opts}) ->
ok -> io:format("Stop stomp:~s listener on ~s successfully.~n", ok -> io:format("Stop stomp:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]); [Proto, format(ListenOn)]);
{error, Reason} -> {error, Reason} ->
io:format(standard_error, "Failed to stop stomp:~s listener on ~s - ~p~n.", io:format(standard_error, "Failed to stop stomp:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]) [Proto, format(ListenOn), Reason])
end, end,
StopRet. StopRet.

View File

@ -47,7 +47,7 @@ main(Args) ->
{true, pong} -> {true, pong} ->
ok; ok;
{false, pong} -> {false, pong} ->
io:format(standard_error, "Failed to connect to node ~p .\n", [TargetNode]), io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]),
halt(1); halt(1);
{_, pang} -> {_, pang} ->
io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]), io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]),

View File

@ -5,7 +5,7 @@
[ [
{config, {config,
[ [
#{dirs => ["src", "apps/**/src", "lib-opensource/**/src"], #{dirs => ["src", "apps/**/src", "lib-ce/**/src"],
filter => "*.erl", filter => "*.erl",
ruleset => erl_files, ruleset => erl_files,
rules => [ rules => [
@ -16,7 +16,7 @@
]}} ]}}
] ]
}, },
#{dirs => ["test", "apps/**/test", "lib-opensource/**/src"], #{dirs => ["test", "apps/**/test", "lib-ce/**/src"],
filter => "*.erl", filter => "*.erl",
rules => [ rules => [
{elvis_text_style, line_length, #{ limit => 100 {elvis_text_style, line_length, #{ limit => 100

View File

@ -10,9 +10,9 @@ cd -P -- "$(dirname -- "$0")"
DOWNLOAD_URL='https://github.com/emqx/emqx-dashboard-frontend/releases/download' DOWNLOAD_URL='https://github.com/emqx/emqx-dashboard-frontend/releases/download'
if [ "$EMQX_ENTERPRISE" = 'true' ] || [ "$EMQX_ENTERPRISE" == '1' ]; then if [ "$EMQX_ENTERPRISE" = 'true' ] || [ "$EMQX_ENTERPRISE" == '1' ]; then
DASHBOARD_PATH='lib-enterprise/emqx_dashboard/priv' DASHBOARD_PATH='lib-ee/emqx_dashboard/priv'
else else
DASHBOARD_PATH='lib-opensource/emqx_dashboard/priv' DASHBOARD_PATH='lib-ce/emqx_dashboard/priv'
fi fi
case $(uname) in case $(uname) in

View File

@ -102,6 +102,7 @@
%% Listeners %% Listeners
-export([ list_listeners/0 -export([ list_listeners/0
, list_listeners/1 , list_listeners/1
, restart_listener/2
]). ]).
%% Alarms %% Alarms
@ -541,6 +542,7 @@ list_listeners(Node) when Node =:= node() ->
Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) -> Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
#{protocol => Protocol, #{protocol => Protocol,
listen_on => ListenOn, listen_on => ListenOn,
identifier => emqx_listeners:find_id_by_listen_on(ListenOn),
acceptors => esockd:get_acceptors({Protocol, ListenOn}), acceptors => esockd:get_acceptors({Protocol, ListenOn}),
max_conns => esockd:get_max_connections({Protocol, ListenOn}), max_conns => esockd:get_max_connections({Protocol, ListenOn}),
current_conns => esockd:get_current_connections({Protocol, ListenOn}), current_conns => esockd:get_current_connections({Protocol, ListenOn}),
@ -559,6 +561,12 @@ list_listeners(Node) when Node =:= node() ->
list_listeners(Node) -> list_listeners(Node) ->
rpc_call(Node, list_listeners, [Node]). rpc_call(Node, list_listeners, [Node]).
restart_listener(Node, Identifier) when Node =:= node() ->
emqx_listeners:restart_listener(Identifier);
restart_listener(Node, Identifier) ->
rpc_call(Node, restart_listener, [Node, Identifier]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Get Alarms %% Get Alarms
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -977,3 +985,4 @@ action_to_prop_list({action_instance, ActionInstId, Name, FallbackActions, Args}
{name, Name}, {name, Name},
{fallbacks, actions_to_prop_list(FallbackActions)}, {fallbacks, actions_to_prop_list(FallbackActions)},
{args, Args}]. {args, Args}].

View File

@ -30,7 +30,19 @@
func => list, func => list,
descr => "A list of listeners on the node"}). descr => "A list of listeners on the node"}).
-export([list/2]). -rest_api(#{name => restart_listener,
method => 'PUT',
path => "/listeners/:bin:identifier/restart",
func => restart,
descr => "Restart a listener in the cluster"}).
-rest_api(#{name => restart_node_listener,
method => 'PUT',
path => "/nodes/:atom:node/listeners/:bin:identifier/restart",
func => restart,
descr => "Restart a listener on a node"}).
-export([list/2, restart/2]).
%% List listeners on a node. %% List listeners on a node.
list(#{node := Node}, _Params) -> list(#{node := Node}, _Params) ->
@ -41,6 +53,21 @@ list(_Binding, _Params) ->
return({ok, [#{node => Node, listeners => format(Listeners)} return({ok, [#{node => Node, listeners => format(Listeners)}
|| {Node, Listeners} <- emqx_mgmt:list_listeners()]}). || {Node, Listeners} <- emqx_mgmt:list_listeners()]}).
%% Restart listeners on a node.
restart(#{node := Node, identifier := Identifier}, _Params) ->
case emqx_mgmt:restart_listener(Node, Identifier) of
ok -> return({ok, "Listener restarted."});
{error, Error} -> return({error, Error})
end;
%% Restart listeners in the cluster.
restart(#{identifier := Identifier}, _Params) ->
Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()],
case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of
[] -> return(ok);
Errors -> return({error, Errors})
end.
format(Listeners) when is_list(Listeners) -> format(Listeners) when is_list(Listeners) ->
[ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))} [ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))}
|| Info = #{listen_on := ListenOn} <- Listeners ]; || Info = #{listen_on := ListenOn} <- Listeners ];

View File

@ -157,7 +157,7 @@ cluster(["join", SNode]) ->
ignore -> ignore ->
emqx_ctl:print("Ignore.~n"); emqx_ctl:print("Ignore.~n");
{error, Error} -> {error, Error} ->
emqx_ctl:print("Failed to join the cluster: ~p~n", [Error]) emqx_ctl:print("Failed to join the cluster: ~0p~n", [Error])
end; end;
cluster(["leave"]) -> cluster(["leave"]) ->
@ -166,7 +166,7 @@ cluster(["leave"]) ->
emqx_ctl:print("Leave the cluster successfully.~n"), emqx_ctl:print("Leave the cluster successfully.~n"),
cluster(["status"]); cluster(["status"]);
{error, Error} -> {error, Error} ->
emqx_ctl:print("Failed to leave the cluster: ~p~n", [Error]) emqx_ctl:print("Failed to leave the cluster: ~0p~n", [Error])
end; end;
cluster(["force-leave", SNode]) -> cluster(["force-leave", SNode]) ->
@ -177,7 +177,7 @@ cluster(["force-leave", SNode]) ->
ignore -> ignore ->
emqx_ctl:print("Ignore.~n"); emqx_ctl:print("Ignore.~n");
{error, Error} -> {error, Error} ->
emqx_ctl:print("Failed to remove the node from cluster: ~p~n", [Error]) emqx_ctl:print("Failed to remove the node from cluster: ~0p~n", [Error])
end; end;
cluster(["status"]) -> cluster(["status"]) ->
@ -510,49 +510,73 @@ trace_off(Who, Name) ->
listeners([]) -> listeners([]) ->
foreach(fun({{Protocol, ListenOn}, _Pid}) -> foreach(fun({{Protocol, ListenOn}, _Pid}) ->
Info = [{acceptors, esockd:get_acceptors({Protocol, ListenOn})}, Info = [{listen_on, {string, emqx_listeners:format_listen_on(ListenOn)}},
{acceptors, esockd:get_acceptors({Protocol, ListenOn})},
{max_conns, esockd:get_max_connections({Protocol, ListenOn})}, {max_conns, esockd:get_max_connections({Protocol, ListenOn})},
{current_conn, esockd:get_current_connections({Protocol, ListenOn})}, {current_conn, esockd:get_current_connections({Protocol, ListenOn})},
{shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}], {shutdown_count, esockd:get_shutdown_count({Protocol, ListenOn})}
emqx_ctl:print("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]), ],
foreach(fun({Key, Val}) -> emqx_ctl:print("~s~n", [listener_identifier(Protocol, ListenOn)]),
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]) foreach(fun indent_print/1, Info)
end, Info)
end, esockd:listeners()), end, esockd:listeners()),
foreach(fun({Protocol, Opts}) -> foreach(fun({Protocol, Opts}) ->
Info = [{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)}, Port = proplists:get_value(port, Opts),
Info = [{listen_on, {string, emqx_listeners:format_listen_on(Port)}},
{acceptors, maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0)},
{max_conns, proplists:get_value(max_connections, Opts)}, {max_conns, proplists:get_value(max_connections, Opts)},
{current_conn, proplists:get_value(all_connections, Opts)}, {current_conn, proplists:get_value(all_connections, Opts)},
{shutdown_count, []}], {shutdown_count, []}],
emqx_ctl:print("listener on ~s:~p~n", [Protocol, proplists:get_value(port, Opts)]), emqx_ctl:print("~s~n", [listener_identifier(Protocol, Port)]),
foreach(fun({Key, Val}) -> foreach(fun indent_print/1, Info)
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val])
end, Info)
end, ranch:info()); end, ranch:info());
listeners(["stop", Name = "http" ++ _N, ListenOn]) -> listeners(["stop", Name = "http" ++ _N | _MaybePort]) ->
%% _MaybePort is to be backward compatible, to stop http listener, there is no need for the port number
case minirest:stop_http(list_to_atom(Name)) of case minirest:stop_http(list_to_atom(Name)) of
ok -> ok ->
emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Name, ListenOn]); emqx_ctl:print("Stop ~s listener successfully.~n", [Name]);
{error, Error} -> {error, Error} ->
emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Name, ListenOn, Error]) emqx_ctl:print("Failed to stop ~s listener: ~0p~n", [Name, Error])
end; end;
listeners(["stop", Proto, ListenOn]) -> listeners(["stop", "mqtt:" ++ _ = Identifier]) ->
stop_listener(emqx_listeners:find_by_id(Identifier), Identifier);
listeners(["stop", _Proto, ListenOn]) ->
%% this clause is kept to be backward compatible
ListenOn1 = case string:tokens(ListenOn, ":") of ListenOn1 = case string:tokens(ListenOn, ":") of
[Port] -> list_to_integer(Port); [Port] -> list_to_integer(Port);
[IP, Port] -> {IP, list_to_integer(Port)} [IP, Port] -> {IP, list_to_integer(Port)}
end, end,
case emqx_listeners:stop_listener({list_to_atom(Proto), ListenOn1, []}) of stop_listener(emqx_listeners:find_by_listen_on(ListenOn1), ListenOn1);
listeners(["restart", Identifier]) ->
case emqx_listeners:restart_listener(Identifier) of
ok -> ok ->
emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [Proto, ListenOn]); emqx_ctl:print("Restarted ~s listener successfully.~n", [Identifier]);
{error, Error} -> {error, Error} ->
emqx_ctl:print("Failed to stop ~s listener on ~s, error:~p~n", [Proto, ListenOn, Error]) emqx_ctl:print("Failed to restart ~s listener: ~0p~n", [Identifier, Error])
end; end;
listeners(_) -> listeners(_) ->
emqx_ctl:usage([{"listeners", "List listeners"}, emqx_ctl:usage([{"listeners", "List listeners"},
{"listeners stop <Proto> <Port>", "Stop a listener"}]). {"listeners stop <Identifier>", "Stop a listener"},
{"listeners stop <Proto> <Port>", "Stop a listener"},
{"listeners restart <Identifier>", "Restart a listener"}
]).
stop_listener(false, Input) ->
emqx_ctl:print("No such listener ~p~n", [Input]);
stop_listener(#{listen_on := ListenOn} = Listener, _Input) ->
ID = emqx_listeners:identifier(Listener),
ListenOnStr = emqx_listeners:format_listen_on(ListenOn),
case emqx_listeners:stop_listener(Listener) of
ok ->
emqx_ctl:print("Stop ~s listener on ~s successfully.~n", [ID, ListenOnStr]);
{error, Reason} ->
emqx_ctl:print("Failed to stop ~s listener on ~s: ~0p~n",
[ID, ListenOnStr, Reason])
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc data Command %% @doc data Command
@ -707,3 +731,16 @@ format(_, Val) ->
Val. Val.
bin(S) -> iolist_to_binary(S). bin(S) -> iolist_to_binary(S).
indent_print({Key, {string, Val}}) ->
emqx_ctl:print(" ~-16s: ~s~n", [Key, Val]);
indent_print({Key, Val}) ->
emqx_ctl:print(" ~-16s: ~w~n", [Key, Val]).
listener_identifier(Protocol, ListenOn) ->
case emqx_listeners:find_id_by_listen_on(ListenOn) of
false ->
"http" ++ _ = atom_to_list(Protocol); %% assert
ID ->
ID
end.

View File

@ -49,7 +49,8 @@ groups() ->
t_broker_cmd, t_broker_cmd,
t_router_cmd, t_router_cmd,
t_subscriptions_cmd, t_subscriptions_cmd,
t_listeners_cmd t_listeners_cmd_old,
t_listeners_cmd_new
]}]. ]}].
apps() -> apps() ->
@ -275,12 +276,35 @@ t_subscriptions_cmd(_) ->
?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"), ?assertEqual(emqx_mgmt_cli:subscriptions(["del", "client", "b/b/c"]), "ok~n"),
unmock_print(). unmock_print().
t_listeners_cmd(_) -> t_listeners_cmd_old(_) ->
ok = emqx_listeners:ensure_all_started(),
mock_print(), mock_print(),
?assertEqual(emqx_mgmt_cli:listeners([]), ok), ?assertEqual(emqx_mgmt_cli:listeners([]), ok),
?assertEqual( ?assertEqual(
emqx_mgmt_cli:listeners(["stop", "wss", "8084"]), "Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
"Stop wss listener on 8084 successfully.\n" emqx_mgmt_cli:listeners(["stop", "wss", "8084"])
),
unmock_print().
t_listeners_cmd_new(_) ->
ok = emqx_listeners:ensure_all_started(),
mock_print(),
?assertEqual(emqx_mgmt_cli:listeners([]), ok),
?assertEqual(
"Stop mqtt:wss:external listener on 0.0.0.0:8084 successfully.\n",
emqx_mgmt_cli:listeners(["stop", "mqtt:wss:external"])
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "mqtt:tcp:external"]),
"Restarted mqtt:tcp:external listener successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "mqtt:ssl:external"]),
"Restarted mqtt:ssl:external listener successfully.\n"
),
?assertEqual(
emqx_mgmt_cli:listeners(["restart", "bad:listener:identifier"]),
"Failed to restart bad:listener:identifier listener: {no_such_listener,\"bad:listener:identifier\"}\n"
), ),
unmock_print(). unmock_print().

Some files were not shown because too many files have changed in this diff Show More