From 4e9330a37e086c887d3059e5686bf18612b00411 Mon Sep 17 00:00:00 2001 From: zhanghongtong Date: Thu, 25 Feb 2021 09:43:37 +0800 Subject: [PATCH] chore(emqx_management): merge enterprise to opensource --- .../src/emqx_mgmt_api_pubsub.erl | 23 +++-- lib-ce/emqx_management/src/emqx_mgmt_http.erl | 4 + .../src/emqx_mod_api_topic_metrics.erl} | 85 +++++++++++++++++-- lib-ce/emqx_modules/src/emqx_modules.erl | 40 +++++++++ .../src/emqx_modules_api.erl} | 52 +++++++++--- lib-ce/emqx_modules/src/emqx_modules_app.erl | 1 + 6 files changed, 183 insertions(+), 22 deletions(-) rename lib-ce/{emqx_management/src/emqx_mgmt_api_topic_metrics.erl => emqx_modules/src/emqx_mod_api_topic_metrics.erl} (56%) rename lib-ce/{emqx_management/src/emqx_mgmt_api_modules.erl => emqx_modules/src/emqx_modules_api.erl} (70%) diff --git a/lib-ce/emqx_management/src/emqx_mgmt_api_pubsub.erl b/lib-ce/emqx_management/src/emqx_mgmt_api_pubsub.erl index 956f2a45f..f49aecb0e 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_api_pubsub.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_api_pubsub.erl @@ -78,7 +78,19 @@ subscribe(_Bindings, Params) -> publish(_Bindings, Params) -> logger:debug("API publish Params:~p", [Params]), {ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params), - return(do_publish(ClientId, Topic, Qos, Retain, Payload)). + case do_publish(ClientId, Topic, Qos, Retain, Payload) of + {ok, MsgIds} -> + case get_value(<<"return">>, Params, undefined) of + undefined -> return(ok); + _Val -> + case get_value(<<"topics">>, Params, undefined) of + undefined -> return({ok, #{msgid => lists:last(MsgIds)}}); + _ -> return({ok, #{msgids => MsgIds}}) + end + end; + Result -> + return(Result) + end. unsubscribe(_Bindings, Params) -> logger:debug("API unsubscribe Params:~p", [Params]), @@ -119,7 +131,7 @@ loop_publish([], Result) -> loop_publish([Params | ParamsN], Acc) -> {ClientId, Topic, Qos, Retain, Payload} = parse_publish_params(Params), Code = case do_publish(ClientId, Topic, Qos, Retain, Payload) of - ok -> 0; + {ok, _} -> 0; {_, Code0, _} -> Code0 end, Result = #{topic => resp_topic(get_value(<<"topic">>, Params), get_value(<<"topics">>, Params, <<"">>)), @@ -153,11 +165,12 @@ do_subscribe(ClientId, Topics, QoS) -> do_publish(_ClientId, [], _Qos, _Retain, _Payload) -> {ok, ?ERROR15, bad_topic}; do_publish(ClientId, Topics, Qos, Retain, Payload) -> - lists:foreach(fun(Topic) -> + MsgIds = lists:map(fun(Topic) -> Msg = emqx_message:make(ClientId, Qos, Topic, Payload), - emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}}) + emqx_mgmt:publish(Msg#message{flags = #{retain => Retain}}), + emqx_guid:to_hexstr(Msg#message.id) end, Topics), - ok. + {ok, MsgIds}. do_unsubscribe(ClientId, Topic) -> case validate_by_filter(Topic) of diff --git a/lib-ce/emqx_management/src/emqx_mgmt_http.erl b/lib-ce/emqx_management/src/emqx_mgmt_http.erl index d6fcf9ab3..8fb4f74ad 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_http.erl +++ b/lib-ce/emqx_management/src/emqx_mgmt_http.erl @@ -119,11 +119,15 @@ authorize_appid(Req) -> _ -> false end. +-ifdef(EMQX_ENTERPRISE). +filter(_) -> true. +-else. filter(#{app := App}) -> case emqx_plugins:find_plugin(App) of false -> false; Plugin -> Plugin#plugin.active end. +-endif. format(Port) when is_integer(Port) -> io_lib:format("0.0.0.0:~w", [Port]); diff --git a/lib-ce/emqx_management/src/emqx_mgmt_api_topic_metrics.erl b/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl similarity index 56% rename from lib-ce/emqx_management/src/emqx_mgmt_api_topic_metrics.erl rename to lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl index aac7845b7..20416da7f 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_api_topic_metrics.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_api_topic_metrics.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mgmt_api_topic_metrics). +-module(emqx_mod_api_topic_metrics). -import(minirest, [return/1]). @@ -58,7 +58,7 @@ list(#{topic := Topic0}, _Params) -> Topic = emqx_mgmt_util:urldecode(Topic0), case safe_validate(Topic) of true -> - case emqx_mgmt:get_topic_metrics(Topic) of + case get_topic_metrics(Topic) of {error, Reason} -> return({error, Reason}); Metrics -> return({ok, maps:from_list(Metrics)}) end; @@ -69,7 +69,7 @@ list(#{topic := Topic0}, _Params) -> list(_Bindings, _Params) -> execute_when_enabled(fun() -> - case emqx_mgmt:get_all_topic_metrics() of + case get_all_topic_metrics() of {error, Reason} -> return({error, Reason}); Metrics -> return({ok, Metrics}) end @@ -83,7 +83,7 @@ register(_Bindings, Params) -> Topic -> case safe_validate(Topic) of true -> - emqx_mgmt:register_topic_metrics(Topic), + register_topic_metrics(Topic), return(ok); false -> return({error, invalid_topic_name}) @@ -93,7 +93,7 @@ register(_Bindings, Params) -> unregister(Bindings, _Params) when map_size(Bindings) =:= 0 -> execute_when_enabled(fun() -> - emqx_mgmt:unregister_all_topic_metrics(), + unregister_all_topic_metrics(), return(ok) end); @@ -102,7 +102,7 @@ unregister(#{topic := Topic0}, _Params) -> Topic = emqx_mgmt_util:urldecode(Topic0), case safe_validate(Topic) of true -> - emqx_mgmt:unregister_topic_metrics(Topic), + unregister_topic_metrics(Topic), return(ok); false -> return({error, invalid_topic_name}) @@ -128,3 +128,76 @@ safe_validate(Topic) -> error:_Error -> false end. + +get_all_topic_metrics() -> + lists:foldl(fun(Topic, Acc) -> + case get_topic_metrics(Topic) of + {error, _Reason} -> + Acc; + Metrics -> + [#{topic => Topic, metrics => Metrics} | Acc] + end + end, [], emqx_mod_topic_metrics:all_registered_topics()). + +get_topic_metrics(Topic) -> + lists:foldl(fun(Node, Acc) -> + case get_topic_metrics(Node, Topic) of + {error, _Reason} -> + Acc; + Metrics -> + case Acc of + [] -> Metrics; + _ -> + lists:foldl(fun({K, V}, Acc0) -> + [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0] + end, [], Acc) + end + end + end, [], ekka_mnesia:running_nodes()). + +get_topic_metrics(Node, Topic) when Node =:= node() -> + emqx_mod_topic_metrics:metrics(Topic); +get_topic_metrics(Node, Topic) -> + rpc_call(Node, get_topic_metrics, [Node, Topic]). + +register_topic_metrics(Topic) -> + Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], + case lists:any(fun(Item) -> Item =:= ok end, Results) of + true -> ok; + false -> lists:last(Results) + end. + +register_topic_metrics(Node, Topic) when Node =:= node() -> + emqx_mod_topic_metrics:register(Topic); +register_topic_metrics(Node, Topic) -> + rpc_call(Node, register_topic_metrics, [Node, Topic]). + +unregister_topic_metrics(Topic) -> + Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], + case lists:any(fun(Item) -> Item =:= ok end, Results) of + true -> ok; + false -> lists:last(Results) + end. + +unregister_topic_metrics(Node, Topic) when Node =:= node() -> + emqx_mod_topic_metrics:unregister(Topic); +unregister_topic_metrics(Node, Topic) -> + rpc_call(Node, unregister_topic_metrics, [Node, Topic]). + +unregister_all_topic_metrics() -> + Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()], + case lists:any(fun(Item) -> Item =:= ok end, Results) of + true -> ok; + false -> lists:last(Results) + end. + +unregister_all_topic_metrics(Node) when Node =:= node() -> + emqx_mod_topic_metrics:unregister_all(); +unregister_all_topic_metrics(Node) -> + rpc_call(Node, unregister_topic_metrics, [Node]). + +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res + end. diff --git a/lib-ce/emqx_modules/src/emqx_modules.erl b/lib-ce/emqx_modules/src/emqx_modules.erl index ffcc456f0..38cf2ac1b 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.erl +++ b/lib-ce/emqx_modules/src/emqx_modules.erl @@ -30,6 +30,8 @@ , load_module/2 ]). +-export([cli/1]). + %% @doc List all available plugins -spec(list() -> [{atom(), boolean()}]). list() -> @@ -170,3 +172,41 @@ write_loaded(true) -> ok end; write_loaded(false) -> ok. + +%%-------------------------------------------------------------------- +%% @doc Modules Command +cli(["list"]) -> + foreach(fun(Module) -> print({module, Module}) end, emqx_modules:list()); + +cli(["load", Name]) -> + case emqx_modules:load(list_to_atom(Name)) of + ok -> + emqx_ctl:print("Module ~s loaded successfully.~n", [Name]); + {error, Reason} -> + emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason]) + end; + +cli(["unload", Name]) -> + case emqx_modules:unload(list_to_atom(Name)) of + ok -> + emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]); + {error, Reason} -> + emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason]) + end; + +cli(["reload", "emqx_mod_acl_internal" = Name]) -> + case emqx_modules:reload(list_to_atom(Name)) of + ok -> + emqx_ctl:print("Module ~s reloaded successfully.~n", [Name]); + {error, Reason} -> + emqx_ctl:print("Reload module ~s error: ~p.~n", [Name, Reason]) + end; +cli(["reload", Name]) -> + emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]); + +cli(_) -> + emqx_ctl:usage([{"modules list", "Show loaded modules"}, + {"modules load ", "Load module"}, + {"modules unload ", "Unload module"}, + {"modules reload ", "Reload module"} + ]). diff --git a/lib-ce/emqx_management/src/emqx_mgmt_api_modules.erl b/lib-ce/emqx_modules/src/emqx_modules_api.erl similarity index 70% rename from lib-ce/emqx_management/src/emqx_mgmt_api_modules.erl rename to lib-ce/emqx_modules/src/emqx_modules_api.erl index 0e2821ec6..fae6d0e5e 100644 --- a/lib-ce/emqx_management/src/emqx_mgmt_api_modules.erl +++ b/lib-ce/emqx_modules/src/emqx_modules_api.erl @@ -14,9 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mgmt_api_modules). - --include("emqx_mgmt.hrl"). +-module(emqx_modules_api). -import(minirest, [return/1]). @@ -75,16 +73,16 @@ ]). list(#{node := Node}, _Params) -> - return({ok, [format(Module) || Module <- emqx_mgmt:list_modules(Node)]}); + return({ok, [format(Module) || Module <- list_modules(Node)]}); list(_Bindings, _Params) -> - return({ok, [format(Node, Modules) || {Node, Modules} <- emqx_mgmt:list_modules()]}). + return({ok, [format(Node, Modules) || {Node, Modules} <- list_modules()]}). load(#{node := Node, module := Module}, _Params) -> - return(emqx_mgmt:load_module(Node, Module)); + return(load_module(Node, Module)); load(#{module := Module}, _Params) -> - Results = [emqx_mgmt:load_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()], + Results = [load_module(Node, Module) || {Node, _Info} <- list_nodes()], case lists:filter(fun(Item) -> Item =/= ok end, Results) of [] -> return(ok); @@ -93,10 +91,10 @@ load(#{module := Module}, _Params) -> end. unload(#{node := Node, module := Module}, _Params) -> - return(emqx_mgmt:unload_module(Node, Module)); + return(unload_module(Node, Module)); unload(#{module := Module}, _Params) -> - Results = [emqx_mgmt:unload_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()], + Results = [unload_module(Node, Module) || {Node, _Info} <- list_nodes()], case lists:filter(fun(Item) -> Item =/= ok end, Results) of [] -> return(ok); @@ -105,13 +103,13 @@ unload(#{module := Module}, _Params) -> end. reload(#{node := Node, module := Module}, _Params) -> - case emqx_mgmt:reload_module(Node, Module) of + case reload_module(Node, Module) of ignore -> return(ok); Result -> return(Result) end; reload(#{module := Module}, _Params) -> - Results = [emqx_mgmt:reload_module(Node, Module) || {Node, _Info} <- emqx_mgmt:list_nodes()], + Results = [reload_module(Node, Module) || {Node, _Info} <- list_nodes()], case lists:filter(fun(Item) -> Item =/= ok end, Results) of [] -> return(ok); @@ -119,6 +117,10 @@ reload(#{module := Module}, _Params) -> return(lists:last(Errors)) end. +%%------------------------------------------------------------------------------ +%% Internal Functions +%%------------------------------------------------------------------------------ + format(Node, Modules) -> #{node => Node, modules => [format(Module) || Module <- Modules]}. @@ -127,3 +129,31 @@ format({Name, Active}) -> description => iolist_to_binary(Name:description()), active => Active}. +list_modules() -> + [{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()]. + +list_modules(Node) when Node =:= node() -> + emqx_modules:list(); +list_modules(Node) -> + rpc_call(Node, list_modules, [Node]). + +load_module(Node, Module) when Node =:= node() -> + emqx_modules:load(Module); +load_module(Node, Module) -> + rpc_call(Node, load_module, [Node, Module]). + +unload_module(Node, Module) when Node =:= node() -> + emqx_modules:unload(Module); +unload_module(Node, Module) -> + rpc_call(Node, unload_module, [Node, Module]). + +reload_module(Node, Module) when Node =:= node() -> + emqx_modules:reload(Module); +reload_module(Node, Module) -> + rpc_call(Node, reload_module, [Node, Module]). + +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res + end. diff --git a/lib-ce/emqx_modules/src/emqx_modules_app.erl b/lib-ce/emqx_modules/src/emqx_modules_app.erl index 832a39c8e..34ff51267 100644 --- a/lib-ce/emqx_modules/src/emqx_modules_app.erl +++ b/lib-ce/emqx_modules/src/emqx_modules_app.erl @@ -30,6 +30,7 @@ start(_Type, _Args) -> application:load(emqx), {ok, Pid} = emqx_mod_sup:start_link(), ok = emqx_modules:load(), + emqx_ctl:register_command(modules, {emqx_modules, cli}, []), {ok, Pid}. stop(_State) ->