diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugin.erl b/apps/emqx_management/src/emqx_mgmt_api_plugin.erl new file mode 100644 index 000000000..a722c2149 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_api_plugin.erl @@ -0,0 +1,396 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_api_plugin). + +-behaviour(minirest_api). + +-include_lib("kernel/include/file.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_plugins/include/emqx_plugins.hrl"). + +-export([ api_spec/0 + , fields/1 + , paths/0 + , schema/1 + , namespace/0 + ]). + +-export([ list_plugins/2 + , upload_install/2 + , plugin/2 + , update_plugin/2 + , update_boot_order/2 + ]). + +-export([ validate_name/1 + , install_package/2 + , delete_package/1 + , describe_package/1 + , ensure_action/2 + ]). + +-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_.]*$"). + +namespace() -> "plugins". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). + +%% Don't change the path's order +paths() -> + [ + "/plugins", + "/plugins/:name", + "/plugins/install", + "/plugins/:name/:action", + "/plugins/:name/move" + ]. + +schema("/plugins") -> + #{ + 'operationId' => list_plugins, + get => #{ + description => "List all install plugins.
" + "Plugins starts in the order of the list from the top to the bottom.
" + "Using `POST /plugins/{name}/move` to change the boot order.", + responses => #{ + 200 => hoconsc:array(hoconsc:ref(plugin)) + } + } + }; +schema("/plugins/install") -> + #{ + 'operationId' => upload_install, + post => #{ + description => "Install a plugin(plugin-vsn.tar.gz)." + "Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) to develop plugin.", + 'requestBody' => #{ + content => #{ + 'multipart/form-data' => #{ + schema => #{ + type => object, + properties => #{ + plugin => #{type => string, format => binary}}}, + encoding => #{plugin => #{'contentType' => 'application/gzip'}}}}}, + responses => #{200 => <<"OK">>} + } + }; +schema("/plugins/:name") -> + #{ + 'operationId' => plugin, + get => #{ + description => "Describe a plugin according `release.json` and `README.md`.", + parameters => [hoconsc:ref(name)], + responses => #{ + 200 => hoconsc:ref(plugin), + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found") + } + }, + delete => #{ + description => "Uninstall a plugin package.", + parameters => [hoconsc:ref(name)], + responses => #{ + 204 => <<"Uninstall successfully">>, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found") + } + } + }; +schema("/plugins/:name/:action") -> + #{ + 'operationId' => update_plugin, + put => #{ + description => "start/stop a installed plugin.
" + "- **start**: start the plugin.
" + "- **stop**: stop the plugin.
", + parameters => [ + hoconsc:ref(name), + {action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}], + responses => #{ + 200 => <<"OK">>, + 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found") + } + } + }; +schema("/plugins/:name/move") -> + #{ + 'operationId' => update_boot_order, + post => #{ + description => "Setting the boot order of plugins.", + parameters => [hoconsc:ref(name)], + 'requestBody' => move_request_body(), + responses => #{200 => <<"OK">>} + } + }. + +fields(plugin) -> + [ + {name, hoconsc:mk(binary(), + #{ + desc => "Name-Vsn: without .tar.gz", + validator => fun ?MODULE:validate_name/1, + nullable => false, + example => "emqx_plugin_template-5.0-rc.1"}) + }, + {author, hoconsc:mk(list(string()), #{example => [<<"EMQ X Team">>]})}, + {builder, hoconsc:ref(?MODULE, builder)}, + {built_on_otp_release, hoconsc:mk(string(), #{example => "24"})}, + {compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})}, + {git_commit_or_build_date, hoconsc:mk(string(), #{ + example => "2021-12-25", + desc => "Last git commit date by `git log -1 --pretty=format:'%cd' --date=format:'%Y-%m-%d`." + " If the last commit date is not available, the build date will be presented." + })}, + {functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})}, + {git_ref, hoconsc:mk(string(), #{example => "ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1"})}, + {metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})}, + {rel_vsn, hoconsc:mk(binary(), + #{desc => "Plugins release version", + nullable => false, + example => <<"5.0-rc.1">>}) + }, + {rel_apps, hoconsc:mk(hoconsc:array(binary()), + #{desc => "Aplications in plugin.", + nullable => false, + example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]}) + }, + {repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})}, + {description, hoconsc:mk(binary(), + #{desc => "Plugin description.", + nullable => false, + example => "This is an demo plugin description"}) + }, + {running_status, hoconsc:mk(hoconsc:array(hoconsc:ref(running_status)), #{nullable => false})}, + {readme, hoconsc:mk(binary(), #{ + example => "This is an demo plugin.", + desc => "only return when `GET /plugins/{name}`.", + nullable => true})} + ]; +fields(name) -> + [{name, hoconsc:mk(binary(), + #{ + desc => list_to_binary(?NAME_RE), + example => "emqx_plugin_template-5.0-rc.1", + in => path, + validator => fun ?MODULE:validate_name/1 + })} + ]; +fields(builder) -> + [ + {contact, hoconsc:mk(string(), #{example => "emqx-support@emqx.io"})}, + {name, hoconsc:mk(string(), #{example => "EMQ X Team"})}, + {website, hoconsc:mk(string(), #{example => "www.emqx.com"})} + ]; +fields(position) -> + [{position, hoconsc:mk(hoconsc:union([top, bottom, binary()]), + #{ + desc => """ + Enable auto-boot at position in the boot list, where Position could be + 'top', 'bottom', or 'before:other-vsn' to specify a relative position. + """, + nullable => true + })}]; +fields(running_status) -> + [ + {node, hoconsc:mk(string(), #{example => "emqx@127.0.0.1"})}, + {status, hoconsc:mk(hoconsc:enum([running, stopped]), #{ + desc => "Install plugin status at runtime
" + "1. running: plugin is running.
" + "2. stopped: plugin is stopped.
" + })} + ]. + +move_request_body() -> + emqx_dashboard_swagger:schema_with_examples(hoconsc:ref(?MODULE, position), + #{ + move_to_top => #{ + summary => <<"move plugin on the top">>, + value => #{position => <<"top">>} + }, + move_to_bottom => #{ + summary => <<"move plugin on the bottom">>, + value => #{position => <<"bottom">>} + }, + move_to_before => #{ + summary => <<"move plugin before other plugins">>, + value => #{position => <<"before:emqx_plugin_demo-5.1-rc.2">>} + } + }). + +validate_name(Name) -> + NameLen = byte_size(Name), + case NameLen > 0 andalso NameLen =< 256 of + true -> + case re:run(Name, ?NAME_RE) of + nomatch -> {error, "Name should be " ?NAME_RE}; + _ -> ok + end; + false -> {error, "Name Length must =< 256"} + end. + +%% API CallBack Begin + +list_plugins(get, _) -> + Plugins = cluster_call(emqx_plugins_monitor, get_plugins, [], 15000), + {200, format_plugins(Plugins)}. + +upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -> + [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)), + %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall + %% TODO what happened when a new node join in? + %% emqx_plugins_monitor should copy plugins from other core node when boot-up. + Res = cluster_call(?MODULE, install_package, [FileName, Bin], 25000), + case lists:filter(fun(R) -> R =/= ok end, Res) of + [] -> {200}; + [{error, Reason} | _] -> + {400, #{code => 'UNEXPECTED_ERROR', + message => iolist_to_binary(io_lib:format("~p", [Reason]))}} + end; +upload_install(post, #{}) -> + {400, #{code => 'BAD_FORM_DATA', + message => <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>} + }. + +plugin(get, #{bindings := #{name := Name}}) -> + Plugins = cluster_call(?MODULE, describe_package, [Name], 10000), + case format_plugins(Plugins) of + [Plugin] -> {200, Plugin}; + [] -> {404, #{code => 'NOT_FOUND', message => Name}} + end; + +plugin(delete, #{bindings := #{name := Name}}) -> + return(204, cluster_rpc(?MODULE, delete_package, [Name])). + +update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> + return(200, cluster_rpc(?MODULE, ensure_action, [Name, Action])). + +update_boot_order(post, #{bindings := #{name := Name}, body := Body}) -> + case parse_position(Body, Name) of + {error, Reason} -> {400, #{code => 'BAD_POSITION', message => Reason}}; + Position -> + case emqx_plugins:ensure_enabled(Name, Position) of + ok -> {200}; + {error, Reason} -> + {400, #{code => 'MOVE_FAILED', + message => iolist_to_binary(io_lib:format("~p", [Reason]))}} + end + end. + +%% API CallBack End + +%% For RPC upload_install/2 +install_package(FileName, Bin) -> + File = filename:join(emqx_plugins:install_dir(), FileName), + ok = file:write_file(File, Bin), + PackageName = string:trim(FileName, trailing, ".tar.gz"), + emqx_plugins:ensure_installed(PackageName). + +%% For RPC plugin get +describe_package(Name) -> + Node = node(), + case emqx_plugins:describe(Name) of + {ok, Plugin} -> {Node, [Plugin]}; + _ -> {Node, []} + end. + +%% For RPC plugin delete +delete_package(Name) -> + case emqx_plugins:ensure_stopped(Name) of + ok -> + emqx_plugins:ensure_disabled(Name), + emqx_plugins:delete_package(Name); + Error -> Error + end. + +%% for RPC plugin update +ensure_action(Name, start) -> + emqx_plugins:ensure_enabled(Name), + emqx_plugins:ensure_started(Name); +ensure_action(Name, stop) -> + emqx_plugins:ensure_stopped(Name), + emqx_plugins:ensure_disabled(Name); +ensure_action(Name, restart) -> + emqx_plugins:ensure_enabled(Name), + emqx_plugins:restart(Name). + +cluster_call(Mod, Fun, Args, Timeout) -> + Nodes = mria_mnesia:running_nodes(), + {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout), + BadNodes =/= [] andalso + ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, length(Args)}}), + GoodRes. + +cluster_rpc(Mod, Fun, Args) -> + case emqx_cluster_rpc:multicall(Mod, Fun, Args, all, 30000) of + {ok, _TnxId, Res} -> Res; + {retry, TnxId, Res, Node} -> + ?SLOG(error, #{msg => "failed_to_update_plugin_in_cluster", nodes => Node, + tnx_id => TnxId, mfa => {Mod, Fun, Args}}), + Res; + {error, Error} -> Error + end. + +return(Code, ok) -> {Code}; +return(Code, {ok, Result}) -> {Code, Result}; +return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) -> + {404, #{code => 'NOT_FOUND', message => Path}}; +return(_, {error, Reason}) -> + {400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}. + +parse_position(#{<<"position">> := <<"top">>}, _) -> front; +parse_position(#{<<"position">> := <<"bottom">>}, _) -> rear; +parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) -> {error, <<"Can't before:self">>}; +parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) -> {before, binary_to_list(Before)}; +parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}. + +format_plugins(List) -> + StatusList = merge_running_status(List, #{}), + {Plugins, _} = + lists:foldr(fun({_Node, Plugins}, {Acc, StatusAcc}) -> + format_plugins_in_order(Plugins, Acc, StatusAcc) + end, {[], StatusList}, List), + Plugins. + +format_plugins_in_order(Plugins, Acc0, StatusAcc0) -> + lists:foldr(fun(Plugin0, {Acc, StatusAcc}) -> + #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin0, + case maps:find({Name, Vsn}, StatusAcc) of + {ok, Status} -> + Plugin1 = maps:without([running_status, config_status], Plugin0), + Plugins2 = Plugin1#{running_status => Status}, + { + [Plugins2 | Acc], + maps:remove({Name, Vsn}, StatusAcc) + }; + error -> {Acc, StatusAcc} + end + end, {Acc0, StatusAcc0}, Plugins). + +merge_running_status([], Acc) -> Acc; +merge_running_status([{Node, Plugins} | List], Acc) -> + NewAcc = + lists:foldl(fun(Plugin, SubAcc) -> + #{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin, + Key = {Name, Vsn}, + Value = #{node => Node, status => plugin_status(Plugin)}, + SubAcc#{Key => [Value | maps:get(Key, Acc, [])]} + end, Acc, Plugins), + merge_running_status(List, NewAcc). + +%% running_status: running loaded, stopped +%% config_status: not_configured disable enable +plugin_status(#{running_status := running}) -> running; +plugin_status(_) -> stopped. diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index bcfa1f0d9..87312d087 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -43,6 +43,9 @@ %% internal -export([ do_ensure_started/1 ]). +-export([ + install_dir/0 + ]). -ifdef(TEST). -compile(export_all). @@ -63,12 +66,12 @@ %% @doc Describe a plugin. -spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}. -describe(NameVsn) -> read_plugin(NameVsn). +describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}). %% @doc Install a .tar.gz package placed in install_dir. -spec ensure_installed(name_vsn()) -> ok | {error, any()}. ensure_installed(NameVsn) -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, _} -> ok; {error, _} -> @@ -80,7 +83,7 @@ do_ensure_installed(NameVsn) -> TarGz = pkg_file(NameVsn), case erl_tar:extract(TarGz, [{cwd, install_dir()}, compressed]) of ok -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, _} -> ok; {error, Reason} -> ?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}), @@ -103,7 +106,7 @@ do_ensure_installed(NameVsn) -> %% If a plugin is running, or enabled, error is returned. -spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}. ensure_uninstalled(NameVsn) -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, #{running_status := RunningSt}} when RunningSt =/= stopped -> {error, #{reason => "bad_plugin_running_status", hint => "stop_the_plugin_first" @@ -134,7 +137,7 @@ ensure_disabled(NameVsn) -> ensure_state(NameVsn, Position, State) when is_binary(NameVsn) -> ensure_state(binary_to_list(NameVsn), Position, State); ensure_state(NameVsn, Position, State) -> - case read_plugin(NameVsn) of + case read_plugin(NameVsn, #{}) of {ok, _} -> Item = #{ name_vsn => NameVsn , enable => State @@ -259,7 +262,7 @@ list() -> Pattern = filename:join([install_dir(), "*", "release.json"]), All = lists:filtermap( fun(JsonFile) -> - case read_plugin({file, JsonFile}) of + case read_plugin({file, JsonFile}, #{}) of {ok, Info} -> {true, Info}; {error, Reason} -> @@ -314,26 +317,36 @@ tryit(WhichOp, F) -> %% read plugin info from the JSON file %% returns {ok, Info} or {error, Reason} -read_plugin(NameVsn) -> +read_plugin(NameVsn, Options) -> tryit("read_plugin_info", - fun() -> {ok, do_read_plugin(NameVsn)} end). + fun() -> {ok, do_read_plugin(NameVsn, Options)} end). -do_read_plugin({file, InfoFile}) -> +do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}). + +do_read_plugin({file, InfoFile}, Options) -> [_, NameVsn | _] = lists:reverse(filename:split(InfoFile)), case hocon:load(InfoFile, #{format => richmap}) of {ok, RichMap} -> - Info = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile), - maps:merge(Info, plugin_status(NameVsn)); + Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile), + Info1 = plugins_readme(NameVsn, Options, Info0), + plugin_status(NameVsn, Info1); {error, Reason} -> throw(#{error => "bad_info_file", path => InfoFile, return => Reason }) end; -do_read_plugin(NameVsn) -> - do_read_plugin({file, info_file(NameVsn)}). +do_read_plugin(NameVsn, Options) -> + do_read_plugin({file, info_file(NameVsn)}, Options). -plugin_status(NameVsn) -> +plugins_readme(NameVsn, #{fill_readme := true}, Info) -> + case file:read_file(readme_file(NameVsn)) of + {ok, Bin} -> Info#{readme => Bin}; + _ -> Info#{readme => <<>>} + end; +plugins_readme(_NameVsn, _Options, Info) -> Info. + +plugin_status(NameVsn, Info) -> {AppName, _AppVsn} = parse_name_vsn(NameVsn), RunningSt = case application:get_key(AppName, vsn) of @@ -357,9 +370,9 @@ plugin_status(NameVsn) -> [true] -> enabled; [false] -> disabled end, - #{ running_status => RunningSt - , config_status => ConfSt - }. + Info#{ running_status => RunningSt + , config_status => ConfSt + }. bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); @@ -592,6 +605,9 @@ dir(NameVsn) -> info_file(NameVsn) -> filename:join([dir(NameVsn), "release.json"]). +readme_file(NameVsn) -> + filename:join([dir(NameVsn), "README.md"]). + running_apps() -> lists:map(fun({N, _, V}) -> {N, V} diff --git a/apps/emqx_plugins/src/emqx_plugins_app.erl b/apps/emqx_plugins/src/emqx_plugins_app.erl index 6b53438bb..70fab549f 100644 --- a/apps/emqx_plugins/src/emqx_plugins_app.erl +++ b/apps/emqx_plugins/src/emqx_plugins_app.erl @@ -23,8 +23,8 @@ ]). start(_Type, _Args) -> - {ok, Sup} = emqx_plugins_sup:start_link(), ok = emqx_plugins:ensure_started(), %% load all pre-configured + {ok, Sup} = emqx_plugins_sup:start_link(), {ok, Sup}. stop(_State) -> diff --git a/apps/emqx_plugins/src/emqx_plugins_monitor.erl b/apps/emqx_plugins/src/emqx_plugins_monitor.erl new file mode 100644 index 000000000..ee06f1bcf --- /dev/null +++ b/apps/emqx_plugins/src/emqx_plugins_monitor.erl @@ -0,0 +1,83 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_plugins_monitor). +-behaviour(gen_server). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ get_plugins/0 + , start_link/0 + ]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #{ref => next_check_time(), failed => 0}}. + +handle_call(Req, _From, State) -> + ?SLOG(error, #{msg => "unexpected_call", call => Req}), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), + {noreply, State}. + +handle_info({timeout, Ref, check}, State = #{failed := Failed}) -> + erlang:cancel_timer(Ref), + NewFailed = maybe_alarm(check(), Failed), + {noreply, State#{ref => next_check_time(), failed => NewFailed}}; +handle_info(Info, State) -> + ?SLOG(error, #{msg => "unexpected_info", info => Info}), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +next_check_time() -> + Check = emqx_plugins:get_config(check_interval, 5000), + emqx_misc:start_timer(Check, check). + +check() -> + Nodes = mria_mnesia:running_nodes(), + case rpc:multicall(Nodes, ?MODULE, get_plugins_list, [], 15000) of + {Plugins, []} -> check_plugins(Plugins); + {_ , BadNodes} -> {error, io_lib:format("~p rpc to ~p failed", [node(), BadNodes])} + end. + +get_plugins() -> + {node(), emqx_plugins:list()}. + +check_plugins(Plugins) -> + check_status(Plugins), + ok. + +check_status(_Plugins) -> + ok. + +%% alarm when failed 3 time. +maybe_alarm({error, _Reason}, Failed) when Failed >= 2 -> + %alarm(Reason), + 0; +maybe_alarm({error, _Reason}, Failed) -> Failed + 1; +maybe_alarm(ok, _Failed) -> 0. diff --git a/apps/emqx_plugins/src/emqx_plugins_schema.erl b/apps/emqx_plugins/src/emqx_plugins_schema.erl index 2de8430a1..b7498d2ee 100644 --- a/apps/emqx_plugins/src/emqx_plugins_schema.erl +++ b/apps/emqx_plugins/src/emqx_plugins_schema.erl @@ -66,6 +66,7 @@ state_fields() -> root_fields() -> [ {states, fun states/1} , {install_dir, fun install_dir/1} + , {check_interval, fun check_interval/1} ]. states(type) -> hoconsc:array(hoconsc:ref(?MODULE, state)); @@ -87,3 +88,11 @@ the subdirectory named as emqx_foo_bar-0.1.0. NOTE: For security reasons, this directory should **NOT** be writable by anyone except emqx (or any user which runs EMQX). """. + +check_interval(type) -> emqx_schema:duration(); +check_interval(default) -> "5s"; +check_interval(T) when T =/= desc -> undefined; +check_interval(desc) -> """ +Check interval: check if the status of the plugins in the cluster is consistent,
+if the results of 3 consecutive checks are not consistent, then alarm. +""". diff --git a/apps/emqx_plugins/src/emqx_plugins_sup.erl b/apps/emqx_plugins/src/emqx_plugins_sup.erl index 45d9b8ca2..5326e525d 100644 --- a/apps/emqx_plugins/src/emqx_plugins_sup.erl +++ b/apps/emqx_plugins/src/emqx_plugins_sup.erl @@ -26,5 +26,20 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Children = [], - {ok, {{one_for_one, 10, 10}, Children}}. + Monitor = emqx_plugins_monitor, + Children = [ + #{id => Monitor, + start => {Monitor, start_link, []}, + restart => permanent, + shutdown => brutal_kill, + type => worker, + modules => [Monitor] + } + ], + SupFlags = + #{ + strategy => one_for_one, + intensity => 100, + period => 10 + }, + {ok, {SupFlags, Children}}. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/demo_src b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/demo_src new file mode 160000 index 000000000..ddab50faf --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/demo_src @@ -0,0 +1 @@ +Subproject commit ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1 diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/README.md b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/README.md new file mode 100644 index 000000000..3c87fd974 --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/README.md @@ -0,0 +1,21 @@ +# emqx-plugin-template + +This is a template plugin for EMQ X >= 5.0. + +For EMQ X >= 4.3, please see branch emqx-v4 + +For older EMQ X versions, plugin development is no longer maintained. + +## Release + +A EMQ X plugin release is a zip package including + +1. A JSON format metadata file +2. A tar file with plugin's apps packed + +Execute `make rel` to have the package created like: + +``` +_build/default/emqx_plugrel/emqx_plugin_template-.tar.gz +``` +See EMQ X documents for details on how to deploy the plugin. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/priv/config.hocon b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/priv/config.hocon new file mode 100644 index 000000000..87844e8aa --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/priv/config.hocon @@ -0,0 +1,4 @@ +## This is a demo config in HOCON format +## The same format used by EMQ X since 5.0 + +magic_n = 42 diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_cli_demo.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_cli_demo.erl new file mode 100644 index 000000000..6186f3aa7 --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_cli_demo.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cli_demo). + +-export([cmd/1]). + +cmd(["arg1", "arg2"]) -> + emqx_ctl:print("ok"); + +cmd(_) -> + emqx_ctl:usage([{"cmd arg1 arg2", "cmd demo"}]). + diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template.app.src b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template.app.src new file mode 100644 index 000000000..609da62fd --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template.app.src @@ -0,0 +1,14 @@ +{application, emqx_plugin_template, + [{description, "EMQ X Plugin Template"}, + {vsn, "5.0.0"}, + {modules, []}, + {registered, [emqx_plugin_template_sup]}, + {applications, [kernel,stdlib,map_sets]}, + {mod, {emqx_plugin_template_app,[]}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, [{"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx-plugin-template"} + ]} + ]}. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template.erl new file mode 100644 index 000000000..9e37120f3 --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template.erl @@ -0,0 +1,198 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_template). + +%% for #message{} record +%% no need for this include if we call emqx_message:to_map/1 to convert it to a map +-include_lib("emqx/include/emqx.hrl"). + +%% for logging +-include_lib("emqx/include/logger.hrl"). + +-export([ load/1 + , unload/0 + ]). + +%% Client Lifecircle Hooks +-export([ on_client_connect/3 + , on_client_connack/4 + , on_client_connected/3 + , on_client_disconnected/4 + , on_client_authenticate/3 + , on_client_check_acl/5 + , on_client_subscribe/4 + , on_client_unsubscribe/4 + ]). + +%% Session Lifecircle Hooks +-export([ on_session_created/3 + , on_session_subscribed/4 + , on_session_unsubscribed/4 + , on_session_resumed/3 + , on_session_discarded/3 + , on_session_takeovered/3 + , on_session_terminated/4 + ]). + +%% Message Pubsub Hooks +-export([ on_message_publish/2 + , on_message_delivered/3 + , on_message_acked/3 + , on_message_dropped/4 + ]). + +%% Called when the plugin application start +load(Env) -> + emqx:hook('client.connect', {?MODULE, on_client_connect, [Env]}), + emqx:hook('client.connack', {?MODULE, on_client_connack, [Env]}), + emqx:hook('client.connected', {?MODULE, on_client_connected, [Env]}), + emqx:hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}), + emqx:hook('client.authenticate', {?MODULE, on_client_authenticate, [Env]}), + emqx:hook('client.check_acl', {?MODULE, on_client_check_acl, [Env]}), + emqx:hook('client.subscribe', {?MODULE, on_client_subscribe, [Env]}), + emqx:hook('client.unsubscribe', {?MODULE, on_client_unsubscribe, [Env]}), + emqx:hook('session.created', {?MODULE, on_session_created, [Env]}), + emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Env]}), + emqx:hook('session.unsubscribed',{?MODULE, on_session_unsubscribed, [Env]}), + emqx:hook('session.resumed', {?MODULE, on_session_resumed, [Env]}), + emqx:hook('session.discarded', {?MODULE, on_session_discarded, [Env]}), + emqx:hook('session.takeovered', {?MODULE, on_session_takeovered, [Env]}), + emqx:hook('session.terminated', {?MODULE, on_session_terminated, [Env]}), + emqx:hook('message.publish', {?MODULE, on_message_publish, [Env]}), + emqx:hook('message.delivered', {?MODULE, on_message_delivered, [Env]}), + emqx:hook('message.acked', {?MODULE, on_message_acked, [Env]}), + emqx:hook('message.dropped', {?MODULE, on_message_dropped, [Env]}). + +%%-------------------------------------------------------------------- +%% Client Lifecircle Hooks +%%-------------------------------------------------------------------- + +on_client_connect(ConnInfo, Props, _Env) -> + %% this is to demo the usage of EMQ X's structured-logging macro + %% * Recommended to always have a `msg` field, + %% * Use underscore instead of space to help log indexers, + %% * Try to use static fields + ?SLOG(debug, #{msg => "demo_log_msg_on_client_connect", + conninfo => ConnInfo, + props => Props}), + {ok, Props}. + +on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) -> + io:format("Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n", + [ClientId, ConnInfo, Rc, Props]), + {ok, Props}. + +on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) -> + io:format("Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n", + [ClientId, ClientInfo, ConnInfo]). + +on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) -> + io:format("Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n", + [ClientId, ReasonCode, ClientInfo, ConnInfo]). + +on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) -> + io:format("Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]), + {ok, Result}. + +on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) -> + io:format("Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n", + [ClientId, PubSub, Topic, Result]), + {ok, Result}. + +on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) -> + io:format("Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]), + {ok, TopicFilters}. + +on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) -> + io:format("Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]), + {ok, TopicFilters}. + +%%-------------------------------------------------------------------- +%% Session Lifecircle Hooks +%%-------------------------------------------------------------------- + +on_session_created(#{clientid := ClientId}, SessInfo, _Env) -> + io:format("Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]). + +on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) -> + io:format("Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]). + +on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) -> + io:format("Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]). + +on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) -> + io:format("Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]). + +on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) -> + io:format("Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]). + +on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) -> + io:format("Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]). + +on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) -> + io:format("Session(~s) is terminated due to ~p~nSession Info: ~p~n", + [ClientId, Reason, SessInfo]). + +%%-------------------------------------------------------------------- +%% Message PubSub Hooks +%%-------------------------------------------------------------------- + +%% Transform message and return +on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) -> + {ok, Message}; + +on_message_publish(Message, _Env) -> + io:format("Publish ~s~n", [emqx_message:to_map(Message)]), + {ok, Message}. + +on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) -> + ok; +on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) -> + io:format("Message dropped by node ~s due to ~s: ~p~n", + [Node, Reason, emqx_message:to_map(Message)]). + +on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) -> + io:format("Message delivered to client(~s): ~p~n", + [ClientId, emqx_message:to_map(Message)]), + {ok, Message}. + +on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) -> + io:format("Message acked by client(~s): ~p~n", + [ClientId, emqx_message:to_map(Message)]). + +%% Called when the plugin application stop +unload() -> + emqx:unhook('client.connect', {?MODULE, on_client_connect}), + emqx:unhook('client.connack', {?MODULE, on_client_connack}), + emqx:unhook('client.connected', {?MODULE, on_client_connected}), + emqx:unhook('client.disconnected', {?MODULE, on_client_disconnected}), + emqx:unhook('client.authenticate', {?MODULE, on_client_authenticate}), + emqx:unhook('client.check_acl', {?MODULE, on_client_check_acl}), + emqx:unhook('client.subscribe', {?MODULE, on_client_subscribe}), + emqx:unhook('client.unsubscribe', {?MODULE, on_client_unsubscribe}), + emqx:unhook('session.created', {?MODULE, on_session_created}), + emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}), + emqx:unhook('session.unsubscribed',{?MODULE, on_session_unsubscribed}), + emqx:unhook('session.resumed', {?MODULE, on_session_resumed}), + emqx:unhook('session.discarded', {?MODULE, on_session_discarded}), + emqx:unhook('session.takeovered', {?MODULE, on_session_takeovered}), + emqx:unhook('session.terminated', {?MODULE, on_session_terminated}), + emqx:unhook('message.publish', {?MODULE, on_message_publish}), + emqx:unhook('message.delivered', {?MODULE, on_message_delivered}), + emqx:unhook('message.acked', {?MODULE, on_message_acked}), + emqx:unhook('message.dropped', {?MODULE, on_message_dropped}). + diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template_app.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template_app.erl new file mode 100644 index 000000000..5a4cabce8 --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template_app.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_template_app). + +-behaviour(application). + +-emqx_plugin(?MODULE). + +-export([ start/2 + , stop/1 + ]). + +start(_StartType, _StartArgs) -> + {ok, Sup} = emqx_plugin_template_sup:start_link(), + emqx_plugin_template:load(application:get_all_env()), + {ok, Sup}. + +stop(_State) -> + emqx_plugin_template:unload(). + diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template_sup.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template_sup.erl new file mode 100644 index 000000000..bd9961f3a --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/emqx_plugin_template-5.0.0/src/emqx_plugin_template_sup.erl @@ -0,0 +1,30 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_template_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, { {one_for_all, 0, 1}, []} }. + diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/map_sets-1.1.0/src/map_sets.app.src b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/map_sets-1.1.0/src/map_sets.app.src new file mode 100644 index 000000000..0025a8a7b --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/map_sets-1.1.0/src/map_sets.app.src @@ -0,0 +1,9 @@ +{application,map_sets, + [{description,"sets-like wrapper based on maps"}, + {vsn,"1.1.0"}, + {registered,[]}, + {applications,[kernel,stdlib]}, + {env,[]}, + {modules,[]}, + {links,[{"Github","https://github.com/k32/map_sets"}]}, + {licenses,["public domain"]}]}. diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/map_sets-1.1.0/src/map_sets.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/map_sets-1.1.0/src/map_sets.erl new file mode 100644 index 000000000..8a001e9e4 --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/map_sets-1.1.0/src/map_sets.erl @@ -0,0 +1,179 @@ +%% s/sets/map_sets/g +%% Why? Because spead (This module piggybacks on `maps' module's BIFs) +-module(map_sets). + +-export([ new/0 + , is_set/1 + , size/1 + , to_list/1 + , from_list/1 + ]). + +-export([ is_element/2 + , add_element/2 + , del_element/2 + ]). + +-export([ union/2 + , union/1 + , intersection/2 + , intersection/1 + ]). + +-export([ is_disjoint/2 + ]). + +-export([ subtract/2 + , is_subset/2 + ]). + +-export([ fold/3 + , filter/2 + ]). + +-export_type([set/1, set/0]). + +-type set(Key) :: #{Key => term()}. +-type set() :: set(term()). + +-define(UNUSED, []). + +-ifdef(OTP_RELEASE). %% OTP21+ supports map iterators + +-define(iterable(A), maps:iterator(A)). + +-define(iterate(I, Last, K, Next, Cons), + case maps:next(I) of + none -> Last; + {K, _, Next} -> Cons + end). + +-else. + +-define(iterable(A), maps:keys(A)). + +-define(iterate(I, Last, K, Next, Cons), + case I of + [] -> Last; + [K|Next] -> Cons + end). + +-endif. + +-spec new() -> set(). +new() -> + #{}. + +-spec is_set(term()) -> boolean(). +is_set(A) -> + is_map(A). + +-spec size(set()) -> non_neg_integer(). +size(A) -> + maps:size(A). + +-spec fold(Function, Acc, Set) -> Acc when + Function :: fun((Element, Acc) -> Acc), + Set :: set(Element), + Acc :: term(). +fold(Fun, A, B) -> + maps:fold( fun(K, _, Acc) -> Fun(K, Acc) end + , A + , B). + +-spec filter(Predicate, Set) -> Set when + Predicate :: fun((Element) -> boolean()), + Set :: set(Element). +filter(P, A) -> + maps:filter( fun(K, _) -> P(K) end + , A). + +-spec to_list(set(Elem)) -> [Elem]. +to_list(A) -> + maps:keys(A). + +-spec from_list([Elem]) -> set(Elem). +from_list(L) -> + maps:from_list([{I, ?UNUSED} || I <- L]). + +-spec is_element(Elem, set(Elem)) -> boolean(). +is_element(Elem, Set) -> + maps:is_key(Elem, Set). + +-spec add_element(Elem, set(Elem)) -> set(Elem). +add_element(Elem, Set) -> + Set#{Elem => ?UNUSED}. + +-spec del_element(Elem, set(Elem)) -> set(Elem). +del_element(Elem, Set) -> + maps:remove(Elem, Set). + +-spec is_subset(set(Elem), set(Elem)) -> boolean(). +is_subset(S1, S2) -> + is_subset_(?iterable(S1), S2). + +is_subset_(Iter, S2) -> + ?iterate(Iter, + true, + K, Next, + case maps:is_key(K, S2) of + true -> + is_subset_(Next, S2); + false -> + false + end). + +-spec subtract(set(Elem), set(Elem)) -> set(Elem). +subtract(S1, S2) -> + maps:without(maps:keys(S2), S1). + +-spec union(set(Elem), set(Elem)) -> set(Elem). +union(S1, S2) -> + maps:merge(S1, S2). + +-spec union([set(Elem)]) -> set(Elem). +union(L) -> + lists:foldl(fun maps:merge/2, #{}, L). + +-spec intersection(set(Elem), set(Elem)) -> set(Elem). +intersection(S1, S2) -> + case maps:size(S1) > maps:size(S2) of + true -> + intersection_(S1, S2); + false -> + intersection_(S2, S1) + end. +intersection_(Large, Small) -> + maps:fold( fun(E, _, Acc) -> + case maps:is_key(E, Large) of + true -> + Acc #{E => ?UNUSED}; + _ -> + Acc + end + end + , #{} + , Small). + +-spec intersection(nonempty_list(set(Elem))) -> set(Elem). +intersection([H|T]) -> + lists:foldl(fun intersection/2, H, T). + +-spec is_disjoint(set(Elem), set(Elem)) -> boolean(). +is_disjoint(S1, S2) -> + case maps:size(S1) > maps:size(S2) of + true -> + is_disjoint_(S1, ?iterable(S2)); + false -> + is_disjoint_(S2, ?iterable(S1)) + end. +is_disjoint_(Large, Small) -> + ?iterate(Small, + true, + K, Next, + case maps:is_key(K, Large) of + true -> + false; + false -> + is_disjoint_(Large, Next) + end). diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/release.json b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/release.json new file mode 100644 index 000000000..569602669 --- /dev/null +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE_data/emqx_plugin_template-5.0-rc.1/release.json @@ -0,0 +1,28 @@ +{ + "authors": [ + "EMQ X Team" + ], + "builder": { + "contact": "emqx-support@emqx.io", + "name": "EMQ X Team", + "website": "www.emqx.com" + }, + "built_on_otp_release": "24", + "compatibility": { + "emqx": "~> 5.0" + }, + "date": "2021-12-16", + "description": "This is a demo plugin", + "functionality": [ + "Demo" + ], + "git_ref": "ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1", + "metadata_vsn": "0.1.0", + "name": "emqx_plugin_template", + "rel_apps": [ + "emqx_plugin_template-5.0.0", + "map_sets-1.1.0" + ], + "rel_vsn": "5.0-rc.1", + "repo": "https://github.com/emqx/emqx-plugin-template" +} \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index d59835123..bbafcf43d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -41,7 +41,7 @@ ]). %% Sync resource instances and files -%% provisional solution: rpc:multical to all the nodes for creating/updating/removing +%% provisional solution: rpc:multicall to all the nodes for creating/updating/removing %% todo: replicate operations -export([ create/4 %% store the config and start the instance , create/5