diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index ac9116cbd..6dc893043 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -249,7 +249,7 @@ publish(Msg) when is_record(Msg, message) -> []; Msg1 = #message{topic = Topic} -> PersistRes = persist_publish(Msg1), - PersistRes ++ route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) + route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes) end. persist_publish(Msg) -> @@ -289,18 +289,20 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}. %% Route %%-------------------------------------------------------------------- --spec route([emqx_types:route_entry()], emqx_types:delivery()) -> +-spec route([emqx_types:route_entry()], emqx_types:delivery(), nil() | [persisted]) -> emqx_types:publish_result(). -route([], #delivery{message = Msg}) -> +route([], #delivery{message = Msg}, _PersistRes = []) -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), ok = inc_dropped_cnt(Msg), []; -route(Routes, Delivery) -> +route([], _Delivery, PersistRes = [_ | _]) -> + PersistRes; +route(Routes, Delivery, PersistRes) -> lists:foldl( fun(Route, Acc) -> [do_route(Route, Delivery) | Acc] end, - [], + PersistRes, Routes ). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index c46d726f4..73c88adc8 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -450,6 +450,32 @@ t_message_gc(Config) -> ), ok. +t_metrics_not_dropped(_Config) -> + %% Asserts that, if only persisted sessions are subscribed to a topic being published + %% to, we don't bump the `message.dropped' metric, nor we run the equivalent hook. + Sub = connect(<>, true, 30), + on_exit(fun() -> emqtt:stop(Sub) end), + Pub = connect(<>, true, 30), + on_exit(fun() -> emqtt:stop(Pub) end), + Hookpoint = 'message.dropped', + emqx_hooks:add(Hookpoint, {?MODULE, on_message_dropped, [self()]}, 1_000), + on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, on_message_dropped}) end), + + DroppedBefore = emqx_metrics:val('messages.dropped'), + DroppedNoSubBefore = emqx_metrics:val('messages.dropped.no_subscribers'), + + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1), + emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1), + ?assertMatch([_], receive_messages(1, 1_500)), + + DroppedAfter = emqx_metrics:val('messages.dropped'), + DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'), + + ?assertEqual(DroppedBefore, DroppedAfter), + ?assertEqual(DroppedNoSubBefore, DroppedNoSubAfter), + + ok. + %% connect(ClientId, CleanStart, EI) -> @@ -542,3 +568,8 @@ message(Topic, Payload, PublishedAt) -> timestamp = PublishedAt, id = emqx_guid:gen() }. + +on_message_dropped(Msg, Context, Res, TestPid) -> + ErrCtx = #{msg => Msg, ctx => Context, res => Res}, + ct:pal("this hook should not be called.\n ~p", [ErrCtx]), + exit(TestPid, {hookpoint_called, ErrCtx}).