feat(plugin): http api
This commit is contained in:
parent
81ffa87354
commit
3414e0b601
|
@ -0,0 +1,396 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_mgmt_api_plugin).
|
||||||
|
|
||||||
|
-behaviour(minirest_api).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx_plugins/include/emqx_plugins.hrl").
|
||||||
|
|
||||||
|
-export([ api_spec/0
|
||||||
|
, fields/1
|
||||||
|
, paths/0
|
||||||
|
, schema/1
|
||||||
|
, namespace/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ list_plugins/2
|
||||||
|
, upload_install/2
|
||||||
|
, plugin/2
|
||||||
|
, update_plugin/2
|
||||||
|
, update_boot_order/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ validate_name/1
|
||||||
|
, install_package/2
|
||||||
|
, delete_package/1
|
||||||
|
, describe_package/1
|
||||||
|
, ensure_action/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(NAME_RE, "^[A-Za-z]+[A-Za-z0-9-_.]*$").
|
||||||
|
|
||||||
|
namespace() -> "plugins".
|
||||||
|
|
||||||
|
api_spec() ->
|
||||||
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||||
|
|
||||||
|
%% Don't change the path's order
|
||||||
|
paths() ->
|
||||||
|
[
|
||||||
|
"/plugins",
|
||||||
|
"/plugins/:name",
|
||||||
|
"/plugins/install",
|
||||||
|
"/plugins/:name/:action",
|
||||||
|
"/plugins/:name/move"
|
||||||
|
].
|
||||||
|
|
||||||
|
schema("/plugins") ->
|
||||||
|
#{
|
||||||
|
'operationId' => list_plugins,
|
||||||
|
get => #{
|
||||||
|
description => "List all install plugins.<br>"
|
||||||
|
"Plugins starts in the order of the list from the top to the bottom. <br>"
|
||||||
|
"Using `POST /plugins/{name}/move` to change the boot order.",
|
||||||
|
responses => #{
|
||||||
|
200 => hoconsc:array(hoconsc:ref(plugin))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/plugins/install") ->
|
||||||
|
#{
|
||||||
|
'operationId' => upload_install,
|
||||||
|
post => #{
|
||||||
|
description => "Install a plugin(plugin-vsn.tar.gz)."
|
||||||
|
"Follow [emqx-plugin-template](https://github.com/emqx/emqx-plugin-template) to develop plugin.",
|
||||||
|
'requestBody' => #{
|
||||||
|
content => #{
|
||||||
|
'multipart/form-data' => #{
|
||||||
|
schema => #{
|
||||||
|
type => object,
|
||||||
|
properties => #{
|
||||||
|
plugin => #{type => string, format => binary}}},
|
||||||
|
encoding => #{plugin => #{'contentType' => 'application/gzip'}}}}},
|
||||||
|
responses => #{200 => <<"OK">>}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/plugins/:name") ->
|
||||||
|
#{
|
||||||
|
'operationId' => plugin,
|
||||||
|
get => #{
|
||||||
|
description => "Describe a plugin according `release.json` and `README.md`.",
|
||||||
|
parameters => [hoconsc:ref(name)],
|
||||||
|
responses => #{
|
||||||
|
200 => hoconsc:ref(plugin),
|
||||||
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
delete => #{
|
||||||
|
description => "Uninstall a plugin package.",
|
||||||
|
parameters => [hoconsc:ref(name)],
|
||||||
|
responses => #{
|
||||||
|
204 => <<"Uninstall successfully">>,
|
||||||
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/plugins/:name/:action") ->
|
||||||
|
#{
|
||||||
|
'operationId' => update_plugin,
|
||||||
|
put => #{
|
||||||
|
description => "start/stop a installed plugin.<br>"
|
||||||
|
"- **start**: start the plugin.<br>"
|
||||||
|
"- **stop**: stop the plugin.<br>",
|
||||||
|
parameters => [
|
||||||
|
hoconsc:ref(name),
|
||||||
|
{action, hoconsc:mk(hoconsc:enum([start, stop]), #{desc => "Action", in => path})}],
|
||||||
|
responses => #{
|
||||||
|
200 => <<"OK">>,
|
||||||
|
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], "Plugin Not Found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/plugins/:name/move") ->
|
||||||
|
#{
|
||||||
|
'operationId' => update_boot_order,
|
||||||
|
post => #{
|
||||||
|
description => "Setting the boot order of plugins.",
|
||||||
|
parameters => [hoconsc:ref(name)],
|
||||||
|
'requestBody' => move_request_body(),
|
||||||
|
responses => #{200 => <<"OK">>}
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
fields(plugin) ->
|
||||||
|
[
|
||||||
|
{name, hoconsc:mk(binary(),
|
||||||
|
#{
|
||||||
|
desc => "Name-Vsn: without .tar.gz",
|
||||||
|
validator => fun ?MODULE:validate_name/1,
|
||||||
|
nullable => false,
|
||||||
|
example => "emqx_plugin_template-5.0-rc.1"})
|
||||||
|
},
|
||||||
|
{author, hoconsc:mk(list(string()), #{example => [<<"EMQ X Team">>]})},
|
||||||
|
{builder, hoconsc:ref(?MODULE, builder)},
|
||||||
|
{built_on_otp_release, hoconsc:mk(string(), #{example => "24"})},
|
||||||
|
{compatibility, hoconsc:mk(map(), #{example => #{<<"emqx">> => <<"~>5.0">>}})},
|
||||||
|
{git_commit_or_build_date, hoconsc:mk(string(), #{
|
||||||
|
example => "2021-12-25",
|
||||||
|
desc => "Last git commit date by `git log -1 --pretty=format:'%cd' --date=format:'%Y-%m-%d`."
|
||||||
|
" If the last commit date is not available, the build date will be presented."
|
||||||
|
})},
|
||||||
|
{functionality, hoconsc:mk(hoconsc:array(string()), #{example => [<<"Demo">>]})},
|
||||||
|
{git_ref, hoconsc:mk(string(), #{example => "ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1"})},
|
||||||
|
{metadata_vsn, hoconsc:mk(string(), #{example => "0.1.0"})},
|
||||||
|
{rel_vsn, hoconsc:mk(binary(),
|
||||||
|
#{desc => "Plugins release version",
|
||||||
|
nullable => false,
|
||||||
|
example => <<"5.0-rc.1">>})
|
||||||
|
},
|
||||||
|
{rel_apps, hoconsc:mk(hoconsc:array(binary()),
|
||||||
|
#{desc => "Aplications in plugin.",
|
||||||
|
nullable => false,
|
||||||
|
example => [<<"emqx_plugin_template-5.0.0">>, <<"map_sets-1.1.0">>]})
|
||||||
|
},
|
||||||
|
{repo, hoconsc:mk(string(), #{example => "https://github.com/emqx/emqx-plugin-template"})},
|
||||||
|
{description, hoconsc:mk(binary(),
|
||||||
|
#{desc => "Plugin description.",
|
||||||
|
nullable => false,
|
||||||
|
example => "This is an demo plugin description"})
|
||||||
|
},
|
||||||
|
{running_status, hoconsc:mk(hoconsc:array(hoconsc:ref(running_status)), #{nullable => false})},
|
||||||
|
{readme, hoconsc:mk(binary(), #{
|
||||||
|
example => "This is an demo plugin.",
|
||||||
|
desc => "only return when `GET /plugins/{name}`.",
|
||||||
|
nullable => true})}
|
||||||
|
];
|
||||||
|
fields(name) ->
|
||||||
|
[{name, hoconsc:mk(binary(),
|
||||||
|
#{
|
||||||
|
desc => list_to_binary(?NAME_RE),
|
||||||
|
example => "emqx_plugin_template-5.0-rc.1",
|
||||||
|
in => path,
|
||||||
|
validator => fun ?MODULE:validate_name/1
|
||||||
|
})}
|
||||||
|
];
|
||||||
|
fields(builder) ->
|
||||||
|
[
|
||||||
|
{contact, hoconsc:mk(string(), #{example => "emqx-support@emqx.io"})},
|
||||||
|
{name, hoconsc:mk(string(), #{example => "EMQ X Team"})},
|
||||||
|
{website, hoconsc:mk(string(), #{example => "www.emqx.com"})}
|
||||||
|
];
|
||||||
|
fields(position) ->
|
||||||
|
[{position, hoconsc:mk(hoconsc:union([top, bottom, binary()]),
|
||||||
|
#{
|
||||||
|
desc => """
|
||||||
|
Enable auto-boot at position in the boot list, where Position could be
|
||||||
|
'top', 'bottom', or 'before:other-vsn' to specify a relative position.
|
||||||
|
""",
|
||||||
|
nullable => true
|
||||||
|
})}];
|
||||||
|
fields(running_status) ->
|
||||||
|
[
|
||||||
|
{node, hoconsc:mk(string(), #{example => "emqx@127.0.0.1"})},
|
||||||
|
{status, hoconsc:mk(hoconsc:enum([running, stopped]), #{
|
||||||
|
desc => "Install plugin status at runtime</br>"
|
||||||
|
"1. running: plugin is running.<br>"
|
||||||
|
"2. stopped: plugin is stopped.<br>"
|
||||||
|
})}
|
||||||
|
].
|
||||||
|
|
||||||
|
move_request_body() ->
|
||||||
|
emqx_dashboard_swagger:schema_with_examples(hoconsc:ref(?MODULE, position),
|
||||||
|
#{
|
||||||
|
move_to_top => #{
|
||||||
|
summary => <<"move plugin on the top">>,
|
||||||
|
value => #{position => <<"top">>}
|
||||||
|
},
|
||||||
|
move_to_bottom => #{
|
||||||
|
summary => <<"move plugin on the bottom">>,
|
||||||
|
value => #{position => <<"bottom">>}
|
||||||
|
},
|
||||||
|
move_to_before => #{
|
||||||
|
summary => <<"move plugin before other plugins">>,
|
||||||
|
value => #{position => <<"before:emqx_plugin_demo-5.1-rc.2">>}
|
||||||
|
}
|
||||||
|
}).
|
||||||
|
|
||||||
|
validate_name(Name) ->
|
||||||
|
NameLen = byte_size(Name),
|
||||||
|
case NameLen > 0 andalso NameLen =< 256 of
|
||||||
|
true ->
|
||||||
|
case re:run(Name, ?NAME_RE) of
|
||||||
|
nomatch -> {error, "Name should be " ?NAME_RE};
|
||||||
|
_ -> ok
|
||||||
|
end;
|
||||||
|
false -> {error, "Name Length must =< 256"}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% API CallBack Begin
|
||||||
|
|
||||||
|
list_plugins(get, _) ->
|
||||||
|
Plugins = cluster_call(emqx_plugins_monitor, get_plugins, [], 15000),
|
||||||
|
{200, format_plugins(Plugins)}.
|
||||||
|
|
||||||
|
upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) ->
|
||||||
|
[{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)),
|
||||||
|
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
|
||||||
|
%% TODO what happened when a new node join in?
|
||||||
|
%% emqx_plugins_monitor should copy plugins from other core node when boot-up.
|
||||||
|
Res = cluster_call(?MODULE, install_package, [FileName, Bin], 25000),
|
||||||
|
case lists:filter(fun(R) -> R =/= ok end, Res) of
|
||||||
|
[] -> {200};
|
||||||
|
[{error, Reason} | _] ->
|
||||||
|
{400, #{code => 'UNEXPECTED_ERROR',
|
||||||
|
message => iolist_to_binary(io_lib:format("~p", [Reason]))}}
|
||||||
|
end;
|
||||||
|
upload_install(post, #{}) ->
|
||||||
|
{400, #{code => 'BAD_FORM_DATA',
|
||||||
|
message => <<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
|
||||||
|
}.
|
||||||
|
|
||||||
|
plugin(get, #{bindings := #{name := Name}}) ->
|
||||||
|
Plugins = cluster_call(?MODULE, describe_package, [Name], 10000),
|
||||||
|
case format_plugins(Plugins) of
|
||||||
|
[Plugin] -> {200, Plugin};
|
||||||
|
[] -> {404, #{code => 'NOT_FOUND', message => Name}}
|
||||||
|
end;
|
||||||
|
|
||||||
|
plugin(delete, #{bindings := #{name := Name}}) ->
|
||||||
|
return(204, cluster_rpc(?MODULE, delete_package, [Name])).
|
||||||
|
|
||||||
|
update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
|
||||||
|
return(200, cluster_rpc(?MODULE, ensure_action, [Name, Action])).
|
||||||
|
|
||||||
|
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
|
||||||
|
case parse_position(Body, Name) of
|
||||||
|
{error, Reason} -> {400, #{code => 'BAD_POSITION', message => Reason}};
|
||||||
|
Position ->
|
||||||
|
case emqx_plugins:ensure_enabled(Name, Position) of
|
||||||
|
ok -> {200};
|
||||||
|
{error, Reason} ->
|
||||||
|
{400, #{code => 'MOVE_FAILED',
|
||||||
|
message => iolist_to_binary(io_lib:format("~p", [Reason]))}}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% API CallBack End
|
||||||
|
|
||||||
|
%% For RPC upload_install/2
|
||||||
|
install_package(FileName, Bin) ->
|
||||||
|
File = filename:join(emqx_plugins:install_dir(), FileName),
|
||||||
|
ok = file:write_file(File, Bin),
|
||||||
|
PackageName = string:trim(FileName, trailing, ".tar.gz"),
|
||||||
|
emqx_plugins:ensure_installed(PackageName).
|
||||||
|
|
||||||
|
%% For RPC plugin get
|
||||||
|
describe_package(Name) ->
|
||||||
|
Node = node(),
|
||||||
|
case emqx_plugins:describe(Name) of
|
||||||
|
{ok, Plugin} -> {Node, [Plugin]};
|
||||||
|
_ -> {Node, []}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% For RPC plugin delete
|
||||||
|
delete_package(Name) ->
|
||||||
|
case emqx_plugins:ensure_stopped(Name) of
|
||||||
|
ok ->
|
||||||
|
emqx_plugins:ensure_disabled(Name),
|
||||||
|
emqx_plugins:delete_package(Name);
|
||||||
|
Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% for RPC plugin update
|
||||||
|
ensure_action(Name, start) ->
|
||||||
|
emqx_plugins:ensure_enabled(Name),
|
||||||
|
emqx_plugins:ensure_started(Name);
|
||||||
|
ensure_action(Name, stop) ->
|
||||||
|
emqx_plugins:ensure_stopped(Name),
|
||||||
|
emqx_plugins:ensure_disabled(Name);
|
||||||
|
ensure_action(Name, restart) ->
|
||||||
|
emqx_plugins:ensure_enabled(Name),
|
||||||
|
emqx_plugins:restart(Name).
|
||||||
|
|
||||||
|
cluster_call(Mod, Fun, Args, Timeout) ->
|
||||||
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
{GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
|
||||||
|
BadNodes =/= [] andalso
|
||||||
|
?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, length(Args)}}),
|
||||||
|
GoodRes.
|
||||||
|
|
||||||
|
cluster_rpc(Mod, Fun, Args) ->
|
||||||
|
case emqx_cluster_rpc:multicall(Mod, Fun, Args, all, 30000) of
|
||||||
|
{ok, _TnxId, Res} -> Res;
|
||||||
|
{retry, TnxId, Res, Node} ->
|
||||||
|
?SLOG(error, #{msg => "failed_to_update_plugin_in_cluster", nodes => Node,
|
||||||
|
tnx_id => TnxId, mfa => {Mod, Fun, Args}}),
|
||||||
|
Res;
|
||||||
|
{error, Error} -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
return(Code, ok) -> {Code};
|
||||||
|
return(Code, {ok, Result}) -> {Code, Result};
|
||||||
|
return(_, {error, #{error := "bad_info_file", return := {enoent, _}, path := Path}}) ->
|
||||||
|
{404, #{code => 'NOT_FOUND', message => Path}};
|
||||||
|
return(_, {error, Reason}) ->
|
||||||
|
{400, #{code => 'PARAM_ERROR', message => iolist_to_binary(io_lib:format("~p", [Reason]))}}.
|
||||||
|
|
||||||
|
parse_position(#{<<"position">> := <<"top">>}, _) -> front;
|
||||||
|
parse_position(#{<<"position">> := <<"bottom">>}, _) -> rear;
|
||||||
|
parse_position(#{<<"position">> := <<"before:", Name/binary>>}, Name) -> {error, <<"Can't before:self">>};
|
||||||
|
parse_position(#{<<"position">> := <<"before:", Before/binary>>}, _Name) -> {before, binary_to_list(Before)};
|
||||||
|
parse_position(Position, _) -> {error, iolist_to_binary(io_lib:format("~p", [Position]))}.
|
||||||
|
|
||||||
|
format_plugins(List) ->
|
||||||
|
StatusList = merge_running_status(List, #{}),
|
||||||
|
{Plugins, _} =
|
||||||
|
lists:foldr(fun({_Node, Plugins}, {Acc, StatusAcc}) ->
|
||||||
|
format_plugins_in_order(Plugins, Acc, StatusAcc)
|
||||||
|
end, {[], StatusList}, List),
|
||||||
|
Plugins.
|
||||||
|
|
||||||
|
format_plugins_in_order(Plugins, Acc0, StatusAcc0) ->
|
||||||
|
lists:foldr(fun(Plugin0, {Acc, StatusAcc}) ->
|
||||||
|
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin0,
|
||||||
|
case maps:find({Name, Vsn}, StatusAcc) of
|
||||||
|
{ok, Status} ->
|
||||||
|
Plugin1 = maps:without([running_status, config_status], Plugin0),
|
||||||
|
Plugins2 = Plugin1#{running_status => Status},
|
||||||
|
{
|
||||||
|
[Plugins2 | Acc],
|
||||||
|
maps:remove({Name, Vsn}, StatusAcc)
|
||||||
|
};
|
||||||
|
error -> {Acc, StatusAcc}
|
||||||
|
end
|
||||||
|
end, {Acc0, StatusAcc0}, Plugins).
|
||||||
|
|
||||||
|
merge_running_status([], Acc) -> Acc;
|
||||||
|
merge_running_status([{Node, Plugins} | List], Acc) ->
|
||||||
|
NewAcc =
|
||||||
|
lists:foldl(fun(Plugin, SubAcc) ->
|
||||||
|
#{<<"name">> := Name, <<"rel_vsn">> := Vsn} = Plugin,
|
||||||
|
Key = {Name, Vsn},
|
||||||
|
Value = #{node => Node, status => plugin_status(Plugin)},
|
||||||
|
SubAcc#{Key => [Value | maps:get(Key, Acc, [])]}
|
||||||
|
end, Acc, Plugins),
|
||||||
|
merge_running_status(List, NewAcc).
|
||||||
|
|
||||||
|
%% running_status: running loaded, stopped
|
||||||
|
%% config_status: not_configured disable enable
|
||||||
|
plugin_status(#{running_status := running}) -> running;
|
||||||
|
plugin_status(_) -> stopped.
|
|
@ -43,6 +43,9 @@
|
||||||
%% internal
|
%% internal
|
||||||
-export([ do_ensure_started/1
|
-export([ do_ensure_started/1
|
||||||
]).
|
]).
|
||||||
|
-export([
|
||||||
|
install_dir/0
|
||||||
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
@ -63,12 +66,12 @@
|
||||||
|
|
||||||
%% @doc Describe a plugin.
|
%% @doc Describe a plugin.
|
||||||
-spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
|
-spec describe(name_vsn()) -> {ok, plugin()} | {error, any()}.
|
||||||
describe(NameVsn) -> read_plugin(NameVsn).
|
describe(NameVsn) -> read_plugin(NameVsn, #{fill_readme => true}).
|
||||||
|
|
||||||
%% @doc Install a .tar.gz package placed in install_dir.
|
%% @doc Install a .tar.gz package placed in install_dir.
|
||||||
-spec ensure_installed(name_vsn()) -> ok | {error, any()}.
|
-spec ensure_installed(name_vsn()) -> ok | {error, any()}.
|
||||||
ensure_installed(NameVsn) ->
|
ensure_installed(NameVsn) ->
|
||||||
case read_plugin(NameVsn) of
|
case read_plugin(NameVsn, #{}) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
ok;
|
ok;
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
|
@ -80,7 +83,7 @@ do_ensure_installed(NameVsn) ->
|
||||||
TarGz = pkg_file(NameVsn),
|
TarGz = pkg_file(NameVsn),
|
||||||
case erl_tar:extract(TarGz, [{cwd, install_dir()}, compressed]) of
|
case erl_tar:extract(TarGz, [{cwd, install_dir()}, compressed]) of
|
||||||
ok ->
|
ok ->
|
||||||
case read_plugin(NameVsn) of
|
case read_plugin(NameVsn, #{}) of
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
|
?SLOG(warning, Reason#{msg => "failed_to_read_after_install"}),
|
||||||
|
@ -103,7 +106,7 @@ do_ensure_installed(NameVsn) ->
|
||||||
%% If a plugin is running, or enabled, error is returned.
|
%% If a plugin is running, or enabled, error is returned.
|
||||||
-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
|
-spec ensure_uninstalled(name_vsn()) -> ok | {error, any()}.
|
||||||
ensure_uninstalled(NameVsn) ->
|
ensure_uninstalled(NameVsn) ->
|
||||||
case read_plugin(NameVsn) of
|
case read_plugin(NameVsn, #{}) of
|
||||||
{ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
|
{ok, #{running_status := RunningSt}} when RunningSt =/= stopped ->
|
||||||
{error, #{reason => "bad_plugin_running_status",
|
{error, #{reason => "bad_plugin_running_status",
|
||||||
hint => "stop_the_plugin_first"
|
hint => "stop_the_plugin_first"
|
||||||
|
@ -134,7 +137,7 @@ ensure_disabled(NameVsn) ->
|
||||||
ensure_state(NameVsn, Position, State) when is_binary(NameVsn) ->
|
ensure_state(NameVsn, Position, State) when is_binary(NameVsn) ->
|
||||||
ensure_state(binary_to_list(NameVsn), Position, State);
|
ensure_state(binary_to_list(NameVsn), Position, State);
|
||||||
ensure_state(NameVsn, Position, State) ->
|
ensure_state(NameVsn, Position, State) ->
|
||||||
case read_plugin(NameVsn) of
|
case read_plugin(NameVsn, #{}) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
Item = #{ name_vsn => NameVsn
|
Item = #{ name_vsn => NameVsn
|
||||||
, enable => State
|
, enable => State
|
||||||
|
@ -259,7 +262,7 @@ list() ->
|
||||||
Pattern = filename:join([install_dir(), "*", "release.json"]),
|
Pattern = filename:join([install_dir(), "*", "release.json"]),
|
||||||
All = lists:filtermap(
|
All = lists:filtermap(
|
||||||
fun(JsonFile) ->
|
fun(JsonFile) ->
|
||||||
case read_plugin({file, JsonFile}) of
|
case read_plugin({file, JsonFile}, #{}) of
|
||||||
{ok, Info} ->
|
{ok, Info} ->
|
||||||
{true, Info};
|
{true, Info};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -314,26 +317,36 @@ tryit(WhichOp, F) ->
|
||||||
|
|
||||||
%% read plugin info from the JSON file
|
%% read plugin info from the JSON file
|
||||||
%% returns {ok, Info} or {error, Reason}
|
%% returns {ok, Info} or {error, Reason}
|
||||||
read_plugin(NameVsn) ->
|
read_plugin(NameVsn, Options) ->
|
||||||
tryit("read_plugin_info",
|
tryit("read_plugin_info",
|
||||||
fun() -> {ok, do_read_plugin(NameVsn)} end).
|
fun() -> {ok, do_read_plugin(NameVsn, Options)} end).
|
||||||
|
|
||||||
do_read_plugin({file, InfoFile}) ->
|
do_read_plugin(Plugin) -> do_read_plugin(Plugin, #{}).
|
||||||
|
|
||||||
|
do_read_plugin({file, InfoFile}, Options) ->
|
||||||
[_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
|
[_, NameVsn | _] = lists:reverse(filename:split(InfoFile)),
|
||||||
case hocon:load(InfoFile, #{format => richmap}) of
|
case hocon:load(InfoFile, #{format => richmap}) of
|
||||||
{ok, RichMap} ->
|
{ok, RichMap} ->
|
||||||
Info = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile),
|
Info0 = check_plugin(hocon_maps:ensure_plain(RichMap), NameVsn, InfoFile),
|
||||||
maps:merge(Info, plugin_status(NameVsn));
|
Info1 = plugins_readme(NameVsn, Options, Info0),
|
||||||
|
plugin_status(NameVsn, Info1);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
throw(#{error => "bad_info_file",
|
throw(#{error => "bad_info_file",
|
||||||
path => InfoFile,
|
path => InfoFile,
|
||||||
return => Reason
|
return => Reason
|
||||||
})
|
})
|
||||||
end;
|
end;
|
||||||
do_read_plugin(NameVsn) ->
|
do_read_plugin(NameVsn, Options) ->
|
||||||
do_read_plugin({file, info_file(NameVsn)}).
|
do_read_plugin({file, info_file(NameVsn)}, Options).
|
||||||
|
|
||||||
plugin_status(NameVsn) ->
|
plugins_readme(NameVsn, #{fill_readme := true}, Info) ->
|
||||||
|
case file:read_file(readme_file(NameVsn)) of
|
||||||
|
{ok, Bin} -> Info#{readme => Bin};
|
||||||
|
_ -> Info#{readme => <<>>}
|
||||||
|
end;
|
||||||
|
plugins_readme(_NameVsn, _Options, Info) -> Info.
|
||||||
|
|
||||||
|
plugin_status(NameVsn, Info) ->
|
||||||
{AppName, _AppVsn} = parse_name_vsn(NameVsn),
|
{AppName, _AppVsn} = parse_name_vsn(NameVsn),
|
||||||
RunningSt =
|
RunningSt =
|
||||||
case application:get_key(AppName, vsn) of
|
case application:get_key(AppName, vsn) of
|
||||||
|
@ -357,7 +370,7 @@ plugin_status(NameVsn) ->
|
||||||
[true] -> enabled;
|
[true] -> enabled;
|
||||||
[false] -> disabled
|
[false] -> disabled
|
||||||
end,
|
end,
|
||||||
#{ running_status => RunningSt
|
Info#{ running_status => RunningSt
|
||||||
, config_status => ConfSt
|
, config_status => ConfSt
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -592,6 +605,9 @@ dir(NameVsn) ->
|
||||||
info_file(NameVsn) ->
|
info_file(NameVsn) ->
|
||||||
filename:join([dir(NameVsn), "release.json"]).
|
filename:join([dir(NameVsn), "release.json"]).
|
||||||
|
|
||||||
|
readme_file(NameVsn) ->
|
||||||
|
filename:join([dir(NameVsn), "README.md"]).
|
||||||
|
|
||||||
running_apps() ->
|
running_apps() ->
|
||||||
lists:map(fun({N, _, V}) ->
|
lists:map(fun({N, _, V}) ->
|
||||||
{N, V}
|
{N, V}
|
||||||
|
|
|
@ -23,8 +23,8 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
{ok, Sup} = emqx_plugins_sup:start_link(),
|
|
||||||
ok = emqx_plugins:ensure_started(), %% load all pre-configured
|
ok = emqx_plugins:ensure_started(), %% load all pre-configured
|
||||||
|
{ok, Sup} = emqx_plugins_sup:start_link(),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_plugins_monitor).
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-export([ get_plugins/0
|
||||||
|
, start_link/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
{ok, #{ref => next_check_time(), failed => 0}}.
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||||
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info({timeout, Ref, check}, State = #{failed := Failed}) ->
|
||||||
|
erlang:cancel_timer(Ref),
|
||||||
|
NewFailed = maybe_alarm(check(), Failed),
|
||||||
|
{noreply, State#{ref => next_check_time(), failed => NewFailed}};
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
next_check_time() ->
|
||||||
|
Check = emqx_plugins:get_config(check_interval, 5000),
|
||||||
|
emqx_misc:start_timer(Check, check).
|
||||||
|
|
||||||
|
check() ->
|
||||||
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
case rpc:multicall(Nodes, ?MODULE, get_plugins_list, [], 15000) of
|
||||||
|
{Plugins, []} -> check_plugins(Plugins);
|
||||||
|
{_ , BadNodes} -> {error, io_lib:format("~p rpc to ~p failed", [node(), BadNodes])}
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_plugins() ->
|
||||||
|
{node(), emqx_plugins:list()}.
|
||||||
|
|
||||||
|
check_plugins(Plugins) ->
|
||||||
|
check_status(Plugins),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
check_status(_Plugins) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% alarm when failed 3 time.
|
||||||
|
maybe_alarm({error, _Reason}, Failed) when Failed >= 2 ->
|
||||||
|
%alarm(Reason),
|
||||||
|
0;
|
||||||
|
maybe_alarm({error, _Reason}, Failed) -> Failed + 1;
|
||||||
|
maybe_alarm(ok, _Failed) -> 0.
|
|
@ -66,6 +66,7 @@ state_fields() ->
|
||||||
root_fields() ->
|
root_fields() ->
|
||||||
[ {states, fun states/1}
|
[ {states, fun states/1}
|
||||||
, {install_dir, fun install_dir/1}
|
, {install_dir, fun install_dir/1}
|
||||||
|
, {check_interval, fun check_interval/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
states(type) -> hoconsc:array(hoconsc:ref(?MODULE, state));
|
states(type) -> hoconsc:array(hoconsc:ref(?MODULE, state));
|
||||||
|
@ -87,3 +88,11 @@ the subdirectory named as <code>emqx_foo_bar-0.1.0</code>.
|
||||||
NOTE: For security reasons, this directory should **NOT** be writable
|
NOTE: For security reasons, this directory should **NOT** be writable
|
||||||
by anyone except <code>emqx</code> (or any user which runs EMQX).
|
by anyone except <code>emqx</code> (or any user which runs EMQX).
|
||||||
""".
|
""".
|
||||||
|
|
||||||
|
check_interval(type) -> emqx_schema:duration();
|
||||||
|
check_interval(default) -> "5s";
|
||||||
|
check_interval(T) when T =/= desc -> undefined;
|
||||||
|
check_interval(desc) -> """
|
||||||
|
Check interval: check if the status of the plugins in the cluster is consistent, <br>
|
||||||
|
if the results of 3 consecutive checks are not consistent, then alarm.
|
||||||
|
""".
|
||||||
|
|
|
@ -26,5 +26,20 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Children = [],
|
Monitor = emqx_plugins_monitor,
|
||||||
{ok, {{one_for_one, 10, 10}, Children}}.
|
Children = [
|
||||||
|
#{id => Monitor,
|
||||||
|
start => {Monitor, start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => brutal_kill,
|
||||||
|
type => worker,
|
||||||
|
modules => [Monitor]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
SupFlags =
|
||||||
|
#{
|
||||||
|
strategy => one_for_one,
|
||||||
|
intensity => 100,
|
||||||
|
period => 10
|
||||||
|
},
|
||||||
|
{ok, {SupFlags, Children}}.
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Subproject commit ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1
|
|
@ -0,0 +1,21 @@
|
||||||
|
# emqx-plugin-template
|
||||||
|
|
||||||
|
This is a template plugin for EMQ X >= 5.0.
|
||||||
|
|
||||||
|
For EMQ X >= 4.3, please see branch emqx-v4
|
||||||
|
|
||||||
|
For older EMQ X versions, plugin development is no longer maintained.
|
||||||
|
|
||||||
|
## Release
|
||||||
|
|
||||||
|
A EMQ X plugin release is a zip package including
|
||||||
|
|
||||||
|
1. A JSON format metadata file
|
||||||
|
2. A tar file with plugin's apps packed
|
||||||
|
|
||||||
|
Execute `make rel` to have the package created like:
|
||||||
|
|
||||||
|
```
|
||||||
|
_build/default/emqx_plugrel/emqx_plugin_template-<vsn>.tar.gz
|
||||||
|
```
|
||||||
|
See EMQ X documents for details on how to deploy the plugin.
|
|
@ -0,0 +1,4 @@
|
||||||
|
## This is a demo config in HOCON format
|
||||||
|
## The same format used by EMQ X since 5.0
|
||||||
|
|
||||||
|
magic_n = 42
|
|
@ -0,0 +1,26 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_cli_demo).
|
||||||
|
|
||||||
|
-export([cmd/1]).
|
||||||
|
|
||||||
|
cmd(["arg1", "arg2"]) ->
|
||||||
|
emqx_ctl:print("ok");
|
||||||
|
|
||||||
|
cmd(_) ->
|
||||||
|
emqx_ctl:usage([{"cmd arg1 arg2", "cmd demo"}]).
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
{application, emqx_plugin_template,
|
||||||
|
[{description, "EMQ X Plugin Template"},
|
||||||
|
{vsn, "5.0.0"},
|
||||||
|
{modules, []},
|
||||||
|
{registered, [emqx_plugin_template_sup]},
|
||||||
|
{applications, [kernel,stdlib,map_sets]},
|
||||||
|
{mod, {emqx_plugin_template_app,[]}},
|
||||||
|
{env, []},
|
||||||
|
{licenses, ["Apache-2.0"]},
|
||||||
|
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
|
||||||
|
{links, [{"Homepage", "https://emqx.io/"},
|
||||||
|
{"Github", "https://github.com/emqx/emqx-plugin-template"}
|
||||||
|
]}
|
||||||
|
]}.
|
|
@ -0,0 +1,198 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_plugin_template).
|
||||||
|
|
||||||
|
%% for #message{} record
|
||||||
|
%% no need for this include if we call emqx_message:to_map/1 to convert it to a map
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
|
%% for logging
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-export([ load/1
|
||||||
|
, unload/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Client Lifecircle Hooks
|
||||||
|
-export([ on_client_connect/3
|
||||||
|
, on_client_connack/4
|
||||||
|
, on_client_connected/3
|
||||||
|
, on_client_disconnected/4
|
||||||
|
, on_client_authenticate/3
|
||||||
|
, on_client_check_acl/5
|
||||||
|
, on_client_subscribe/4
|
||||||
|
, on_client_unsubscribe/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Session Lifecircle Hooks
|
||||||
|
-export([ on_session_created/3
|
||||||
|
, on_session_subscribed/4
|
||||||
|
, on_session_unsubscribed/4
|
||||||
|
, on_session_resumed/3
|
||||||
|
, on_session_discarded/3
|
||||||
|
, on_session_takeovered/3
|
||||||
|
, on_session_terminated/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Message Pubsub Hooks
|
||||||
|
-export([ on_message_publish/2
|
||||||
|
, on_message_delivered/3
|
||||||
|
, on_message_acked/3
|
||||||
|
, on_message_dropped/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Called when the plugin application start
|
||||||
|
load(Env) ->
|
||||||
|
emqx:hook('client.connect', {?MODULE, on_client_connect, [Env]}),
|
||||||
|
emqx:hook('client.connack', {?MODULE, on_client_connack, [Env]}),
|
||||||
|
emqx:hook('client.connected', {?MODULE, on_client_connected, [Env]}),
|
||||||
|
emqx:hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}),
|
||||||
|
emqx:hook('client.authenticate', {?MODULE, on_client_authenticate, [Env]}),
|
||||||
|
emqx:hook('client.check_acl', {?MODULE, on_client_check_acl, [Env]}),
|
||||||
|
emqx:hook('client.subscribe', {?MODULE, on_client_subscribe, [Env]}),
|
||||||
|
emqx:hook('client.unsubscribe', {?MODULE, on_client_unsubscribe, [Env]}),
|
||||||
|
emqx:hook('session.created', {?MODULE, on_session_created, [Env]}),
|
||||||
|
emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Env]}),
|
||||||
|
emqx:hook('session.unsubscribed',{?MODULE, on_session_unsubscribed, [Env]}),
|
||||||
|
emqx:hook('session.resumed', {?MODULE, on_session_resumed, [Env]}),
|
||||||
|
emqx:hook('session.discarded', {?MODULE, on_session_discarded, [Env]}),
|
||||||
|
emqx:hook('session.takeovered', {?MODULE, on_session_takeovered, [Env]}),
|
||||||
|
emqx:hook('session.terminated', {?MODULE, on_session_terminated, [Env]}),
|
||||||
|
emqx:hook('message.publish', {?MODULE, on_message_publish, [Env]}),
|
||||||
|
emqx:hook('message.delivered', {?MODULE, on_message_delivered, [Env]}),
|
||||||
|
emqx:hook('message.acked', {?MODULE, on_message_acked, [Env]}),
|
||||||
|
emqx:hook('message.dropped', {?MODULE, on_message_dropped, [Env]}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Client Lifecircle Hooks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
on_client_connect(ConnInfo, Props, _Env) ->
|
||||||
|
%% this is to demo the usage of EMQ X's structured-logging macro
|
||||||
|
%% * Recommended to always have a `msg` field,
|
||||||
|
%% * Use underscore instead of space to help log indexers,
|
||||||
|
%% * Try to use static fields
|
||||||
|
?SLOG(debug, #{msg => "demo_log_msg_on_client_connect",
|
||||||
|
conninfo => ConnInfo,
|
||||||
|
props => Props}),
|
||||||
|
{ok, Props}.
|
||||||
|
|
||||||
|
on_client_connack(ConnInfo = #{clientid := ClientId}, Rc, Props, _Env) ->
|
||||||
|
io:format("Client(~s) connack, ConnInfo: ~p, Rc: ~p, Props: ~p~n",
|
||||||
|
[ClientId, ConnInfo, Rc, Props]),
|
||||||
|
{ok, Props}.
|
||||||
|
|
||||||
|
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, _Env) ->
|
||||||
|
io:format("Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
||||||
|
[ClientId, ClientInfo, ConnInfo]).
|
||||||
|
|
||||||
|
on_client_disconnected(ClientInfo = #{clientid := ClientId}, ReasonCode, ConnInfo, _Env) ->
|
||||||
|
io:format("Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
||||||
|
[ClientId, ReasonCode, ClientInfo, ConnInfo]).
|
||||||
|
|
||||||
|
on_client_authenticate(_ClientInfo = #{clientid := ClientId}, Result, _Env) ->
|
||||||
|
io:format("Client(~s) authenticate, Result:~n~p~n", [ClientId, Result]),
|
||||||
|
{ok, Result}.
|
||||||
|
|
||||||
|
on_client_check_acl(_ClientInfo = #{clientid := ClientId}, Topic, PubSub, Result, _Env) ->
|
||||||
|
io:format("Client(~s) check_acl, PubSub:~p, Topic:~p, Result:~p~n",
|
||||||
|
[ClientId, PubSub, Topic, Result]),
|
||||||
|
{ok, Result}.
|
||||||
|
|
||||||
|
on_client_subscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
||||||
|
io:format("Client(~s) will subscribe: ~p~n", [ClientId, TopicFilters]),
|
||||||
|
{ok, TopicFilters}.
|
||||||
|
|
||||||
|
on_client_unsubscribe(#{clientid := ClientId}, _Properties, TopicFilters, _Env) ->
|
||||||
|
io:format("Client(~s) will unsubscribe ~p~n", [ClientId, TopicFilters]),
|
||||||
|
{ok, TopicFilters}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Session Lifecircle Hooks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
on_session_created(#{clientid := ClientId}, SessInfo, _Env) ->
|
||||||
|
io:format("Session(~s) created, Session Info:~n~p~n", [ClientId, SessInfo]).
|
||||||
|
|
||||||
|
on_session_subscribed(#{clientid := ClientId}, Topic, SubOpts, _Env) ->
|
||||||
|
io:format("Session(~s) subscribed ~s with subopts: ~p~n", [ClientId, Topic, SubOpts]).
|
||||||
|
|
||||||
|
on_session_unsubscribed(#{clientid := ClientId}, Topic, Opts, _Env) ->
|
||||||
|
io:format("Session(~s) unsubscribed ~s with opts: ~p~n", [ClientId, Topic, Opts]).
|
||||||
|
|
||||||
|
on_session_resumed(#{clientid := ClientId}, SessInfo, _Env) ->
|
||||||
|
io:format("Session(~s) resumed, Session Info:~n~p~n", [ClientId, SessInfo]).
|
||||||
|
|
||||||
|
on_session_discarded(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
||||||
|
io:format("Session(~s) is discarded. Session Info: ~p~n", [ClientId, SessInfo]).
|
||||||
|
|
||||||
|
on_session_takeovered(_ClientInfo = #{clientid := ClientId}, SessInfo, _Env) ->
|
||||||
|
io:format("Session(~s) is takeovered. Session Info: ~p~n", [ClientId, SessInfo]).
|
||||||
|
|
||||||
|
on_session_terminated(_ClientInfo = #{clientid := ClientId}, Reason, SessInfo, _Env) ->
|
||||||
|
io:format("Session(~s) is terminated due to ~p~nSession Info: ~p~n",
|
||||||
|
[ClientId, Reason, SessInfo]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Message PubSub Hooks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% Transform message and return
|
||||||
|
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
|
||||||
|
{ok, Message};
|
||||||
|
|
||||||
|
on_message_publish(Message, _Env) ->
|
||||||
|
io:format("Publish ~s~n", [emqx_message:to_map(Message)]),
|
||||||
|
{ok, Message}.
|
||||||
|
|
||||||
|
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason, _Env) ->
|
||||||
|
ok;
|
||||||
|
on_message_dropped(Message, _By = #{node := Node}, Reason, _Env) ->
|
||||||
|
io:format("Message dropped by node ~s due to ~s: ~p~n",
|
||||||
|
[Node, Reason, emqx_message:to_map(Message)]).
|
||||||
|
|
||||||
|
on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
||||||
|
io:format("Message delivered to client(~s): ~p~n",
|
||||||
|
[ClientId, emqx_message:to_map(Message)]),
|
||||||
|
{ok, Message}.
|
||||||
|
|
||||||
|
on_message_acked(_ClientInfo = #{clientid := ClientId}, Message, _Env) ->
|
||||||
|
io:format("Message acked by client(~s): ~p~n",
|
||||||
|
[ClientId, emqx_message:to_map(Message)]).
|
||||||
|
|
||||||
|
%% Called when the plugin application stop
|
||||||
|
unload() ->
|
||||||
|
emqx:unhook('client.connect', {?MODULE, on_client_connect}),
|
||||||
|
emqx:unhook('client.connack', {?MODULE, on_client_connack}),
|
||||||
|
emqx:unhook('client.connected', {?MODULE, on_client_connected}),
|
||||||
|
emqx:unhook('client.disconnected', {?MODULE, on_client_disconnected}),
|
||||||
|
emqx:unhook('client.authenticate', {?MODULE, on_client_authenticate}),
|
||||||
|
emqx:unhook('client.check_acl', {?MODULE, on_client_check_acl}),
|
||||||
|
emqx:unhook('client.subscribe', {?MODULE, on_client_subscribe}),
|
||||||
|
emqx:unhook('client.unsubscribe', {?MODULE, on_client_unsubscribe}),
|
||||||
|
emqx:unhook('session.created', {?MODULE, on_session_created}),
|
||||||
|
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
|
||||||
|
emqx:unhook('session.unsubscribed',{?MODULE, on_session_unsubscribed}),
|
||||||
|
emqx:unhook('session.resumed', {?MODULE, on_session_resumed}),
|
||||||
|
emqx:unhook('session.discarded', {?MODULE, on_session_discarded}),
|
||||||
|
emqx:unhook('session.takeovered', {?MODULE, on_session_takeovered}),
|
||||||
|
emqx:unhook('session.terminated', {?MODULE, on_session_terminated}),
|
||||||
|
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
|
||||||
|
emqx:unhook('message.delivered', {?MODULE, on_message_delivered}),
|
||||||
|
emqx:unhook('message.acked', {?MODULE, on_message_acked}),
|
||||||
|
emqx:unhook('message.dropped', {?MODULE, on_message_dropped}).
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_plugin_template_app).
|
||||||
|
|
||||||
|
-behaviour(application).
|
||||||
|
|
||||||
|
-emqx_plugin(?MODULE).
|
||||||
|
|
||||||
|
-export([ start/2
|
||||||
|
, stop/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
start(_StartType, _StartArgs) ->
|
||||||
|
{ok, Sup} = emqx_plugin_template_sup:start_link(),
|
||||||
|
emqx_plugin_template:load(application:get_all_env()),
|
||||||
|
{ok, Sup}.
|
||||||
|
|
||||||
|
stop(_State) ->
|
||||||
|
emqx_plugin_template:unload().
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_plugin_template_sup).
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
{ok, { {one_for_all, 0, 1}, []} }.
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
{application,map_sets,
|
||||||
|
[{description,"sets-like wrapper based on maps"},
|
||||||
|
{vsn,"1.1.0"},
|
||||||
|
{registered,[]},
|
||||||
|
{applications,[kernel,stdlib]},
|
||||||
|
{env,[]},
|
||||||
|
{modules,[]},
|
||||||
|
{links,[{"Github","https://github.com/k32/map_sets"}]},
|
||||||
|
{licenses,["public domain"]}]}.
|
|
@ -0,0 +1,179 @@
|
||||||
|
%% s/sets/map_sets/g
|
||||||
|
%% Why? Because spead (This module piggybacks on `maps' module's BIFs)
|
||||||
|
-module(map_sets).
|
||||||
|
|
||||||
|
-export([ new/0
|
||||||
|
, is_set/1
|
||||||
|
, size/1
|
||||||
|
, to_list/1
|
||||||
|
, from_list/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ is_element/2
|
||||||
|
, add_element/2
|
||||||
|
, del_element/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ union/2
|
||||||
|
, union/1
|
||||||
|
, intersection/2
|
||||||
|
, intersection/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ is_disjoint/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ subtract/2
|
||||||
|
, is_subset/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ fold/3
|
||||||
|
, filter/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export_type([set/1, set/0]).
|
||||||
|
|
||||||
|
-type set(Key) :: #{Key => term()}.
|
||||||
|
-type set() :: set(term()).
|
||||||
|
|
||||||
|
-define(UNUSED, []).
|
||||||
|
|
||||||
|
-ifdef(OTP_RELEASE). %% OTP21+ supports map iterators
|
||||||
|
|
||||||
|
-define(iterable(A), maps:iterator(A)).
|
||||||
|
|
||||||
|
-define(iterate(I, Last, K, Next, Cons),
|
||||||
|
case maps:next(I) of
|
||||||
|
none -> Last;
|
||||||
|
{K, _, Next} -> Cons
|
||||||
|
end).
|
||||||
|
|
||||||
|
-else.
|
||||||
|
|
||||||
|
-define(iterable(A), maps:keys(A)).
|
||||||
|
|
||||||
|
-define(iterate(I, Last, K, Next, Cons),
|
||||||
|
case I of
|
||||||
|
[] -> Last;
|
||||||
|
[K|Next] -> Cons
|
||||||
|
end).
|
||||||
|
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
-spec new() -> set().
|
||||||
|
new() ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
|
-spec is_set(term()) -> boolean().
|
||||||
|
is_set(A) ->
|
||||||
|
is_map(A).
|
||||||
|
|
||||||
|
-spec size(set()) -> non_neg_integer().
|
||||||
|
size(A) ->
|
||||||
|
maps:size(A).
|
||||||
|
|
||||||
|
-spec fold(Function, Acc, Set) -> Acc when
|
||||||
|
Function :: fun((Element, Acc) -> Acc),
|
||||||
|
Set :: set(Element),
|
||||||
|
Acc :: term().
|
||||||
|
fold(Fun, A, B) ->
|
||||||
|
maps:fold( fun(K, _, Acc) -> Fun(K, Acc) end
|
||||||
|
, A
|
||||||
|
, B).
|
||||||
|
|
||||||
|
-spec filter(Predicate, Set) -> Set when
|
||||||
|
Predicate :: fun((Element) -> boolean()),
|
||||||
|
Set :: set(Element).
|
||||||
|
filter(P, A) ->
|
||||||
|
maps:filter( fun(K, _) -> P(K) end
|
||||||
|
, A).
|
||||||
|
|
||||||
|
-spec to_list(set(Elem)) -> [Elem].
|
||||||
|
to_list(A) ->
|
||||||
|
maps:keys(A).
|
||||||
|
|
||||||
|
-spec from_list([Elem]) -> set(Elem).
|
||||||
|
from_list(L) ->
|
||||||
|
maps:from_list([{I, ?UNUSED} || I <- L]).
|
||||||
|
|
||||||
|
-spec is_element(Elem, set(Elem)) -> boolean().
|
||||||
|
is_element(Elem, Set) ->
|
||||||
|
maps:is_key(Elem, Set).
|
||||||
|
|
||||||
|
-spec add_element(Elem, set(Elem)) -> set(Elem).
|
||||||
|
add_element(Elem, Set) ->
|
||||||
|
Set#{Elem => ?UNUSED}.
|
||||||
|
|
||||||
|
-spec del_element(Elem, set(Elem)) -> set(Elem).
|
||||||
|
del_element(Elem, Set) ->
|
||||||
|
maps:remove(Elem, Set).
|
||||||
|
|
||||||
|
-spec is_subset(set(Elem), set(Elem)) -> boolean().
|
||||||
|
is_subset(S1, S2) ->
|
||||||
|
is_subset_(?iterable(S1), S2).
|
||||||
|
|
||||||
|
is_subset_(Iter, S2) ->
|
||||||
|
?iterate(Iter,
|
||||||
|
true,
|
||||||
|
K, Next,
|
||||||
|
case maps:is_key(K, S2) of
|
||||||
|
true ->
|
||||||
|
is_subset_(Next, S2);
|
||||||
|
false ->
|
||||||
|
false
|
||||||
|
end).
|
||||||
|
|
||||||
|
-spec subtract(set(Elem), set(Elem)) -> set(Elem).
|
||||||
|
subtract(S1, S2) ->
|
||||||
|
maps:without(maps:keys(S2), S1).
|
||||||
|
|
||||||
|
-spec union(set(Elem), set(Elem)) -> set(Elem).
|
||||||
|
union(S1, S2) ->
|
||||||
|
maps:merge(S1, S2).
|
||||||
|
|
||||||
|
-spec union([set(Elem)]) -> set(Elem).
|
||||||
|
union(L) ->
|
||||||
|
lists:foldl(fun maps:merge/2, #{}, L).
|
||||||
|
|
||||||
|
-spec intersection(set(Elem), set(Elem)) -> set(Elem).
|
||||||
|
intersection(S1, S2) ->
|
||||||
|
case maps:size(S1) > maps:size(S2) of
|
||||||
|
true ->
|
||||||
|
intersection_(S1, S2);
|
||||||
|
false ->
|
||||||
|
intersection_(S2, S1)
|
||||||
|
end.
|
||||||
|
intersection_(Large, Small) ->
|
||||||
|
maps:fold( fun(E, _, Acc) ->
|
||||||
|
case maps:is_key(E, Large) of
|
||||||
|
true ->
|
||||||
|
Acc #{E => ?UNUSED};
|
||||||
|
_ ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end
|
||||||
|
, #{}
|
||||||
|
, Small).
|
||||||
|
|
||||||
|
-spec intersection(nonempty_list(set(Elem))) -> set(Elem).
|
||||||
|
intersection([H|T]) ->
|
||||||
|
lists:foldl(fun intersection/2, H, T).
|
||||||
|
|
||||||
|
-spec is_disjoint(set(Elem), set(Elem)) -> boolean().
|
||||||
|
is_disjoint(S1, S2) ->
|
||||||
|
case maps:size(S1) > maps:size(S2) of
|
||||||
|
true ->
|
||||||
|
is_disjoint_(S1, ?iterable(S2));
|
||||||
|
false ->
|
||||||
|
is_disjoint_(S2, ?iterable(S1))
|
||||||
|
end.
|
||||||
|
is_disjoint_(Large, Small) ->
|
||||||
|
?iterate(Small,
|
||||||
|
true,
|
||||||
|
K, Next,
|
||||||
|
case maps:is_key(K, Large) of
|
||||||
|
true ->
|
||||||
|
false;
|
||||||
|
false ->
|
||||||
|
is_disjoint_(Large, Next)
|
||||||
|
end).
|
|
@ -0,0 +1,28 @@
|
||||||
|
{
|
||||||
|
"authors": [
|
||||||
|
"EMQ X Team"
|
||||||
|
],
|
||||||
|
"builder": {
|
||||||
|
"contact": "emqx-support@emqx.io",
|
||||||
|
"name": "EMQ X Team",
|
||||||
|
"website": "www.emqx.com"
|
||||||
|
},
|
||||||
|
"built_on_otp_release": "24",
|
||||||
|
"compatibility": {
|
||||||
|
"emqx": "~> 5.0"
|
||||||
|
},
|
||||||
|
"date": "2021-12-16",
|
||||||
|
"description": "This is a demo plugin",
|
||||||
|
"functionality": [
|
||||||
|
"Demo"
|
||||||
|
],
|
||||||
|
"git_ref": "ddab50fafeed6b1faea70fc9ffd8c700d7e26ec1",
|
||||||
|
"metadata_vsn": "0.1.0",
|
||||||
|
"name": "emqx_plugin_template",
|
||||||
|
"rel_apps": [
|
||||||
|
"emqx_plugin_template-5.0.0",
|
||||||
|
"map_sets-1.1.0"
|
||||||
|
],
|
||||||
|
"rel_vsn": "5.0-rc.1",
|
||||||
|
"repo": "https://github.com/emqx/emqx-plugin-template"
|
||||||
|
}
|
|
@ -41,7 +41,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Sync resource instances and files
|
%% Sync resource instances and files
|
||||||
%% provisional solution: rpc:multical to all the nodes for creating/updating/removing
|
%% provisional solution: rpc:multicall to all the nodes for creating/updating/removing
|
||||||
%% todo: replicate operations
|
%% todo: replicate operations
|
||||||
-export([ create/4 %% store the config and start the instance
|
-export([ create/4 %% store the config and start the instance
|
||||||
, create/5
|
, create/5
|
||||||
|
|
Loading…
Reference in New Issue