From b0e2b7db0c0bc84e4e42d8e35b94cc22dee91e83 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 27 Jul 2019 12:59:07 +0800 Subject: [PATCH] Support inter-node messages via RPC cast --- etc/emqx.conf | 4 ++ include/emqx.hrl | 3 +- priv/emqx.schema | 6 +++ src/emqx.erl | 2 +- src/emqx_broker.erl | 88 ++++++++++++++++++++++---------------- src/emqx_packet.erl | 4 +- src/emqx_session.erl | 2 +- src/emqx_shared_sub.erl | 8 ++-- src/emqx_types.erl | 14 ++++-- test/emqx_broker_SUITE.erl | 5 ++- 10 files changed, 82 insertions(+), 54 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index bebefadf0..d5850d513 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -292,6 +292,10 @@ node.dist_listen_max = 6369 ##-------------------------------------------------------------------- ## RPC ##-------------------------------------------------------------------- +## RPC Mode. +## +## Value: sync | async +rpc.mode = async ## TCP server port for RPC. ## diff --git a/include/emqx.hrl b/include/emqx.hrl index 7afe009a2..24e4a6c3e 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -73,8 +73,7 @@ -record(delivery, { sender :: pid(), %% Sender of the delivery - message :: #message{}, %% The message delivered - results :: list() %% Dispatches of the message + message :: #message{} %% The message delivered }). %%-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 9d2896713..b55da3f22 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -338,6 +338,12 @@ end}. %% RPC %%-------------------------------------------------------------------- +%% RPC Mode. +{mapping, "rpc.mode", "emqx.rpc_mode", [ + {default, async}, + {datatype, {enum, [sync, async]}} +]}. + %% RPC server port. {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [ {default, 5369}, diff --git a/src/emqx.erl b/src/emqx.erl index 3c7622887..ffb791de8 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -114,7 +114,7 @@ subscribe(Topic, SubOpts) when is_map(SubOpts) -> subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) -> emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts). --spec(publish(emqx_types:message()) -> emqx_types:deliver_results()). +-spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) -> emqx_broker:publish(Msg). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b9a4eb1bd..fee327927 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -191,7 +191,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> %% Publish %%------------------------------------------------------------------------------ --spec(publish(emqx_types:message()) -> emqx_types:deliver_results()). +-spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), Headers = Msg#message.headers, @@ -200,8 +200,7 @@ publish(Msg) when is_record(Msg, message) -> ?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]), []; #message{topic = Topic} = Msg1 -> - Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), - Delivery#delivery.results + route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. %% Called internally @@ -217,27 +216,27 @@ safe_publish(Msg) when is_record(Msg, message) -> end. delivery(Msg) -> - #delivery{sender = self(), message = Msg, results = []}. + #delivery{sender = self(), message = Msg}. %%------------------------------------------------------------------------------ %% Route %%------------------------------------------------------------------------------ - -route([], Delivery = #delivery{message = Msg}) -> +-spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()). +route([], #delivery{message = Msg}) -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - inc_dropped_cnt(Msg#message.topic), Delivery; - -route([{To, Node}], Delivery) when Node =:= node() -> - dispatch(To, Delivery); - -route([{To, Node}], Delivery = #delivery{results = Results}) when is_atom(Node) -> - forward(Node, To, Delivery#delivery{results = [{route, Node, To}|Results]}); - -route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) -> - emqx_shared_sub:dispatch(Group, To, Delivery); - + inc_dropped_cnt(Msg#message.topic), + []; route(Routes, Delivery) -> - lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes). + lists:foldl(fun(Route, Acc) -> + [do_route(Route, Delivery) | Acc] + end, [], Routes). + +do_route({To, Node}, Delivery) when Node =:= node() -> + {Node, To, dispatch(To, Delivery)}; +do_route({To, Node}, Delivery) when is_atom(Node) -> + {Node, To, forward(Node, To, Delivery, emqx_config:get_env(rpc_mode, async))}; +do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> + {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. aggre([]) -> []; @@ -254,45 +253,58 @@ aggre(Routes) -> end, [], Routes). %% @doc Forward message to another node. -forward(Node, To, Delivery) -> - %% rpc:call to ensure the delivery, but the latency:( +-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) + -> emqx_types:deliver_result()). +forward(Node, To, Delivery, async) -> + case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of + true -> ok; + {badrpc, Reason} -> + ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), + {error, badrpc} + end; + +forward(Node, To, Delivery, sync) -> case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]), - Delivery; - Delivery1 -> Delivery1 + ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), + {error, badrpc}; + Result -> Result end. --spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:delivery()). -dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> +-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). +dispatch(Topic, #delivery{message = Msg}) -> case subscribers(Topic) of [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), inc_dropped_cnt(Topic), - Delivery; + {error, no_subscribers}; [Sub] -> %% optimize? - Cnt = dispatch(Sub, Topic, Msg), - Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}; + dispatch(Sub, Topic, Msg); Subs -> - Cnt = lists:foldl( - fun(Sub, Acc) -> - dispatch(Sub, Topic, Msg) + Acc - end, 0, Subs), - Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]} + lists:foldl( + fun(Sub, Res) -> + case dispatch(Sub, Topic, Msg) of + ok -> Res; + Err -> Err + end + end, ok, Subs) end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of true -> SubPid ! {dispatch, Topic, Msg}, - 1; - false -> 0 + ok; + false -> {error, subscriber_die} end; dispatch({shard, I}, Topic, Msg) -> lists:foldl( - fun(SubPid, Cnt) -> - dispatch(SubPid, Topic, Msg) + Cnt - end, 0, subscribers({shard, Topic, I})). + fun(SubPid, Res) -> + case dispatch(SubPid, Topic, Msg) of + ok -> Res; + Err -> Err + end + end, ok, subscribers({shard, Topic, I})). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index ba28bddac..19d93933a 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -233,11 +233,11 @@ format_variable(#mqtt_packet_puback{packet_id = PacketId}) -> format_variable(#mqtt_packet_subscribe{packet_id = PacketId, topic_filters = TopicFilters}) -> - io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, TopicFilters]); + io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]); format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, topic_filters = Topics}) -> - io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, Topics]); + io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]); format_variable(#mqtt_packet_suback{packet_id = PacketId, reason_codes = ReasonCodes}) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 7d23c9e62..7cdcbece3 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -257,7 +257,7 @@ subscribe(SPid, PacketId, Properties, TopicFilters) -> %% @doc Called by connection processes when publishing messages -spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message()) - -> {ok, emqx_types:deliver_results()} | {error, term()}). + -> {ok, emqx_types:publish_result()} | {error, term()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message directly {ok, emqx_broker:publish(Msg)}; diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 3b0d48189..d42fec7d3 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -103,19 +103,19 @@ record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) - -> emqx_types:delivery()). + -> emqx_topic:deliver_result()). dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = []). -dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, FailedSubs) -> +dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId} = Msg, case pick(strategy(), ClientId, Group, Topic, FailedSubs) of false -> - Delivery; + {error, no_subscribers}; {Type, SubPid} -> case do_dispatch(SubPid, Topic, Msg, Type) of ok -> - Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}; + ok; {error, _Reason} -> %% Failed to dispatch to this sub, try next. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 50e7f78c0..59f4f9336 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -46,10 +46,13 @@ ]). -export_type([ delivery/0 - , deliver_results/0 + , publish_result/0 + , deliver_result/0 ]). --export_type([route/0]). +-export_type([ route/0 + , route_entry/0 + ]). -export_type([ alarm/0 , plugin/0 @@ -99,9 +102,12 @@ -type(message() :: #message{}). -type(banned() :: #banned{}). -type(delivery() :: #delivery{}). --type(deliver_results() :: [{route, node(), topic()} | - {dispatch, topic(), pos_integer()}]). +-type(deliver_result() :: ok | {error, term()}). +-type(publish_result() :: [ {node(), topic(), deliver_result()} + | {share, topic(), deliver_result()}]). -type(route() :: #route{}). +-type(sub_group() :: tuple() | binary()). +-type(route_entry() :: {topic(), node()} | {topic, sub_group()}). -type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 14cd9e4ca..76bf49b71 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -74,8 +74,9 @@ publish(_) -> dispatch_with_no_sub(_) -> Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>), - Delivery = #delivery{sender = self(), message = Msg, results = []}, - ?assertEqual(Delivery, emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)). + Delivery = #delivery{sender = self(), message = Msg}, + ?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}], + emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)). pubsub(_) -> true = emqx:is_running(node()),