Fix conflicts

This commit is contained in:
turtled 2019-07-31 16:43:26 +08:00
commit 4aff37772f
12 changed files with 147 additions and 56 deletions

View File

@ -292,6 +292,18 @@ node.dist_listen_max = 6369
##--------------------------------------------------------------------
## RPC
##--------------------------------------------------------------------
## RPC Mode.
##
## Value: sync | async
rpc.mode = async
## Max batch size of async RPC requests.
##
## Value: Integer
## Zero or negative value disables rpc batching.
##
## NOTE: RPC batch won't work when rpc.mode = sync
rpc.async_batch_size = 256
## TCP server port for RPC.
##
@ -303,6 +315,11 @@ rpc.tcp_server_port = 5369
## Value: Port [1024-65535]
rpc.tcp_client_port = 5369
## Number of utgoing RPC connections.
##
## Value: Interger [1-256]
rpc.tcp_client_num = 32
## RCP Client connect timeout.
##
## Value: Seconds
@ -338,6 +355,21 @@ rpc.socket_keepalive_interval = 75s
## Value: Integer
rpc.socket_keepalive_count = 9
## Size of TCP send buffer.
##
## Value: Bytes
rpc.socket_sndbuf = 1MB
## Size of TCP receive buffer.
##
## Value: Seconds
rpc.socket_recbuf = 1MB
## Size of user-level software socket buffer.
##
## Value: Seconds
rpc.socket_buffer = 1MB
##--------------------------------------------------------------------
## Log
##--------------------------------------------------------------------

View File

@ -73,8 +73,7 @@
-record(delivery, {
sender :: pid(), %% Sender of the delivery
message :: #message{}, %% The message delivered
results :: list() %% Dispatches of the message
message :: #message{} %% The message delivered
}).
%%--------------------------------------------------------------------

View File

@ -338,6 +338,17 @@ end}.
%% RPC
%%--------------------------------------------------------------------
%% RPC Mode.
{mapping, "rpc.mode", "emqx.rpc_mode", [
{default, async},
{datatype, {enum, [sync, async]}}
]}.
{mapping, "rpc.async_batch_size", "gen_rpc.max_batch_size", [
{default, 256},
{datatype, integer}
]}.
%% RPC server port.
{mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [
{default, 5369},
@ -350,6 +361,13 @@ end}.
{datatype, integer}
]}.
%% Default TCP port for outgoing connections
{mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [
{default, 32},
{datatype, integer},
{validators, ["range:gt_0_lt_256"]}
]}.
%% Client connect timeout
{mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [
{default, "5s"},
@ -392,6 +410,28 @@ end}.
{datatype, integer}
]}.
%% Size of TCP send buffer
{mapping, "rpc.socket_sndbuf", "gen_rpc.socket_sndbuf", [
{default, "1MB"},
{datatype, bytesize}
]}.
%% Size of TCP receive buffer
{mapping, "rpc.socket_recbuf", "gen_rpc.socket_recbuf", [
{default, "1MB"},
{datatype, bytesize}
]}.
%% Size of TCP receive buffer
{mapping, "rpc.socket_buffer", "gen_rpc.socket_buffer", [
{default, "1MB"},
{datatype, bytesize}
]}.
{validator, "range:gt_0_lt_256", "must greater than 0 and less than 256",
fun(X) -> X > 0 andalso X < 256 end
}.
%%--------------------------------------------------------------------
%% Log
%%--------------------------------------------------------------------

View File

@ -5,7 +5,7 @@
{replayq, "0.1.1"}, %hex
{esockd, "5.5.0"}, %hex
{ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.0"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
]}.

View File

@ -114,7 +114,7 @@ subscribe(Topic, SubOpts) when is_map(SubOpts) ->
subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
publish(Msg) ->
emqx_broker:publish(Msg).

View File

@ -191,7 +191,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
%% Publish
%%------------------------------------------------------------------------------
-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
publish(Msg) when is_record(Msg, message) ->
_ = emqx_tracer:trace(publish, Msg),
Headers = Msg#message.headers,
@ -200,8 +200,7 @@ publish(Msg) when is_record(Msg, message) ->
?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]),
[];
#message{topic = Topic} = Msg1 ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.results
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
end.
%% Called internally
@ -217,27 +216,27 @@ safe_publish(Msg) when is_record(Msg, message) ->
end.
delivery(Msg) ->
#delivery{sender = self(), message = Msg, results = []}.
#delivery{sender = self(), message = Msg}.
%%------------------------------------------------------------------------------
%% Route
%%------------------------------------------------------------------------------
route([], Delivery = #delivery{message = Msg}) ->
-spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()).
route([], #delivery{message = Msg}) ->
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
inc_dropped_cnt(Msg#message.topic), Delivery;
route([{To, Node}], Delivery) when Node =:= node() ->
dispatch(To, Delivery);
route([{To, Node}], Delivery = #delivery{results = Results}) when is_atom(Node) ->
forward(Node, To, Delivery#delivery{results = [{route, Node, To}|Results]});
route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) ->
emqx_shared_sub:dispatch(Group, To, Delivery);
inc_dropped_cnt(Msg#message.topic),
[];
route(Routes, Delivery) ->
lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
lists:foldl(fun(Route, Acc) ->
[do_route(Route, Delivery) | Acc]
end, [], Routes).
do_route({To, Node}, Delivery) when Node =:= node() ->
{Node, To, dispatch(To, Delivery)};
do_route({To, Node}, Delivery) when is_atom(Node) ->
{Node, To, forward(Node, To, Delivery, emqx_config:get_env(rpc_mode, async))};
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
aggre([]) ->
[];
@ -254,45 +253,58 @@ aggre(Routes) ->
end, [], Routes).
%% @doc Forward message to another node.
forward(Node, To, Delivery) ->
%% rpc:call to ensure the delivery, but the latency:(
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
-> emqx_types:deliver_result()).
forward(Node, To, Delivery, async) ->
case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
true -> ok;
{badrpc, Reason} ->
?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
{error, badrpc}
end;
forward(Node, To, Delivery, sync) ->
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]),
Delivery;
Delivery1 -> Delivery1
?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
{error, badrpc};
Result -> Result
end.
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:delivery()).
dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
dispatch(Topic, #delivery{message = Msg}) ->
case subscribers(Topic) of
[] ->
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
inc_dropped_cnt(Topic),
Delivery;
{error, no_subscribers};
[Sub] -> %% optimize?
Cnt = dispatch(Sub, Topic, Msg),
Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]};
dispatch(Sub, Topic, Msg);
Subs ->
Cnt = lists:foldl(
fun(Sub, Acc) ->
dispatch(Sub, Topic, Msg) + Acc
end, 0, Subs),
Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}
lists:foldl(
fun(Sub, Res) ->
case dispatch(Sub, Topic, Msg) of
ok -> Res;
Err -> Err
end
end, ok, Subs)
end.
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
case erlang:is_process_alive(SubPid) of
true ->
SubPid ! {dispatch, Topic, Msg},
1;
false -> 0
ok;
false -> {error, subscriber_die}
end;
dispatch({shard, I}, Topic, Msg) ->
lists:foldl(
fun(SubPid, Cnt) ->
dispatch(SubPid, Topic, Msg) + Cnt
end, 0, subscribers({shard, Topic, I})).
fun(SubPid, Res) ->
case dispatch(SubPid, Topic, Msg) of
ok -> Res;
Err -> Err
end
end, ok, subscribers({shard, Topic, I})).
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
ok;

View File

@ -233,11 +233,11 @@ format_variable(#mqtt_packet_puback{packet_id = PacketId}) ->
format_variable(#mqtt_packet_subscribe{packet_id = PacketId,
topic_filters = TopicFilters}) ->
io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, TopicFilters]);
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]);
format_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
topic_filters = Topics}) ->
io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, Topics]);
io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]);
format_variable(#mqtt_packet_suback{packet_id = PacketId,
reason_codes = ReasonCodes}) ->

View File

@ -32,7 +32,8 @@ cast(Node, Mod, Fun, Args) ->
filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
rpc_node(Node) ->
{Node, erlang:system_info(scheduler_id)}.
{ok, ClientNum} = application:get_env(gen_rpc, tcp_client_num),
{Node, rand:uniform(ClientNum)}.
rpc_nodes(Nodes) ->
rpc_nodes(Nodes, []).

View File

@ -257,7 +257,7 @@ subscribe(SPid, PacketId, Properties, TopicFilters) ->
%% @doc Called by connection processes when publishing messages
-spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message())
-> {ok, emqx_types:deliver_results()} | {error, term()}).
-> {ok, emqx_types:publish_result()} | {error, term()}).
publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 message directly
{ok, emqx_broker:publish(Msg)};

View File

@ -103,19 +103,19 @@ record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
-> emqx_types:delivery()).
-> emqx_types:deliver_result()).
dispatch(Group, Topic, Delivery) ->
dispatch(Group, Topic, Delivery, _FailedSubs = []).
dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, FailedSubs) ->
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
#message{from = ClientId} = Msg,
case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
false ->
Delivery;
{error, no_subscribers};
{Type, SubPid} ->
case do_dispatch(SubPid, Topic, Msg, Type) of
ok ->
Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]};
ok;
{error, _Reason} ->
%% Failed to dispatch to this sub, try next.
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])

View File

@ -46,10 +46,13 @@
]).
-export_type([ delivery/0
, deliver_results/0
, publish_result/0
, deliver_result/0
]).
-export_type([route/0]).
-export_type([ route/0
, route_entry/0
]).
-export_type([ alarm/0
, plugin/0
@ -99,9 +102,12 @@
-type(message() :: #message{}).
-type(banned() :: #banned{}).
-type(delivery() :: #delivery{}).
-type(deliver_results() :: [{route, node(), topic()} |
{dispatch, topic(), pos_integer()}]).
-type(deliver_result() :: ok | {error, term()}).
-type(publish_result() :: [ {node(), topic(), deliver_result()}
| {share, topic(), deliver_result()}]).
-type(route() :: #route{}).
-type(sub_group() :: tuple() | binary()).
-type(route_entry() :: {topic(), node()} | {topic, sub_group()}).
-type(alarm() :: #alarm{}).
-type(plugin() :: #plugin{}).
-type(command() :: #command{}).

View File

@ -74,8 +74,9 @@ publish(_) ->
dispatch_with_no_sub(_) ->
Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
Delivery = #delivery{sender = self(), message = Msg, results = []},
?assertEqual(Delivery, emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
Delivery = #delivery{sender = self(), message = Msg},
?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}],
emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
pubsub(_) ->
true = emqx:is_running(node()),