diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index b2c73c9bb..69d0a3f5e 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -24,6 +24,7 @@ %% Start/Stop the application -export([ start/0 + , is_running/0 , is_running/1 , stop/0 ]). @@ -85,10 +86,17 @@ stop() -> %% @doc Is emqx running? -spec(is_running(node()) -> boolean()). is_running(Node) -> - case rpc:call(Node, erlang, whereis, [?APP]) of + case rpc:call(Node, ?MODULE, is_running, []) of {badrpc, _} -> false; - undefined -> false; - Pid when is_pid(Pid) -> true + Result -> Result + end. + +%% @doc Is emqx running on this node? +-spec(is_running() -> boolean()). +is_running() -> + case whereis(?APP) of + undefined -> false; + _ -> true end. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 4ae61d8e5..a82ab9b45 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -298,33 +298,18 @@ forward(Node, To, Delivery, sync) -> end. -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). -dispatch(Topic, #delivery{message = Msg}) -> - DispN = lists:foldl( - fun(Sub, N) -> - N + dispatch(Sub, Topic, Msg) - end, 0, subscribers(Topic)), - case DispN of - 0 -> - ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), - ok = inc_dropped_cnt(Msg), - {error, no_subscribers}; - _ -> - {ok, DispN} +dispatch(Topic, Delivery) -> + case emqx:is_running() of + true -> + do_dispatch(Topic, Delivery); + false -> + %% In a rare case emqx_router_helper process may delay + %% cleanup of the routing table and the peers will + %% dispatch messages to a node that is not fully + %% initialized. Handle this case gracefully: + {error, not_running} end. -dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> - case erlang:is_process_alive(SubPid) of - true -> - SubPid ! {deliver, Topic, Msg}, 1; - false -> 0 - end; - -dispatch({shard, I}, Topic, Msg) -> - lists:foldl( - fun(SubPid, N) -> - N + dispatch(SubPid, Topic, Msg) - end, 0, subscribers({shard, Topic, I})). - -compile({inline, [inc_dropped_cnt/1]}). inc_dropped_cnt(Msg) -> case emqx_message:is_sys(Msg) of @@ -516,3 +501,30 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- + +-spec(do_dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). +do_dispatch(Topic, #delivery{message = Msg}) -> + DispN = lists:foldl( + fun(Sub, N) -> + N + do_dispatch(Sub, Topic, Msg) + end, 0, subscribers(Topic)), + case DispN of + 0 -> + ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), + ok = inc_dropped_cnt(Msg), + {error, no_subscribers}; + _ -> + {ok, DispN} + end. + +do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> + case erlang:is_process_alive(SubPid) of + true -> + SubPid ! {deliver, Topic, Msg}, 1; + false -> 0 + end; +do_dispatch({shard, I}, Topic, Msg) -> + lists:foldl( + fun(SubPid, N) -> + N + do_dispatch(SubPid, Topic, Msg) + end, 0, subscribers({shard, Topic, I})).