diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 24dd05524..9be449b60 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -27,10 +27,10 @@ , on_message_publish/2 ]). --export([ dispatch/5 +-export([ dispatch/4 , delete_message/2 , store_retained/2 - , deliver/6]). + , deliver/5]). -export([ get_expiry_time/1 , update_config/1 @@ -78,7 +78,7 @@ on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefin on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> IsNew = maps:get(is_new, Opts, true), case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of - true -> dispatch(Context, Topic, Opts); + true -> dispatch(Context, Topic); _ -> ok end. @@ -111,26 +111,26 @@ on_message_publish(Msg, _) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec dispatch(context(), pid(), topic(), emqx_types:subopts(), cursor()) -> ok. -dispatch(Context, Pid, Topic, Opts, Cursor) -> +-spec dispatch(context(), pid(), topic(), cursor()) -> ok. +dispatch(Context, Pid, Topic, Cursor) -> Mod = get_backend_module(), case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of false -> {ok, Result} = Mod:read_message(Context, Topic), - deliver(Result, Context, Pid, Topic, Opts, undefined); + deliver(Result, Context, Pid, Topic, undefined); true -> {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), - deliver(Result, Context, Pid, Topic, Opts, NewCursor) + deliver(Result, Context, Pid, Topic, NewCursor) end. -deliver([], Context, Pid, Topic, Opts, Cursor) -> +deliver([], Context, Pid, Topic, Cursor) -> case Cursor of undefined -> ok; _ -> - dispatch(Context, Pid, Topic, Opts, Cursor) + dispatch(Context, Pid, Topic, Cursor) end; -deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) -> +deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> case erlang:is_process_alive(Pid) of false -> ok; @@ -138,12 +138,12 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) -> #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]), case MaxDeliverNum of 0 -> - _ = [Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)} || Msg <- Result], + _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], ok; _ -> - case do_deliver(Result, Id, Pid, Topic, Opts) of + case do_deliver(Result, Id, Pid, Topic) of ok -> - deliver([], Context, Pid, Topic, Opts, Cursor); + deliver([], Context, Pid, Topic, Cursor); abort -> ok end @@ -280,9 +280,9 @@ is_too_big(Size) -> Limit > 0 andalso (Size > Limit). %% @private -dispatch(Context, Topic, Opts) -> - emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/5, - [Context, self(), Topic, Opts, undefined]). +dispatch(Context, Topic) -> + emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4, + [Context, self(), Topic, undefined]). -spec delete_message(context(), topic()) -> ok. delete_message(Context, Topic) -> @@ -305,16 +305,16 @@ clean(Context) -> Mod = get_backend_module(), Mod:clean(Context). --spec do_deliver(list(term()), pos_integer(), pid(), topic(), emqx_types:subopts()) -> ok | abort. -do_deliver([Msg | T], Id, Pid, Topic, Opts) -> +-spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort. +do_deliver([Msg | T], Id, Pid, Topic) -> case require_semaphore(?DELIVER_SEMAPHORE, Id) of true -> - Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)}, - do_deliver(T, Id, Pid, Topic, Opts); + Pid ! {deliver, Topic, Msg}, + do_deliver(T, Id, Pid, Topic); _ -> abort end; -do_deliver([], _, _, _, _) -> +do_deliver([], _, _, _) -> ok. -spec require_semaphore(semaphore(), pos_integer()) -> boolean(). @@ -484,9 +484,3 @@ load(Context) -> unload() -> emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). - -handle_retain_opts(#{rap := 0}, Message) -> - emqx_message:set_header(retain, false, Message); - -handle_retain_opts(_, Message) -> - Message.