From 2611f660de89a5d0ad564b03bedbd33fe8fcddbd Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 29 Nov 2017 10:45:40 +0800 Subject: [PATCH 1/8] Fix issue #1353 - the management API should listen on 0.0.0:8080 --- etc/emq.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/emq.conf b/etc/emq.conf index 511292d30..60e9c421f 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -534,7 +534,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ##-------------------------------------------------------------------- ## HTTP Management API Listener -listener.api.mgmt = 127.0.0.1:8080 +listener.api.mgmt = 0.0.0.0:8080 listener.api.mgmt.acceptors = 4 From 26fb809dbed287854cee2d97fb717cb72751649c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 29 Nov 2017 14:09:46 +0800 Subject: [PATCH 2/8] Fix issue#1293 - the retained flags should be propagated for bridge. --- include/emqttd_protocol.hrl | 3 ++- src/emqttd_parser.erl | 5 +++-- src/emqttd_protocol.erl | 20 ++++++++++++++++---- src/emqttd_session.erl | 27 +++++++++++---------------- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 9d9cec714..a6d6c06e6 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -174,7 +174,8 @@ will_topic = undefined :: undefined | binary(), will_msg = undefined :: undefined | binary(), username = undefined :: undefined | binary(), - password = undefined :: undefined | binary() + password = undefined :: undefined | binary(), + is_bridge = false :: boolean() }). -record(mqtt_packet_connack, diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index 4699b3f77..91df07d77 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -79,7 +79,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), %% Fix mosquitto bridge: 0x83, 0x84 - <<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1, + <> = Rest1, < {error, protocol_header_corrupt} end; diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 35c914b14..b30978ac6 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -44,7 +44,7 @@ clean_sess, proto_ver, proto_name, username, is_superuser, will_msg, keepalive, keepalive_backoff, max_clientid_len, session, stats_data, mountpoint, ws_initial_headers, - connected_at}). + is_bridge, connected_at}). -type(proto_state() :: #proto_state{}). @@ -180,7 +180,8 @@ process(?CONNECT_PACKET(Var), State0) -> password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - client_id = ClientId} = Var, + client_id = ClientId, + is_bridge = IsBridge} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, @@ -189,6 +190,7 @@ process(?CONNECT_PACKET(Var), State0) -> clean_sess = CleanSess, keepalive = KeepAlive, will_msg = willmsg(Var, State0), + is_bridge = IsBridge, connected_at = os:timestamp()}, {ReturnCode1, SessPresent, State3} = @@ -333,10 +335,11 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). send(Msg, State = #proto_state{client_id = ClientId, username = Username, - mountpoint = MountPoint}) + mountpoint = MountPoint, + is_bridge = IsBridge}) when is_record(Msg, mqtt_message) -> emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), - send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State); + send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> @@ -543,6 +546,15 @@ check_acl(subscribe, Topic, Client) -> sp(true) -> 1; sp(false) -> 0. +%%-------------------------------------------------------------------- +%% The retained flag should be propagated for bridge. +%%-------------------------------------------------------------------- + +clean_retain(false, Msg = #mqtt_message{retain = true}) -> + Msg#mqtt_message{retain = false}; +clean_retain(true, Msg) -> + Msg. + %%-------------------------------------------------------------------- %% Mount Point %%-------------------------------------------------------------------- diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index aa08d746c..854dee0a5 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -152,9 +152,10 @@ %% Force GC Count force_gc_count :: undefined | integer(), - created_at :: erlang:timestamp(), + %% Ignore loop deliver? + ignore_loop_deliver = false :: boolean(), - ignore_loop_deliver = false :: boolean() + created_at :: erlang:timestamp() }). -define(TIMEOUT, 60000). @@ -529,17 +530,14 @@ handle_cast({destroy, ClientId}, handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -%% Dispatch message from self publish -handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, - State = #state{client_id = ClientId, - ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) -> - case IgnoreLoopDeliver of - true -> {noreply, State, hibernate}; - false -> {noreply, handle_dispatch(Topic, Msg, State), hibernate} - end; +%% Ignore Messages delivered by self +handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}}, + State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> + hibernate(State); + %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> - {noreply, handle_dispatch(Topic, Msg, State), hibernate}; + hibernate(gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))); %% Do nothing if the client has been disconnected. handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> @@ -552,7 +550,7 @@ handle_info({timeout, _Timer, check_awaiting_rel}, State) -> hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined}))); handle_info({timeout, _Timer, expired}, State) -> - ?LOG(debug, "Expired, shutdown now.", [], State), + ?LOG(info, "Expired, shutdown now.", [], State), shutdown(expired, State); handle_info({'EXIT', ClientPid, _Reason}, @@ -563,7 +561,7 @@ handle_info({'EXIT', ClientPid, Reason}, State = #state{clean_sess = false, client_pid = ClientPid, expiry_interval = Interval}) -> - ?LOG(debug, "Client ~p EXIT for ~p", [ClientPid, Reason], State), + ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ExpireTimer = start_timer(Interval, expired), State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer}, hibernate(emit_stats(State1)); @@ -687,9 +685,6 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) %% Dispatch Messages %%-------------------------------------------------------------------- -handle_dispatch(Topic, Msg, State) -> - gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)). - %% Enqueue message if the client has been disconnected dispatch(Msg, State = #state{client_pid = undefined}) -> enqueue_msg(Msg, State); From ee5c33d0bd9f70db09307f69a5dac62df736bec5 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 29 Nov 2017 22:23:59 +0800 Subject: [PATCH 3/8] Fix the 'no function clause' exception for issue #1293 --- src/emqttd_protocol.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index b30978ac6..c3905ad5a 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -552,7 +552,7 @@ sp(false) -> 0. clean_retain(false, Msg = #mqtt_message{retain = true}) -> Msg#mqtt_message{retain = false}; -clean_retain(true, Msg) -> +clean_retain(_IsBridge, Msg) -> Msg. %%-------------------------------------------------------------------- From d9500412bfb8f119c46253a59835b8257f4ff5d7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Dec 2017 09:10:46 +0800 Subject: [PATCH 4/8] Update the topic's statistics --- src/emqttd_router.erl | 4 +++- src/emqttd_stats.erl | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 4d4e22160..fa1a0c70c 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -284,5 +284,7 @@ clean_routes_(Node) -> mnesia:transaction(Clean). update_stats_() -> - emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)). + Size = mnesia:table_info(mqtt_route, size), + emqttd_stats:setstats('routes/count', 'routes/max', Size), + emqttd_stats:setstats('topics/count', 'topics/max', Size). diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 73a1471ac..6d84395e2 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -57,14 +57,14 @@ %% $SYS Topics for Subscribers -define(SYSTOP_PUBSUB, [ - 'routes/count', % ... - 'routes/max', % ... 'topics/count', % ... 'topics/max', % ... 'subscribers/count', % ... 'subscribers/max', % ... 'subscriptions/count', % ... - 'subscriptions/max' % ... + 'subscriptions/max', % ... + 'routes/count', % ... + 'routes/max' % ... ]). %% $SYS Topic for retained From d2a4e2c615718cb41006b54a813201d20a53e9b0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Dec 2017 15:46:42 +0800 Subject: [PATCH 5/8] Should not exit arbitrarily if clientid conflicts in mnesia --- src/emqttd_protocol.erl | 10 +++++----- src/emqttd_sm.erl | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index c3905ad5a..384f93225 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -213,7 +213,7 @@ process(?CONNECT_PACKET(Var), State0) -> %% ACCEPT {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> - exit({shutdown, Error}) + {stop, {shutdown, Error}, State2} end; {error, Reason}-> ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), @@ -381,12 +381,12 @@ stop_if_auth_failure(_RC, State) -> shutdown(_Error, #proto_state{client_id = undefined}) -> ignore; - -shutdown(conflict, #proto_state{client_id = _ClientId}) -> +shutdown(conflict, _State) -> + %% let it down + ignore; +shutdown(mnesia_conflict, _State) -> %% let it down - %% emqttd_cm:unreg(ClientId); ignore; - shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> ?LOG(debug, "Shutdown for ~p", [Error], State), Client = client(State), diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 054e81c95..b8a68a733 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -216,8 +216,7 @@ code_change(_OldVsn, State, _Extra) -> create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> case create_session(CleanSess, {ClientId, Username}, ClientPid) of {ok, SessPid} -> - {reply, {ok, SessPid, false}, - monitor_session(ClientId, SessPid, State)}; + {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)}; {error, Error} -> {reply, {error, Error}, State} end. From c308037b1a6ad24a5fb113443c540d59b4c12a37 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 1 Dec 2017 17:16:59 +0800 Subject: [PATCH 6/8] Remove the unnecessary transactions to optimize session management --- src/emqttd_sm.erl | 30 ++++++++++++------------------ src/emqttd_sm_sup.erl | 8 +++----- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index b8a68a733..a46d56fa6 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -156,7 +156,7 @@ handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, Stat {reply, {ok, SessPid, true}, State}; {error, Erorr} -> {reply, {error, Erorr}, State} - end + end end; %% Transient Session @@ -183,16 +183,14 @@ handle_cast(Msg, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of {ok, ClientId} -> - mnesia:transaction(fun() -> - case mnesia:wread({mqtt_session, ClientId}) of - [] -> - ok; - [Sess = #mqtt_session{sess_pid = DownPid}] -> - mnesia:delete_object(mqtt_session, Sess, write); - [_Sess] -> - ok - end - end), + case mnesia:dirty_read({mqtt_session, ClientId}) of + [] -> + ok; + [Sess = #mqtt_session{sess_pid = DownPid}] -> + mnesia:dirty_delete_object(Sess); + [_Sess] -> + ok + end, {noreply, erase_monitor(MRef, State), hibernate}; error -> lager:error("MRef of session ~p not found", [DownPid]), @@ -283,15 +281,14 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPi remove_session(Session); %% Remote node -destroy_session(Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid}) -> +destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) -> Node = node(SessPid), case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of ok -> remove_session(Session); {badrpc, nodedown} -> ?LOG(error, "Node '~s' down", [Node], Session), - remove_session(Session); + remove_session(Session); {badrpc, Reason} -> ?LOG(error, "Failed to destory ~p on remote node ~p for ~s", [SessPid, Node, Reason], Session), @@ -299,10 +296,7 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, end. remove_session(Session) -> - case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end. + mnesia:dirty_delete_object(Session). monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> MRef = erlang:monitor(process, SessPid), diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index efabadb96..1c2e7f31a 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -24,8 +24,6 @@ -include("emqttd.hrl"). --define(SM, emqttd_sm). - -define(HELPER, emqttd_sm_helper). %% API @@ -44,11 +42,11 @@ init([]) -> %% Helper StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Helper = {?HELPER, {?HELPER, start_link, [StatsFun]}, - permanent, 5000, worker, [?HELPER]}, + permanent, 5000, worker, [?HELPER]}, %% SM Pool Sup - MFA = {?SM, start_link, []}, - PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]), + MFA = {emqttd_sm, start_link, []}, + PoolSup = emqttd_pool_sup:spec([emqttd_sm, hash, erlang:system_info(schedulers), MFA]), {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}. From 0716b2b2d0c0057a1cb5a5f7a4dfaf191b1410c1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 2 Dec 2017 12:13:55 +0800 Subject: [PATCH 7/8] Enable the stats of session by default --- etc/emq.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/emq.conf b/etc/emq.conf index 60e9c421f..9b37860b9 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -242,7 +242,7 @@ mqtt.session.max_awaiting_rel = 100 mqtt.session.await_rel_timeout = 20s ## Enable Statistics: on | off -mqtt.session.enable_stats = off +mqtt.session.enable_stats = on ## Expired after 1 day: ## w - week From b8e0a4d5c4df3c08f7b0ef6ef320abae79eed46e Mon Sep 17 00:00:00 2001 From: HuangDan Date: Sun, 3 Dec 2017 00:16:26 +0800 Subject: [PATCH 8/8] Bump version to 2.3.1 --- Makefile | 2 +- src/emqttd.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index de2827e23..a1d77b1a3 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.3.0 +PROJECT_VERSION = 2.3.1 DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 269601bb8..67af8854e 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,6 +1,6 @@ {application,emqttd, [{description,"Erlang MQTT Broker"}, - {vsn,"2.3.0"}, + {vsn,"2.3.1"}, {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,