%%-------------------------------------------------------------------- %% Copyright (c) 2017-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_plugins). -include("emqx.hrl"). -include("logger.hrl"). -logger_header("[Plugins]"). -export([init/0]). -export([ load/0 , load/1 , unload/0 , unload/1 , reload/1 , list/0 , find_plugin/1 , generate_configs/1 , apply_configs/1 ]). -export([funlog/2]). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. -dialyzer({no_match, [ plugin_loaded/2 , plugin_unloaded/2 ]}). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- %% @doc Init plugins' config -spec(init() -> ok). init() -> case emqx:get_env(plugins_etc_dir) of undefined -> ok; PluginsEtc -> CfgFiles = [filename:join(PluginsEtc, File) || File <- filelib:wildcard("*.config", PluginsEtc)], lists:foreach(fun init_config/1, CfgFiles) end. %% @doc Load all plugins when the broker started. -spec(load() -> ok | ignore | {error, term()}). load() -> ok = load_ext_plugins(emqx:get_env(expand_plugins_dir)), case emqx:get_env(plugins_loaded_file) of undefined -> ignore; %% No plugins available File -> _ = ensure_file(File), with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) end. %% @doc Load a Plugin -spec(load(atom()) -> ok | {error, term()}). load(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> ?LOG(alert, "Plugin ~s not found, cannot load it", [PluginName]), {error, not_found}; {_, true} -> ?LOG(notice, "Plugin ~s is already started", [PluginName]), {error, already_started}; {_, false} -> load_plugin(PluginName, true) end. %% @doc Unload all plugins before broker stopped. -spec(unload() -> list() | {error, term()}). unload() -> case emqx:get_env(plugins_loaded_file) of undefined -> ignore; File -> with_loaded_file(File, fun stop_plugins/1) end. %% @doc UnLoad a Plugin -spec(unload(atom()) -> ok | {error, term()}). unload(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> ?LOG(error, "Plugin ~s is not found, cannot unload it", [PluginName]), {error, not_found}; {_, false} -> ?LOG(error, "Plugin ~s is not started", [PluginName]), {error, not_started}; {_, _} -> unload_plugin(PluginName, true) end. reload(PluginName) when is_atom(PluginName)-> case {lists:member(PluginName, names(plugin)), lists:member(PluginName, names(started_app))} of {false, _} -> ?LOG(error, "Plugin ~s is not found, cannot reload it", [PluginName]), {error, not_found}; {_, false} -> load(PluginName); {_, true} -> case unload(PluginName) of ok -> load(PluginName); {error, Reason} -> {error, Reason} end end. %% @doc List all available plugins -spec(list() -> [emqx_types:plugin()]). list() -> StartedApps = names(started_app), lists:map(fun({Name, _, [Type| _]}) -> Plugin = plugin(Name, Type), case lists:member(Name, StartedApps) of true -> Plugin#plugin{active = true}; false -> Plugin end end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). find_plugin(Name) -> find_plugin(Name, list()). find_plugin(Name, Plugins) -> lists:keyfind(Name, 2, Plugins). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- init_config(CfgFile) -> {ok, [AppsEnv]} = file:consult(CfgFile), lists:foreach(fun({App, Envs}) -> [application:set_env(App, Par, Val) || {Par, Val} <- Envs] end, AppsEnv). %% load external plugins which are placed in etc/plugins dir load_ext_plugins(undefined) -> ok; load_ext_plugins(Dir) -> lists:foreach( fun(Plugin) -> PluginDir = filename:join(Dir, Plugin), case filelib:is_dir(PluginDir) of true -> load_ext_plugin(PluginDir); false -> ok end end, filelib:wildcard("*", Dir)). load_ext_plugin(PluginDir) -> ?LOG(debug, "loading_extra_plugin: ~s", [PluginDir]), Ebin = filename:join([PluginDir, "ebin"]), AppFile = filename:join([Ebin, "*.app"]), AppName = case filelib:wildcard(AppFile) of [App] -> list_to_atom(filename:basename(App, ".app")); [] -> ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]), error({plugin_app_file_not_found, AppFile}) end, ok = load_plugin_app(AppName, Ebin), try ok = load_plugin_conf(AppName, PluginDir) catch throw : {conf_file_not_found, ConfFile} -> %% this is maybe a dependency of an external plugin ?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]), ok end. load_plugin_app(AppName, Ebin) -> _ = code:add_patha(Ebin), Modules = filelib:wildcard(filename:join([Ebin, "*.beam"])), lists:foreach( fun(BeamFile) -> Module = list_to_atom(filename:basename(BeamFile, ".beam")), case code:load_file(Module) of {module, _} -> ok; {error, Reason} -> error({failed_to_load_plugin_beam, BeamFile, Reason}) end end, Modules), case application:load(AppName) of ok -> ok; {error, {already_loaded, _}} -> ok end. load_plugin_conf(AppName, PluginDir) -> Priv = filename:join([PluginDir, "priv"]), Etc = filename:join([PluginDir, "etc"]), ConfFile = filename:join([Etc, atom_to_list(AppName) ++ ".conf"]), Conf = case filelib:is_file(ConfFile) of true -> cuttlefish_conf:file(ConfFile); false -> throw({conf_file_not_found, ConfFile}) end, Schema = filelib:wildcard(filename:join([Priv, "*.schema"])), ?LOG(debug, "loading_extra_plugin_config conf=~s, schema=~s", [ConfFile, Schema]), AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf), lists:foreach(fun({AppName1, Envs}) -> [application:set_env(AppName1, Par, Val) || {Par, Val} <- Envs] end, AppsEnv). ensure_file(File) -> case filelib:is_file(File) of false -> DefaultPlugins = [ {emqx_management, true} , {emqx_dashboard, true} , {emqx_modules, false} , {emqx_recon, true} , {emqx_retainer, true} , {emqx_telemetry, true} , {emqx_rule_engine, true} , {emqx_bridge_mqtt, false} ], write_loaded(DefaultPlugins); true -> ok end. with_loaded_file(File, SuccFun) -> case read_loaded(File) of {ok, Names0} -> Names = filter_plugins(Names0), SuccFun(Names); {error, Error} -> ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. filter_plugins(Names) -> filter_plugins(Names, []). filter_plugins([], Plugins) -> lists:reverse(Plugins); filter_plugins([{Name, Load} | Names], Plugins) -> case {Load, lists:member(Name, Plugins)} of {true, false} -> filter_plugins(Names, [Name | Plugins]); {false, true} -> filter_plugins(Names, Plugins -- [Name]); _ -> filter_plugins(Names, Plugins) end; filter_plugins([Name | Names], Plugins) when is_atom(Name) -> filter_plugins([{Name, true} | Names], Plugins). load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of [] -> ok; NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), lists:foreach(fun(Name) -> Plugin = find_plugin(Name, Plugins), load_plugin(Plugin#plugin.name, Persistent) end, NeedToLoad). generate_configs(App) -> ConfigFile = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".config", case filelib:is_file(ConfigFile) of true -> {ok, [Configs]} = file:consult(ConfigFile), Configs; false -> do_generate_configs(App) end. do_generate_configs(App) -> Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf", Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf", ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of {true, _} -> Name1; {false, true} -> Name2; {false, false} -> error({config_not_found, [Name1, Name2]}) end, SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema", case filelib:is_file(SchemaFile) of true -> Schema = cuttlefish_schema:files([SchemaFile]), Conf = cuttlefish_conf:file(ConfFile), cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2); false -> error({schema_not_found, SchemaFile}) end. apply_configs([]) -> ok; apply_configs([{App, Config} | More]) -> lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)), lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config), apply_configs(More). %% Stop plugins stop_plugins(Names) -> _ = [stop_app(App) || App <- Names], ok. plugin(AppName, Type) -> case application:get_all_key(AppName) of {ok, Attrs} -> Descr = proplists:get_value(description, Attrs, ""), #plugin{name = AppName, descr = Descr, type = plugin_type(Type)}; undefined -> error({plugin_not_found, AppName}) end. load_plugin(Name, Persistent) -> try Configs = ?MODULE:generate_configs(Name), ?MODULE:apply_configs(Configs), case load_app(Name) of ok -> start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); {error, Error0} -> {error, Error0} end catch _ : Error : Stacktrace -> ?LOG(alert, "Plugin ~s load failed with ~p", [Name, {Error, Stacktrace}]), {error, parse_config_file_failed} end. load_app(App) -> case application:load(App) of ok -> ok; {error, {already_loaded, App}} -> ok; {error, Error} -> {error, Error} end. start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> ?LOG(info, "Started plugins: ~p", [Started]), ?LOG(info, "Load plugin ~s successfully", [App]), _ = SuccFun(App), ok; {error, {ErrApp, Reason}} -> ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~0p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} end. unload_plugin(App, Persistent) -> case stop_app(App) of ok -> _ = plugin_unloaded(App, Persistent), ok; {error, Reason} -> {error, Reason} end. stop_app(App) -> case application:stop(App) of ok -> ?LOG(info, "Stop plugin ~s successfully", [App]), ok; {error, {not_started, App}} -> ?LOG(error, "Plugin ~s is not started", [App]), ok; {error, Reason} -> ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason} end. names(plugin) -> names(list()); names(started_app) -> [Name || {Name, _Descr, _Ver} <- application:which_applications()]; names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. plugin_loaded(_Name, false) -> ok; plugin_loaded(Name, true) -> case read_loaded() of {ok, Names} -> case lists:member(Name, Names) of false -> %% write file if plugin is loaded write_loaded(lists:append(Names, [{Name, true}])); true -> ignore end; {error, Error} -> ?LOG(error, "Cannot read loaded plugins: ~p", [Error]) end. plugin_unloaded(_Name, false) -> ok; plugin_unloaded(Name, true) -> case read_loaded() of {ok, Names0} -> Names = filter_plugins(Names0), case lists:member(Name, Names) of true -> write_loaded(lists:delete(Name, Names)); false -> ?LOG(error, "Cannot find ~s in loaded_file", [Name]) end; {error, Error} -> ?LOG(error, "Cannot read loaded_plugins: ~p", [Error]) end. read_loaded() -> case emqx:get_env(plugins_loaded_file) of undefined -> {error, not_found}; File -> read_loaded(File) end. read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> FilePath = emqx:get_env(plugins_loaded_file), case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- AppNames]) of ok -> ok; {error, Error} -> ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]), {error, Error} end. plugin_type(auth) -> auth; plugin_type(protocol) -> protocol; plugin_type(backend) -> backend; plugin_type(bridge) -> bridge; plugin_type(_) -> feature. funlog(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).