341 lines
11 KiB
Erlang
341 lines
11 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx).
|
|
|
|
-include("emqx.hrl").
|
|
-include("logger.hrl").
|
|
-include("types.hrl").
|
|
|
|
-logger_header("[EMQX]").
|
|
|
|
%% Start/Stop the application
|
|
-export([ start/0
|
|
, restart/1
|
|
, is_running/1
|
|
, stop/0
|
|
]).
|
|
|
|
-export([ get_env/1
|
|
, get_env/2
|
|
]).
|
|
|
|
%% PubSub API
|
|
-export([ subscribe/1
|
|
, subscribe/2
|
|
, subscribe/3
|
|
, publish/1
|
|
, unsubscribe/1
|
|
]).
|
|
|
|
%% PubSub management API
|
|
-export([ topics/0
|
|
, subscriptions/1
|
|
, subscribers/1
|
|
, subscribed/2
|
|
]).
|
|
|
|
%% Hooks API
|
|
-export([ hook/2
|
|
, hook/3
|
|
, hook/4
|
|
, unhook/2
|
|
, run_hook/2
|
|
, run_fold_hook/3
|
|
]).
|
|
|
|
%% Shutdown and reboot
|
|
-export([ shutdown/0
|
|
, shutdown/1
|
|
, reboot/0
|
|
]).
|
|
|
|
%% Troubleshooting
|
|
-export([ set_debug_secret/1
|
|
, default_started_applications/0
|
|
, expand_apps/1
|
|
]).
|
|
|
|
-define(APP, ?MODULE).
|
|
|
|
%% @hidden Path to the file which has debug_info encryption secret in it.
|
|
%% Evaluate this function if there is a need to access encrypted debug_info.
|
|
%% NOTE: Do not change the API to accept the secret text because it may
|
|
%% get logged everywhere.
|
|
set_debug_secret(PathToSecretFile) ->
|
|
SecretText =
|
|
case file:read_file(PathToSecretFile) of
|
|
{ok, Secret} ->
|
|
try string:trim(binary_to_list(Secret))
|
|
catch _ : _ -> error({badfile, PathToSecretFile})
|
|
end;
|
|
{error, Reason} ->
|
|
io:format("Failed to read debug_info encryption key file ~s: ~p~n",
|
|
[PathToSecretFile, Reason]),
|
|
error(Reason)
|
|
end,
|
|
F = fun(init) -> ok;
|
|
(clear) -> ok;
|
|
({debug_info, _Mode, _Module, _Filename}) -> SecretText
|
|
end,
|
|
_ = beam_lib:clear_crypto_key_fun(),
|
|
ok = beam_lib:crypto_key_fun(F).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Bootstrap, is_running...
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc Start emqx application
|
|
-spec(start() -> {ok, list(atom())} | {error, term()}).
|
|
start() ->
|
|
%% Check OS
|
|
%% Check VM
|
|
%% Check Mnesia
|
|
application:ensure_all_started(?APP).
|
|
|
|
-spec(restart(string()) -> ok).
|
|
restart(ConfFile) ->
|
|
reload_config(ConfFile),
|
|
shutdown(),
|
|
ok = application:stop(mnesia),
|
|
_ = application:start(mnesia),
|
|
reboot().
|
|
|
|
%% @doc Stop emqx application.
|
|
-spec(stop() -> ok | {error, term()}).
|
|
stop() ->
|
|
application:stop(?APP).
|
|
|
|
%% @doc Is emqx running?
|
|
-spec(is_running(node()) -> boolean()).
|
|
is_running(Node) ->
|
|
case rpc:call(Node, erlang, whereis, [?APP]) of
|
|
{badrpc, _} -> false;
|
|
undefined -> false;
|
|
Pid when is_pid(Pid) -> true
|
|
end.
|
|
|
|
%% @doc Get environment
|
|
-spec(get_env(Key :: atom()) -> maybe(term())).
|
|
get_env(Key) ->
|
|
get_env(Key, undefined).
|
|
|
|
-spec(get_env(Key :: atom(), Default :: term()) -> term()).
|
|
get_env(Key, Default) ->
|
|
application:get_env(?APP, Key, Default).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(subscribe(emqx_topic:topic() | string()) -> ok).
|
|
subscribe(Topic) ->
|
|
emqx_broker:subscribe(iolist_to_binary(Topic)).
|
|
|
|
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | emqx_types:subopts()) -> ok).
|
|
subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
|
|
emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
|
|
subscribe(Topic, SubOpts) when is_map(SubOpts) ->
|
|
emqx_broker:subscribe(iolist_to_binary(Topic), SubOpts).
|
|
|
|
-spec(subscribe(emqx_topic:topic() | string(),
|
|
emqx_types:subid() | pid(), emqx_types:subopts()) -> ok).
|
|
subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
|
|
emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
|
|
|
|
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
|
|
publish(Msg) ->
|
|
emqx_broker:publish(Msg).
|
|
|
|
-spec(unsubscribe(emqx_topic:topic() | string()) -> ok).
|
|
unsubscribe(Topic) ->
|
|
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub management API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(topics() -> list(emqx_topic:topic())).
|
|
topics() -> emqx_router:topics().
|
|
|
|
-spec(subscribers(emqx_topic:topic() | string()) -> [pid()]).
|
|
subscribers(Topic) ->
|
|
emqx_broker:subscribers(iolist_to_binary(Topic)).
|
|
|
|
-spec(subscriptions(pid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]).
|
|
subscriptions(SubPid) when is_pid(SubPid) ->
|
|
emqx_broker:subscriptions(SubPid).
|
|
|
|
-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic() | string()) -> boolean()).
|
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
|
emqx_broker:subscribed(SubPid, iolist_to_binary(Topic));
|
|
subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->
|
|
emqx_broker:subscribed(SubId, iolist_to_binary(Topic)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Hooks API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}).
|
|
hook(HookPoint, Action) ->
|
|
emqx_hooks:add(HookPoint, Action).
|
|
|
|
-spec(hook(emqx_hooks:hookpoint(),
|
|
emqx_hooks:action(),
|
|
emqx_hooks:filter() | integer() | list())
|
|
-> ok | {error, already_exists}).
|
|
hook(HookPoint, Action, Priority) when is_integer(Priority) ->
|
|
emqx_hooks:add(HookPoint, Action, Priority);
|
|
hook(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) ->
|
|
emqx_hooks:add(HookPoint, Action, Filter);
|
|
hook(HookPoint, Action, InitArgs) when is_list(InitArgs) ->
|
|
emqx_hooks:add(HookPoint, Action, InitArgs).
|
|
|
|
-spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer())
|
|
-> ok | {error, already_exists}).
|
|
hook(HookPoint, Action, Filter, Priority) ->
|
|
emqx_hooks:add(HookPoint, Action, Filter, Priority).
|
|
|
|
-spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action() | {module(), atom()}) -> ok).
|
|
unhook(HookPoint, Action) ->
|
|
emqx_hooks:del(HookPoint, Action).
|
|
|
|
-spec(run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop).
|
|
run_hook(HookPoint, Args) ->
|
|
emqx_hooks:run(HookPoint, Args).
|
|
|
|
-spec(run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any()).
|
|
run_fold_hook(HookPoint, Args, Acc) ->
|
|
emqx_hooks:run_fold(HookPoint, Args, Acc).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Shutdown and reboot
|
|
%%--------------------------------------------------------------------
|
|
|
|
shutdown() ->
|
|
shutdown(normal).
|
|
|
|
shutdown(Reason) ->
|
|
ok = emqx_misc:maybe_mute_rpc_log(),
|
|
?LOG(critical, "emqx shutdown for ~s", [Reason]),
|
|
on_shutdown(Reason),
|
|
_ = emqx_plugins:unload(),
|
|
lists:foreach(fun application:stop/1
|
|
, lists:reverse(default_started_applications())
|
|
).
|
|
|
|
reboot() ->
|
|
case is_application_running(emqx_dashboard) of
|
|
true ->
|
|
_ = application:stop(emqx_dashboard), %% dashboard must be started after mnesia
|
|
lists:foreach(fun application:start/1 , default_started_applications()),
|
|
_ = application:start(emqx_dashboard),
|
|
on_reboot();
|
|
|
|
false ->
|
|
lists:foreach(fun application:start/1 , default_started_applications()),
|
|
on_reboot()
|
|
end.
|
|
|
|
is_application_running(App) ->
|
|
StartedApps = proplists:get_value(started, application:info()),
|
|
proplists:is_defined(App, StartedApps).
|
|
|
|
-ifdef(EMQX_ENTERPRISE).
|
|
on_reboot() ->
|
|
try
|
|
_ = emqx_license_api:bootstrap_license(),
|
|
_ = emqx_license:load_dynamic_license(),
|
|
ok
|
|
catch
|
|
Kind:Reason:Stack ->
|
|
?LOG(critical, "~p while rebooting: ~p, ~p", [Kind, Reason, Stack]),
|
|
ok
|
|
end,
|
|
ok.
|
|
|
|
on_shutdown(join) ->
|
|
emqx_modules:sync_load_modules_file(),
|
|
ok;
|
|
on_shutdown(_) ->
|
|
ok.
|
|
|
|
-else.
|
|
on_reboot() ->
|
|
ok.
|
|
|
|
on_shutdown(_) ->
|
|
ok.
|
|
-endif.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
-ifdef(EMQX_ENTERPRISE).
|
|
applications_need_restart() ->
|
|
[gproc, esockd, ranch, cowboy, ekka, emqx].
|
|
-else.
|
|
applications_need_restart() ->
|
|
[gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules].
|
|
-endif.
|
|
|
|
-define(PK_START_APPS, {?MODULE, default_started_applications}).
|
|
default_started_applications() ->
|
|
case persistent_term:get(?PK_START_APPS, undefined) of
|
|
undefined ->
|
|
AppNames = expand_apps(applications_need_restart()),
|
|
ok = persistent_term:put(?PK_START_APPS, AppNames),
|
|
AppNames;
|
|
AppNames ->
|
|
AppNames
|
|
end.
|
|
|
|
%% expand the application list with dependent apps.
|
|
expand_apps(AppNames) ->
|
|
AllApps = application:which_applications(),
|
|
remove_duplicated(
|
|
lists:flatmap(fun(AppName) ->
|
|
expand_an_app(AppName, AllApps)
|
|
end, AppNames)).
|
|
|
|
expand_an_app(AppNameA, AllApps) ->
|
|
expand_an_app(AppNameA, AllApps, [AppNameA]).
|
|
|
|
expand_an_app(_AppNameA, [], Acc) ->
|
|
Acc;
|
|
expand_an_app(AppNameA, [{AppNameB, _Descr, _Vsn} | AllApps], Acc) ->
|
|
{ok, DepAppNames} = application:get_key(AppNameB, applications),
|
|
case lists:member(AppNameA, DepAppNames) of
|
|
true -> %% AppNameB depends on AppNameA
|
|
NewAcc = Acc ++ expand_an_app(AppNameB, AllApps),
|
|
expand_an_app(AppNameA, AllApps, NewAcc);
|
|
false ->
|
|
expand_an_app(AppNameA, AllApps, Acc)
|
|
end.
|
|
|
|
remove_duplicated([]) -> [];
|
|
remove_duplicated([E | Elems]) ->
|
|
case lists:member(E, Elems) of
|
|
true -> remove_duplicated(Elems);
|
|
false -> [E] ++ remove_duplicated(Elems)
|
|
end.
|
|
|
|
reload_config(ConfFile) ->
|
|
{ok, [Conf]} = file:consult(ConfFile),
|
|
lists:foreach(fun({App, Vals}) ->
|
|
[application:set_env(App, Par, Val) || {Par, Val} <- Vals]
|
|
end, Conf).
|