404 lines
13 KiB
Erlang
Executable File
404 lines
13 KiB
Erlang
Executable File
#!/usr/bin/env escript
|
|
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
|
|
%%! +Q 65536 +S 1
|
|
%% ex: ft=erlang ts=4 sw=4 et
|
|
%% -------------------------------------------------------------------
|
|
%%
|
|
%% nodetool: Helper Script for interacting with live nodes
|
|
%%
|
|
%% -------------------------------------------------------------------
|
|
-mode(compile).
|
|
|
|
-define(SHUTDOWN_TIMEOUT_MS, 120_000).
|
|
|
|
main(Args) ->
|
|
case os:type() of
|
|
{win32, nt} ->
|
|
ok;
|
|
_nix ->
|
|
case init:get_argument(start_epmd) of
|
|
{ok, [["true"]]} ->
|
|
ok = start_epmd();
|
|
_ ->
|
|
ok
|
|
end
|
|
end,
|
|
case Args of
|
|
["hocon" | Rest] ->
|
|
ok = add_libs_dir(),
|
|
%% forward the call to hocon_cli
|
|
hocon_cli:main(Rest);
|
|
["check_license_key", Key0] ->
|
|
ok = add_libs_dir(),
|
|
Key = cleanup_key(Key0),
|
|
check_license(#{key => Key});
|
|
_ ->
|
|
do(Args)
|
|
end.
|
|
|
|
%% the key is a string (list) representation of a binary, so we need
|
|
%% to remove the leading and trailing angle brackets.
|
|
cleanup_key(Str0) ->
|
|
Str1 = iolist_to_binary(string:replace(Str0, "<<", "", leading)),
|
|
iolist_to_binary(string:replace(Str1, ">>", "", trailing)).
|
|
|
|
do(Args) ->
|
|
ok = do_with_halt(Args, "mnesia_dir", fun create_mnesia_dir/2),
|
|
Args1 = do_with_ret(
|
|
Args,
|
|
"-name",
|
|
fun(TargetName) ->
|
|
ThisNode = this_node_name(longnames, TargetName),
|
|
{ok, _} = net_kernel:start([ThisNode, longnames]),
|
|
put(target_node, nodename(TargetName))
|
|
end
|
|
),
|
|
Args2 = do_with_ret(
|
|
Args1,
|
|
"-sname",
|
|
fun(TargetName) ->
|
|
ThisNode = this_node_name(shortnames, TargetName),
|
|
{ok, _} = net_kernel:start([ThisNode, shortnames]),
|
|
put(target_node, nodename(TargetName))
|
|
end
|
|
),
|
|
RestArgs = do_with_ret(
|
|
Args2,
|
|
"-setcookie",
|
|
fun(Cookie) ->
|
|
erlang:set_cookie(node(), list_to_atom(Cookie))
|
|
end
|
|
),
|
|
|
|
[application:start(App) || App <- [crypto, public_key, ssl]],
|
|
TargetNode = get(target_node),
|
|
|
|
%% See if the node is currently running -- if it's not, we'll bail
|
|
case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of
|
|
{true, pong} ->
|
|
ok;
|
|
{false, pong} ->
|
|
io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]),
|
|
halt(1);
|
|
{_, pang} ->
|
|
io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]),
|
|
halt(1)
|
|
end,
|
|
|
|
%% Mute logger from now on.
|
|
%% Otherwise Erlang distribution over TLS (inet_tls_dist) warning logs
|
|
%% and supervisor reports may contaminate io:format outputs
|
|
logger:set_primary_config(level, none),
|
|
case RestArgs of
|
|
["getpid"] ->
|
|
io:format("~p\n", [list_to_integer(rpc:call(TargetNode, os, getpid, []))]);
|
|
["ping"] ->
|
|
%% If we got this far, the node already responded to a ping, so just dump
|
|
%% a "pong"
|
|
io:format("pong\n");
|
|
["stop"] ->
|
|
Pid = start_shutdown_status(),
|
|
Res = rpc:call(TargetNode, emqx_machine, graceful_shutdown, [], ?SHUTDOWN_TIMEOUT_MS),
|
|
true = stop_shutdown_status(Pid),
|
|
case Res of
|
|
ok ->
|
|
ok;
|
|
{badrpc, timeout} ->
|
|
io:format(
|
|
"EMQX is still shutting down, it failed to stop gracefully "
|
|
"within the configured timeout of: ~ps\n",
|
|
[erlang:convert_time_unit(?SHUTDOWN_TIMEOUT_MS, millisecond, second)]
|
|
),
|
|
halt(1);
|
|
{badrpc, nodedown} ->
|
|
%% nodetool commands are always executed after a ping
|
|
%% which if the code gets here, it's because the target node
|
|
%% has shutdown before RPC returns.
|
|
ok
|
|
end;
|
|
["rpc", Module, Function | RpcArgs] ->
|
|
case
|
|
rpc:call(
|
|
TargetNode,
|
|
list_to_atom(Module),
|
|
list_to_atom(Function),
|
|
[RpcArgs],
|
|
60000
|
|
)
|
|
of
|
|
ok ->
|
|
ok;
|
|
{error, cmd_not_found} ->
|
|
halt(1);
|
|
{error, Reason} ->
|
|
io:format("RPC to ~s error: ~p\n", [TargetNode, Reason]),
|
|
halt(1);
|
|
{badrpc, Reason} ->
|
|
io:format("RPC to ~s failed: ~p\n", [TargetNode, Reason]),
|
|
halt(1);
|
|
_ ->
|
|
halt(1)
|
|
end;
|
|
["rpc_infinity", Module, Function | RpcArgs] ->
|
|
case
|
|
rpc:call(
|
|
TargetNode, list_to_atom(Module), list_to_atom(Function), [RpcArgs], infinity
|
|
)
|
|
of
|
|
ok ->
|
|
ok;
|
|
{badrpc, Reason} ->
|
|
io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
|
|
halt(1);
|
|
_ ->
|
|
halt(1)
|
|
end;
|
|
["rpcterms", Module, Function | ArgsAsString] ->
|
|
case
|
|
rpc:call(
|
|
TargetNode,
|
|
list_to_atom(Module),
|
|
list_to_atom(Function),
|
|
consult(lists:flatten(ArgsAsString)),
|
|
60000
|
|
)
|
|
of
|
|
{badrpc, Reason} ->
|
|
io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
|
|
halt(1);
|
|
Other ->
|
|
io:format("~p\n", [Other])
|
|
end;
|
|
["eval" | ListOfArgs] ->
|
|
% parse args locally in the remsh node
|
|
Parsed = parse_eval_args(ListOfArgs),
|
|
% and evaluate it on the remote node
|
|
case rpc:call(TargetNode, emqx_ctl, run_command, [eval_erl, Parsed], infinity) of
|
|
{ok, Value} ->
|
|
io:format("~p~n", [Value]);
|
|
{badrpc, Reason} ->
|
|
io:format("RPC to ~p failed: ~p~n", [TargetNode, Reason]),
|
|
halt(1)
|
|
end;
|
|
Other ->
|
|
io:format("Other: ~p~n", [Other]),
|
|
io:format(
|
|
"Usage: nodetool getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval|cold_eval [Terms] [RPC]\n"
|
|
)
|
|
end,
|
|
net_kernel:stop().
|
|
|
|
start_shutdown_status() ->
|
|
spawn_link(fun shutdown_status_loop/0).
|
|
|
|
stop_shutdown_status(Pid) ->
|
|
true = unlink(Pid),
|
|
true = exit(Pid, stop).
|
|
|
|
shutdown_status_loop() ->
|
|
timer:sleep(10_000),
|
|
io:format("EMQX is shutting down, please wait...\n", []),
|
|
shutdown_status_loop().
|
|
|
|
parse_eval_args(Args) ->
|
|
% shells may process args into more than one, and end up stripping
|
|
% spaces, so this converts all of that to a single string to parse
|
|
String = lists:flatten(lists:join(" ", Args)),
|
|
|
|
% then just as a convenience to users, if they forgot a trailing
|
|
% '.' add it for them.
|
|
Normalized =
|
|
case lists:reverse(String) of
|
|
[$. | _] -> String;
|
|
R -> lists:reverse([$. | R])
|
|
end,
|
|
|
|
% then scan and parse the string
|
|
{ok, Scanned, _} = erl_scan:string(Normalized),
|
|
{ok, Parsed} = erl_parse:parse_exprs(Scanned),
|
|
Parsed.
|
|
|
|
do_with_ret(Args, Name, Handler) ->
|
|
{arity, Arity} = erlang:fun_info(Handler, arity),
|
|
case take_args(Args, Name, Arity) of
|
|
false ->
|
|
Args;
|
|
{Args1, Rest} ->
|
|
_ = erlang:apply(Handler, Args1),
|
|
Rest
|
|
end.
|
|
|
|
do_with_halt(Args, Name, Handler) ->
|
|
{arity, Arity} = erlang:fun_info(Handler, arity),
|
|
case take_args(Args, Name, Arity) of
|
|
false ->
|
|
ok;
|
|
{Args1, _Rest} ->
|
|
%% should halt
|
|
erlang:apply(Handler, Args1),
|
|
io:format(standard_error, "~s handler did not halt", [Name]),
|
|
halt(?LINE)
|
|
end.
|
|
|
|
%% Return option args list if found, otherwise 'false'.
|
|
take_args(Args, OptName, 0) ->
|
|
lists:member(OptName, Args) andalso [];
|
|
take_args(Args, OptName, OptArity) ->
|
|
take_args(Args, OptName, OptArity, _Scanned = []).
|
|
|
|
%% no such option
|
|
take_args([], _, _, _) ->
|
|
false;
|
|
take_args([Name | Rest], Name, Arity, Scanned) ->
|
|
length(Rest) >= Arity orelse error({not_enough_args_for, Name}),
|
|
{Result, Tail} = lists:split(Arity, Rest),
|
|
{Result, lists:reverse(Scanned) ++ Tail};
|
|
take_args([Other | Rest], Name, Arity, Scanned) ->
|
|
take_args(Rest, Name, Arity, [Other | Scanned]).
|
|
|
|
start_epmd() ->
|
|
[] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"),
|
|
ok.
|
|
|
|
epmd_path() ->
|
|
ErtsBinDir = filename:dirname(escript:script_name()),
|
|
Name = "epmd",
|
|
case os:find_executable(Name, ErtsBinDir) of
|
|
false ->
|
|
case os:find_executable(Name) of
|
|
false ->
|
|
io:format("Could not find epmd.~n"),
|
|
halt(1);
|
|
GlobalEpmd ->
|
|
GlobalEpmd
|
|
end;
|
|
Epmd ->
|
|
Epmd
|
|
end.
|
|
|
|
nodename(Name) ->
|
|
case re:split(Name, "@", [{return, list}, unicode]) of
|
|
[_Node, _Host] ->
|
|
list_to_atom(Name);
|
|
[Node] ->
|
|
[_, Host] = re:split(atom_to_list(node()), "@", [{return, list}, unicode]),
|
|
list_to_atom(lists:concat([Node, "@", Host]))
|
|
end.
|
|
|
|
this_node_name(longnames, Name) ->
|
|
[Node, Host] = re:split(Name, "@", [{return, list}, unicode]),
|
|
list_to_atom(lists:concat(["remsh_maint_", Node, node_name_suffix_id(), "@", Host]));
|
|
this_node_name(shortnames, Name) ->
|
|
list_to_atom(lists:concat(["remsh_maint_", Name, node_name_suffix_id()])).
|
|
|
|
%% use the reversed value that from pid mod 1000 as the node name suffix
|
|
node_name_suffix_id() ->
|
|
Pid = os:getpid(),
|
|
string:slice(string:reverse(Pid), 0, 3).
|
|
|
|
%% For windows???
|
|
create_mnesia_dir(DataDir, NodeName) ->
|
|
MnesiaDir = filename:join(DataDir, NodeName),
|
|
file:make_dir(MnesiaDir),
|
|
io:format("~s", [MnesiaDir]),
|
|
halt(0).
|
|
|
|
check_license(Config) ->
|
|
ok = ensure_application_load(emqx_license),
|
|
%% This checks formal license validity to ensure
|
|
%% that the node can successfully start with the given license.
|
|
|
|
%% However, a valid license may be expired. In this case, the node will
|
|
%% start but will not be able to receive connections due to connection limits.
|
|
%% It may receive license updates from the cluster further.
|
|
case emqx_license:read_license(Config) of
|
|
{ok, _} ->
|
|
ok;
|
|
{error, Error} ->
|
|
io:format(standard_error, "Error reading license: ~p~n", [Error]),
|
|
halt(1)
|
|
end.
|
|
|
|
%%
|
|
%% Given a string or binary, parse it into a list of terms, ala file:consult/0
|
|
%%
|
|
consult(Str) when is_list(Str) ->
|
|
consult([], Str, []);
|
|
consult(Bin) when is_binary(Bin) ->
|
|
consult([], binary_to_list(Bin), []).
|
|
|
|
consult(Cont, Str, Acc) ->
|
|
case erl_scan:tokens(Cont, Str, 0) of
|
|
{done, Result, Remaining} ->
|
|
case Result of
|
|
{ok, Tokens, _} ->
|
|
{ok, Term} = erl_parse:parse_term(Tokens),
|
|
consult([], Remaining, [Term | Acc]);
|
|
{eof, _Other} ->
|
|
lists:reverse(Acc);
|
|
{error, Info, _} ->
|
|
{error, Info}
|
|
end;
|
|
{more, Cont1} ->
|
|
consult(Cont1, eof, Acc)
|
|
end.
|
|
|
|
add_libs_dir() ->
|
|
[_ | _] = RootDir = os:getenv("RUNNER_ROOT_DIR"),
|
|
CurrentVsn = os:getenv("REL_VSN"),
|
|
RelFile = filename:join([RootDir, "releases", "RELEASES"]),
|
|
case file:consult(RelFile) of
|
|
{ok, [Releases]} ->
|
|
Release = lists:keyfind(CurrentVsn, 3, Releases),
|
|
{release, _Name, _AppVsn, _ErtsVsn, Libs, _State} = Release,
|
|
lists:foreach(
|
|
fun({Name, Vsn, _}) ->
|
|
add_lib_dir(RootDir, Name, Vsn)
|
|
end,
|
|
Libs
|
|
);
|
|
{error, Reason} ->
|
|
%% rel file was been deleted by release handler
|
|
error({failed_to_read_RELEASES_file, RelFile, Reason})
|
|
end,
|
|
ok = add_patches_dir(filename:join([RootDir, "data", "patches"])),
|
|
ok = add_patches_dir("/var/lib/emqx/patches").
|
|
|
|
add_patches_dir(PatchesDir) ->
|
|
case filelib:is_dir(PatchesDir) of
|
|
true ->
|
|
true = code:add_patha(PatchesDir),
|
|
ok;
|
|
false ->
|
|
ok
|
|
end.
|
|
|
|
add_lib_dir(RootDir, Name, Vsn) ->
|
|
LibDir = filename:join([RootDir, lib, atom_to_list(Name) ++ "-" ++ Vsn, ebin]),
|
|
case code:add_patha(LibDir) of
|
|
true ->
|
|
%% load all applications into application controller, before performing
|
|
%% the configuration check of HOCON
|
|
%%
|
|
%% It helps to implement the feature of dynamically searching schema.
|
|
%% See `emqx_gateway_schema:fields(gateway)`
|
|
is_emqx_application(Name) andalso ensure_application_load(Name),
|
|
ok;
|
|
{error, _} ->
|
|
error(LibDir)
|
|
end.
|
|
|
|
is_emqx_application(Name) when is_atom(Name) ->
|
|
is_emqx_application(atom_to_list(Name));
|
|
is_emqx_application("emqx_" ++ _Rest) ->
|
|
true;
|
|
is_emqx_application(_) ->
|
|
false.
|
|
|
|
ensure_application_load(Name) ->
|
|
case application:load(Name) of
|
|
ok -> ok;
|
|
{error, {already_loaded, _}} -> ok;
|
|
{error, Reason} -> error({failed_to_load_application, Name, Reason})
|
|
end.
|