444 lines
15 KiB
Erlang
444 lines
15 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_plugins).
|
|
|
|
-include("emqx.hrl").
|
|
-include("logger.hrl").
|
|
|
|
-logger_header("[Plugins]").
|
|
|
|
-export([init/0]).
|
|
|
|
-export([ load/0
|
|
, load/1
|
|
, unload/0
|
|
, unload/1
|
|
, reload/1
|
|
, list/0
|
|
, find_plugin/1
|
|
, generate_configs/1
|
|
, apply_configs/1
|
|
]).
|
|
|
|
-export([funlog/2]).
|
|
|
|
-ifdef(TEST).
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
-endif.
|
|
|
|
-dialyzer({no_match, [ plugin_loaded/2
|
|
, plugin_unloaded/2
|
|
]}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc Init plugins' config
|
|
-spec(init() -> ok).
|
|
init() ->
|
|
case emqx:get_env(plugins_etc_dir) of
|
|
undefined -> ok;
|
|
PluginsEtc ->
|
|
CfgFiles = [filename:join(PluginsEtc, File) ||
|
|
File <- filelib:wildcard("*.config", PluginsEtc)],
|
|
lists:foreach(fun init_config/1, CfgFiles)
|
|
end.
|
|
|
|
%% @doc Load all plugins when the broker started.
|
|
-spec(load() -> ok | ignore | {error, term()}).
|
|
load() ->
|
|
ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)),
|
|
case emqx:get_env(plugins_loaded_file) of
|
|
undefined -> ignore; %% No plugins available
|
|
File ->
|
|
_ = ensure_file(File),
|
|
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end)
|
|
end.
|
|
|
|
%% @doc Load a Plugin
|
|
-spec(load(atom()) -> ok | {error, term()}).
|
|
load(PluginName) when is_atom(PluginName) ->
|
|
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
|
{false, _} ->
|
|
?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]),
|
|
{error, not_found};
|
|
{_, true} ->
|
|
?LOG(notice, "Plugin ~s is already started", [PluginName]),
|
|
{error, already_started};
|
|
{_, false} ->
|
|
load_plugin(PluginName, true)
|
|
end.
|
|
|
|
%% @doc Unload all plugins before broker stopped.
|
|
-spec(unload() -> list() | {error, term()}).
|
|
unload() ->
|
|
case emqx:get_env(plugins_loaded_file) of
|
|
undefined -> ignore;
|
|
File ->
|
|
with_loaded_file(File, fun stop_plugins/1)
|
|
end.
|
|
|
|
%% @doc UnLoad a Plugin
|
|
-spec(unload(atom()) -> ok | {error, term()}).
|
|
unload(PluginName) when is_atom(PluginName) ->
|
|
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
|
{false, _} ->
|
|
?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]),
|
|
{error, not_found};
|
|
{_, false} ->
|
|
?LOG(error, "Plugin ~s is not started", [PluginName]),
|
|
{error, not_started};
|
|
{_, _} ->
|
|
unload_plugin(PluginName, true)
|
|
end.
|
|
|
|
reload(PluginName) when is_atom(PluginName)->
|
|
case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of
|
|
{false, _} ->
|
|
?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]),
|
|
{error, not_found};
|
|
{_, false} ->
|
|
load(PluginName);
|
|
{_, true} ->
|
|
case unload(PluginName) of
|
|
ok -> load(PluginName);
|
|
{error, Reason} -> {error, Reason}
|
|
end
|
|
end.
|
|
|
|
%% @doc List all available plugins
|
|
-spec(list() -> [emqx_types:plugin()]).
|
|
list() ->
|
|
StartedApps = names(started_app),
|
|
lists:map(fun({Name, _, [Type| _]}) ->
|
|
Plugin = plugin(Name, Type),
|
|
case lists:member(Name, StartedApps) of
|
|
true -> Plugin#plugin{active = true};
|
|
false -> Plugin
|
|
end
|
|
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
|
|
|
|
find_plugin(Name) ->
|
|
find_plugin(Name, list()).
|
|
|
|
find_plugin(Name, Plugins) ->
|
|
lists:keyfind(Name, 2, Plugins).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
init_config(CfgFile) ->
|
|
{ok, [AppsEnv]} = file:consult(CfgFile),
|
|
lists:foreach(fun({App, Envs}) ->
|
|
[application:set_env(App, Par, Val) || {Par, Val} <- Envs]
|
|
end, AppsEnv).
|
|
|
|
%% load external plugins which are placed in etc/plugins dir
|
|
load_ext_plugins(undefined) -> ok;
|
|
load_ext_plugins(Dir) ->
|
|
lists:foreach(
|
|
fun(Plugin) ->
|
|
PluginDir = filename:join(Dir, Plugin),
|
|
case filelib:is_dir(PluginDir) of
|
|
true -> load_ext_plugin(PluginDir);
|
|
false -> ok
|
|
end
|
|
end, filelib:wildcard("*", Dir)).
|
|
|
|
load_ext_plugin(PluginDir) ->
|
|
?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]),
|
|
Ebin = filename:join([PluginDir, "ebin"]),
|
|
AppFile = filename:join([Ebin, "*.app"]),
|
|
AppName = case filelib:wildcard(AppFile) of
|
|
[App] ->
|
|
list_to_atom(filename:basename(App, ".app"));
|
|
[] ->
|
|
?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
|
|
error({plugin_app_file_not_found, AppFile})
|
|
end,
|
|
ok = load_plugin_app(AppName, Ebin),
|
|
try
|
|
ok = load_plugin_conf(AppName, PluginDir)
|
|
catch
|
|
throw : {conf_file_not_found, ConfFile} ->
|
|
%% this is maybe a dependency of an external plugin
|
|
?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]),
|
|
ok
|
|
end.
|
|
|
|
load_plugin_app(AppName, Ebin) ->
|
|
_ = code:add_patha(Ebin),
|
|
Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])),
|
|
lists:foreach(
|
|
fun(BeamFile) ->
|
|
Module = list_to_atom(filename:basename(BeamFile, ".beam")),
|
|
case code:load_file(Module) of
|
|
{module, _} -> ok;
|
|
{error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason})
|
|
end
|
|
end, Modules),
|
|
case application:load(AppName) of
|
|
ok -> ok;
|
|
{error, {already_loaded, _}} -> ok
|
|
end.
|
|
|
|
load_plugin_conf(AppName, PluginDir) ->
|
|
Priv = filename:join([PluginDir, "priv"]),
|
|
Etc = filename:join([PluginDir, "etc"]),
|
|
ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]),
|
|
Conf = case filelib:is_file(ConfFile) of
|
|
true -> cuttlefish_conf:file(ConfFile);
|
|
false -> throw({conf_file_not_found, ConfFile})
|
|
end,
|
|
Schema = filelib:wildcard(filename:join([Priv, "*.schema"])),
|
|
?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]),
|
|
AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf),
|
|
lists:foreach(fun({AppName1, Envs}) ->
|
|
[application:set_env(AppName1, Par, Val) || {Par, Val} <- Envs]
|
|
end, AppsEnv).
|
|
|
|
ensure_file(File) ->
|
|
case filelib:is_file(File) of
|
|
false ->
|
|
DefaultPlugins = [ {emqx_management, true}
|
|
, {emqx_dashboard, true}
|
|
, {emqx_modules, false}
|
|
, {emqx_recon, true}
|
|
, {emqx_retainer, true}
|
|
, {emqx_telemetry, true}
|
|
, {emqx_rule_engine, true}
|
|
, {emqx_bridge_mqtt, false}
|
|
],
|
|
write_loaded(DefaultPlugins);
|
|
true ->
|
|
ok
|
|
end.
|
|
|
|
with_loaded_file(File, SuccFun) ->
|
|
case read_loaded(File) of
|
|
{ok, Names0} ->
|
|
Names = filter_plugins(Names0),
|
|
SuccFun(Names);
|
|
{error, Error} ->
|
|
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]),
|
|
{error, Error}
|
|
end.
|
|
|
|
filter_plugins(Names) ->
|
|
filter_plugins(Names, []).
|
|
|
|
filter_plugins([], Plugins) ->
|
|
lists:reverse(Plugins);
|
|
filter_plugins([{Name, Load} | Names], Plugins) ->
|
|
case {Load, lists:member(Name, Plugins)} of
|
|
{true, false} ->
|
|
filter_plugins(Names, [Name | Plugins]);
|
|
{false, true} ->
|
|
filter_plugins(Names, Plugins -- [Name]);
|
|
_ ->
|
|
filter_plugins(Names, Plugins)
|
|
end;
|
|
filter_plugins([Name | Names], Plugins) when is_atom(Name) ->
|
|
filter_plugins([{Name, true} | Names], Plugins).
|
|
|
|
load_plugins(Names, Persistent) ->
|
|
Plugins = list(),
|
|
NotFound = Names -- names(Plugins),
|
|
case NotFound of
|
|
[] -> ok;
|
|
NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
|
|
end,
|
|
NeedToLoad = Names -- NotFound -- names(started_app),
|
|
lists:foreach(fun(Name) ->
|
|
Plugin = find_plugin(Name, Plugins),
|
|
load_plugin(Plugin#plugin.name, Persistent)
|
|
end, NeedToLoad).
|
|
|
|
generate_configs(App) ->
|
|
ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config",
|
|
case filelib:is_file(ConfigFile) of
|
|
true ->
|
|
{ok, [Configs]} = file:consult(ConfigFile),
|
|
Configs;
|
|
false ->
|
|
do_generate_configs(App)
|
|
end.
|
|
|
|
do_generate_configs(App) ->
|
|
Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
|
|
Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf",
|
|
ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of
|
|
{true, _} -> Name1;
|
|
{false, true} -> Name2;
|
|
{false, false} -> error({config_not_found, [Name1, Name2]})
|
|
end,
|
|
SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
|
|
case filelib:is_file(SchemaFile) of
|
|
true ->
|
|
Schema = cuttlefish_schema:files([SchemaFile]),
|
|
Conf = cuttlefish_conf:file(ConfFile),
|
|
cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2);
|
|
false ->
|
|
error({schema_not_found, SchemaFile})
|
|
end.
|
|
|
|
apply_configs([]) ->
|
|
ok;
|
|
apply_configs([{App, Config} | More]) ->
|
|
lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)),
|
|
lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config),
|
|
apply_configs(More).
|
|
|
|
%% Stop plugins
|
|
stop_plugins(Names) ->
|
|
_ = [stop_app(App) || App <- Names],
|
|
ok.
|
|
|
|
plugin(AppName, Type) ->
|
|
case application:get_all_key(AppName) of
|
|
{ok, Attrs} ->
|
|
Descr = proplists:get_value(description, Attrs, ""),
|
|
#plugin{name = AppName, descr = Descr, type = plugin_type(Type)};
|
|
undefined -> error({plugin_not_found, AppName})
|
|
end.
|
|
|
|
load_plugin(Name, Persistent) ->
|
|
try
|
|
Configs = ?MODULE:generate_configs(Name),
|
|
?MODULE:apply_configs(Configs),
|
|
case load_app(Name) of
|
|
ok ->
|
|
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
|
|
{error, Error0} ->
|
|
{error, Error0}
|
|
end
|
|
catch _ : Error : Stacktrace ->
|
|
?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]),
|
|
{error, parse_config_file_failed}
|
|
end.
|
|
|
|
load_app(App) ->
|
|
case application:load(App) of
|
|
ok ->
|
|
ok;
|
|
{error, {already_loaded, App}} ->
|
|
ok;
|
|
{error, Error} ->
|
|
{error, Error}
|
|
end.
|
|
|
|
start_app(App, SuccFun) ->
|
|
case application:ensure_all_started(App) of
|
|
{ok, Started} ->
|
|
?LOG(info, "Started plugins: ~p", [Started]),
|
|
?LOG(info, "Load plugin ~s successfully", [App]),
|
|
_ = SuccFun(App),
|
|
ok;
|
|
{error, {ErrApp, Reason}} ->
|
|
?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]),
|
|
{error, {ErrApp, Reason}}
|
|
end.
|
|
|
|
unload_plugin(App, Persistent) ->
|
|
case stop_app(App) of
|
|
ok ->
|
|
_ = plugin_unloaded(App, Persistent), ok;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
stop_app(App) ->
|
|
case application:stop(App) of
|
|
ok ->
|
|
?LOG(info, "Stop plugin ~s successfully", [App]), ok;
|
|
{error, {not_started, App}} ->
|
|
?LOG(error, "Plugin ~s is not started", [App]), ok;
|
|
{error, Reason} ->
|
|
?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason}
|
|
end.
|
|
|
|
names(plugin) ->
|
|
names(list());
|
|
|
|
names(started_app) ->
|
|
[Name || {Name, _Descr, _Ver} <- application:which_applications()];
|
|
|
|
names(Plugins) ->
|
|
[Name || #plugin{name = Name} <- Plugins].
|
|
|
|
plugin_loaded(_Name, false) ->
|
|
ok;
|
|
plugin_loaded(Name, true) ->
|
|
case read_loaded() of
|
|
{ok, Names} ->
|
|
case lists:member(Name, Names) of
|
|
false ->
|
|
%% write file if plugin is loaded
|
|
write_loaded(lists:append(Names, [{Name, true}]));
|
|
true ->
|
|
ignore
|
|
end;
|
|
{error, Error} ->
|
|
?LOG(error, "Cannot read loaded plugins: ~p", [Error])
|
|
end.
|
|
|
|
plugin_unloaded(_Name, false) ->
|
|
ok;
|
|
plugin_unloaded(Name, true) ->
|
|
case read_loaded() of
|
|
{ok, Names0} ->
|
|
Names = filter_plugins(Names0),
|
|
case lists:member(Name, Names) of
|
|
true ->
|
|
write_loaded(lists:delete(Name, Names));
|
|
false ->
|
|
?LOG(error, "Cannot find ~s in loaded_file", [Name])
|
|
end;
|
|
{error, Error} ->
|
|
?LOG(error, "Cannot read loaded_plugins: ~p", [Error])
|
|
end.
|
|
|
|
read_loaded() ->
|
|
case emqx:get_env(plugins_loaded_file) of
|
|
undefined -> {error, not_found};
|
|
File -> read_loaded(File)
|
|
end.
|
|
|
|
read_loaded(File) -> file:consult(File).
|
|
|
|
write_loaded(AppNames) ->
|
|
FilePath = emqx:get_env(plugins_loaded_file),
|
|
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- AppNames]) of
|
|
ok -> ok;
|
|
{error, Error} ->
|
|
?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
|
|
{error, Error}
|
|
end.
|
|
|
|
plugin_type(auth) -> auth;
|
|
plugin_type(protocol) -> protocol;
|
|
plugin_type(backend) -> backend;
|
|
plugin_type(bridge) -> bridge;
|
|
plugin_type(_) -> feature.
|
|
|
|
|
|
funlog(Key, Value) ->
|
|
?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).
|