fix(ds): do not count persistent session-only routed messages as dropped

Fixes https://emqx.atlassian.net/browse/EMQX-11539
This commit is contained in:
Thales Macedo Garitezi 2024-01-22 13:10:44 -03:00 committed by zhongwencool
parent d323fc7c27
commit 878c9ee8b1
2 changed files with 38 additions and 5 deletions

View File

@ -249,7 +249,7 @@ publish(Msg) when is_record(Msg, message) ->
[];
Msg1 = #message{topic = Topic} ->
PersistRes = persist_publish(Msg1),
PersistRes ++ route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes)
end.
persist_publish(Msg) ->
@ -289,18 +289,20 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
%% Route
%%--------------------------------------------------------------------
-spec route([emqx_types:route_entry()], emqx_types:delivery()) ->
-spec route([emqx_types:route_entry()], emqx_types:delivery(), nil() | [persisted]) ->
emqx_types:publish_result().
route([], #delivery{message = Msg}) ->
route([], #delivery{message = Msg}, _PersistRes = []) ->
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
ok = inc_dropped_cnt(Msg),
[];
route(Routes, Delivery) ->
route([], _Delivery, PersistRes = [_ | _]) ->
PersistRes;
route(Routes, Delivery, PersistRes) ->
lists:foldl(
fun(Route, Acc) ->
[do_route(Route, Delivery) | Acc]
end,
[],
PersistRes,
Routes
).

View File

@ -450,6 +450,32 @@ t_message_gc(Config) ->
),
ok.
t_metrics_not_dropped(_Config) ->
%% Asserts that, if only persisted sessions are subscribed to a topic being published
%% to, we don't bump the `message.dropped' metric, nor we run the equivalent hook.
Sub = connect(<<?MODULE_STRING "1">>, true, 30),
on_exit(fun() -> emqtt:stop(Sub) end),
Pub = connect(<<?MODULE_STRING "2">>, true, 30),
on_exit(fun() -> emqtt:stop(Pub) end),
Hookpoint = 'message.dropped',
emqx_hooks:add(Hookpoint, {?MODULE, on_message_dropped, [self()]}, 1_000),
on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, on_message_dropped}) end),
DroppedBefore = emqx_metrics:val('messages.dropped'),
DroppedNoSubBefore = emqx_metrics:val('messages.dropped.no_subscribers'),
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1),
emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1),
?assertMatch([_], receive_messages(1, 1_500)),
DroppedAfter = emqx_metrics:val('messages.dropped'),
DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),
?assertEqual(DroppedBefore, DroppedAfter),
?assertEqual(DroppedNoSubBefore, DroppedNoSubAfter),
ok.
%%
connect(ClientId, CleanStart, EI) ->
@ -542,3 +568,8 @@ message(Topic, Payload, PublishedAt) ->
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
on_message_dropped(Msg, Context, Res, TestPid) ->
ErrCtx = #{msg => Msg, ctx => Context, res => Res},
ct:pal("this hook should not be called.\n ~p", [ErrCtx]),
exit(TestPid, {hookpoint_called, ErrCtx}).