From b80a01554b14442d777a7972aeac519d0afeb90e Mon Sep 17 00:00:00 2001 From: lafirest Date: Fri, 24 Dec 2021 18:03:55 +0800 Subject: [PATCH] fix(emqx_retainer): add support for RAP falg --- apps/emqx_retainer/src/emqx_retainer.erl | 48 +++++++++++++----------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 9be449b60..24dd05524 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -27,10 +27,10 @@ , on_message_publish/2 ]). --export([ dispatch/4 +-export([ dispatch/5 , delete_message/2 , store_retained/2 - , deliver/5]). + , deliver/6]). -export([ get_expiry_time/1 , update_config/1 @@ -78,7 +78,7 @@ on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefin on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> IsNew = maps:get(is_new, Opts, true), case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of - true -> dispatch(Context, Topic); + true -> dispatch(Context, Topic, Opts); _ -> ok end. @@ -111,26 +111,26 @@ on_message_publish(Msg, _) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec dispatch(context(), pid(), topic(), cursor()) -> ok. -dispatch(Context, Pid, Topic, Cursor) -> +-spec dispatch(context(), pid(), topic(), emqx_types:subopts(), cursor()) -> ok. +dispatch(Context, Pid, Topic, Opts, Cursor) -> Mod = get_backend_module(), case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of false -> {ok, Result} = Mod:read_message(Context, Topic), - deliver(Result, Context, Pid, Topic, undefined); + deliver(Result, Context, Pid, Topic, Opts, undefined); true -> {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), - deliver(Result, Context, Pid, Topic, NewCursor) + deliver(Result, Context, Pid, Topic, Opts, NewCursor) end. -deliver([], Context, Pid, Topic, Cursor) -> +deliver([], Context, Pid, Topic, Opts, Cursor) -> case Cursor of undefined -> ok; _ -> - dispatch(Context, Pid, Topic, Cursor) + dispatch(Context, Pid, Topic, Opts, Cursor) end; -deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> +deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) -> case erlang:is_process_alive(Pid) of false -> ok; @@ -138,12 +138,12 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]), case MaxDeliverNum of 0 -> - _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], + _ = [Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)} || Msg <- Result], ok; _ -> - case do_deliver(Result, Id, Pid, Topic) of + case do_deliver(Result, Id, Pid, Topic, Opts) of ok -> - deliver([], Context, Pid, Topic, Cursor); + deliver([], Context, Pid, Topic, Opts, Cursor); abort -> ok end @@ -280,9 +280,9 @@ is_too_big(Size) -> Limit > 0 andalso (Size > Limit). %% @private -dispatch(Context, Topic) -> - emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4, - [Context, self(), Topic, undefined]). +dispatch(Context, Topic, Opts) -> + emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/5, + [Context, self(), Topic, Opts, undefined]). -spec delete_message(context(), topic()) -> ok. delete_message(Context, Topic) -> @@ -305,16 +305,16 @@ clean(Context) -> Mod = get_backend_module(), Mod:clean(Context). --spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort. -do_deliver([Msg | T], Id, Pid, Topic) -> +-spec do_deliver(list(term()), pos_integer(), pid(), topic(), emqx_types:subopts()) -> ok | abort. +do_deliver([Msg | T], Id, Pid, Topic, Opts) -> case require_semaphore(?DELIVER_SEMAPHORE, Id) of true -> - Pid ! {deliver, Topic, Msg}, - do_deliver(T, Id, Pid, Topic); + Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)}, + do_deliver(T, Id, Pid, Topic, Opts); _ -> abort end; -do_deliver([], _, _, _) -> +do_deliver([], _, _, _, _) -> ok. -spec require_semaphore(semaphore(), pos_integer()) -> boolean(). @@ -484,3 +484,9 @@ load(Context) -> unload() -> emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). + +handle_retain_opts(#{rap := 0}, Message) -> + emqx_message:set_header(retain, false, Message); + +handle_retain_opts(_, Message) -> + Message.