fix(broker): Check broker status before dispatch

Fixes: #6388
This commit is contained in:
k32 2021-12-15 18:59:09 +01:00
parent 10449a8909
commit 618113d507
2 changed files with 48 additions and 28 deletions

View File

@ -24,6 +24,7 @@
%% Start/Stop the application
-export([ start/0
, is_running/0
, is_running/1
, stop/0
]).
@ -85,10 +86,17 @@ stop() ->
%% @doc Is emqx running?
-spec(is_running(node()) -> boolean()).
is_running(Node) ->
case rpc:call(Node, erlang, whereis, [?APP]) of
case rpc:call(Node, ?MODULE, is_running, []) of
{badrpc, _} -> false;
undefined -> false;
Pid when is_pid(Pid) -> true
Result -> Result
end.
%% @doc Is emqx running on this node?
-spec(is_running() -> boolean()).
is_running() ->
case whereis(?APP) of
undefined -> false;
_ -> true
end.
%%--------------------------------------------------------------------

View File

@ -298,33 +298,18 @@ forward(Node, To, Delivery, sync) ->
end.
-spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
dispatch(Topic, #delivery{message = Msg}) ->
DispN = lists:foldl(
fun(Sub, N) ->
N + dispatch(Sub, Topic, Msg)
end, 0, subscribers(Topic)),
case DispN of
0 ->
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
ok = inc_dropped_cnt(Msg),
{error, no_subscribers};
_ ->
{ok, DispN}
dispatch(Topic, Delivery) ->
case emqx:is_running() of
true ->
do_dispatch(Topic, Delivery);
false ->
%% In a rare case emqx_router_helper process may delay
%% cleanup of the routing table and the peers will
%% dispatch messages to a node that is not fully
%% initialized. Handle this case gracefully:
{error, not_running}
end.
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
case erlang:is_process_alive(SubPid) of
true ->
SubPid ! {deliver, Topic, Msg}, 1;
false -> 0
end;
dispatch({shard, I}, Topic, Msg) ->
lists:foldl(
fun(SubPid, N) ->
N + dispatch(SubPid, Topic, Msg)
end, 0, subscribers({shard, Topic, I})).
-compile({inline, [inc_dropped_cnt/1]}).
inc_dropped_cnt(Msg) ->
case emqx_message:is_sys(Msg) of
@ -516,3 +501,30 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec(do_dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
do_dispatch(Topic, #delivery{message = Msg}) ->
DispN = lists:foldl(
fun(Sub, N) ->
N + do_dispatch(Sub, Topic, Msg)
end, 0, subscribers(Topic)),
case DispN of
0 ->
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
ok = inc_dropped_cnt(Msg),
{error, no_subscribers};
_ ->
{ok, DispN}
end.
do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
case erlang:is_process_alive(SubPid) of
true ->
SubPid ! {deliver, Topic, Msg}, 1;
false -> 0
end;
do_dispatch({shard, I}, Topic, Msg) ->
lists:foldl(
fun(SubPid, N) ->
N + do_dispatch(SubPid, Topic, Msg)
end, 0, subscribers({shard, Topic, I})).