From 95d36d02045b04da54aee3a49a61c50cc28d7bea Mon Sep 17 00:00:00 2001 From: turtled Date: Mon, 27 Aug 2018 10:15:41 +0800 Subject: [PATCH] Fix share sub bug --- include/emqx.hrl | 2 +- src/emqx_broker.erl | 10 ++++++---- src/emqx_shared_sub.erl | 2 +- src/emqx_topic.erl | 2 +- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/include/emqx.hrl b/include/emqx.hrl index 0372f547e..1e2541f65 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -54,7 +54,7 @@ -type(subid() :: binary() | atom()). -type(subopts() :: #{qos => integer(), - share => '$queue' | binary(), + share => binary(), atom() => term()}). -record(subscription, { diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 7015590d8..a941367c4 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -183,16 +183,18 @@ route([{To, Node}], Delivery) when Node =:= node() -> route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) -> forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]}); -route([{To, Shared}], Delivery) when is_tuple(Shared); is_binary(Shared) -> - emqx_shared_sub:dispatch(Shared, To, Delivery); +route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) -> + emqx_shared_sub:dispatch(Group, To, Delivery); route(Routes, Delivery) -> lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes). aggre([]) -> []; -aggre([#route{topic = To, dest = Dest}]) -> - [{To, Dest}]; +aggre([#route{topic = To, dest = Node}]) when is_atom(Node) -> + [{To, Node}]; +aggre([#route{topic = To, dest = {Group, _Node}}]) -> + [{To, Group}]; aggre(Routes) -> lists:foldl( fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index e8a9fac10..7a70fca59 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -81,7 +81,7 @@ record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. %% TODO: dispatch strategy, ensure the delivery... -dispatch({Group, _Node}, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> +dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> case pick(subscribers(Group, Topic)) of false -> Delivery; SubPid -> SubPid ! {dispatch, Topic, Msg}, diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 74a405f65..b122c114b 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -185,7 +185,7 @@ parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) -> parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) -> error({invalid_topic, Topic}); parse(<<"$queue/", Topic1/binary>>, Options) -> - parse(Topic1, maps:put(share, '$queue', Options)); + parse(Topic1, maps:put(share, <<"$queue">>, Options)); parse(<<"$share/", Topic1/binary>>, Options) -> [Group, Topic2] = binary:split(Topic1, <<"/">>), {Topic2, maps:put(share, Group, Options)};