diff --git a/Makefile b/Makefile index b07092c91..16c9795b1 100644 --- a/Makefile +++ b/Makefile @@ -8,8 +8,8 @@ DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq dep_jsx = hex-emqx 2.9.0 dep_gproc = hex-emqx 0.8.0 dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.0 -dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.3 -dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.1 +dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4 +dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3 dep_cowboy = hex-emqx 2.4.0 dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1 diff --git a/rebar.config b/rebar.config index 993790aa9..7486e7267 100644 --- a/rebar.config +++ b/rebar.config @@ -6,9 +6,9 @@ %% appended to deps in rebar.config.script {github_emqx_deps, [{gen_rpc, "2.3.0"}, - {ekka, "v0.5.1"}, + {ekka, "v0.5.3"}, {replayq, "v0.1.1"}, - {esockd, "v5.4.3"}, + {esockd, "v5.4.4"}, {cuttlefish, "v2.2.1"} ]}. diff --git a/src/emqx.erl b/src/emqx.erl index 76e966a59..728b35cfc 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -17,7 +17,7 @@ -include("emqx.hrl"). %% Start/Stop the application --export([start/0, is_running/1, stop/0]). +-export([start/0, restart/1, is_running/1, stop/0]). %% PubSub API -export([subscribe/1, subscribe/2, subscribe/3]). @@ -47,6 +47,12 @@ start() -> %% Check Mnesia application:ensure_all_started(?APP). +-spec(restart(string()) -> ok). +restart(ConfFile) -> + reload_config(ConfFile), + shutdown(), + reboot(). + %% @doc Stop emqx application. -spec(stop() -> ok | {error, term()}). stop() -> @@ -158,3 +164,11 @@ shutdown(Reason) -> reboot() -> lists:foreach(fun application:start/1, [gproc, esockd, ranch, cowboy, ekka, emqx]). +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ +reload_config(ConfFile) -> + {ok, [Conf]} = file:consult(ConfFile), + lists:foreach(fun({App, Vals}) -> + [application:set_env(App, Par, Val) || {Par, Val} <- Vals] + end, Conf). diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 133b3b621..5d3ae3e91 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -114,12 +114,12 @@ init([Options]) -> ReconnectInterval = get_value(reconnect_interval, Options, 30000), Mountpoint = format_mountpoint(get_value(mountpoint, Options)), QueueOptions = get_value(queue, Options), - {ok, #state{mountpoint = Mountpoint, - queue_option = QueueOptions, - readq = [], - writeq = [], - options = Options, - reconnect_interval = ReconnectInterval}}. + {ok, #state{mountpoint = Mountpoint, + queue_option = QueueOptions, + readq = [], + writeq = [], + options = Options, + reconnect_interval = ReconnectInterval}}. handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> {Msg, NewState} = bridge(start, State), @@ -228,16 +228,19 @@ handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) -> %%---------------------------------------------------------------- %% received local node message %%---------------------------------------------------------------- -handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, +handle_info({dispatch, _, #message{topic = Topic, qos = QoS, payload = Payload, flags = #{retain := Retain}}}, State = #state{client_pid = undefined, - mountpoint = Mountpoint}) -> + mountpoint = Mountpoint}) + when QoS =< 1 -> Msg = #mqtt_msg{qos = 1, retain = Retain, topic = mountpoint(Mountpoint, Topic), payload = Payload}, {noreply, en_writeq({undefined, Msg}, State)}; -handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = Pid, mountpoint = Mountpoint}) -> +handle_info({dispatch, _, #message{topic = Topic, qos = QoS ,payload = Payload, flags = #{retain := Retain}}}, + State = #state{client_pid = Pid, + mountpoint = Mountpoint}) + when QoS =< 1 -> Msg = #mqtt_msg{qos = 1, retain = Retain, topic = mountpoint(Mountpoint, Topic), @@ -347,7 +350,6 @@ format_mountpoint(undefined) -> format_mountpoint(Prefix) -> binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - en_writeq(Msg, State = #state{replayq = ReplayQ, queue_option = #{mem_cache := false}}) -> NewReplayQ = replayq:append(ReplayQ, [Msg]), @@ -369,16 +371,21 @@ publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) -> publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]). delete(PktId, State = #state{ replayq = ReplayQ, - queue_option = #{ mem_cache := false }}) -> + readq = [], + queue_option = #{ mem_cache := false}}) -> {NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}), + logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msgs]), + ok = replayq:ack(NewReplayQ, NewAckRef), case Msgs of - [{PktId, Msg}] -> - logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msg]), - replayq:ack(ReplayQ, NewAckRef), - State#state{ replayq = NewReplayQ, ackref = NewAckRef}; - _ -> + [{PktId, _Msg}] -> self() ! pop, - State + State#state{ replayq = NewReplayQ, ackref = NewAckRef }; + [{_PktId, _Msg}] -> + NewReplayQ1 = replayq:append(NewReplayQ, Msgs), + self() ! pop, + State#state{ replayq = NewReplayQ1, ackref = NewAckRef }; + _Empty -> + State#state{ replayq = NewReplayQ, ackref = NewAckRef} end; delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) -> ok = replayq:ack(ReplayQ, AckRef), @@ -388,8 +395,16 @@ delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref delete(PktId, State = #state{readq = [], writeq = WriteQ}) -> State#state{writeq = lists:keydelete(PktId, 1, WriteQ)}; -delete(PktId, State = #state{readq = ReadQ}) -> - State#state{readq = lists:keydelete(PktId, 1, ReadQ)}. +delete(PktId, State = #state{readq = ReadQ, replayq = ReplayQ, ackref = AckRef}) -> + NewReadQ = lists:keydelete(PktId, 1, ReadQ), + case NewReadQ of + [] -> + ok = replayq:ack(ReplayQ, AckRef), + self() ! pop; + _NewReadQ -> + ok + end, + State#state{ readq = NewReadQ }. bridge(Action, State = #state{options = Options, replayq = ReplayQ, @@ -397,7 +412,7 @@ bridge(Action, State = #state{options = Options, = QueueOption = #{batch_size := BatchSize}}) when BatchSize > 0 -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of + case emqx_client:start_link([{owner, self()} | options(Options)]) of {ok, ClientPid} -> case emqx_client:connect(ClientPid) of {ok, _} -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8aafb313f..9b9ef7f7d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -21,6 +21,7 @@ -export([init/2]). -export([info/1]). -export([attrs/1]). +-export([attr/2]). -export([caps/1]). -export([stats/1]). -export([client_id/1]). @@ -162,6 +163,28 @@ attrs(#pstate{zone = Zone, {is_bridge, IsBridge}, {connected_at, ConnectedAt}]. +attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> + get_property('Receive-Maximum', ConnProps, 65535); +attr(max_inflight, #pstate{zone = Zone}) -> + emqx_zone:get_env(Zone, max_inflight, 65535); +attr(expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> + get_property('Session-Expiry-Interval', ConnProps, 0); +attr(expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}) -> + case CleanStart of + true -> 0; + false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) + end; +attr(topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) -> + get_property('Topic-Alias-Maximum', ConnProps, 0); +attr(topic_alias_maximum, #pstate{zone = Zone}) -> + emqx_zone:get_env(Zone, max_topic_alias, 0); +attr(Name, PState) -> + Attrs = lists:zip(record_info(fields, pstate), tl(tuple_to_list(PState))), + case lists:keyfind(Name, 1, Attrs) of + {_, Value} -> Value; + false -> undefined + end. + caps(#pstate{zone = Zone}) -> emqx_mqtt_caps:get_caps(Zone). @@ -348,8 +371,8 @@ process_packet(?CONNECT_PACKET( PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}), emqx_logger:set_metadata_client_id(PState3#pstate.client_id), %% Open session - SessAttrs = lists:foldl(fun set_session_attrs/2, #{will_msg => make_will_msg(ConnPkt)}, [{max_inflight, PState3}, {expiry_interval, PState3}, {misc, PState3}]), - case try_open_session(SessAttrs) of + SessAttrs = #{will_msg => make_will_msg(ConnPkt)}, + case try_open_session(SessAttrs, PState3) of {ok, SPid, SP} -> PState4 = PState3#pstate{session = SPid, connected = true}, ok = emqx_cm:register_connection(client_id(PState4)), @@ -673,54 +696,26 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) maybe_assign_client_id(PState) -> PState. -try_open_session(SessAttrs = #{zone := _, - client_id := _, - conn_pid := _, - username := _, - will_msg := _, - clean_start := _}) -> - case emqx_sm:open_session(SessAttrs) of +try_open_session(SessAttrs, PState = #pstate{zone = Zone, + client_id = ClientId, + conn_pid = ConnPid, + username = Username, + clean_start = CleanStart}) -> + case emqx_sm:open_session( + maps:merge(#{zone => Zone, + client_id => ClientId, + conn_pid => ConnPid, + username => Username, + clean_start => CleanStart, + max_inflight => attr(max_inflight, PState), + expiry_interval => attr(expiry_interval, PState), + topic_alias_maximum => attr(topic_alias_maximum, PState)}, + SessAttrs)) of {ok, SPid} -> {ok, SPid, false}; Other -> Other end. - -set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> - maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs); - -set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) -> - maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs); - -set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> - maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs); - -set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) -> - maps:put(expiry_interval, case CleanStart of - true -> 0; - false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) - end, SessAttrs); - -set_session_attrs({topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) -> - maps:put(topic_alias_maximum, get_property('Topic-Alias-Maximum', ConnProps, 0), SessAttrs); - -set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) -> - maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs); - -set_session_attrs({misc, #pstate{zone = Zone, - client_id = ClientId, - conn_pid = ConnPid, - username = Username, - clean_start = CleanStart}}, SessAttrs) -> - SessAttrs#{zone => Zone, - client_id => ClientId, - conn_pid => ConnPid, - username => Username, - clean_start => CleanStart}; - -set_session_attrs(_, SessAttrs) -> - SessAttrs. - authenticate(Credentials, Password) -> case emqx_access_control:authenticate(Credentials, Password) of ok -> {ok, false}; @@ -821,7 +816,7 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) -> allow -> ok; deny -> ?LOG(warning, "Will message (to ~s) validation failed, acl denied", [WillTopic]), - {error, ?RC_UNSPECIFIED_ERROR} + {error, ?RC_NOT_AUTHORIZED} end. check_publish(Packet, PState) -> @@ -978,3 +973,4 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> undefined; reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 098d76c52..689d84b29 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -184,7 +184,7 @@ info(State = #state{conn_pid = ConnPid, {upgrade_qos, UpgradeQoS}, {inflight, Inflight}, {retry_interval, RetryInterval}, - {mqueue_len, MQueue}, + {mqueue_len, emqx_mqueue:len(MQueue)}, {awaiting_rel, AwaitingRel}, {max_awaiting_rel, MaxAwaitingRel}, {await_rel_timeout, AwaitRelTimeout}]. diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 67f03a93c..e14963bc7 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -91,18 +91,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F case pick(strategy(), ClientId, Group, Topic, FailedSubs) of false -> Delivery; - SubPid -> - case do_dispatch(SubPid, Topic, Msg) of + {Type, SubPid} -> + case do_dispatch(SubPid, Topic, Msg, Type) of ok -> Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]}; {error, _Reason} -> - %% failed to dispatch to this sub, try next - %% 'Reason' is discarded so far, meaning for QoS1/2 messages - %% if all subscribers are off line, the dispatch would faile - %% even if there are sessions not expired yet. - %% If required, we can make use of the 'no_connection' reason to perform - %% retry without requiring acks, so the messages can be delivered - %% to sessions of offline clients + %% Failed to dispatch to this sub, try next. dispatch(Group, Topic, Delivery, [SubPid | FailedSubs]) end end. @@ -115,19 +109,23 @@ strategy() -> ack_enabled() -> emqx_config:get_env(shared_dispatch_ack_enabled, false). -do_dispatch(SubPid, Topic, Msg) when SubPid =:= self() -> +do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise _ = erlang:send(SubPid, {dispatch, Topic, Msg}), ok; -do_dispatch(SubPid, Topic, Msg) -> - dispatch_per_qos(SubPid, Topic, Msg). +do_dispatch(SubPid, Topic, Msg, Type) -> + dispatch_per_qos(SubPid, Topic, Msg, Type). %% return either 'ok' (when everything is fine) or 'error' -dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg) -> +dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> %% For QoS 0 message, send it as regular dispatch _ = erlang:send(SubPid, {dispatch, Topic, Msg}), ok; -dispatch_per_qos(SubPid, Topic, Msg) -> +dispatch_per_qos(SubPid, Topic, Msg, retry) -> + %% Retry implies all subscribers nack:ed, send again without ack + _ = erlang:send(SubPid, {dispatch, Topic, Msg}), + ok; +dispatch_per_qos(SubPid, Topic, Msg, fresh) -> case ack_enabled() of true -> dispatch_with_ack(SubPid, Topic, Msg); @@ -211,24 +209,32 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) -> true -> %% the old subscriber is still alive %% keep using it for sticky strategy - Sub0; + {fresh, Sub0}; false -> %% randomly pick one for the first message - Sub = do_pick(random, ClientId, Group, Topic, FailedSubs), + {Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]), %% stick to whatever pick result erlang:put({shared_sub_sticky, Group, Topic}, Sub), - Sub + {Type, Sub} end; pick(Strategy, ClientId, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, Group, Topic, FailedSubs). do_pick(Strategy, ClientId, Group, Topic, FailedSubs) -> - case subscribers(Group, Topic) -- FailedSubs of - [] -> false; - [Sub] -> Sub; - All -> pick_subscriber(Group, Topic, Strategy, ClientId, All) + All = subscribers(Group, Topic), + case All -- FailedSubs of + [] when FailedSubs =:= [] -> + %% Genuinely no subscriber + false; + [] -> + %% All offline? pick one anyway + {retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)}; + Subs -> + %% More than one available + {fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)} end. +pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub; pick_subscriber(Group, Topic, Strategy, ClientId, Subs) -> Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)), lists:nth(Nth, Subs). diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index dd0e20ad1..cc5a5f515 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -72,6 +72,9 @@ t_random_basic(_) -> %% out which member it picked, then close its connection %% send the second message, the message should be 'nack'ed %% by the sticky session and delivered to the 2nd session. +%% After the connection for the 2nd session is also closed, +%% i.e. when all clients are offline, the following message(s) +%% should be delivered randomly. t_no_connection_nack(_) -> ok = ensure_config(sticky), Publisher = <<"publisher">>, @@ -117,7 +120,7 @@ t_no_connection_nack(_) -> %% sleep then make synced calls to session processes to ensure that %% the connection pid's 'EXIT' message is propagated to the session process %% also to be sure sessions are still alive - timer:sleep(5), + timer:sleep(2), _ = emqx_session:info(SPid1), _ = emqx_session:info(SPid2), %% Now we know what is the other still alive connection @@ -128,11 +131,21 @@ t_no_connection_nack(_) -> SendF(Id), ?wait(Received(Id, TheOtherConnPid), 1000) end, PacketIdList), + %% Now close the 2nd (last connection) + emqx_mock_client:stop(TheOtherConnPid), + timer:sleep(2), + %% both sessions should have conn_pid = undefined + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), + %% send more messages, but all should be queued in session state + lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), + {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), + {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), + ?assertEqual(length(PacketIdList), L1 + L2), %% clean up emqx_mock_client:close_session(PubConnPid), emqx_sm:close_session(SPid1), emqx_sm:close_session(SPid2), - emqx_mock_client:close_session(TheOtherConnPid), ok. t_random(_) ->