emqx/bin/nodetool

389 lines
14 KiB
Erlang
Executable File

#!/usr/bin/env escript
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ft=erlang ts=4 sw=4 et
%% -------------------------------------------------------------------
%%
%% nodetool: Helper Script for interacting with live nodes
%%
%% -------------------------------------------------------------------
-mode(compile).
main(Args) ->
case os:type() of
{win32, nt} -> ok;
_nix ->
case init:get_argument(start_epmd) of
{ok, [["true"]]} ->
ok = start_epmd();
_ ->
ok
end
end,
ok = add_libs_dir(),
case Args of
["hocon" | Rest] ->
%% forward the call to hocon_cli
hocon_cli:main(Rest);
["check_license_key", Key0] ->
Key = cleanup_key(Key0),
check_license(#{key => Key});
_ ->
do(Args)
end.
%% the key is a string (list) representation of a binary, so we need
%% to remove the leading and trailing angle brackets.
cleanup_key(Str0) ->
Str1 = iolist_to_binary(string:replace(Str0, "<<", "", leading)),
iolist_to_binary(string:replace(Str1, ">>", "", trailing)).
do(Args) ->
ok = do_with_halt(Args, "mnesia_dir", fun create_mnesia_dir/2),
ok = do_with_halt(Args, "chkconfig", fun("-config", X) -> chkconfig(X) end),
ok = do_with_halt(Args, "chkconfig", fun chkconfig/1),
Args1 = do_with_ret(Args, "-name",
fun(TargetName) ->
ThisNode = this_node_name(longnames, TargetName),
{ok, _} = net_kernel:start([ThisNode, longnames]),
put(target_node, nodename(TargetName))
end),
Args2 = do_with_ret(Args1, "-sname",
fun(TargetName) ->
ThisNode = this_node_name(shortnames, TargetName),
{ok, _} = net_kernel:start([ThisNode, shortnames]),
put(target_node, nodename(TargetName))
end),
RestArgs = do_with_ret(Args2, "-setcookie",
fun(Cookie) ->
erlang:set_cookie(node(), list_to_atom(Cookie))
end),
[application:start(App) || App <- [crypto, public_key, ssl]],
TargetNode = get(target_node),
%% See if the node is currently running -- if it's not, we'll bail
case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of
{true, pong} ->
ok;
{false, pong} ->
io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]),
halt(1);
{_, pang} ->
io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]),
halt(1)
end,
%% Mute logger from now on.
%% Otherwise Erlang distribution over TLS (inet_tls_dist) warning logs
%% and supervisor reports may contaminate io:format outputs
logger:set_primary_config(level, none),
case RestArgs of
["getpid"] ->
io:format("~p\n", [list_to_integer(rpc:call(TargetNode, os, getpid, []))]);
["ping"] ->
%% If we got this far, the node already responded to a ping, so just dump
%% a "pong"
io:format("pong\n");
["stop"] ->
case rpc:call(TargetNode, emqx_machine, graceful_shutdown, [], 60000) of
ok ->
ok;
{badrpc, nodedown} ->
%% nodetool commands are always executed after a ping
%% which if the code gets here, it's because the target node
%% has shutdown before RPC returns.
ok
end;
["rpc", Module, Function | RpcArgs] ->
case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
[RpcArgs], 60000) of
ok ->
ok;
{error, cmd_not_found} ->
halt(1);
{error, Reason} ->
io:format("RPC to ~s error: ~p\n", [TargetNode, Reason]),
halt(1);
{badrpc, Reason} ->
io:format("RPC to ~s failed: ~p\n", [TargetNode, Reason]),
halt(1);
_ ->
halt(1)
end;
["rpc_infinity", Module, Function | RpcArgs] ->
case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function), [RpcArgs], infinity) of
ok ->
ok;
{badrpc, Reason} ->
io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
halt(1);
_ ->
halt(1)
end;
["rpcterms", Module, Function | ArgsAsString] ->
case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
consult(lists:flatten(ArgsAsString)), 60000) of
{badrpc, Reason} ->
io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
halt(1);
Other ->
io:format("~p\n", [Other])
end;
["eval" | ListOfArgs] ->
Parsed = parse_eval_args(ListOfArgs),
% and evaluate it on the remote node
case rpc:call(TargetNode, erl_eval, exprs, [Parsed, [] ]) of
{value, Value, _} ->
io:format ("~p~n",[Value]);
{badrpc, Reason} ->
io:format("RPC to ~p failed: ~p~n", [TargetNode, Reason]),
halt(1)
end;
Other ->
io:format("Other: ~p~n", [Other]),
io:format("Usage: nodetool chkconfig|getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval|cold_eval [Terms] [RPC]\n")
end,
net_kernel:stop().
parse_eval_args(Args) ->
% shells may process args into more than one, and end up stripping
% spaces, so this converts all of that to a single string to parse
String = binary_to_list(
list_to_binary(
join(Args," ")
)
),
% then just as a convenience to users, if they forgot a trailing
% '.' add it for them.
Normalized =
case lists:reverse(String) of
[$. | _] -> String;
R -> lists:reverse([$. | R])
end,
% then scan and parse the string
{ok, Scanned, _} = erl_scan:string(Normalized),
{ok, Parsed } = erl_parse:parse_exprs(Scanned),
Parsed.
do_with_ret(Args, Name, Handler) ->
{arity, Arity} = erlang:fun_info(Handler, arity),
case take_args(Args, Name, Arity) of
false ->
Args;
{Args1, Rest} ->
_ = erlang:apply(Handler, Args1),
Rest
end.
do_with_halt(Args, Name, Handler) ->
{arity, Arity} = erlang:fun_info(Handler, arity),
case take_args(Args, Name, Arity) of
false ->
ok;
{Args1, _Rest} ->
erlang:apply(Handler, Args1), %% should halt
io:format(standard_error, "~s handler did not halt", [Name]),
halt(?LINE)
end.
%% Return option args list if found, otherwise 'false'.
take_args(Args, OptName, 0) ->
lists:member(OptName, Args) andalso [];
take_args(Args, OptName, OptArity) ->
take_args(Args, OptName, OptArity, _Scanned = []).
take_args([], _, _, _) -> false; %% no such option
take_args([Name | Rest], Name, Arity, Scanned) ->
length(Rest) >= Arity orelse error({not_enough_args_for, Name}),
{Result, Tail} = lists:split(Arity, Rest),
{Result, lists:reverse(Scanned) ++ Tail};
take_args([Other | Rest], Name, Arity, Scanned) ->
take_args(Rest, Name, Arity, [Other | Scanned]).
start_epmd() ->
[] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
ok.
epmd_path() ->
ErtsBinDir = filename:dirname(escript:script_name()),
Name = "epmd",
case os:find_executable(Name, ErtsBinDir) of
false ->
case os:find_executable(Name) of
false ->
io:format("Could not find epmd.~n"),
halt(1);
GlobalEpmd ->
GlobalEpmd
end;
Epmd ->
Epmd
end.
nodename(Name) ->
case re:split(Name, "@", [{return, list}, unicode]) of
[_Node, _Host] ->
list_to_atom(Name);
[Node] ->
[_, Host] = re:split(atom_to_list(node()), "@", [{return, list}, unicode]),
list_to_atom(lists:concat([Node, "@", Host]))
end.
this_node_name(longnames, Name) ->
[Node, Host] = re:split(Name, "@", [{return, list}, unicode]),
list_to_atom(lists:concat(["remsh_maint_", Node, node_name_suffix_id(), "@", Host]));
this_node_name(shortnames, Name) ->
list_to_atom(lists:concat(["remsh_maint_", Name, node_name_suffix_id()])).
%% use the reversed value that from pid mod 1000 as the node name suffix
node_name_suffix_id() ->
Pid = os:getpid(),
string:slice(string:reverse(Pid), 0, 3).
%% For windows???
create_mnesia_dir(DataDir, NodeName) ->
MnesiaDir = filename:join(DataDir, NodeName),
file:make_dir(MnesiaDir),
io:format("~s", [MnesiaDir]),
halt(0).
chkconfig(File) ->
case file:consult(File) of
{ok, Terms} ->
case validate(Terms) of
ok ->
halt(0);
{error, Problems} ->
lists:foreach(fun print_issue/1, Problems),
%% halt(1) if any problems were errors
halt(case [x || {error, _} <- Problems] of
[] -> 0;
_ -> 1
end)
end;
{error, {Line, Mod, Term}} ->
io:format(standard_error, ["Error on line ", file:format_error({Line, Mod, Term}), "\n"], []),
halt(1);
{error, Error} ->
io:format(standard_error, ["Error reading config file: ", File, " ", file:format_error(Error), "\n"], []),
halt(1)
end.
check_license(Config) ->
ok = ensure_application_load(emqx_license),
%% This checks formal license validity to ensure
%% that the node can successfully start with the given license.
%% However, a valid license may be expired. In this case, the node will
%% start but will not be able to receive connections due to connection limits.
%% It may receive license updates from the cluster further.
case emqx_license:read_license(Config) of
{ok, _} -> ok;
{error, Error} ->
io:format(standard_error, "Error reading license: ~p~n", [Error]),
halt(1)
end.
%%
%% Given a string or binary, parse it into a list of terms, ala file:consult/0
%%
consult(Str) when is_list(Str) ->
consult([], Str, []);
consult(Bin) when is_binary(Bin)->
consult([], binary_to_list(Bin), []).
consult(Cont, Str, Acc) ->
case erl_scan:tokens(Cont, Str, 0) of
{done, Result, Remaining} ->
case Result of
{ok, Tokens, _} ->
{ok, Term} = erl_parse:parse_term(Tokens),
consult([], Remaining, [Term | Acc]);
{eof, _Other} ->
lists:reverse(Acc);
{error, Info, _} ->
{error, Info}
end;
{more, Cont1} ->
consult(Cont1, eof, Acc)
end.
%%
%% Validation functions for checking the app.config
%%
validate([Terms]) ->
Results = [ValidateFun(Terms) || ValidateFun <- get_validation_funs()],
Failures = [Res || Res <- Results, Res /= true],
case Failures of
[] ->
ok;
_ ->
{error, Failures}
end.
%% Some initial and basic checks for the app.config file
get_validation_funs() ->
[ ].
print_issue({warning, Warning}) ->
io:format(standard_error, "Warning in app.config: ~s~n", [Warning]);
print_issue({error, Error}) ->
io:format(standard_error, "Error in app.config: ~s~n", [Error]).
%% string:join/2 copy; string:join/2 is getting obsoleted
%% and replaced by lists:join/2, but lists:join/2 is too new
%% for version support (only appeared in 19.0) so it cannot be
%% used. Instead we just adopt join/2 locally and hope it works
%% for most unicode use cases anyway.
join([], Sep) when is_list(Sep) ->
[];
join([H|T], Sep) ->
H ++ lists:append([Sep ++ X || X <- T]).
add_libs_dir() ->
[_ | _] = RootDir = os:getenv("RUNNER_ROOT_DIR"),
CurrentVsn = os:getenv("REL_VSN"),
RelFile = filename:join([RootDir, "releases", "RELEASES"]),
case file:consult(RelFile) of
{ok, [Releases]} ->
Release = lists:keyfind(CurrentVsn, 3, Releases),
{release, _Name, _AppVsn, _ErtsVsn, Libs, _State} = Release,
lists:foreach(
fun({Name, Vsn, _}) ->
add_lib_dir(RootDir, Name, Vsn)
end, Libs);
{error, Reason} ->
%% rel file was been deleted by release handler
error({failed_to_read_RELEASES_file, RelFile, Reason})
end.
add_lib_dir(RootDir, Name, Vsn) ->
LibDir = filename:join([RootDir, lib, atom_to_list(Name) ++ "-" ++ Vsn, ebin]),
case code:add_patha(LibDir) of
true ->
%% load all applications into application controller, before performing
%% the configuration check of HOCON
%%
%% It helps to implement the feature of dynamically searching schema.
%% See `emqx_gateway_schema:fields(gateway)`
is_emqx_application(Name) andalso ensure_application_load(Name),
ok;
{error, _} -> error(LibDir)
end.
is_emqx_application(Name) when is_atom(Name) ->
is_emqx_application(atom_to_list(Name));
is_emqx_application("emqx_" ++ _Rest) ->
true;
is_emqx_application(_) ->
false.
ensure_application_load(Name) ->
case application:load(Name) of
ok -> ok;
{error, {already_loaded, _}} -> ok;
{error, Reason} -> error({failed_to_load_application, Name, Reason})
end.