fix: rpc call use emqx_bpapi behavior

This commit is contained in:
Zhongwen Deng 2022-03-11 20:08:52 +08:00
parent fd7f91b5a6
commit f195808691
2 changed files with 64 additions and 24 deletions

View File

@ -20,7 +20,7 @@
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_plugins/include/emqx_plugins.hrl"). %%-include_lib("emqx_plugins/include/emqx_plugins.hrl").
-export([ api_spec/0 -export([ api_spec/0
, fields/1 , fields/1
@ -37,6 +37,7 @@
]). ]).
-export([ validate_name/1 -export([ validate_name/1
, get_plugins/0
, install_package/2 , install_package/2
, delete_package/1 , delete_package/1
, describe_package/1 , describe_package/1
@ -251,15 +252,18 @@ validate_name(Name) ->
%% API CallBack Begin %% API CallBack Begin
list_plugins(get, _) -> list_plugins(get, _) ->
Plugins = cluster_call(emqx_plugins_monitor, get_plugins, [], 15000), {Plugins, []} = emqx_mgmt_api_plugins_proto_v1:get_plugins(),
{200, format_plugins(Plugins)}. {200, format_plugins(Plugins)}.
get_plugins() ->
{node(), emqx_plugins:list()}.
upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -> upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) ->
[{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)), [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)),
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
%% TODO what happened when a new node join in? %% TODO what happened when a new node join in?
%% emqx_plugins_monitor should copy plugins from other core node when boot-up. %% emqx_plugins_monitor should copy plugins from other core node when boot-up.
Res = cluster_call(?MODULE, install_package, [FileName, Bin], 25000), {Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
case lists:filter(fun(R) -> R =/= ok end, Res) of case lists:filter(fun(R) -> R =/= ok end, Res) of
[] -> {200}; [] -> {200};
[{error, Reason} | _] -> [{error, Reason} | _] ->
@ -274,17 +278,19 @@ upload_install(post, #{} = Body) ->
}. }.
plugin(get, #{bindings := #{name := Name}}) -> plugin(get, #{bindings := #{name := Name}}) ->
Plugins = cluster_call(?MODULE, describe_package, [Name], 10000), {Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name),
case format_plugins(Plugins) of case format_plugins(Plugins) of
[Plugin] -> {200, Plugin}; [Plugin] -> {200, Plugin};
[] -> {404, #{code => 'NOT_FOUND', message => Name}} [] -> {404, #{code => 'NOT_FOUND', message => Name}}
end; end;
plugin(delete, #{bindings := #{name := Name}}) -> plugin(delete, #{bindings := #{name := Name}}) ->
return(204, cluster_rpc(?MODULE, delete_package, [Name])). {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
return(204, Res).
update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
return(204, cluster_rpc(?MODULE, ensure_action, [Name, Action])). {ok, _TnxId, Res} = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
return(204, Res).
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
case parse_position(Body, Name) of case parse_position(Body, Name) of
@ -337,24 +343,6 @@ ensure_action(Name, restart) ->
_ = emqx_plugins:ensure_enabled(Name), _ = emqx_plugins:ensure_enabled(Name),
_ = emqx_plugins:restart(Name). _ = emqx_plugins:restart(Name).
cluster_call(Mod, Fun, Args, Timeout) ->
Nodes = mria_mnesia:running_nodes(),
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
BadNodes =/= [] andalso
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes,
mfa => {Mod, Fun, length(Args)}}),
GoodRes.
cluster_rpc(Mod, Fun, Args) ->
case emqx_cluster_rpc:multicall(Mod, Fun, Args, all, 30000) of
{ok, _TnxId, Res} -> Res;
{retry, TnxId, Res, Node} ->
?SLOG(error, #{msg => "failed_to_update_plugin_in_cluster", nodes => Node,
tnx_id => TnxId, mfa => {Mod, Fun, Args}}),
Res;
{error, Error} -> Error
end.
return(Code, ok) -> {Code}; return(Code, ok) -> {Code};
return(Code, {ok, Result}) -> {Code, Result}; return(Code, {ok, Result}) -> {Code, Result};
return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) -> return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) ->

View File

@ -0,0 +1,52 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_plugins_proto_v1).
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, get_plugins/0
, install_package/2
, describe_package/1
, delete_package/1
, ensure_action/2
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec get_plugins() -> emqx_rpc:multicall_result().
get_plugins() ->
rpc:multicall(emqx_mgmt_api_plugins, get_plugins, [], 15000).
-spec install_package(binary() | string(), binary()) -> emqx_rpc:multicall_result().
install_package(Filename, Bin) ->
rpc:multicall(emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000).
-spec describe_package(binary() | string()) -> emqx_rpc:multicall_result().
describe_package(Name) ->
rpc:multicall(emqx_mgmt_api_plugins, describe_package, [Name], 10000).
-spec delete_package(binary() | string()) -> emqx_cluster_rpc:multicall_return().
delete_package(Name) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).
-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') ->
emqx_cluster_rpc:multicall_return().
ensure_action(Name, Action) ->
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).