chore: fix dialyzer warning.
This commit is contained in:
parent
0f681f6a08
commit
fd7f91b5a6
|
@ -77,7 +77,8 @@ schema("/plugins/install") ->
|
||||||
'operationId' => upload_install,
|
'operationId' => upload_install,
|
||||||
post => #{
|
post => #{
|
||||||
description => "Install a plugin(plugin-vsn.tar.gz)."
|
description => "Install a plugin(plugin-vsn.tar.gz)."
|
||||||
"Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) to develop plugin.",
|
"Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) "
|
||||||
|
"to develop plugin.",
|
||||||
'requestBody' => #{
|
'requestBody' => #{
|
||||||
content => #{
|
content => #{
|
||||||
'multipart/form-data' => #{
|
'multipart/form-data' => #{
|
||||||
|
@ -97,7 +98,7 @@ schema("/plugins/:name") ->
|
||||||
parameters => [hoconsc:ref(name)],
|
parameters => [hoconsc:ref(name)],
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => hoconsc:ref(plugin),
|
200 => hoconsc:ref(plugin),
|
||||||
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found")
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
delete => #{
|
delete => #{
|
||||||
|
@ -105,7 +106,7 @@ schema("/plugins/:name") ->
|
||||||
parameters => [hoconsc:ref(name)],
|
parameters => [hoconsc:ref(name)],
|
||||||
responses => #{
|
responses => #{
|
||||||
204 => <<"Uninstall successfully">>,
|
204 => <<"Uninstall successfully">>,
|
||||||
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found")
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -121,7 +122,7 @@ schema("/plugins/:name/:action") ->
|
||||||
{action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}],
|
{action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}],
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => <<"OK">>,
|
200 => <<"OK">>,
|
||||||
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found")
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Plugin Not Found">>)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -142,7 +143,7 @@ fields(plugin) ->
|
||||||
#{
|
#{
|
||||||
desc => "Name-Vsn: without .tar.gz",
|
desc => "Name-Vsn: without .tar.gz",
|
||||||
validator => fun ?MODULE:validate_name/1,
|
validator => fun ?MODULE:validate_name/1,
|
||||||
nullable => false,
|
required => true,
|
||||||
example => "emqx_plugin_template-5.0-rc.1"})
|
example => "emqx_plugin_template-5.0-rc.1"})
|
||||||
},
|
},
|
||||||
{author, hoconsc:mk(list(string()), #{example => [<<"EMQ X Team">>]})},
|
{author, hoconsc:mk(list(string()), #{example => [<<"EMQ X Team">>]})},
|
||||||
|
@ -151,7 +152,8 @@ fields(plugin) ->
|
||||||
{compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})},
|
{compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})},
|
||||||
{git_commit_or_build_date, hoconsc:mk(string(), #{
|
{git_commit_or_build_date, hoconsc:mk(string(), #{
|
||||||
example => "2021-12-25",
|
example => "2021-12-25",
|
||||||
desc => "Last git commit date by `git log -1 --pretty=format:'%cd' --date=format:'%Y-%m-%d`."
|
desc => "Last git commit date by `git log -1 --pretty=format:'%cd' "
|
||||||
|
"--date=format:'%Y-%m-%d`.\n"
|
||||||
" If the last commit date is not available, the build date will be presented."
|
" If the last commit date is not available, the build date will be presented."
|
||||||
})},
|
})},
|
||||||
{functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})},
|
{functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})},
|
||||||
|
@ -159,25 +161,26 @@ fields(plugin) ->
|
||||||
{metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})},
|
{metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})},
|
||||||
{rel_vsn, hoconsc:mk(binary(),
|
{rel_vsn, hoconsc:mk(binary(),
|
||||||
#{desc => "Plugins release version",
|
#{desc => "Plugins release version",
|
||||||
nullable => false,
|
required => true,
|
||||||
example => <<"5.0-rc.1">>})
|
example => <<"5.0-rc.1">>})
|
||||||
},
|
},
|
||||||
{rel_apps, hoconsc:mk(hoconsc:array(binary()),
|
{rel_apps, hoconsc:mk(hoconsc:array(binary()),
|
||||||
#{desc => "Aplications in plugin.",
|
#{desc => "Aplications in plugin.",
|
||||||
nullable => false,
|
required => true,
|
||||||
example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]})
|
example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]})
|
||||||
},
|
},
|
||||||
{repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})},
|
{repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})},
|
||||||
{description, hoconsc:mk(binary(),
|
{description, hoconsc:mk(binary(),
|
||||||
#{desc => "Plugin description.",
|
#{desc => "Plugin description.",
|
||||||
nullable => false,
|
required => true,
|
||||||
example => "This is an demo plugin description"})
|
example => "This is an demo plugin description"})
|
||||||
},
|
},
|
||||||
{running_status, hoconsc:mk(hoconsc:array(hoconsc:ref(running_status)), #{nullable => false})},
|
{running_status, hoconsc:mk(hoconsc:array(hoconsc:ref(running_status)),
|
||||||
|
#{required => true})},
|
||||||
{readme, hoconsc:mk(binary(), #{
|
{readme, hoconsc:mk(binary(), #{
|
||||||
example => "This is an demo plugin.",
|
example => "This is an demo plugin.",
|
||||||
desc => "only return when `GET /plugins/{name}`.",
|
desc => "only return when `GET /plugins/{name}`.",
|
||||||
nullable => true})}
|
required => false})}
|
||||||
];
|
];
|
||||||
fields(name) ->
|
fields(name) ->
|
||||||
[{name, hoconsc:mk(binary(),
|
[{name, hoconsc:mk(binary(),
|
||||||
|
@ -199,9 +202,10 @@ fields(position) ->
|
||||||
#{
|
#{
|
||||||
desc => """
|
desc => """
|
||||||
Enable auto-boot at position in the boot list, where Position could be
|
Enable auto-boot at position in the boot list, where Position could be
|
||||||
'top', 'bottom', or 'before:other-vsn', 'after:other-vsn' to specify a relative position.
|
'top', 'bottom', or 'before:other-vsn', 'after:other-vsn'
|
||||||
|
to specify a relative position.
|
||||||
""",
|
""",
|
||||||
nullable => true
|
required => false
|
||||||
})}];
|
})}];
|
||||||
fields(running_status) ->
|
fields(running_status) ->
|
||||||
[
|
[
|
||||||
|
@ -265,7 +269,8 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -
|
||||||
upload_install(post, #{} = Body) ->
|
upload_install(post, #{} = Body) ->
|
||||||
io:format("~p~n", [Body]),
|
io:format("~p~n", [Body]),
|
||||||
{400, #{code => 'BAD_FORM_DATA',
|
{400, #{code => 'BAD_FORM_DATA',
|
||||||
message => <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
|
message =>
|
||||||
|
<<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
plugin(get, #{bindings := #{name := Name}}) ->
|
plugin(get, #{bindings := #{name := Name}}) ->
|
||||||
|
@ -315,28 +320,29 @@ describe_package(Name) ->
|
||||||
delete_package(Name) ->
|
delete_package(Name) ->
|
||||||
case emqx_plugins:ensure_stopped(Name) of
|
case emqx_plugins:ensure_stopped(Name) of
|
||||||
ok ->
|
ok ->
|
||||||
emqx_plugins:ensure_disabled(Name),
|
_ = emqx_plugins:ensure_disabled(Name),
|
||||||
emqx_plugins:purge(Name),
|
_ = emqx_plugins:purge(Name),
|
||||||
emqx_plugins:delete_package(Name);
|
_ = emqx_plugins:delete_package(Name);
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% for RPC plugin update
|
%% for RPC plugin update
|
||||||
ensure_action(Name, start) ->
|
ensure_action(Name, start) ->
|
||||||
emqx_plugins:ensure_enabled(Name),
|
_ = emqx_plugins:ensure_enabled(Name),
|
||||||
emqx_plugins:ensure_started(Name);
|
_ = emqx_plugins:ensure_started(Name);
|
||||||
ensure_action(Name, stop) ->
|
ensure_action(Name, stop) ->
|
||||||
emqx_plugins:ensure_stopped(Name),
|
_ = emqx_plugins:ensure_stopped(Name),
|
||||||
emqx_plugins:ensure_disabled(Name);
|
_ = emqx_plugins:ensure_disabled(Name);
|
||||||
ensure_action(Name, restart) ->
|
ensure_action(Name, restart) ->
|
||||||
emqx_plugins:ensure_enabled(Name),
|
_ = emqx_plugins:ensure_enabled(Name),
|
||||||
emqx_plugins:restart(Name).
|
_ = emqx_plugins:restart(Name).
|
||||||
|
|
||||||
cluster_call(Mod, Fun, Args, Timeout) ->
|
cluster_call(Mod, Fun, Args, Timeout) ->
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
|
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
|
||||||
BadNodes =/= [] andalso
|
BadNodes =/= [] andalso
|
||||||
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, length(Args)}}),
|
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes,
|
||||||
|
mfa => {Mod, Fun, length(Args)}}),
|
||||||
GoodRes.
|
GoodRes.
|
||||||
|
|
||||||
cluster_rpc(Mod, Fun, Args) ->
|
cluster_rpc(Mod, Fun, Args) ->
|
||||||
|
@ -358,14 +364,18 @@ return(_, {error, Reason}) ->
|
||||||
|
|
||||||
parse_position(#{<<"position">> := <<"top">>}, _) -> front;
|
parse_position(#{<<"position">> := <<"top">>}, _) -> front;
|
||||||
parse_position(#{<<"position">> := <<"bottom">>}, _) -> rear;
|
parse_position(#{<<"position">> := <<"bottom">>}, _) -> rear;
|
||||||
parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) -> {error, <<"Can't before:self">>};
|
parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) ->
|
||||||
parse_position(#{<<"position">> := <<"after:", Name/binary>>}, Name) -> {error, <<"Can't after:self">>};
|
{error, <<"Can't before:self">>};
|
||||||
parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) -> {before, binary_to_list(Before)};
|
parse_position(#{<<"position">> := <<"after:", Name/binary>>}, Name) ->
|
||||||
parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) -> {behind, binary_to_list(After)};
|
{error, <<"Can't after:self">>};
|
||||||
|
parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) ->
|
||||||
|
{before, binary_to_list(Before)};
|
||||||
|
parse_position(#{<<"position">> := <<"after:", After/binary>>}, _Name) ->
|
||||||
|
{behind, binary_to_list(After)};
|
||||||
parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}.
|
parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}.
|
||||||
|
|
||||||
format_plugins(List) ->
|
format_plugins(List) ->
|
||||||
StatusMap = emqx_plugins_monitor:aggregate_status(List),
|
StatusMap = aggregate_status(List),
|
||||||
SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end,
|
SortFun = fun({_N1, P1}, {_N2, P2}) -> length(P1) > length(P2) end,
|
||||||
SortList = lists:sort(SortFun, List),
|
SortList = lists:sort(SortFun, List),
|
||||||
pack_status_in_order(SortList, StatusMap).
|
pack_status_in_order(SortList, StatusMap).
|
||||||
|
@ -390,3 +400,21 @@ pack_plugin_in_order([Plugin0 | Plugins], Acc, StatusAcc) ->
|
||||||
error ->
|
error ->
|
||||||
pack_plugin_in_order(Plugins, Acc, StatusAcc)
|
pack_plugin_in_order(Plugins, Acc, StatusAcc)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
aggregate_status(List) -> aggregate_status(List, #{}).
|
||||||
|
|
||||||
|
aggregate_status([], Acc) -> Acc;
|
||||||
|
aggregate_status([{Node, Plugins} | List], Acc) ->
|
||||||
|
NewAcc =
|
||||||
|
lists:foldl(fun(Plugin, SubAcc) ->
|
||||||
|
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin,
|
||||||
|
Key = {Name, Vsn},
|
||||||
|
Value = #{node => Node, status => plugin_status(Plugin)},
|
||||||
|
SubAcc#{Key => [Value | maps:get(Key, Acc, [])]}
|
||||||
|
end, Acc, Plugins),
|
||||||
|
aggregate_status(List, NewAcc).
|
||||||
|
|
||||||
|
% running_status: running loaded, stopped
|
||||||
|
%% config_status: not_configured disable enable
|
||||||
|
plugin_status(#{running_status := running}) -> running;
|
||||||
|
plugin_status(_) -> stopped.
|
||||||
|
|
|
@ -104,7 +104,8 @@ api_path(Parts)->
|
||||||
?SERVER ++ filename:join([?BASE_PATH | Parts]).
|
?SERVER ++ filename:join([?BASE_PATH | Parts]).
|
||||||
|
|
||||||
%% Usage:
|
%% Usage:
|
||||||
%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, [], <<"some-token">>)
|
%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>,
|
||||||
|
%% <<"upload">>, <<"image/png">>, [], <<"some-token">>)
|
||||||
%%
|
%%
|
||||||
%% Usage with RequestData:
|
%% Usage with RequestData:
|
||||||
%% Payload = [{upload_type, <<"user_picture">>}],
|
%% Payload = [{upload_type, <<"user_picture">>}],
|
||||||
|
@ -112,8 +113,10 @@ api_path(Parts)->
|
||||||
%% RequestData = [
|
%% RequestData = [
|
||||||
%% {<<"payload">>, PayloadContent}
|
%% {<<"payload">>, PayloadContent}
|
||||||
%% ]
|
%% ]
|
||||||
%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>, <<"upload">>, <<"image/png">>, RequestData, <<"some-token">>)
|
%% upload_request(<<"site.com/api/upload">>, <<"path/to/file.png">>,
|
||||||
-spec upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -> {ok, binary()} | {error, list()} when
|
%% <<"upload">>, <<"image/png">>, RequestData, <<"some-token">>)
|
||||||
|
-spec upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) ->
|
||||||
|
{ok, binary()} | {error, list()} when
|
||||||
URL:: binary(),
|
URL:: binary(),
|
||||||
FilePath:: binary(),
|
FilePath:: binary(),
|
||||||
Name:: binary(),
|
Name:: binary(),
|
||||||
|
@ -125,7 +128,8 @@ upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -
|
||||||
Filename = filename:basename(FilePath),
|
Filename = filename:basename(FilePath),
|
||||||
{ok, Data} = file:read_file(FilePath),
|
{ok, Data} = file:read_file(FilePath),
|
||||||
Boundary = emqx_guid:to_base62(emqx_guid:gen()),
|
Boundary = emqx_guid:to_base62(emqx_guid:gen()),
|
||||||
RequestBody = format_multipart_formdata(Data, RequestData, Name, [Filename], MimeType, Boundary),
|
RequestBody = format_multipart_formdata(Data, RequestData, Name,
|
||||||
|
[Filename], MimeType, Boundary),
|
||||||
ContentType = "multipart/form-data; boundary=" ++ binary_to_list(Boundary),
|
ContentType = "multipart/form-data; boundary=" ++ binary_to_list(Boundary),
|
||||||
ContentLength = integer_to_list(length(binary_to_list(RequestBody))),
|
ContentLength = integer_to_list(length(binary_to_list(RequestBody))),
|
||||||
Headers = [
|
Headers = [
|
||||||
|
@ -140,7 +144,8 @@ upload_request(URL, FilePath, Name, MimeType, RequestData, AuthorizationToken) -
|
||||||
inets:start(),
|
inets:start(),
|
||||||
httpc:request(Method, {URL, Headers, ContentType, RequestBody}, HTTPOptions, Options).
|
httpc:request(Method, {URL, Headers, ContentType, RequestBody}, HTTPOptions, Options).
|
||||||
|
|
||||||
-spec format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> binary() when
|
-spec format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) ->
|
||||||
|
binary() when
|
||||||
Data:: binary(),
|
Data:: binary(),
|
||||||
Params:: list(),
|
Params:: list(),
|
||||||
Name:: binary(),
|
Name:: binary(),
|
||||||
|
@ -154,7 +159,8 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) ->
|
||||||
erlang:iolist_to_binary([
|
erlang:iolist_to_binary([
|
||||||
Acc,
|
Acc,
|
||||||
StartBoundary, LineSeparator,
|
StartBoundary, LineSeparator,
|
||||||
<<"Content-Disposition: form-data; name=\"">>, Key, <<"\"">>, LineSeparator, LineSeparator,
|
<<"Content-Disposition: form-data; name=\"">>, Key, <<"\"">>,
|
||||||
|
LineSeparator, LineSeparator,
|
||||||
Value, LineSeparator
|
Value, LineSeparator
|
||||||
])
|
])
|
||||||
end, <<"">>, Params),
|
end, <<"">>, Params),
|
||||||
|
@ -162,7 +168,8 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) ->
|
||||||
erlang:iolist_to_binary([
|
erlang:iolist_to_binary([
|
||||||
Acc,
|
Acc,
|
||||||
StartBoundary, LineSeparator,
|
StartBoundary, LineSeparator,
|
||||||
<<"Content-Disposition: form-data; name=\"">>, Name, <<"\"; filename=\"">>, FileName, <<"\"">>, LineSeparator,
|
<<"Content-Disposition: form-data; name=\"">>, Name, <<"\"; filename=\"">>,
|
||||||
|
FileName, <<"\"">>, LineSeparator,
|
||||||
<<"Content-Type: ">>, MimeType, LineSeparator, LineSeparator,
|
<<"Content-Type: ">>, MimeType, LineSeparator, LineSeparator,
|
||||||
Data,
|
Data,
|
||||||
LineSeparator
|
LineSeparator
|
||||||
|
|
|
@ -1,158 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-module(emqx_plugins_monitor).
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
|
|
||||||
-export([ get_plugins/0
|
|
||||||
, start_link/0
|
|
||||||
, aggregate_status/1
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([lock/1, unlock/1, lock/0, unlock/0]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
-spec aggregate_status(list(tuple())) ->
|
|
||||||
[#{{Name :: binary(), Vsn :: binary()} => [#{node => node(), status => running | stopped}]}].
|
|
||||||
aggregate_status(List) -> aggregate_status(List, #{}).
|
|
||||||
|
|
||||||
aggregate_status([], Acc) -> Acc;
|
|
||||||
aggregate_status([{Node, Plugins} | List], Acc) ->
|
|
||||||
NewAcc =
|
|
||||||
lists:foldl(fun(Plugin, SubAcc) ->
|
|
||||||
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin,
|
|
||||||
Key = {Name, Vsn},
|
|
||||||
Value = #{node => Node, status => plugin_status(Plugin)},
|
|
||||||
SubAcc#{Key => [Value | maps:get(Key, Acc, [])]}
|
|
||||||
end, Acc, Plugins),
|
|
||||||
aggregate_status(List, NewAcc).
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
lock(Owner) ->
|
|
||||||
gen_server:call(?MODULE, {lock, Owner}).
|
|
||||||
|
|
||||||
unlock(Owner) ->
|
|
||||||
gen_server:call(?MODULE, {unlock, Owner}).
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
{ok, #{ref => schedule_next_check(), failed => 0, lock => undefined}}.
|
|
||||||
|
|
||||||
handle_call({lock, Owner}, _From, #{lock := undefined} = State) ->
|
|
||||||
{reply, true, State#{lock => Owner}};
|
|
||||||
handle_call({lock, Owner}, _From, #{lock := Locker} = State) ->
|
|
||||||
case is_locker_alive(Locker) of
|
|
||||||
true -> {reply, false, State};
|
|
||||||
false -> {reply, true, State#{lock => Owner}}
|
|
||||||
end;
|
|
||||||
handle_call({unlock, Owner}, _From, #{lock := Owner} = State) ->
|
|
||||||
{reply, true, State#{lock => undefined}};
|
|
||||||
handle_call({unlock, _Owner}, _From, #{lock := Locker} = State) ->
|
|
||||||
case is_locker_alive(Locker) of
|
|
||||||
true -> {reply, false, State};
|
|
||||||
false -> {reply, true, State#{lock => undefined}}
|
|
||||||
end;
|
|
||||||
handle_call(Req, _From, State) ->
|
|
||||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
|
||||||
{reply, ignored, State}.
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
|
||||||
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info({timeout, _Ref, check}, State = #{failed := Failed}) ->
|
|
||||||
NewFailed = maybe_alarm(check(), Failed),
|
|
||||||
{noreply, State#{ref => schedule_next_check(), failed => NewFailed}};
|
|
||||||
handle_info(Info, State) ->
|
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
schedule_next_check() ->
|
|
||||||
Check = emqx_plugins:get_config(check_interval, 5000),
|
|
||||||
emqx_misc:start_timer(Check, check).
|
|
||||||
|
|
||||||
is_locker_alive({Node, Pid}) ->
|
|
||||||
case rpc:call(Node, erlang, is_process_alive, [Pid]) of
|
|
||||||
{badrpc, _} -> false;
|
|
||||||
Boolean -> Boolean
|
|
||||||
end.
|
|
||||||
|
|
||||||
check() ->
|
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
|
||||||
case rpc:multicall(Nodes, ?MODULE, get_plugins, [], 15000) of
|
|
||||||
{Plugins, []} -> check_plugins(Plugins);
|
|
||||||
{_ , BadNodes} -> {error, lists:flatten(io_lib:format("~p rpc to ~p failed", [node(), BadNodes]))}
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_plugins() ->
|
|
||||||
{node(), emqx_plugins:list()}.
|
|
||||||
|
|
||||||
check_plugins(Plugins) ->
|
|
||||||
StatusMap = aggregate_status(Plugins),
|
|
||||||
case ensure_install(StatusMap) of
|
|
||||||
{error, Bad} ->
|
|
||||||
?SLOG(warning, #{msg => "plugin_not_install_on_some_node", broken_plugins => Bad});
|
|
||||||
{ok, Good} ->
|
|
||||||
check_status(Good)
|
|
||||||
end.
|
|
||||||
|
|
||||||
ensure_install(Status) ->
|
|
||||||
Size = length(mria_mnesia:running_nodes()),
|
|
||||||
{Good, Bad} =
|
|
||||||
maps:fold(fun({Key, Value}, {GoodAcc, BadAcc}) ->
|
|
||||||
case length(Value) =:= Size of
|
|
||||||
true -> {GoodAcc#{Key => Value}, BadAcc};
|
|
||||||
false -> {GoodAcc, BadAcc#{Key => Value}}
|
|
||||||
end
|
|
||||||
end, {#{}, #{}}, Status),
|
|
||||||
case map_size(Bad) =:= 0 of
|
|
||||||
true -> {ok, Good};
|
|
||||||
false -> {error, Bad}
|
|
||||||
end.
|
|
||||||
|
|
||||||
check_status(_Plugins) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%% alarm when failed 3 time.
|
|
||||||
maybe_alarm({error, _Reason}, Failed) when Failed >= 2 ->
|
|
||||||
%alarm(Reason),
|
|
||||||
0;
|
|
||||||
maybe_alarm({error, _Reason}, Failed) -> Failed + 1;
|
|
||||||
maybe_alarm(ok, _Failed) -> 0.
|
|
||||||
|
|
||||||
%% running_status: running loaded, stopped
|
|
||||||
%% config_status: not_configured disable enable
|
|
||||||
plugin_status(#{running_status := running}) -> running;
|
|
||||||
plugin_status(_) -> stopped.
|
|
||||||
|
|
||||||
-define(RESOURCE, plugins_lock).
|
|
||||||
|
|
||||||
lock() ->
|
|
||||||
ekka_locker:acquire(?MODULE, ?RESOURCE, all, undefined).
|
|
||||||
|
|
||||||
unlock() ->
|
|
||||||
ekka_locker:release(?MODULE, ?RESOURCE, all).
|
|
|
@ -26,8 +26,9 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
%% TODO: Add monitor plugins change.
|
||||||
Monitor = emqx_plugins_monitor,
|
Monitor = emqx_plugins_monitor,
|
||||||
Children = [
|
_Children = [
|
||||||
#{id => Monitor,
|
#{id => Monitor,
|
||||||
start => {Monitor, start_link, []},
|
start => {Monitor, start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
|
@ -42,4 +43,4 @@ init([]) ->
|
||||||
intensity => 100,
|
intensity => 100,
|
||||||
period => 10
|
period => 10
|
||||||
},
|
},
|
||||||
{ok, {SupFlags, Children}}.
|
{ok, {SupFlags, []}}.
|
||||||
|
|
Loading…
Reference in New Issue