fix(ds): do not count persistent session-only routed messages as dropped
Fixes https://emqx.atlassian.net/browse/EMQX-11539
This commit is contained in:
parent
d122340c13
commit
609ba7e332
|
@ -249,7 +249,7 @@ publish(Msg) when is_record(Msg, message) ->
|
||||||
[];
|
[];
|
||||||
Msg1 = #message{topic = Topic} ->
|
Msg1 = #message{topic = Topic} ->
|
||||||
PersistRes = persist_publish(Msg1),
|
PersistRes = persist_publish(Msg1),
|
||||||
PersistRes ++ route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
|
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1), PersistRes)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
persist_publish(Msg) ->
|
persist_publish(Msg) ->
|
||||||
|
@ -289,18 +289,20 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
|
||||||
%% Route
|
%% Route
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec route([emqx_types:route_entry()], emqx_types:delivery()) ->
|
-spec route([emqx_types:route_entry()], emqx_types:delivery(), nil() | [persisted]) ->
|
||||||
emqx_types:publish_result().
|
emqx_types:publish_result().
|
||||||
route([], #delivery{message = Msg}) ->
|
route([], #delivery{message = Msg}, _PersistRes = []) ->
|
||||||
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
||||||
ok = inc_dropped_cnt(Msg),
|
ok = inc_dropped_cnt(Msg),
|
||||||
[];
|
[];
|
||||||
route(Routes, Delivery) ->
|
route([], _Delivery, PersistRes = [_ | _]) ->
|
||||||
|
PersistRes;
|
||||||
|
route(Routes, Delivery, PersistRes) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(Route, Acc) ->
|
fun(Route, Acc) ->
|
||||||
[do_route(Route, Delivery) | Acc]
|
[do_route(Route, Delivery) | Acc]
|
||||||
end,
|
end,
|
||||||
[],
|
PersistRes,
|
||||||
Routes
|
Routes
|
||||||
).
|
).
|
||||||
|
|
||||||
|
|
|
@ -450,6 +450,32 @@ t_message_gc(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_metrics_not_dropped(_Config) ->
|
||||||
|
%% Asserts that, if only persisted sessions are subscribed to a topic being published
|
||||||
|
%% to, we don't bump the `message.dropped' metric, nor we run the equivalent hook.
|
||||||
|
Sub = connect(<<?MODULE_STRING "1">>, true, 30),
|
||||||
|
on_exit(fun() -> emqtt:stop(Sub) end),
|
||||||
|
Pub = connect(<<?MODULE_STRING "2">>, true, 30),
|
||||||
|
on_exit(fun() -> emqtt:stop(Pub) end),
|
||||||
|
Hookpoint = 'message.dropped',
|
||||||
|
emqx_hooks:add(Hookpoint, {?MODULE, on_message_dropped, [self()]}, 1_000),
|
||||||
|
on_exit(fun() -> emqx_hooks:del(Hookpoint, {?MODULE, on_message_dropped}) end),
|
||||||
|
|
||||||
|
DroppedBefore = emqx_metrics:val('messages.dropped'),
|
||||||
|
DroppedNoSubBefore = emqx_metrics:val('messages.dropped.no_subscribers'),
|
||||||
|
|
||||||
|
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Sub, <<"t/+">>, ?QOS_1),
|
||||||
|
emqtt:publish(Pub, <<"t/ps">>, <<"payload">>, ?QOS_1),
|
||||||
|
?assertMatch([_], receive_messages(1, 1_500)),
|
||||||
|
|
||||||
|
DroppedAfter = emqx_metrics:val('messages.dropped'),
|
||||||
|
DroppedNoSubAfter = emqx_metrics:val('messages.dropped.no_subscribers'),
|
||||||
|
|
||||||
|
?assertEqual(DroppedBefore, DroppedAfter),
|
||||||
|
?assertEqual(DroppedNoSubBefore, DroppedNoSubAfter),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
connect(ClientId, CleanStart, EI) ->
|
connect(ClientId, CleanStart, EI) ->
|
||||||
|
@ -542,3 +568,8 @@ message(Topic, Payload, PublishedAt) ->
|
||||||
timestamp = PublishedAt,
|
timestamp = PublishedAt,
|
||||||
id = emqx_guid:gen()
|
id = emqx_guid:gen()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
on_message_dropped(Msg, Context, Res, TestPid) ->
|
||||||
|
ErrCtx = #{msg => Msg, ctx => Context, res => Res},
|
||||||
|
ct:pal("this hook should not be called.\n ~p", [ErrCtx]),
|
||||||
|
exit(TestPid, {hookpoint_called, ErrCtx}).
|
||||||
|
|
Loading…
Reference in New Issue