From ad5ece8c33be400dbe54bc8c375a1fe21a8a59a1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 9 Apr 2018 14:56:33 +0800 Subject: [PATCH 01/10] Upgrade the lager_console_backend config --- priv/emq.schema | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/priv/emq.schema b/priv/emq.schema index 169445005..85eb16272 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -434,7 +434,7 @@ end}. ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), - ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, + ConsoleHandler = {lager_console_backend, [ConsoleLogLevel]}, ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, {level, ConsoleLogLevel}, {size, cuttlefish:conf_get("log.console.size", Conf)}, From 18100cacf916138bb29bff065a2b7245d77cbdcf Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 13 Apr 2018 16:00:46 +0800 Subject: [PATCH 02/10] fix #1562 dup flag not set when re-deliver The dup flag is not set when redeliver the PUBLISH messages for QoS1 and QoS2 --- src/emqttd_session.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index dfba46b3e..58436c8d8 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -32,7 +32,7 @@ %% %% QoS 1 and QoS 2 messages pending transmission to the Client. %% -%% QoS 2 messages which have been received from the Client, but have not +%% QoS 2 messages which have been received from the Client, but have not %% been completely acknowledged. %% %% Optionally, QoS 0 messages pending transmission to the Client. @@ -720,7 +720,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) -> %%-------------------------------------------------------------------- redeliver(Msg = #mqtt_message{qos = QoS}, State) -> - deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State); + deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS0 -> false; true -> true end}, State); redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> Pid ! {redeliver, {?PUBREL, PacketId}}. @@ -759,7 +759,7 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, case Inflight:lookup(PacketId) of {publish, Msg, _Ts} -> emqttd_hooks:run('message.acked', [ClientId, Username], Msg), - State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; + State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; {pubrel, PacketId, _Ts} -> ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State), State @@ -853,4 +853,3 @@ shutdown(Reason, State) -> gc(State) -> emqttd_gc:maybe_force_gc(#state.force_gc_count, State). - From f13654dbce8dbb860163d281b57ce3c27f231f15 Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 17 Apr 2018 11:39:03 +0800 Subject: [PATCH 03/10] Support set k8s namespace --- etc/emq.conf | 6 ++++++ priv/emq.schema | 7 ++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/etc/emq.conf b/etc/emq.conf index 3bcadfaf8..f296b8ff9 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -135,6 +135,12 @@ cluster.autoclean = 5m ## Value: String ## cluster.k8s.app_name = emq +## Kubernates Namespace +## +## Value: String +## cluster.k8s.namespace = default + + ##-------------------------------------------------------------------- ## Node Args ##-------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index 85eb16272..a06d838ed 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -124,6 +124,10 @@ {datatype, string} ]}. +{mapping, "cluster.k8s.namespace", "ekka.cluster_discovery", [ + {datatype, string} +]}. + {translation, "ekka.cluster_discovery", fun(Conf) -> Strategy = cuttlefish:conf_get("cluster.discovery", Conf), Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, @@ -152,7 +156,8 @@ [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, - {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}]; + {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}, + {namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}]; (manual) -> [ ] end, From 5039b751f4c4c42c27aa184c16ec09715286192b Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Tue, 17 Apr 2018 16:52:05 +0800 Subject: [PATCH 04/10] clean dead persistent session on connect --- src/emqttd_packet.erl | 12 ++++++------ src/emqttd_protocol.erl | 16 ++++++++-------- src/emqttd_sm.erl | 21 +++++++++++---------- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index f269f3dbe..d909089d1 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -56,7 +56,7 @@ format_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Retain}, S) -> - S1 = if + S1 = if S == undefined -> <<>>; true -> [", ", S] end, @@ -78,13 +78,13 @@ format_variable(#mqtt_packet_connect{ clean_sess = CleanSess, keep_alive = KeepAlive, client_id = ClientId, - will_topic = WillTopic, - will_msg = WillMsg, - username = Username, + will_topic = WillTopic, + will_msg = WillMsg, + username = Username, password = Password}) -> Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s", Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, format_password(Password)], - {Format1, Args1} = if + {Format1, Args1} = if WillFlag -> { Format ++ ", Will(Q~p, R~p, Topic=~s, Msg=~s)", Args ++ [WillQoS, i(WillRetain), WillTopic, WillMsg] }; true -> {Format, Args} @@ -93,7 +93,7 @@ format_variable(#mqtt_packet_connect{ format_variable(#mqtt_packet_connack{ack_flags = AckFlags, return_code = ReturnCode}) -> - io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]); + io_lib:format("AckFlags=~p, ReturnCode=~p", [AckFlags, ReturnCode]); format_variable(#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId}) -> diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 3531b165f..ea6a11ebf 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -137,7 +137,7 @@ session(#proto_state{session = Session}) -> %% CONNECT – Client requests a connection to a Server -%% A Client can only send the CONNECT Packet once over a Network Connection. +%% A Client can only send the CONNECT Packet once over a Network Connection. -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, term()}). received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false, stats_data = Stats}) -> @@ -228,7 +228,8 @@ process(?CONNECT_PACKET(Var), State0) -> %% ACCEPT {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}}; {error, Error} -> - {stop, {shutdown, Error}, State2} + ?LOG(error, "Username '~s' login failed for ~p", [Username, Error], State2), + {?CONNACK_SERVER, false, State2} end; {error, Reason}-> ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1), @@ -449,14 +450,14 @@ start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 -> validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) -> case validate_protocol(Connect) of - true -> + true -> case validate_clientid(Connect, ProtoState) of - true -> + true -> ?CONNACK_ACCEPT; - false -> + false -> ?CONNACK_INVALID_ID end; - false -> + false -> ?CONNACK_PROTO_VER end. @@ -498,7 +499,7 @@ validate_packet(?SUBSCRIBE_PACKET(_PacketId, TopicTable)) -> validate_packet(?UNSUBSCRIBE_PACKET(_PacketId, Topics)) -> validate_topics(filter, Topics); -validate_packet(_Packet) -> +validate_packet(_Packet) -> ok. validate_topics(_Type, []) -> @@ -593,4 +594,3 @@ unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0}; _ -> Msg end. - diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index e2e332041..5d9b900c3 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -183,15 +183,16 @@ handle_cast(Msg, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of {ok, ClientId} -> - case mnesia:dirty_read({mqtt_session, ClientId}) of - [] -> - ok; - [Sess = #mqtt_session{sess_pid = DownPid}] -> - mnesia:dirty_delete_object(Sess); - [_Sess] -> - ok - end, - {noreply, erase_monitor(MRef, State), hibernate}; + NewState = + case mnesia:dirty_read({mqtt_session, ClientId}) of + [] -> State; + [Sess = #mqtt_session{sess_pid = DownPid}] -> + mnesia:dirty_delete_object(Sess), + erase_monitor(MRef, State); + [_Sess] -> + State + end, + {noreply, NewState, hibernate}; error -> lager:error("MRef of session ~p not found", [DownPid]), {noreply, State} @@ -256,6 +257,7 @@ resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid} {ok, SessPid}; false -> ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session), + remove_session(Session), {error, session_died} end; @@ -305,4 +307,3 @@ monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) -> erase_monitor(MRef, State = #state{monitors = Monitors}) -> erlang:demonitor(MRef, [flush]), State#state{monitors = dict:erase(MRef, Monitors)}. - From 27fcb73483850af7c5abeef89efd22a317f87283 Mon Sep 17 00:00:00 2001 From: Frank Feng Date: Fri, 20 Apr 2018 14:07:41 +0800 Subject: [PATCH 05/10] fix spec of function setstats/3 --- src/emqttd_stats.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 4471a2814..bfd60ea15 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -143,7 +143,7 @@ setstat(Stat, Val) -> ets:update_element(?STATS_TAB, Stat, {2, Val}). %% @doc Set stats with max --spec(setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean()). +-spec(setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> ok). setstats(Stat, MaxStat, Val) -> gen_server:cast(?MODULE, {setstats, Stat, MaxStat, Val}). From 490ac8f4491469d4fd6480f377e56a372aafe8f0 Mon Sep 17 00:00:00 2001 From: Frank Feng Date: Fri, 20 Apr 2018 14:33:13 +0800 Subject: [PATCH 06/10] match {error,Reason} --- src/emqttd_vm.erl | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index 368463d35..b23b884fa 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -156,9 +156,9 @@ microsecs() -> (Mega * 1000000 + Sec) * 1000000 + Micro. loads() -> - [{load1, ftos(cpu_sup:avg1()/256)}, - {load5, ftos(cpu_sup:avg5()/256)}, - {load15, ftos(cpu_sup:avg15()/256)}]. + [{load1, ftos(avg1()/256)}, + {load5, ftos(avg5()/256)}, + {load15, ftos(avg15()/256)}]. get_system_info() -> [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO]. @@ -427,3 +427,27 @@ mapping([{owner, V}|Entries], Acc) when is_pid(V) -> mapping([{Key, Value}|Entries], Acc) -> mapping(Entries, [{Key, Value}|Acc]). +avg1() -> + case cpu_sup:avg1() of + SystemLoad when is_integer(SystemLoad) -> + SystemLoad; + {error, Reason} -> + lager:error("Get the average system load in the last minute fail for ~p~n", [Reason]), + 0.00 + end. +avg5() -> + case cpu_sup:avg5() of + SystemLoad when is_integer(SystemLoad) -> + SystemLoad; + {error, Reason} -> + lager:error("Get the average system load in the last 5 minutes fail for ~p~n", [Reason]), + 0.00 + end. +avg15() -> + case cpu_sup:avg15() of + SystemLoad when is_integer(SystemLoad) -> + SystemLoad; + {error, Reason} -> + lager:error("Get the average system load in the last 15 minutes fail for ~p~n", [Reason]), + 0.00 + end. From 92f3036f1ffdc4f53052b89ed6ba8dafb80f68c0 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 21 Apr 2018 10:43:43 +0800 Subject: [PATCH 07/10] Client sub/unsub mount topic --- src/emqttd_protocol.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index ea6a11ebf..c2399f6a3 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -162,11 +162,12 @@ received(Packet = ?PACKET(Type), State = #proto_state{stats_data = Stats}) -> subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId, username = Username, - session = Session}) -> + session = Session, + mountpoint = MountPoint}) -> TopicTable = parse_topic_table(RawTopicTable), case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of {ok, TopicTable1} -> - emqttd_session:subscribe(Session, TopicTable1); + emqttd_session:subscribe(Session, mount(MountPoint, TopicTable1)); {stop, _} -> ok end, @@ -174,10 +175,11 @@ subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId, unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId, username = Username, - session = Session}) -> + session = Session, + mountpoint = MountPoint}) -> case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of {ok, TopicTable} -> - emqttd_session:unsubscribe(Session, TopicTable); + emqttd_session:unsubscribe(Session, mount(MountPoint, TopicTable)); {stop, _} -> ok end, From 5b0c752181b3c9bbf0e87de9af0ef702c3f0ce70 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 21 Apr 2018 10:44:29 +0800 Subject: [PATCH 08/10] 2.3.7 --- src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index c0aa24661..a5ec4a7ed 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,6 +1,6 @@ {application,emqttd, [{description,"Erlang MQTT Broker"}, - {vsn,"2.3.6"}, + {vsn,"2.3.7"}, {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, From f717b734c303adf9692d72469bef1a5ea24bea38 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 21 Apr 2018 11:17:27 +0800 Subject: [PATCH 09/10] Update ekka 0.2.3 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index bd36085c1..22df025cc 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_esockd = git https://github.com/emqtt/esockd v5.2.1 -dep_ekka = git https://github.com/emqtt/ekka v0.2.2 +dep_ekka = git https://github.com/emqtt/ekka v0.2.3 dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.2 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog From e7e8131f2a93668a4fcd3c55e8bcc7c0160466e5 Mon Sep 17 00:00:00 2001 From: HeeeJianBo Date: Sat, 21 Apr 2018 11:39:19 +0800 Subject: [PATCH 10/10] Align code --- src/emqttd_vm.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index b23b884fa..725d7c920 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -157,8 +157,8 @@ microsecs() -> loads() -> [{load1, ftos(avg1()/256)}, - {load5, ftos(avg5()/256)}, - {load15, ftos(avg15()/256)}]. + {load5, ftos(avg5()/256)}, + {load15, ftos(avg15()/256)}]. get_system_info() -> [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO].