fix: Revert "fix(emqx_retainer): add support for RAP falg"

This reverts commit b80a01554b.
This commit is contained in:
lafirest 2021-12-30 11:32:08 +08:00
parent e8acec7f56
commit ed086728cf
1 changed files with 21 additions and 27 deletions

View File

@ -27,10 +27,10 @@
, on_message_publish/2
]).
-export([ dispatch/5
-export([ dispatch/4
, delete_message/2
, store_retained/2
, deliver/6]).
, deliver/5]).
-export([ get_expiry_time/1
, update_config/1
@ -78,7 +78,7 @@ on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefin
on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) ->
IsNew = maps:get(is_new, Opts, true),
case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of
true -> dispatch(Context, Topic, Opts);
true -> dispatch(Context, Topic);
_ -> ok
end.
@ -111,26 +111,26 @@ on_message_publish(Msg, _) ->
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec dispatch(context(), pid(), topic(), emqx_types:subopts(), cursor()) -> ok.
dispatch(Context, Pid, Topic, Opts, Cursor) ->
-spec dispatch(context(), pid(), topic(), cursor()) -> ok.
dispatch(Context, Pid, Topic, Cursor) ->
Mod = get_backend_module(),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
false ->
{ok, Result} = Mod:read_message(Context, Topic),
deliver(Result, Context, Pid, Topic, Opts, undefined);
deliver(Result, Context, Pid, Topic, undefined);
true ->
{ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor),
deliver(Result, Context, Pid, Topic, Opts, NewCursor)
deliver(Result, Context, Pid, Topic, NewCursor)
end.
deliver([], Context, Pid, Topic, Opts, Cursor) ->
deliver([], Context, Pid, Topic, Cursor) ->
case Cursor of
undefined ->
ok;
_ ->
dispatch(Context, Pid, Topic, Opts, Cursor)
dispatch(Context, Pid, Topic, Cursor)
end;
deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) ->
deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
case erlang:is_process_alive(Pid) of
false ->
ok;
@ -138,12 +138,12 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) ->
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]),
case MaxDeliverNum of
0 ->
_ = [Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)} || Msg <- Result],
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
ok;
_ ->
case do_deliver(Result, Id, Pid, Topic, Opts) of
case do_deliver(Result, Id, Pid, Topic) of
ok ->
deliver([], Context, Pid, Topic, Opts, Cursor);
deliver([], Context, Pid, Topic, Cursor);
abort ->
ok
end
@ -280,9 +280,9 @@ is_too_big(Size) ->
Limit > 0 andalso (Size > Limit).
%% @private
dispatch(Context, Topic, Opts) ->
emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/5,
[Context, self(), Topic, Opts, undefined]).
dispatch(Context, Topic) ->
emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4,
[Context, self(), Topic, undefined]).
-spec delete_message(context(), topic()) -> ok.
delete_message(Context, Topic) ->
@ -305,16 +305,16 @@ clean(Context) ->
Mod = get_backend_module(),
Mod:clean(Context).
-spec do_deliver(list(term()), pos_integer(), pid(), topic(), emqx_types:subopts()) -> ok | abort.
do_deliver([Msg | T], Id, Pid, Topic, Opts) ->
-spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort.
do_deliver([Msg | T], Id, Pid, Topic) ->
case require_semaphore(?DELIVER_SEMAPHORE, Id) of
true ->
Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)},
do_deliver(T, Id, Pid, Topic, Opts);
Pid ! {deliver, Topic, Msg},
do_deliver(T, Id, Pid, Topic);
_ ->
abort
end;
do_deliver([], _, _, _, _) ->
do_deliver([], _, _, _) ->
ok.
-spec require_semaphore(semaphore(), pos_integer()) -> boolean().
@ -484,9 +484,3 @@ load(Context) ->
unload() ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}).
handle_retain_opts(#{rap := 0}, Message) ->
emqx_message:set_header(retain, false, Message);
handle_retain_opts(_, Message) ->
Message.