From 07d815b3f76de52690341c7329d00c6ada53fb04 Mon Sep 17 00:00:00 2001 From: GilbertWong Date: Tue, 23 Jul 2019 14:45:24 +0800 Subject: [PATCH 1/6] Update gen_rpc tag to 2.4.0 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index db7590567..c28a43276 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,7 @@ {replayq, "0.1.1"}, %hex {esockd, "5.5.0"}, %hex {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}}, - {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.3.1"}}}, + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.0"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. From b61d72cdcd6915e72e5d86f7e15b217462eeb8f7 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 26 Jul 2019 11:22:25 +0800 Subject: [PATCH 2/6] Add options for gen_rpc --- etc/emqx.conf | 20 ++++++++++++++++++++ priv/emqx.schema | 29 +++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/etc/emqx.conf b/etc/emqx.conf index df3be2a9a..bebefadf0 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -303,6 +303,11 @@ rpc.tcp_server_port = 5369 ## Value: Port [1024-65535] rpc.tcp_client_port = 5369 +## Number of utgoing RPC connections. +## +## Value: Interger [1-256] +rpc.tcp_client_num = 32 + ## RCP Client connect timeout. ## ## Value: Seconds @@ -338,6 +343,21 @@ rpc.socket_keepalive_interval = 75s ## Value: Integer rpc.socket_keepalive_count = 9 +## Size of TCP send buffer. +## +## Value: Bytes +rpc.socket_sndbuf = 1MB + +## Size of TCP receive buffer. +## +## Value: Seconds +rpc.socket_recbuf = 1MB + +## Size of user-level software socket buffer. +## +## Value: Seconds +rpc.socket_buffer = 1MB + ##-------------------------------------------------------------------- ## Log ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 1d308476d..9d2896713 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -350,6 +350,13 @@ end}. {datatype, integer} ]}. +%% Default TCP port for outgoing connections +{mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ + {default, 32}, + {datatype, integer}, + {validators, ["range:gt_0_lt_256"]} +]}. + %% Client connect timeout {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [ {default, "5s"}, @@ -392,6 +399,28 @@ end}. {datatype, integer} ]}. +%% Size of TCP send buffer +{mapping, "rpc.socket_sndbuf", "gen_rpc.socket_sndbuf", [ + {default, "1MB"}, + {datatype, bytesize} +]}. + +%% Size of TCP receive buffer +{mapping, "rpc.socket_recbuf", "gen_rpc.socket_recbuf", [ + {default, "1MB"}, + {datatype, bytesize} +]}. + +%% Size of TCP receive buffer +{mapping, "rpc.socket_buffer", "gen_rpc.socket_buffer", [ + {default, "1MB"}, + {datatype, bytesize} +]}. + +{validator, "range:gt_0_lt_256", "must greater than 0 and less than 256", + fun(X) -> X > 0 andalso X < 256 end +}. + %%-------------------------------------------------------------------- %% Log %%-------------------------------------------------------------------- From c5ff9029743a66732d8de91a56ddce686e35ba91 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 26 Jul 2019 14:46:24 +0800 Subject: [PATCH 3/6] Update gen_rpc to 2.4.1 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index c28a43276..4c1fd58e8 100644 --- a/rebar.config +++ b/rebar.config @@ -5,7 +5,7 @@ {replayq, "0.1.1"}, %hex {esockd, "5.5.0"}, %hex {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}}, - {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.0"}}}, + {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}. From 11fdf101f0c88e9dd10d6e97814af62df64419d7 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 26 Jul 2019 15:13:58 +0800 Subject: [PATCH 4/6] Configurable rpc client number --- src/emqx_rpc.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/emqx_rpc.erl b/src/emqx_rpc.erl index 96adf6605..5d0370240 100644 --- a/src/emqx_rpc.erl +++ b/src/emqx_rpc.erl @@ -32,7 +32,8 @@ cast(Node, Mod, Fun, Args) -> filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)). rpc_node(Node) -> - {Node, erlang:system_info(scheduler_id)}. + {ok, ClientNum} = application:get_env(gen_rpc, tcp_client_num), + {Node, rand:uniform(ClientNum)}. rpc_nodes(Nodes) -> rpc_nodes(Nodes, []). From b0e2b7db0c0bc84e4e42d8e35b94cc22dee91e83 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 27 Jul 2019 12:59:07 +0800 Subject: [PATCH 5/6] Support inter-node messages via RPC cast --- etc/emqx.conf | 4 ++ include/emqx.hrl | 3 +- priv/emqx.schema | 6 +++ src/emqx.erl | 2 +- src/emqx_broker.erl | 88 ++++++++++++++++++++++---------------- src/emqx_packet.erl | 4 +- src/emqx_session.erl | 2 +- src/emqx_shared_sub.erl | 8 ++-- src/emqx_types.erl | 14 ++++-- test/emqx_broker_SUITE.erl | 5 ++- 10 files changed, 82 insertions(+), 54 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index bebefadf0..d5850d513 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -292,6 +292,10 @@ node.dist_listen_max = 6369 ##-------------------------------------------------------------------- ## RPC ##-------------------------------------------------------------------- +## RPC Mode. +## +## Value: sync | async +rpc.mode = async ## TCP server port for RPC. ## diff --git a/include/emqx.hrl b/include/emqx.hrl index 7afe009a2..24e4a6c3e 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -73,8 +73,7 @@ -record(delivery, { sender :: pid(), %% Sender of the delivery - message :: #message{}, %% The message delivered - results :: list() %% Dispatches of the message + message :: #message{} %% The message delivered }). %%-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 9d2896713..b55da3f22 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -338,6 +338,12 @@ end}. %% RPC %%-------------------------------------------------------------------- +%% RPC Mode. +{mapping, "rpc.mode", "emqx.rpc_mode", [ + {default, async}, + {datatype, {enum, [sync, async]}} +]}. + %% RPC server port. {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [ {default, 5369}, diff --git a/src/emqx.erl b/src/emqx.erl index 3c7622887..ffb791de8 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -114,7 +114,7 @@ subscribe(Topic, SubOpts) when is_map(SubOpts) -> subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) -> emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts). --spec(publish(emqx_types:message()) -> emqx_types:deliver_results()). +-spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) -> emqx_broker:publish(Msg). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b9a4eb1bd..fee327927 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -191,7 +191,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> %% Publish %%------------------------------------------------------------------------------ --spec(publish(emqx_types:message()) -> emqx_types:deliver_results()). +-spec(publish(emqx_types:message()) -> emqx_types:publish_result()). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), Headers = Msg#message.headers, @@ -200,8 +200,7 @@ publish(Msg) when is_record(Msg, message) -> ?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]), []; #message{topic = Topic} = Msg1 -> - Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), - Delivery#delivery.results + route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. %% Called internally @@ -217,27 +216,27 @@ safe_publish(Msg) when is_record(Msg, message) -> end. delivery(Msg) -> - #delivery{sender = self(), message = Msg, results = []}. + #delivery{sender = self(), message = Msg}. %%------------------------------------------------------------------------------ %% Route %%------------------------------------------------------------------------------ - -route([], Delivery = #delivery{message = Msg}) -> +-spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()). +route([], #delivery{message = Msg}) -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), - inc_dropped_cnt(Msg#message.topic), Delivery; - -route([{To, Node}], Delivery) when Node =:= node() -> - dispatch(To, Delivery); - -route([{To, Node}], Delivery = #delivery{results = Results}) when is_atom(Node) -> - forward(Node, To, Delivery#delivery{results = [{route, Node, To}|Results]}); - -route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) -> - emqx_shared_sub:dispatch(Group, To, Delivery); - + inc_dropped_cnt(Msg#message.topic), + []; route(Routes, Delivery) -> - lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes). + lists:foldl(fun(Route, Acc) -> + [do_route(Route, Delivery) | Acc] + end, [], Routes). + +do_route({To, Node}, Delivery) when Node =:= node() -> + {Node, To, dispatch(To, Delivery)}; +do_route({To, Node}, Delivery) when is_atom(Node) -> + {Node, To, forward(Node, To, Delivery, emqx_config:get_env(rpc_mode, async))}; +do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> + {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. aggre([]) -> []; @@ -254,45 +253,58 @@ aggre(Routes) -> end, [], Routes). %% @doc Forward message to another node. -forward(Node, To, Delivery) -> - %% rpc:call to ensure the delivery, but the latency:( +-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async) + -> emqx_types:deliver_result()). +forward(Node, To, Delivery, async) -> + case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of + true -> ok; + {badrpc, Reason} -> + ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]), + {error, badrpc} + end; + +forward(Node, To, Delivery, sync) -> case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of {badrpc, Reason} -> - ?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]), - Delivery; - Delivery1 -> Delivery1 + ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]), + {error, badrpc}; + Result -> Result end. --spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:delivery()). -dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> +-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()). +dispatch(Topic, #delivery{message = Msg}) -> case subscribers(Topic) of [] -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), inc_dropped_cnt(Topic), - Delivery; + {error, no_subscribers}; [Sub] -> %% optimize? - Cnt = dispatch(Sub, Topic, Msg), - Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}; + dispatch(Sub, Topic, Msg); Subs -> - Cnt = lists:foldl( - fun(Sub, Acc) -> - dispatch(Sub, Topic, Msg) + Acc - end, 0, Subs), - Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]} + lists:foldl( + fun(Sub, Res) -> + case dispatch(Sub, Topic, Msg) of + ok -> Res; + Err -> Err + end + end, ok, Subs) end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of true -> SubPid ! {dispatch, Topic, Msg}, - 1; - false -> 0 + ok; + false -> {error, subscriber_die} end; dispatch({shard, I}, Topic, Msg) -> lists:foldl( - fun(SubPid, Cnt) -> - dispatch(SubPid, Topic, Msg) + Cnt - end, 0, subscribers({shard, Topic, I})). + fun(SubPid, Res) -> + case dispatch(SubPid, Topic, Msg) of + ok -> Res; + Err -> Err + end + end, ok, subscribers({shard, Topic, I})). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index ba28bddac..19d93933a 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -233,11 +233,11 @@ format_variable(#mqtt_packet_puback{packet_id = PacketId}) -> format_variable(#mqtt_packet_subscribe{packet_id = PacketId, topic_filters = TopicFilters}) -> - io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, TopicFilters]); + io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]); format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, topic_filters = Topics}) -> - io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, Topics]); + io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]); format_variable(#mqtt_packet_suback{packet_id = PacketId, reason_codes = ReasonCodes}) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 7d23c9e62..7cdcbece3 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -257,7 +257,7 @@ subscribe(SPid, PacketId, Properties, TopicFilters) -> %% @doc Called by connection processes when publishing messages -spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message()) - -> {ok, emqx_types:deliver_results()} | {error, term()}). + -> {ok, emqx_types:publish_result()} | {error, term()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message directly {ok, emqx_broker:publish(Msg)}; diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 3b0d48189..d42fec7d3 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -103,19 +103,19 @@ record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) - -> emqx_types:delivery()). + -> emqx_topic:deliver_result()). dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = []). -dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, FailedSubs) -> +dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId} = Msg, case pick(strategy(), ClientId, Group, Topic, FailedSubs) of false -> - Delivery; + {error, no_subscribers}; {Type, SubPid} -> case do_dispatch(SubPid, Topic, Msg, Type) of ok -> - Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}; + ok; {error, _Reason} -> %% Failed to dispatch to this sub, try next. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 50e7f78c0..59f4f9336 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -46,10 +46,13 @@ ]). -export_type([ delivery/0 - , deliver_results/0 + , publish_result/0 + , deliver_result/0 ]). --export_type([route/0]). +-export_type([ route/0 + , route_entry/0 + ]). -export_type([ alarm/0 , plugin/0 @@ -99,9 +102,12 @@ -type(message() :: #message{}). -type(banned() :: #banned{}). -type(delivery() :: #delivery{}). --type(deliver_results() :: [{route, node(), topic()} | - {dispatch, topic(), pos_integer()}]). +-type(deliver_result() :: ok | {error, term()}). +-type(publish_result() :: [ {node(), topic(), deliver_result()} + | {share, topic(), deliver_result()}]). -type(route() :: #route{}). +-type(sub_group() :: tuple() | binary()). +-type(route_entry() :: {topic(), node()} | {topic, sub_group()}). -type(alarm() :: #alarm{}). -type(plugin() :: #plugin{}). -type(command() :: #command{}). diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 14cd9e4ca..76bf49b71 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -74,8 +74,9 @@ publish(_) -> dispatch_with_no_sub(_) -> Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>), - Delivery = #delivery{sender = self(), message = Msg, results = []}, - ?assertEqual(Delivery, emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)). + Delivery = #delivery{sender = self(), message = Msg}, + ?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}], + emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)). pubsub(_) -> true = emqx:is_running(node()), From 20a7ed6f53a203d14b4477cbad2086c524d9f3ea Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Sat, 27 Jul 2019 20:44:44 +0800 Subject: [PATCH 6/6] RPC batch --- etc/emqx.conf | 8 ++++++++ priv/emqx.schema | 5 +++++ src/emqx_shared_sub.erl | 2 +- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index d5850d513..2f03ec7c1 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -297,6 +297,14 @@ node.dist_listen_max = 6369 ## Value: sync | async rpc.mode = async +## Max batch size of async RPC requests. +## +## Value: Integer +## Zero or negative value disables rpc batching. +## +## NOTE: RPC batch won't work when rpc.mode = sync +rpc.async_batch_size = 256 + ## TCP server port for RPC. ## ## Value: Port [1024-65535] diff --git a/priv/emqx.schema b/priv/emqx.schema index b55da3f22..c7d6d0af5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -344,6 +344,11 @@ end}. {datatype, {enum, [sync, async]}} ]}. +{mapping, "rpc.async_batch_size", "gen_rpc.max_batch_size", [ + {default, 256}, + {datatype, integer} +]}. + %% RPC server port. {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [ {default, 5369}, diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index d42fec7d3..3184fabb4 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -103,7 +103,7 @@ record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) - -> emqx_topic:deliver_result()). + -> emqx_types:deliver_result()). dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = []).