%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. -module(emqx_plugins). -include("emqx.hrl"). -include("logger.hrl"). -logger_header("[Plugins]"). -export([init/0]). -export([ load/0 , load/1 , unload/0 , unload/1 , list/0 , find_plugin/1 , load_expand_plugin/1 ]). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ %% @doc Init plugins' config -spec(init() -> ok). init() -> case emqx_config:get_env(plugins_etc_dir) of undefined -> ok; PluginsEtc -> CfgFiles = [filename:join(PluginsEtc, File) || File <- filelib:wildcard("*.config", PluginsEtc)], lists:foreach(fun init_config/1, CfgFiles) end. init_config(CfgFile) -> {ok, [AppsEnv]} = file:consult(CfgFile), lists:foreach(fun({AppName, Envs}) -> [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] end, AppsEnv). %% @doc Load all plugins when the broker started. -spec(load() -> list() | {error, term()}). load() -> load_expand_plugins(), case emqx_config:get_env(plugins_loaded_file) of undefined -> %% No plugins available ignore; File -> ensure_file(File), with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) end. load_expand_plugins() -> case emqx_config:get_env(expand_plugins_dir) of undefined -> ok; ExpandPluginsDir -> Plugins = filelib:wildcard("*", ExpandPluginsDir), lists:foreach(fun(Plugin) -> PluginDir = filename:join(ExpandPluginsDir, Plugin), case filelib:is_dir(PluginDir) of true -> load_expand_plugin(PluginDir); false -> ok end end, Plugins) end. load_expand_plugin(PluginDir) -> init_expand_plugin_config(PluginDir), Ebin = PluginDir ++ "/ebin", code:add_patha(Ebin), Modules = filelib:wildcard(Ebin ++ "/*.beam"), lists:foreach(fun(Mod) -> Module = list_to_atom(filename:basename(Mod, ".beam")), code:load_file(Module) end, Modules), case filelib:wildcard(Ebin ++ "/*.app") of [App|_] -> application:load(list_to_atom(filename:basename(App, ".app"))); _ -> ?LOG(alert, "Plugin not found."), {error, load_app_fail} end. init_expand_plugin_config(PluginDir) -> Priv = PluginDir ++ "/priv", Etc = PluginDir ++ "/etc", Schema = filelib:wildcard(Priv ++ "/*.schema"), Conf = case filelib:wildcard(Etc ++ "/*.conf") of [] -> []; [Conf1] -> cuttlefish_conf:file(Conf1) end, AppsEnv = cuttlefish_generator:map(cuttlefish_schema:files(Schema), Conf), lists:foreach(fun({AppName, Envs}) -> [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] end, AppsEnv). ensure_file(File) -> case filelib:is_file(File) of false -> write_loaded([]); true -> ok end. with_loaded_file(File, SuccFun) -> case read_loaded(File) of {ok, Names0} -> Names = filter_plugins(Names0), SuccFun(Names); {error, Error} -> ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]), {error, Error} end. filter_plugins(Names) -> lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; ({Name1, true}) -> {true, Name1}; ({_Name1, false}) -> false end, Names). load_plugins(Names, Persistent) -> Plugins = list(), NotFound = Names -- names(Plugins), case NotFound of [] -> ok; NotFound -> ?LOG(alert, "Cannot find plugins: ~p", [NotFound]) end, NeedToLoad = Names -- NotFound -- names(started_app), [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. %% @doc Unload all plugins before broker stopped. -spec(unload() -> list() | {error, term()}). unload() -> case emqx_config:get_env(plugins_loaded_file) of undefined -> ignore; File -> with_loaded_file(File, fun stop_plugins/1) end. %% Stop plugins stop_plugins(Names) -> [stop_app(App) || App <- Names]. %% @doc List all available plugins -spec(list() -> [emqx_types:plugin()]). list() -> StartedApps = names(started_app), lists:map(fun({Name, _, [Type| _]}) -> Plugin = plugin(Name, Type), case lists:member(Name, StartedApps) of true -> Plugin#plugin{active = true}; false -> Plugin end end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). plugin(AppName, Type) -> case application:get_all_key(AppName) of {ok, Attrs} -> Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), #plugin{name = AppName, version = Ver, descr = Descr, type = plugin_type(Type)}; undefined -> error({plugin_not_found, AppName}) end. %% @doc Load a Plugin -spec(load(atom()) -> ok | {error, term()}). load(PluginName) when is_atom(PluginName) -> case lists:member(PluginName, names(started_app)) of true -> ?LOG(notice, "Plugin ~s is already started", [PluginName]), {error, already_started}; false -> case find_plugin(PluginName) of false -> ?LOG(alert, "Plugin ~s not found", [PluginName]), {error, not_found}; Plugin -> load_plugin(Plugin, true) end end. load_plugin(#plugin{name = Name}, Persistent) -> case load_app(Name) of ok -> start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); {error, Error} -> {error, Error} end. load_app(App) -> case application:load(App) of ok -> ok; {error, {already_loaded, App}} -> ok; {error, Error} -> {error, Error} end. start_app(App, SuccFun) -> case application:ensure_all_started(App) of {ok, Started} -> ?LOG(info, "Started plugins: ~p", [Started]), ?LOG(info, "Load plugin ~s successfully", [App]), SuccFun(App), {ok, Started}; {error, {ErrApp, Reason}} -> ?LOG(error, "Load plugin ~s failed, cannot start plugin ~s for ~p", [App, ErrApp, Reason]), {error, {ErrApp, Reason}} end. find_plugin(Name) -> find_plugin(Name, list()). find_plugin(Name, Plugins) -> lists:keyfind(Name, 2, Plugins). %% @doc UnLoad a Plugin -spec(unload(atom()) -> ok | {error, term()}). unload(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of {true, true} -> unload_plugin(PluginName, true); {false, _} -> ?LOG(error, "Plugin ~s is not started", [PluginName]), {error, not_started}; {true, false} -> ?LOG(error, "~s is not a plugin, cannot unload it", [PluginName]), {error, not_found} end. unload_plugin(App, Persistent) -> case stop_app(App) of ok -> plugin_unloaded(App, Persistent), ok; {error, Reason} -> {error, Reason} end. stop_app(App) -> case application:stop(App) of ok -> ?LOG(info, "Stop plugin ~s successfully", [App]), ok; {error, {not_started, App}} -> ?LOG(error, "Plugin ~s is not started", [App]), ok; {error, Reason} -> ?LOG(error, "Stop plugin ~s error: ~p", [App]), {error, Reason} end. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- names(plugin) -> names(list()); names(started_app) -> [Name || {Name, _Descr, _Ver} <- application:which_applications()]; names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. plugin_loaded(_Name, false) -> ok; plugin_loaded(Name, true) -> case read_loaded() of {ok, Names} -> case lists:member(Name, Names) of false -> %% write file if plugin is loaded write_loaded(lists:append(Names, [{Name, true}])); true -> ignore end; {error, Error} -> ?LOG(error, "Cannot read loaded plugins: ~p", [Error]) end. plugin_unloaded(_Name, false) -> ok; plugin_unloaded(Name, true) -> case read_loaded() of {ok, Names0} -> Names = filter_plugins(Names0), case lists:member(Name, Names) of true -> write_loaded(lists:delete(Name, Names)); false -> ?LOG(error, "Cannot find ~s in loaded_file", [Name]) end; {error, Error} -> ?LOG(error, "Cannot read loaded_plugins: ~p", [Error]) end. read_loaded() -> case emqx_config:get_env(plugins_loaded_file) of undefined -> {error, not_found}; File -> read_loaded(File) end. read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> File = emqx_config:get_env(plugins_loaded_file), case file:open(File, [binary, write]) of {ok, Fd} -> lists:foreach(fun(Name) -> file:write(Fd, iolist_to_binary(io_lib:format("~p.~n", [Name]))) end, AppNames); {error, Error} -> ?LOG(error, "Open File ~p Error: ~p", [File, Error]), {error, Error} end. plugin_type(auth) -> auth; plugin_type(protocol) -> protocol; plugin_type(backend) -> backend; plugin_type(bridge) -> bridge; plugin_type(_) -> feature.