diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 975b403b9..5b8985fdb 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -139,7 +139,7 @@ record(Group, Topic, SubPid) -> -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result(). dispatch(Group, Topic, Delivery) -> - dispatch(Group, Topic, Delivery, _FailedSubs = []). + dispatch(Group, Topic, Delivery, _FailedSubs = #{}). dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, @@ -151,9 +151,9 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> case do_dispatch(SubPid, Group, Topic, Msg1, Type) of ok -> {ok, 1}; - {error, _Reason} -> + {error, Reason} -> %% Failed to dispatch to this sub, try next. - dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) + dispatch(Group, Topic, Delivery, FailedSubs#{SubPid => Reason}) end end. @@ -262,7 +262,8 @@ redispatch_shared_message(#message{} = Msg) -> %% Note that dispatch is called with self() in failed subs %% This is done to avoid dispatching back to caller Delivery = #delivery{sender = self(), message = Msg}, - dispatch(Group, Topic, Delivery, [self()]). + FailedSubs = #{self() => sender}, + dispatch(Group, Topic, Delivery, FailedSubs). %% @hidden Return the `redispatch_to` group-topic in the message header. %% `false` is returned if the message is not a shared dispatch. @@ -307,14 +308,22 @@ maybe_ack(Msg) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> Sub0 = erlang:get({shared_sub_sticky, Group, Topic}), - case is_active_sub(Sub0, FailedSubs) of + All = subscribers(Group, Topic), + case is_active_sub(Sub0, FailedSubs, All) of true -> %% the old subscriber is still alive %% keep using it for sticky strategy {fresh, Sub0}; false -> %% randomly pick one for the first message - {Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]), + {Type, Sub} = do_pick( + random, + ClientId, + SourceTopic, + Group, + Topic, + FailedSubs#{Sub0 => noproc} + ), %% stick to whatever pick result erlang:put({shared_sub_sticky, Group, Topic}, Sub), {Type, Sub} @@ -324,7 +333,7 @@ pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> All = subscribers(Group, Topic), - case All -- FailedSubs of + case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of [] when All =:= [] -> %% Genuinely no subscriber false; @@ -512,8 +521,10 @@ update_stats(State) -> State. %% Return 'true' if the subscriber process is alive AND not in the failed list -is_active_sub(Pid, FailedSubs) -> - is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs). +is_active_sub(Pid, FailedSubs, All) -> + lists:member(Pid, All) andalso + is_alive_sub(Pid) andalso + (not maps:is_key(Pid, FailedSubs)). %% erlang:is_process_alive/1 does not work with remote pid. is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) -> diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index c171382c7..91bf4659b 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -190,7 +190,7 @@ t_publish_qos2_with_error_return(_) -> begin Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>), - {ok, [], Session1} = emqx_session:publish(clientinfo(), PacketId2 = 2, Msg2, Session), + {ok, [], Session1} = emqx_session:publish(clientinfo(), _PacketId2 = 2, Msg2, Session), ?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)), {error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish( clientinfo(), _PacketId3 = 3, Msg2, Session1 diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index a428a6fa9..1beca6d5a 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -344,6 +344,39 @@ t_sticky(Config) when is_list(Config) -> ok = ensure_config(sticky, true), test_two_messages(sticky). +%% two subscribers in one shared group +%% one unsubscribe after receiving a message +%% the other one in the group should receive the next message +t_sticky_unsubscribe(Config) when is_list(Config) -> + ok = ensure_config(sticky, false), + Topic = <<"foo/bar/sticky-unsub">>, + ClientId1 = <<"c1-sticky-unsub">>, + ClientId2 = <<"c2-sticky-unsub">>, + Group = <<"gsu">>, + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + ShareTopic = <<"$share/", Group/binary, "/", Topic/binary>>, + emqtt:subscribe(ConnPid1, {ShareTopic, 0}), + emqtt:subscribe(ConnPid2, {ShareTopic, 0}), + + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + ct:sleep(100), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + emqtt:unsubscribe(UsedSubPid1, ShareTopic), + emqx:publish(Message2), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + ?assertNotEqual(UsedSubPid1, UsedSubPid2), + + kill_process(ConnPid1, fun(_) -> emqtt:stop(ConnPid1) end), + kill_process(ConnPid2, fun(_) -> emqtt:stop(ConnPid2) end), + ok. + t_hash(Config) when is_list(Config) -> ok = ensure_config(hash, false), test_two_messages(hash). diff --git a/apps/emqx_connector/src/emqx_connector_jwt.erl b/apps/emqx_connector/src/emqx_connector_jwt.erl index 3dc39c675..c5cd54cb9 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt.erl @@ -16,7 +16,7 @@ -module(emqx_connector_jwt). --include("emqx_connector_tables.hrl"). +-include_lib("emqx_connector/include/emqx_connector_tables.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). %% API diff --git a/apps/emqx_connector/src/emqx_connector_jwt_sup.erl b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl index 611e152bd..ac1d22b71 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt_sup.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --include("emqx_connector_tables.hrl"). +-include_lib("emqx_connector/include/emqx_connector_tables.hrl"). -export([ start_link/0,