Merge pull request #6577 from lafirest/fix/revert_rap

fix: Revert "fix(emqx_retainer): add support for RAP falg"
This commit is contained in:
lafirest 2021-12-30 11:34:46 +08:00 committed by GitHub
commit 753a7eba6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 27 deletions

View File

@ -27,10 +27,10 @@
, on_message_publish/2 , on_message_publish/2
]). ]).
-export([ dispatch/5 -export([ dispatch/4
, delete_message/2 , delete_message/2
, store_retained/2 , store_retained/2
, deliver/6]). , deliver/5]).
-export([ get_expiry_time/1 -export([ get_expiry_time/1
, update_config/1 , update_config/1
@ -78,7 +78,7 @@ on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefin
on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) -> on_session_subscribed(_, Topic, #{rh := Rh} = Opts, Context) ->
IsNew = maps:get(is_new, Opts, true), IsNew = maps:get(is_new, Opts, true),
case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of case Rh =:= 0 orelse (Rh =:= 1 andalso IsNew) of
true -> dispatch(Context, Topic, Opts); true -> dispatch(Context, Topic);
_ -> ok _ -> ok
end. end.
@ -111,26 +111,26 @@ on_message_publish(Msg, _) ->
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec dispatch(context(), pid(), topic(), emqx_types:subopts(), cursor()) -> ok. -spec dispatch(context(), pid(), topic(), cursor()) -> ok.
dispatch(Context, Pid, Topic, Opts, Cursor) -> dispatch(Context, Pid, Topic, Cursor) ->
Mod = get_backend_module(), Mod = get_backend_module(),
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
false -> false ->
{ok, Result} = Mod:read_message(Context, Topic), {ok, Result} = Mod:read_message(Context, Topic),
deliver(Result, Context, Pid, Topic, Opts, undefined); deliver(Result, Context, Pid, Topic, undefined);
true -> true ->
{ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor), {ok, Result, NewCursor} = Mod:match_messages(Context, Topic, Cursor),
deliver(Result, Context, Pid, Topic, Opts, NewCursor) deliver(Result, Context, Pid, Topic, NewCursor)
end. end.
deliver([], Context, Pid, Topic, Opts, Cursor) -> deliver([], Context, Pid, Topic, Cursor) ->
case Cursor of case Cursor of
undefined -> undefined ->
ok; ok;
_ -> _ ->
dispatch(Context, Pid, Topic, Opts, Cursor) dispatch(Context, Pid, Topic, Cursor)
end; end;
deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) -> deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
case erlang:is_process_alive(Pid) of case erlang:is_process_alive(Pid) of
false -> false ->
ok; ok;
@ -138,12 +138,12 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Opts, Cursor) ->
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]), #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]),
case MaxDeliverNum of case MaxDeliverNum of
0 -> 0 ->
_ = [Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)} || Msg <- Result], _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
ok; ok;
_ -> _ ->
case do_deliver(Result, Id, Pid, Topic, Opts) of case do_deliver(Result, Id, Pid, Topic) of
ok -> ok ->
deliver([], Context, Pid, Topic, Opts, Cursor); deliver([], Context, Pid, Topic, Cursor);
abort -> abort ->
ok ok
end end
@ -280,9 +280,9 @@ is_too_big(Size) ->
Limit > 0 andalso (Size > Limit). Limit > 0 andalso (Size > Limit).
%% @private %% @private
dispatch(Context, Topic, Opts) -> dispatch(Context, Topic) ->
emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/5, emqx_retainer_pool:async_submit(fun ?MODULE:dispatch/4,
[Context, self(), Topic, Opts, undefined]). [Context, self(), Topic, undefined]).
-spec delete_message(context(), topic()) -> ok. -spec delete_message(context(), topic()) -> ok.
delete_message(Context, Topic) -> delete_message(Context, Topic) ->
@ -305,16 +305,16 @@ clean(Context) ->
Mod = get_backend_module(), Mod = get_backend_module(),
Mod:clean(Context). Mod:clean(Context).
-spec do_deliver(list(term()), pos_integer(), pid(), topic(), emqx_types:subopts()) -> ok | abort. -spec do_deliver(list(term()), pos_integer(), pid(), topic()) -> ok | abort.
do_deliver([Msg | T], Id, Pid, Topic, Opts) -> do_deliver([Msg | T], Id, Pid, Topic) ->
case require_semaphore(?DELIVER_SEMAPHORE, Id) of case require_semaphore(?DELIVER_SEMAPHORE, Id) of
true -> true ->
Pid ! {deliver, Topic, handle_retain_opts(Opts, Msg)}, Pid ! {deliver, Topic, Msg},
do_deliver(T, Id, Pid, Topic, Opts); do_deliver(T, Id, Pid, Topic);
_ -> _ ->
abort abort
end; end;
do_deliver([], _, _, _, _) -> do_deliver([], _, _, _) ->
ok. ok.
-spec require_semaphore(semaphore(), pos_integer()) -> boolean(). -spec require_semaphore(semaphore(), pos_integer()) -> boolean().
@ -484,9 +484,3 @@ load(Context) ->
unload() -> unload() ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx:unhook('message.publish', {?MODULE, on_message_publish}),
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}). emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}).
handle_retain_opts(#{rap := 0}, Message) ->
emqx_message:set_header(retain, false, Message);
handle_retain_opts(_, Message) ->
Message.