#!/usr/bin/env escript %% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- %%! +Q 65536 +S 1 %% ex: ft=erlang ts=4 sw=4 et %% ------------------------------------------------------------------- %% %% nodetool: Helper Script for interacting with live nodes %% %% ------------------------------------------------------------------- -mode(compile). -define(SHUTDOWN_TIMEOUT_MS, 120_000). main(Args) -> case os:type() of {win32, nt} -> ok; _nix -> case init:get_argument(start_epmd) of {ok, [["true"]]} -> ok = start_epmd(); _ -> ok end end, case Args of ["hocon" | Rest] -> ok = add_libs_dir(), %% forward the call to hocon_cli hocon_cli:main(Rest); ["check_license_key", Key0] -> ok = add_libs_dir(), Key = cleanup_key(Key0), check_license(#{key => Key}); _ -> do(Args) end. %% the key is a string (list) representation of a binary, so we need %% to remove the leading and trailing angle brackets. cleanup_key(Str0) -> Str1 = iolist_to_binary(string:replace(Str0, "<<", "", leading)), iolist_to_binary(string:replace(Str1, ">>", "", trailing)). do(Args) -> ok = do_with_halt(Args, "mnesia_dir", fun create_mnesia_dir/2), Args1 = do_with_ret( Args, "-name", fun(TargetName) -> ThisNode = this_node_name(longnames, TargetName), {ok, _} = net_kernel:start([ThisNode, longnames]), put(target_node, nodename(TargetName)) end ), Args2 = do_with_ret( Args1, "-sname", fun(TargetName) -> ThisNode = this_node_name(shortnames, TargetName), {ok, _} = net_kernel:start([ThisNode, shortnames]), put(target_node, nodename(TargetName)) end ), RestArgs = do_with_ret( Args2, "-setcookie", fun(Cookie) -> erlang:set_cookie(node(), list_to_atom(Cookie)) end ), [application:start(App) || App <- [crypto, public_key, ssl]], TargetNode = get(target_node), %% See if the node is currently running -- if it's not, we'll bail case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of {true, pong} -> ok; {false, pong} -> io:format(standard_error, "Failed to connect to node ~p\n", [TargetNode]), halt(1); {_, pang} -> io:format(standard_error, "Node ~p not responding to pings.\n", [TargetNode]), halt(1) end, %% Mute logger from now on. %% Otherwise Erlang distribution over TLS (inet_tls_dist) warning logs %% and supervisor reports may contaminate io:format outputs logger:set_primary_config(level, none), case RestArgs of ["getpid"] -> io:format("~p\n", [list_to_integer(rpc:call(TargetNode, os, getpid, []))]); ["ping"] -> %% If we got this far, the node already responded to a ping, so just dump %% a "pong" io:format("pong\n"); ["stop"] -> Pid = start_shutdown_status(), Res = rpc:call(TargetNode, emqx_machine, graceful_shutdown, [], ?SHUTDOWN_TIMEOUT_MS), true = stop_shutdown_status(Pid), case Res of ok -> ok; {badrpc, timeout} -> io:format( "EMQX is still shutting down, it failed to stop gracefully " "within the configured timeout of: ~ps\n", [erlang:convert_time_unit(?SHUTDOWN_TIMEOUT_MS, millisecond, second)] ), halt(1); {badrpc, nodedown} -> %% nodetool commands are always executed after a ping %% which if the code gets here, it's because the target node %% has shutdown before RPC returns. ok end; ["rpc", Module, Function | RpcArgs] -> case rpc:call( TargetNode, list_to_atom(Module), list_to_atom(Function), [RpcArgs], 60000 ) of ok -> ok; {error, cmd_not_found} -> halt(1); {error, Reason} -> io:format("RPC to ~s error: ~p\n", [TargetNode, Reason]), halt(1); {badrpc, Reason} -> io:format("RPC to ~s failed: ~p\n", [TargetNode, Reason]), halt(1); _ -> halt(1) end; ["rpc_infinity", Module, Function | RpcArgs] -> case rpc:call( TargetNode, list_to_atom(Module), list_to_atom(Function), [RpcArgs], infinity ) of ok -> ok; {badrpc, Reason} -> io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]), halt(1); _ -> halt(1) end; ["rpcterms", Module, Function | ArgsAsString] -> case rpc:call( TargetNode, list_to_atom(Module), list_to_atom(Function), consult(lists:flatten(ArgsAsString)), 60000 ) of {badrpc, Reason} -> io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]), halt(1); Other -> io:format("~p\n", [Other]) end; ["eval" | ListOfArgs] -> % parse args locally in the remsh node Parsed = parse_eval_args(ListOfArgs), % and evaluate it on the remote node case rpc:call(TargetNode, emqx_ctl, run_command, [eval_erl, Parsed], infinity) of {ok, Value} -> io:format("~p~n", [Value]); {badrpc, Reason} -> io:format("RPC to ~p failed: ~p~n", [TargetNode, Reason]), halt(1) end; Other -> io:format("Other: ~p~n", [Other]), io:format( "Usage: nodetool getpid|ping|stop|rpc|rpc_infinity|rpcterms|eval|cold_eval [Terms] [RPC]\n" ) end, net_kernel:stop(). start_shutdown_status() -> spawn_link(fun shutdown_status_loop/0). stop_shutdown_status(Pid) -> true = unlink(Pid), true = exit(Pid, stop). shutdown_status_loop() -> timer:sleep(10_000), io:format("EMQX is shutting down, please wait...\n", []), shutdown_status_loop(). parse_eval_args(Args) -> % shells may process args into more than one, and end up stripping % spaces, so this converts all of that to a single string to parse String = lists:flatten(lists:join(" ", Args)), % then just as a convenience to users, if they forgot a trailing % '.' add it for them. Normalized = case lists:reverse(String) of [$. | _] -> String; R -> lists:reverse([$. | R]) end, % then scan and parse the string {ok, Scanned, _} = erl_scan:string(Normalized), {ok, Parsed} = erl_parse:parse_exprs(Scanned), Parsed. do_with_ret(Args, Name, Handler) -> {arity, Arity} = erlang:fun_info(Handler, arity), case take_args(Args, Name, Arity) of false -> Args; {Args1, Rest} -> _ = erlang:apply(Handler, Args1), Rest end. do_with_halt(Args, Name, Handler) -> {arity, Arity} = erlang:fun_info(Handler, arity), case take_args(Args, Name, Arity) of false -> ok; {Args1, _Rest} -> %% should halt erlang:apply(Handler, Args1), io:format(standard_error, "~s handler did not halt", [Name]), halt(?LINE) end. %% Return option args list if found, otherwise 'false'. take_args(Args, OptName, 0) -> lists:member(OptName, Args) andalso []; take_args(Args, OptName, OptArity) -> take_args(Args, OptName, OptArity, _Scanned = []). %% no such option take_args([], _, _, _) -> false; take_args([Name | Rest], Name, Arity, Scanned) -> length(Rest) >= Arity orelse error({not_enough_args_for, Name}), {Result, Tail} = lists:split(Arity, Rest), {Result, lists:reverse(Scanned) ++ Tail}; take_args([Other | Rest], Name, Arity, Scanned) -> take_args(Rest, Name, Arity, [Other | Scanned]). start_epmd() -> [] = os:cmd("\"" ++ epmd_path() ++ "\" -daemon"), ok. epmd_path() -> ErtsBinDir = filename:dirname(escript:script_name()), Name = "epmd", case os:find_executable(Name, ErtsBinDir) of false -> case os:find_executable(Name) of false -> io:format("Could not find epmd.~n"), halt(1); GlobalEpmd -> GlobalEpmd end; Epmd -> Epmd end. nodename(Name) -> case re:split(Name, "@", [{return, list}, unicode]) of [_Node, _Host] -> list_to_atom(Name); [Node] -> [_, Host] = re:split(atom_to_list(node()), "@", [{return, list}, unicode]), list_to_atom(lists:concat([Node, "@", Host])) end. this_node_name(longnames, Name) -> [Node, Host] = re:split(Name, "@", [{return, list}, unicode]), list_to_atom(lists:concat(["remsh_maint_", Node, node_name_suffix_id(), "@", Host])); this_node_name(shortnames, Name) -> list_to_atom(lists:concat(["remsh_maint_", Name, node_name_suffix_id()])). %% use the reversed value that from pid mod 1000 as the node name suffix node_name_suffix_id() -> Pid = os:getpid(), string:slice(string:reverse(Pid), 0, 3). %% For windows??? create_mnesia_dir(DataDir, NodeName) -> MnesiaDir = filename:join(DataDir, NodeName), file:make_dir(MnesiaDir), io:format("~s", [MnesiaDir]), halt(0). check_license(Config) -> ok = ensure_application_load(emqx_license), %% This checks formal license validity to ensure %% that the node can successfully start with the given license. %% However, a valid license may be expired. In this case, the node will %% start but will not be able to receive connections due to connection limits. %% It may receive license updates from the cluster further. case emqx_license:read_license(Config) of {ok, _} -> ok; {error, Error} -> io:format(standard_error, "Error reading license: ~p~n", [Error]), halt(1) end. %% %% Given a string or binary, parse it into a list of terms, ala file:consult/0 %% consult(Str) when is_list(Str) -> consult([], Str, []); consult(Bin) when is_binary(Bin) -> consult([], binary_to_list(Bin), []). consult(Cont, Str, Acc) -> case erl_scan:tokens(Cont, Str, 0) of {done, Result, Remaining} -> case Result of {ok, Tokens, _} -> {ok, Term} = erl_parse:parse_term(Tokens), consult([], Remaining, [Term | Acc]); {eof, _Other} -> lists:reverse(Acc); {error, Info, _} -> {error, Info} end; {more, Cont1} -> consult(Cont1, eof, Acc) end. add_libs_dir() -> [_ | _] = RootDir = os:getenv("RUNNER_ROOT_DIR"), CurrentVsn = os:getenv("REL_VSN"), RelFile = filename:join([RootDir, "releases", "RELEASES"]), case file:consult(RelFile) of {ok, [Releases]} -> Release = lists:keyfind(CurrentVsn, 3, Releases), {release, _Name, _AppVsn, _ErtsVsn, Libs, _State} = Release, lists:foreach( fun({Name, Vsn, _}) -> add_lib_dir(RootDir, Name, Vsn) end, Libs ); {error, Reason} -> %% rel file was been deleted by release handler error({failed_to_read_RELEASES_file, RelFile, Reason}) end, ok = add_patches_dir(filename:join([RootDir, "data", "patches"])), ok = add_patches_dir("/var/lib/emqx/patches"). add_patches_dir(PatchesDir) -> case filelib:is_dir(PatchesDir) of true -> true = code:add_patha(PatchesDir), ok; false -> ok end. add_lib_dir(RootDir, Name, Vsn) -> LibDir = filename:join([RootDir, lib, atom_to_list(Name) ++ "-" ++ Vsn, ebin]), case code:add_patha(LibDir) of true -> %% load all applications into application controller, before performing %% the configuration check of HOCON %% %% It helps to implement the feature of dynamically searching schema. %% See `emqx_gateway_schema:fields(gateway)` is_emqx_application(Name) andalso ensure_application_load(Name), ok; {error, _} -> error(LibDir) end. is_emqx_application(Name) when is_atom(Name) -> is_emqx_application(atom_to_list(Name)); is_emqx_application("emqx_" ++ _Rest) -> true; is_emqx_application(_) -> false. ensure_application_load(Name) -> case application:load(Name) of ok -> ok; {error, {already_loaded, _}} -> ok; {error, Reason} -> error({failed_to_load_application, Name, Reason}) end.