Merge emqx30 (#2181)

* Change the reason code in will topic acl check (#2168)

* Fix bridge bug (#2160)

* Limit bridge QoS less than 1

* Improve shared sub dispatch implementation. (#2144)

* Upgrade ekka, esockd libraries

* Improve the 'try_open_session' function

* Reload config (#2180)
This commit is contained in:
turtleDeng 2019-01-25 13:01:48 +08:00 committed by GitHub
parent b8929a46c1
commit 00863acea3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 140 additions and 96 deletions

View File

@ -8,8 +8,8 @@ DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq
dep_jsx = hex-emqx 2.9.0
dep_gproc = hex-emqx 0.8.0
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.0
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.3
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.1
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3
dep_cowboy = hex-emqx 2.4.0
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1

View File

@ -6,9 +6,9 @@
%% appended to deps in rebar.config.script
{github_emqx_deps,
[{gen_rpc, "2.3.0"},
{ekka, "v0.5.1"},
{ekka, "v0.5.3"},
{replayq, "v0.1.1"},
{esockd, "v5.4.3"},
{esockd, "v5.4.4"},
{cuttlefish, "v2.2.1"}
]}.

View File

@ -17,7 +17,7 @@
-include("emqx.hrl").
%% Start/Stop the application
-export([start/0, is_running/1, stop/0]).
-export([start/0, restart/1, is_running/1, stop/0]).
%% PubSub API
-export([subscribe/1, subscribe/2, subscribe/3]).
@ -47,6 +47,12 @@ start() ->
%% Check Mnesia
application:ensure_all_started(?APP).
-spec(restart(string()) -> ok).
restart(ConfFile) ->
reload_config(ConfFile),
shutdown(),
reboot().
%% @doc Stop emqx application.
-spec(stop() -> ok | {error, term()}).
stop() ->
@ -158,3 +164,11 @@ shutdown(Reason) ->
reboot() ->
lists:foreach(fun application:start/1, [gproc, esockd, ranch, cowboy, ekka, emqx]).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
reload_config(ConfFile) ->
{ok, [Conf]} = file:consult(ConfFile),
lists:foreach(fun({App, Vals}) ->
[application:set_env(App, Par, Val) || {Par, Val} <- Vals]
end, Conf).

View File

@ -114,12 +114,12 @@ init([Options]) ->
ReconnectInterval = get_value(reconnect_interval, Options, 30000),
Mountpoint = format_mountpoint(get_value(mountpoint, Options)),
QueueOptions = get_value(queue, Options),
{ok, #state{mountpoint = Mountpoint,
queue_option = QueueOptions,
readq = [],
writeq = [],
options = Options,
reconnect_interval = ReconnectInterval}}.
{ok, #state{mountpoint = Mountpoint,
queue_option = QueueOptions,
readq = [],
writeq = [],
options = Options,
reconnect_interval = ReconnectInterval}}.
handle_call(start_bridge, _From, State = #state{client_pid = undefined}) ->
{Msg, NewState} = bridge(start, State),
@ -228,16 +228,19 @@ handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) ->
%%----------------------------------------------------------------
%% received local node message
%%----------------------------------------------------------------
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
handle_info({dispatch, _, #message{topic = Topic, qos = QoS, payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = undefined,
mountpoint = Mountpoint}) ->
mountpoint = Mountpoint})
when QoS =< 1 ->
Msg = #mqtt_msg{qos = 1,
retain = Retain,
topic = mountpoint(Mountpoint, Topic),
payload = Payload},
{noreply, en_writeq({undefined, Msg}, State)};
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = Pid, mountpoint = Mountpoint}) ->
handle_info({dispatch, _, #message{topic = Topic, qos = QoS ,payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = Pid,
mountpoint = Mountpoint})
when QoS =< 1 ->
Msg = #mqtt_msg{qos = 1,
retain = Retain,
topic = mountpoint(Mountpoint, Topic),
@ -347,7 +350,6 @@ format_mountpoint(undefined) ->
format_mountpoint(Prefix) ->
binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
en_writeq(Msg, State = #state{replayq = ReplayQ,
queue_option = #{mem_cache := false}}) ->
NewReplayQ = replayq:append(ReplayQ, [Msg]),
@ -369,16 +371,21 @@ publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) ->
publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]).
delete(PktId, State = #state{ replayq = ReplayQ,
queue_option = #{ mem_cache := false }}) ->
readq = [],
queue_option = #{ mem_cache := false}}) ->
{NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}),
logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msgs]),
ok = replayq:ack(NewReplayQ, NewAckRef),
case Msgs of
[{PktId, Msg}] ->
logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msg]),
replayq:ack(ReplayQ, NewAckRef),
State#state{ replayq = NewReplayQ, ackref = NewAckRef};
_ ->
[{PktId, _Msg}] ->
self() ! pop,
State
State#state{ replayq = NewReplayQ, ackref = NewAckRef };
[{_PktId, _Msg}] ->
NewReplayQ1 = replayq:append(NewReplayQ, Msgs),
self() ! pop,
State#state{ replayq = NewReplayQ1, ackref = NewAckRef };
_Empty ->
State#state{ replayq = NewReplayQ, ackref = NewAckRef}
end;
delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) ->
ok = replayq:ack(ReplayQ, AckRef),
@ -388,8 +395,16 @@ delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref
delete(PktId, State = #state{readq = [], writeq = WriteQ}) ->
State#state{writeq = lists:keydelete(PktId, 1, WriteQ)};
delete(PktId, State = #state{readq = ReadQ}) ->
State#state{readq = lists:keydelete(PktId, 1, ReadQ)}.
delete(PktId, State = #state{readq = ReadQ, replayq = ReplayQ, ackref = AckRef}) ->
NewReadQ = lists:keydelete(PktId, 1, ReadQ),
case NewReadQ of
[] ->
ok = replayq:ack(ReplayQ, AckRef),
self() ! pop;
_NewReadQ ->
ok
end,
State#state{ readq = NewReadQ }.
bridge(Action, State = #state{options = Options,
replayq = ReplayQ,
@ -397,7 +412,7 @@ bridge(Action, State = #state{options = Options,
= QueueOption
= #{batch_size := BatchSize}})
when BatchSize > 0 ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
case emqx_client:start_link([{owner, self()} | options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->

View File

@ -21,6 +21,7 @@
-export([init/2]).
-export([info/1]).
-export([attrs/1]).
-export([attr/2]).
-export([caps/1]).
-export([stats/1]).
-export([client_id/1]).
@ -162,6 +163,28 @@ attrs(#pstate{zone = Zone,
{is_bridge, IsBridge},
{connected_at, ConnectedAt}].
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535);
attr(max_inflight, #pstate{zone = Zone}) ->
emqx_zone:get_env(Zone, max_inflight, 65535);
attr(expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Session-Expiry-Interval', ConnProps, 0);
attr(expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}) ->
case CleanStart of
true -> 0;
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end;
attr(topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Topic-Alias-Maximum', ConnProps, 0);
attr(topic_alias_maximum, #pstate{zone = Zone}) ->
emqx_zone:get_env(Zone, max_topic_alias, 0);
attr(Name, PState) ->
Attrs = lists:zip(record_info(fields, pstate), tl(tuple_to_list(PState))),
case lists:keyfind(Name, 1, Attrs) of
{_, Value} -> Value;
false -> undefined
end.
caps(#pstate{zone = Zone}) ->
emqx_mqtt_caps:get_caps(Zone).
@ -348,8 +371,8 @@ process_packet(?CONNECT_PACKET(
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
%% Open session
SessAttrs = lists:foldl(fun set_session_attrs/2, #{will_msg => make_will_msg(ConnPkt)}, [{max_inflight, PState3}, {expiry_interval, PState3}, {misc, PState3}]),
case try_open_session(SessAttrs) of
SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
case try_open_session(SessAttrs, PState3) of
{ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid, connected = true},
ok = emqx_cm:register_connection(client_id(PState4)),
@ -673,54 +696,26 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps})
maybe_assign_client_id(PState) ->
PState.
try_open_session(SessAttrs = #{zone := _,
client_id := _,
conn_pid := _,
username := _,
will_msg := _,
clean_start := _}) ->
case emqx_sm:open_session(SessAttrs) of
try_open_session(SessAttrs, PState = #pstate{zone = Zone,
client_id = ClientId,
conn_pid = ConnPid,
username = Username,
clean_start = CleanStart}) ->
case emqx_sm:open_session(
maps:merge(#{zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart,
max_inflight => attr(max_inflight, PState),
expiry_interval => attr(expiry_interval, PState),
topic_alias_maximum => attr(topic_alias_maximum, PState)},
SessAttrs)) of
{ok, SPid} ->
{ok, SPid, false};
Other -> Other
end.
set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs);
set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) ->
maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs);
set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs);
set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) ->
maps:put(expiry_interval, case CleanStart of
true -> 0;
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end, SessAttrs);
set_session_attrs({topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
maps:put(topic_alias_maximum, get_property('Topic-Alias-Maximum', ConnProps, 0), SessAttrs);
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) ->
maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs);
set_session_attrs({misc, #pstate{zone = Zone,
client_id = ClientId,
conn_pid = ConnPid,
username = Username,
clean_start = CleanStart}}, SessAttrs) ->
SessAttrs#{zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart};
set_session_attrs(_, SessAttrs) ->
SessAttrs.
authenticate(Credentials, Password) ->
case emqx_access_control:authenticate(Credentials, Password) of
ok -> {ok, false};
@ -821,7 +816,7 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) ->
allow -> ok;
deny ->
?LOG(warning, "Will message (to ~s) validation failed, acl denied", [WillTopic]),
{error, ?RC_UNSPECIFIED_ERROR}
{error, ?RC_NOT_AUTHORIZED}
end.
check_publish(Packet, PState) ->
@ -978,3 +973,4 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
undefined;
reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
[emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].

View File

@ -184,7 +184,7 @@ info(State = #state{conn_pid = ConnPid,
{upgrade_qos, UpgradeQoS},
{inflight, Inflight},
{retry_interval, RetryInterval},
{mqueue_len, MQueue},
{mqueue_len, emqx_mqueue:len(MQueue)},
{awaiting_rel, AwaitingRel},
{max_awaiting_rel, MaxAwaitingRel},
{await_rel_timeout, AwaitRelTimeout}].

View File

@ -91,18 +91,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F
case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
false ->
Delivery;
SubPid ->
case do_dispatch(SubPid, Topic, Msg) of
{Type, SubPid} ->
case do_dispatch(SubPid, Topic, Msg, Type) of
ok ->
Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]};
{error, _Reason} ->
%% failed to dispatch to this sub, try next
%% 'Reason' is discarded so far, meaning for QoS1/2 messages
%% if all subscribers are off line, the dispatch would faile
%% even if there are sessions not expired yet.
%% If required, we can make use of the 'no_connection' reason to perform
%% retry without requiring acks, so the messages can be delivered
%% to sessions of offline clients
%% Failed to dispatch to this sub, try next.
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
end
end.
@ -115,19 +109,23 @@ strategy() ->
ack_enabled() ->
emqx_config:get_env(shared_dispatch_ack_enabled, false).
do_dispatch(SubPid, Topic, Msg) when SubPid =:= self() ->
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
ok;
do_dispatch(SubPid, Topic, Msg) ->
dispatch_per_qos(SubPid, Topic, Msg).
do_dispatch(SubPid, Topic, Msg, Type) ->
dispatch_per_qos(SubPid, Topic, Msg, Type).
%% return either 'ok' (when everything is fine) or 'error'
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg) ->
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
ok;
dispatch_per_qos(SubPid, Topic, Msg) ->
dispatch_per_qos(SubPid, Topic, Msg, retry) ->
%% Retry implies all subscribers nack:ed, send again without ack
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
ok;
dispatch_per_qos(SubPid, Topic, Msg, fresh) ->
case ack_enabled() of
true ->
dispatch_with_ack(SubPid, Topic, Msg);
@ -211,24 +209,32 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) ->
true ->
%% the old subscriber is still alive
%% keep using it for sticky strategy
Sub0;
{fresh, Sub0};
false ->
%% randomly pick one for the first message
Sub = do_pick(random, ClientId, Group, Topic, FailedSubs),
{Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]),
%% stick to whatever pick result
erlang:put({shared_sub_sticky, Group, Topic}, Sub),
Sub
{Type, Sub}
end;
pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
do_pick(Strategy, ClientId, Group, Topic, FailedSubs).
do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
case subscribers(Group, Topic) -- FailedSubs of
[] -> false;
[Sub] -> Sub;
All -> pick_subscriber(Group, Topic, Strategy, ClientId, All)
All = subscribers(Group, Topic),
case All -- FailedSubs of
[] when FailedSubs =:= [] ->
%% Genuinely no subscriber
false;
[] ->
%% All offline? pick one anyway
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)};
Subs ->
%% More than one available
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)}
end.
pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub;
pick_subscriber(Group, Topic, Strategy, ClientId, Subs) ->
Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)),
lists:nth(Nth, Subs).

View File

@ -72,6 +72,9 @@ t_random_basic(_) ->
%% out which member it picked, then close its connection
%% send the second message, the message should be 'nack'ed
%% by the sticky session and delivered to the 2nd session.
%% After the connection for the 2nd session is also closed,
%% i.e. when all clients are offline, the following message(s)
%% should be delivered randomly.
t_no_connection_nack(_) ->
ok = ensure_config(sticky),
Publisher = <<"publisher">>,
@ -117,7 +120,7 @@ t_no_connection_nack(_) ->
%% sleep then make synced calls to session processes to ensure that
%% the connection pid's 'EXIT' message is propagated to the session process
%% also to be sure sessions are still alive
timer:sleep(5),
timer:sleep(2),
_ = emqx_session:info(SPid1),
_ = emqx_session:info(SPid2),
%% Now we know what is the other still alive connection
@ -128,11 +131,21 @@ t_no_connection_nack(_) ->
SendF(Id),
?wait(Received(Id, TheOtherConnPid), 1000)
end, PacketIdList),
%% Now close the 2nd (last connection)
emqx_mock_client:stop(TheOtherConnPid),
timer:sleep(2),
%% both sessions should have conn_pid = undefined
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
%% send more messages, but all should be queued in session state
lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
{_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
{_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
?assertEqual(length(PacketIdList), L1 + L2),
%% clean up
emqx_mock_client:close_session(PubConnPid),
emqx_sm:close_session(SPid1),
emqx_sm:close_session(SPid2),
emqx_mock_client:close_session(TheOtherConnPid),
ok.
t_random(_) ->