From 2fc41b69352ca92250d13150e9ed831a8fb05ae4 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 19:09:22 +0800 Subject: [PATCH 01/37] fix reason code name for mqtt 4 --- src/emqx_client.erl | 5 +++-- src/emqx_protocol.erl | 2 +- src/emqx_reason_codes.erl | 11 ++++++++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index e6aac5d43..82e331abd 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -592,8 +592,8 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode, _SessPresent, - Properties), State) -> - Reason = emqx_reason_codes:name(ReasonCode), + Properties), State = #state{ proto_ver = ProtoVer}) -> + Reason = emqx_reason_codes:name(ReasonCode, ProtoVer), case take_call(connect, State) of {value, #call{from = From}, _State} -> Reply = {error, {Reason, Properties}}, @@ -1082,6 +1082,7 @@ receive_loop(Bytes, State = #state{parse_state = ParseState}) -> {error, Reason} -> {stop, Reason}; {'EXIT', Error} -> + io:format("client stop"), {stop, Error} end. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index da7ee88b8..9ee24609f 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -412,7 +412,7 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> true -> emqx_reason_codes:compat(connack, ReasonCode) end}, PState), - {error, emqx_reason_codes:name(ReasonCode), PState}. + {error, emqx_reason_codes:name(ReasonCode, ProtoVer), PState}. %%------------------------------------------------------------------------------ %% Publish Message -> Broker diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index f300d675d..0cc52acbb 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -17,9 +17,18 @@ -include("emqx_mqtt.hrl"). --export([name/1, text/1]). +-export([name/2, text/1]). -export([compat/2]). +name(I, Ver) when Ver >= ?MQTT_PROTO_V5 -> + name(I); +name(0, _Ver) -> connection_acceptd; +name(1, _Ver) -> unacceptable_protocol_version; +name(2, _Ver) -> client_identifier_not_valid; +name(3, _Ver) -> server_unavaliable; +name(4, _Ver) -> malformed_username_or_password; +name(5, _Ver) -> unauthorized_client. + name(16#00) -> success; name(16#01) -> granted_qos1; name(16#02) -> granted_qos2; From dd7f0dec3c54d056d5e203960b14bed024487e80 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 21:01:09 +0800 Subject: [PATCH 02/37] Add 'messages/qos2/expired' counter --- src/emqx_metrics.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 6d17f6648..0a9ad67aa 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -73,6 +73,7 @@ {counter, 'messages/qos1/received'}, % QoS1 Messages received {counter, 'messages/qos1/sent'}, % QoS1 Messages sent {counter, 'messages/qos2/received'}, % QoS2 Messages received + {counter, 'messages/qos2/expired'}, % QoS2 Messages expired {counter, 'messages/qos2/sent'}, % QoS2 Messages sent {counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped {gauge, 'messages/retained'}, % Messagea retained From b0fad7a86d3466d5836cea330bd80a648c0bf41a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 21:02:01 +0800 Subject: [PATCH 03/37] Change the default value of 'zone.external.await_rel_timeout' to 300s --- etc/emqx.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index cb6e91b96..5975ae542 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -614,7 +614,7 @@ zone.external.max_awaiting_rel = 100 ## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout. ## ## Value: Duration -zone.external.await_rel_timeout = 60s +zone.external.await_rel_timeout = 300s ## Default session expiry interval for MQTT V3.1.1 connections. ## From bbb58dad687a35a5a3164a38b3126c679cd834be Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 21:02:14 +0800 Subject: [PATCH 04/37] Change the default value of 'zone.external.await_rel_timeout' to 300s --- priv/emqx.schema | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index a80aa8ee6..07ff5ce6f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -774,7 +774,7 @@ end}. %% @doc Awaiting PUBREL timeout {mapping, "zone.$name.await_rel_timeout", "emqx.zones", [ - {default, "60s"}, + {default, "300s"}, {datatype, {duration, ms}} ]}. From 8fcfcfb8603a5a11308946714023445df1f9e696 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 21:12:41 +0800 Subject: [PATCH 05/37] update compact test cases except keepalive --- test/emqx_mqtt_compat_SUITE.erl | 87 +++++++++++++-------------------- 1 file changed, 33 insertions(+), 54 deletions(-) diff --git a/test/emqx_mqtt_compat_SUITE.erl b/test/emqx_mqtt_compat_SUITE.erl index 0edbd148e..452b2a821 100644 --- a/test/emqx_mqtt_compat_SUITE.erl +++ b/test/emqx_mqtt_compat_SUITE.erl @@ -33,14 +33,13 @@ all() -> [basic_test, - retained_message_test, will_message_test, zero_length_clientid_test, offline_message_queueing_test, overlapping_subscriptions_test, keepalive_test, redelivery_on_reconnect_test, - subscribe_failure_test, + %% subscribe_failure_test, dollar_topics_test]. init_per_suite(Config) -> @@ -57,7 +56,7 @@ receive_messages(0, Msgs) -> Msgs; receive_messages(Count, Msgs) -> receive - {public, Msg} -> + {publish, Msg} -> receive_messages(Count-1, [Msg|Msgs]); _Other -> receive_messages(Count, Msgs) @@ -69,40 +68,16 @@ basic_test(_Config) -> Topic = nth(1, ?TOPICS), ct:print("Basic test starting"), {ok, C, _} = emqx_client:start_link(), - {ok, _, [0]} = emqx_client:subscribe(C, Topic, qos2), - ok = emqx_client:publish(C, Topic, <<"qos 0">>), - {ok, _} = emqx_client:publish(C, Topic, <<"qos 1">>, 1), + {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), - ok = emqx_client:disconnect(C), - ?assertEqual(3, length(receive_messages(3))). - -retained_message_test(_Config) -> - ct:print("Retained message test starting"), - - %% Retained messages - {ok, C1, _} = emqx_client:start_link([{clean_start, true}]), - ok = emqx_client:publish(C1, nth(1, ?TOPICS), <<"qos 0">>, [{qos, 0}, {retain, true}]), - {ok, _} = emqx_client:publish(C1, nth(3, ?TOPICS), <<"qos 1">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<"qos 2">>, [{qos, 2}, {retain, true}]), - timer:sleep(10), - {ok, #{}, [0]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), - ok = emqx_client:disconnect(C1), - ?assertEqual(3, length(receive_messages(10))), - - %% Clear retained messages - {ok, C2, _} = emqx_client:start_link([{clean_start, true}]), - ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"">>, [{qos, 0}, {retain, true}]), - {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"">>, [{qos, 2}, {retain, true}]), - timer:sleep(10), %% wait for QoS 2 exchange to be completed - {ok, _, [0]} = emqx_client:subscribe(C2, nth(6, ?WILD_TOPICS), 2), - timer:sleep(10), - ok = emqx_client:disconnect(), - ?assertEqual(0, length(receive_messages(3))). + {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), + {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), + ?assertEqual(3, length(receive_messages(3))), + ok = emqx_client:disconnect(C). will_message_test(_Config) -> {ok, C1, _} = emqx_client:start_link([{clean_start, true}, - {will_topic = nth(3, ?TOPICS)}, + {will_topic, nth(3, ?TOPICS)}, {will_payload, <<"client disconnected">>}, {keepalive, 2}]), {ok, C2, _} = emqx_client:start_link(), @@ -110,14 +85,18 @@ will_message_test(_Config) -> timer:sleep(10), ok = emqx_client:stop(C1), timer:sleep(5), - ok = emqx_client:disconnect(C2), ?assertEqual(1, length(receive_messages(1))), + ok = emqx_client:disconnect(C2), ct:print("Will message test succeeded"). zero_length_clientid_test(_Config) -> ct:print("Zero length clientid test starting"), - {error, _} = emqx_client:start_link([{clean_start, false}, - {client_id, <<>>}]), + + %% TODO: There are some controversies on the situation when + %% clean_start flag is true and clientid is zero length. + + %% {error, _} = emqx_client:start_link([{clean_start, false}, + %% {client_id, <<>>}]), {ok, _, _} = emqx_client:start_link([{clean_start, true}, {client_id, <<>>}]), ct:print("Zero length clientid test succeeded"). @@ -129,7 +108,7 @@ offline_message_queueing_test(_) -> ok = emqx_client:disconnect(C1), {ok, C2, _} = emqx_client:start_link([{clean_start, true}, {client_id, <<"c2">>}]), - + ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), @@ -147,9 +126,9 @@ overlapping_subscriptions_test(_) -> {nth(1, ?WILD_TOPICS), 1}]), timer:sleep(10), {ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2), - time:sleep(10), - emqx_client:disconnect(C), - Num = receive_messages(2), + timer:sleep(10), + + Num = length(receive_messages(2)), ?assert(lists:member(Num, [1, 2])), if Num == 1 -> @@ -159,7 +138,8 @@ overlapping_subscriptions_test(_) -> ct:print("This server is publishing one message per each matching overlapping subscription."); true -> ok - end. + end, + emqx_client:disconnect(C). keepalive_test(_) -> ct:print("Keepalive test starting"), @@ -168,14 +148,13 @@ keepalive_test(_) -> {will_topic, nth(5, ?TOPICS)}, {will_payload, <<"keepalive expiry">>}]), ok = emqx_client:pause(C1), - {ok, C2, _} = emqx_client:start_link([{clean_start, true}, {keepalive, 0}]), {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2), timer:sleep(15000), - ok = emqx_client:disconnect(C2), ?assertEqual(1, length(receive_messages(1))), - ct:print("Keepalive test succeeded"). + ct:print("Keepalive test succeeded"), + ok = emqx_client:disconnect(C2). redelivery_on_reconnect_test(_) -> ct:print("Redelivery on reconnect test starting"), @@ -188,7 +167,7 @@ redelivery_on_reconnect_test(_) -> [{qos, 1}, {retain, false}]), {ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>, [{qos, 2}, {retain, false}]), - time:sleep(10), + timer:sleep(10), ok = emqx_client:disconnect(C1), ?assertEqual(0, length(receive_messages(2))), {ok, C2, _} = emqx_client:start_link([{clean_start, false}, @@ -197,20 +176,20 @@ redelivery_on_reconnect_test(_) -> ok = emqx_client:disconnect(C2), ?assertEqual(2, length(receive_messages(2))). -subscribe_failure_test(_) -> - ct:print("Subscribe failure test starting"), - {ok, C, _} = emqx_client:start_link([]), - {ok, _, [16#80]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2), - timer:sleep(10), - ct:print("Subscribe failure test succeeded"). +%% subscribe_failure_test(_) -> +%% ct:print("Subscribe failure test starting"), +%% {ok, C, _} = emqx_client:start_link([]), +%% {ok, _, [2]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2), +%% timer:sleep(10), +%% ct:print("Subscribe failure test succeeded"). dollar_topics_test(_) -> ct:print("$ topics test starting"), {ok, C, _} = emqx_client:start_link([{clean_start, true}, {keepalive, 0}]), - {ok, _, [2]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 2), - {ok, _} = emqx_client:publish(C, <<"$", (nth(2, ?TOPICS))>>, - <<"">>, [{qos, 1}, {retain, false}]), + {ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1), + {ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>, + <<"test">>, [{qos, 1}, {retain, false}]), timer:sleep(10), ?assertEqual(0, length(receive_messages(1))), ok = emqx_client:disconnect(C), From 78a8ccd0f2bdad31aecac0790410cbbae4de5947 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 21:17:20 +0800 Subject: [PATCH 06/37] Only store packet_id and timestamp for qos2 message --- src/emqx_session.erl | 53 ++++++++++++++++---------------------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b291bd1fa..65bce85aa 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -376,7 +376,7 @@ handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> {stop, {shutdown, conflict}, ok, State}; %% PUBLISH: -handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, +handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From, State = #state{awaiting_rel = AwaitingRel}) -> reply(case is_awaiting_full(State) of false -> @@ -384,13 +384,12 @@ handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, true -> {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; false -> - State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, + State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)}, {emqx_broker:publish(Msg), ensure_await_rel_timer(State1)} end; true -> emqx_metrics:inc('messages/qos2/dropped'), - ?LOG(warning, "Dropped message for too many awaiting_rel: ~p", - [emqx_message:format(Msg)], State), + ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State), {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} end); @@ -408,7 +407,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In %% PUBREL: handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) -> reply(case maps:take(PacketId, AwaitingRel) of - {_, AwaitingRel1} -> + {_Ts, AwaitingRel1} -> {ok, State#state{awaiting_rel = AwaitingRel1}}; error -> emqx_metrics:inc('packets/pubrel/missed'), @@ -639,8 +638,9 @@ retry_delivery(Force, State = #state{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> State; false -> - InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), - retry_delivery(Force, InflightMsgs, os:timestamp(), State) + SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, + Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)), + retry_delivery(Force, Msgs, os:timestamp(), State) end. retry_delivery(_Force, [], _Now, State) -> @@ -650,9 +650,9 @@ retry_delivery(_Force, [], _Now, State) -> retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, State = #state{inflight = Inflight, retry_interval = Interval}) -> %% Microseconds -> MilliSeconds - Diff = timer:now_diff(Now, Ts) div 1000, + Age = timer:now_diff(Now, Ts) div 1000, if - Force orelse (Diff >= Interval) -> + Force orelse (Age >= Interval) -> Inflight1 = case {Type, Msg0} of {publish, {PacketId, Msg}} -> case emqx_message:is_expired(Msg) of @@ -669,7 +669,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, end, retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); true -> - ensure_retry_timer(Interval - Diff, State) + ensure_retry_timer(Interval - max(0, Age), State) end. %%------------------------------------------------------------------------------ @@ -679,36 +679,21 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> case maps:size(AwaitingRel) of 0 -> State; - _ -> Msgs = lists:sort(sortfun(awaiting_rel), maps:to_list(AwaitingRel)), - expire_awaiting_rel(Msgs, os:timestamp(), State) + _ -> expire_awaiting_rel(lists:keysort(2, maps:to_list(AwaitingRel)), os:timestamp(), State) end. expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; -expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now, +expire_awaiting_rel([{PacketId, Ts} | More], Now, State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> - case (timer:now_diff(Now, TS) div 1000) of - Diff when Diff >= Timeout -> - emqx_metrics:inc('messages/qos2/dropped'), - ?LOG(warning, "Dropped message for await_rel_timeout: ~p", - [emqx_message:format(Msg)], State), - expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); - Diff -> - ensure_await_rel_timer(Timeout - Diff, State) - end. - -%%------------------------------------------------------------------------------ -%% Sort Inflight, AwaitingRel -%%------------------------------------------------------------------------------ - -sortfun(inflight) -> - fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end; - -sortfun(awaiting_rel) -> - fun({_, #message{timestamp = Ts1}}, - {_, #message{timestamp = Ts2}}) -> - Ts1 < Ts2 + case (timer:now_diff(Now, Ts) div 1000) of + Age when Age >= Timeout -> + emqx_metrics:inc('messages/qos2/expired'), + ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State), + expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); + Age -> + ensure_await_rel_timer(Timeout - max(0, Age), State) end. %%------------------------------------------------------------------------------ From cf0f55d057607b1d2ae21b25d6a8f010eb9d0ca9 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 30 Aug 2018 21:43:23 +0800 Subject: [PATCH 07/37] delayed will message --- src/emqx_client.erl | 16 ++-------------- src/emqx_protocol.erl | 17 ++++++++++------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 82e331abd..5c50519bc 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -373,22 +373,12 @@ init([Options]) -> {_ver, undefined} -> random_client_id(); {_ver, Id} -> iolist_to_binary(Id) end, - Username = case proplists:get_value(username, Options) of - undefined -> <<>>; - Name -> Name - end, - Password = case proplists:get_value(password, Options) of - undefined -> <<>>; - Passw -> Passw - end, State = init(Options, #state{host = {127,0,0,1}, port = 1883, hosts = [], sock_opts = [], bridge_mode = false, client_id = ClientId, - username = Username, - password = Password, clean_start = true, proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, @@ -450,9 +440,9 @@ init([{client_id, ClientId} | Opts], State) -> init(Opts, State#state{client_id = iolist_to_binary(ClientId)}); init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) -> init(Opts, State#state{clean_start = CleanStart}); -init([{useranme, Username} | Opts], State) -> +init([{username, Username} | Opts], State) -> init(Opts, State#state{username = iolist_to_binary(Username)}); -init([{passwrod, Password} | Opts], State) -> +init([{password, Password} | Opts], State) -> init(Opts, State#state{password = iolist_to_binary(Password)}); init([{keepalive, Secs} | Opts], State) -> init(Opts, State#state{keepalive = timer:seconds(Secs)}); @@ -552,8 +542,6 @@ mqtt_connect(State = #state{client_id = ClientId, properties = Properties}) -> ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties), - io:format("ConnProps: ~p, ClientID: ~p, Username: ~p, Password: ~p~n", - [ConnProps, ClientId, Username, Password]), send(?CONNECT_PACKET( #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9ee24609f..054bcdf16 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -264,7 +264,6 @@ process_packet(?CONNECT_PACKET( %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = emqx_packet:will_msg(Connect), - PState1 = set_username(Username, PState#pstate{client_id = ClientId, proto_ver = ProtoVer, @@ -656,18 +655,23 @@ shutdown(conflict, #pstate{client_id = ClientId}) -> shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> emqx_cm:unregister_connection(ClientId), ignore; -shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> +shutdown(Error, PState = #pstate{connected = Connected, + client_id = ClientId, + will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], PState), - %% TODO: Auth failure not publish the will message - case Error =:= auth_failure of - true -> ok; - false -> send_willmsg(WillMsg) + case Connected of + false -> ok; + true -> send_willmsg(WillMsg) end, emqx_hooks:run('client.disconnected', [credentials(PState), Error]), emqx_cm:unregister_connection(ClientId). send_willmsg(undefined) -> ignore; +send_willmsg(WillMsg = #message{topic = Topic, + headers = #{'Will-Delay-Interval' := Interval}}) when is_integer(Interval) -> + SendAfter = integer_to_binary(Interval), + emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>}); send_willmsg(WillMsg) -> emqx_broker:publish(WillMsg). @@ -709,4 +713,3 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> sp(true) -> 1; sp(false) -> 0. - From fb8a86c5e0270697b66520cdc2ab28b704aab2fc Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 30 Aug 2018 21:58:02 +0800 Subject: [PATCH 08/37] delayed will message --- src/emqx_protocol.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 054bcdf16..5b7c204cd 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -655,14 +655,14 @@ shutdown(conflict, #pstate{client_id = ClientId}) -> shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> emqx_cm:unregister_connection(ClientId), ignore; -shutdown(Error, PState = #pstate{connected = Connected, +shutdown(Error, PState = #pstate{connected = false}) -> + ?LOG(info, "Shutdown for ~p", [Error], PState), + ignore; +shutdown(Error, PState = #pstate{connected = true, client_id = ClientId, will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], PState), - case Connected of - false -> ok; - true -> send_willmsg(WillMsg) - end, + send_willmsg(WillMsg), emqx_hooks:run('client.disconnected', [credentials(PState), Error]), emqx_cm:unregister_connection(ClientId). From 025a3c7d278e5498ba4d8ab6f24e626aea6dc575 Mon Sep 17 00:00:00 2001 From: RockyJin Date: Thu, 30 Aug 2018 22:07:46 +0800 Subject: [PATCH 09/37] fix typo of README --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1e80dae6b..0e13dd019 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # *EMQ X* - MQTT Broker -*EMQ X* broker is fully a open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. +*EMQ X* broker is a fully open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket, STOMP and SockJS. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. @@ -17,8 +17,8 @@ The *EMQ* broker is cross-platform, which can be deployed on Linux, Unix, Mac, W Download the binary package for your platform from [here](http://emqtt.io/downloads). --[Single Node Install](http://emqtt.io/docs/v2/install.html) --[Multi Node Install](http://emqtt.io/docs/v2/cluster.html) +- [Single Node Install](http://emqtt.io/docs/v2/install.html) +- [Multi Node Install](http://emqtt.io/docs/v2/cluster.html) ## Build From Source From d0eaa5192854a91cb70f4e08837fc141919b7cc3 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 22:16:31 +0800 Subject: [PATCH 10/37] add emqx listeners suite --- Makefile | 5 ++- test/emqx_listeners_SUITE.erl | 72 +++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 test/emqx_listeners_SUITE.erl diff --git a/Makefile b/Makefile index aba0d4ac4..6e888a53e 100644 --- a/Makefile +++ b/Makefile @@ -37,8 +37,9 @@ EUNIT_OPTS = verbose CT_SUITES = emqx emqx_connection emqx_session emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ - emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_message emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone emqx_mountpoint + emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ + emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone \ + emqx_mountpoint emqx_listeners CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl new file mode 100644 index 000000000..f826e1798 --- /dev/null +++ b/test/emqx_listeners_SUITE.erl @@ -0,0 +1,72 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_listeners_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + +all() -> + [start_stop_listeners, + restart_listeners]. + +init_per_suite(Config) -> + NewConfig = generate_config(), + lists:foreach(fun set_app_env/1, NewConfig), + Config. + +end_per_suite(_Config) -> + ok. + +start_stop_listeners() -> + emqx_listeners:start(), + emqx_listeners:stop(). + + +restart_listeners() -> + ok = emqx_listeners:restart(). + +generate_config() -> + Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), + Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]), + cuttlefish_generator:map(Schema, Conf). + +set_app_env({App, Lists}) -> + lists:foreach(fun({acl_file, _Var}) -> + application:set_env(App, acl_file, local_path(["etc", "acl.conf"])); + ({plugins_loaded_file, _Var}) -> + application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"])); + ({Par, Var}) -> + application:set_env(App, Par, Var) + end, Lists). + +local_path(Components, Module) -> + filename:join([get_base_dir(Module) | Components]). + +local_path(Components) -> + local_path(Components, ?MODULE). + +get_base_dir(Module) -> + {file, Here} = code:is_loaded(Module), + filename:dirname(filename:dirname(Here)). + +get_base_dir() -> + get_base_dir(?MODULE). From f229c1675230923c696dba88d846daf155e6bdd8 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 22:21:13 +0800 Subject: [PATCH 11/37] fix listener test case --- test/emqx_listeners_SUITE.erl | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index f826e1798..d55bba400 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -28,21 +28,25 @@ all() -> [start_stop_listeners, restart_listeners]. -init_per_suite(Config) -> +init_per_suite() -> NewConfig = generate_config(), + application:ensure_all_started(esockd), lists:foreach(fun set_app_env/1, NewConfig), - Config. - -end_per_suite(_Config) -> ok. -start_stop_listeners() -> - emqx_listeners:start(), - emqx_listeners:stop(). +end_per_suite() -> + application:stop(esockd), + ok. + +start_stop_listeners(_) -> + ok = emqx_listeners:start(), + ok = emqx_listeners:stop(). - -restart_listeners() -> - ok = emqx_listeners:restart(). +restart_listeners(_) -> + ok = emqx_listeners:start(), + ok = emqx_listeners:stop(), + ok = emqx_listeners:restart(), + ok = emqx_listeners:stop(). generate_config() -> Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]), From 0379219a044e9ae9a8f62a9c751620ee9c919098 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 23:14:09 +0800 Subject: [PATCH 12/37] Improve the design of session discard --- src/emqx_connection.erl | 16 ++++--- src/emqx_protocol.erl | 31 ++++++------- src/emqx_session.erl | 92 ++++++++++++++++++++------------------ src/emqx_ws_connection.erl | 9 +++- 4 files changed, 78 insertions(+), 70 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 16d35585e..bcaea297d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -217,6 +217,10 @@ handle_info(timeout, State) -> handle_info({shutdown, Error}, State) -> shutdown(Error, State); +handle_info({shutdown, discard, {ClientId, ByPid}}, State) -> + ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), + shutdown(discard, State); + handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), shutdown(conflict, State); @@ -240,10 +244,10 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Sock}) -> +handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), StatFun = fun() -> - case Transport:getstat(Sock, [recv_oct]) of + case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; Error -> Error end @@ -270,11 +274,11 @@ handle_info(Info, State) -> {noreply, State}. terminate(Reason, State = #state{transport = Transport, - socket = Sock, + socket = Socket, keepalive = KeepAlive, proto_state = ProtoState}) -> ?LOG(debug, "Terminated for ~p", [Reason], State), - Transport:fast_close(Sock), + Transport:fast_close(Socket), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> ok; @@ -358,8 +362,8 @@ run_socket(State = #state{conn_state = blocked}) -> State; run_socket(State = #state{await_recv = true}) -> State; -run_socket(State = #state{transport = Transport, socket = Sock}) -> - Transport:async_recv(Sock, 0, infinity), +run_socket(State = #state{transport = Transport, socket = Socket}) -> + Transport:async_recv(Socket, 0, infinity), State#state{await_recv = true}. %%------------------------------------------------------------------------------ diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index da7ee88b8..5ef4a9b0d 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -402,11 +402,11 @@ process_packet(?PACKET(?DISCONNECT), PState) -> %%------------------------------------------------------------------------------ connack({?RC_SUCCESS, SP, PState}) -> - emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> - emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), + emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> ReasonCode; true -> @@ -648,22 +648,17 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> false -> MsgCnt end}. -shutdown(_Error, #pstate{client_id = undefined}) -> - ignore; -shutdown(conflict, #pstate{client_id = ClientId}) -> - emqx_cm:unregister_connection(ClientId), - ignore; -shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> - emqx_cm:unregister_connection(ClientId), - ignore; -shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> - ?LOG(info, "Shutdown for ~p", [Error], PState), - %% TODO: Auth failure not publish the will message - case Error =:= auth_failure of - true -> ok; - false -> send_willmsg(WillMsg) - end, - emqx_hooks:run('client.disconnected', [credentials(PState), Error]), +shutdown(_Reason, #pstate{client_id = undefined}) -> + ok; +shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; + Reason =:= discard -> + emqx_cm:unregister_connection(ClientId); +shutdown(Reason, PState = #pstate{client_id = ClientId, + will_msg = WillMsg, + connected = true}) -> + ?LOG(info, "Shutdown for ~p", [Reason], PState), + _ = send_willmsg(WillMsg), + emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). send_willmsg(undefined) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 65bce85aa..d5b68a1f6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -147,6 +147,8 @@ created_at :: erlang:timestamp() }). +-type(spid() :: pid()). + -define(TIMEOUT, 60000). -define(LOG(Level, Format, Args, State), @@ -159,7 +161,7 @@ start_link(SessAttrs) -> proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]). %% @doc Get session info --spec(info(pid() | #state{}) -> list({atom(), term()})). +-spec(info(spid() | #state{}) -> list({atom(), term()})). info(SPid) when is_pid(SPid) -> gen_server:call(SPid, info, infinity); @@ -187,7 +189,7 @@ info(State = #state{conn_pid = ConnPid, {await_rel_timeout, AwaitRelTimeout}]. %% @doc Get session attrs --spec(attrs(pid() | #state{}) -> list({atom(), term()})). +-spec(attrs(spid() | #state{}) -> list({atom(), term()})). attrs(SPid) when is_pid(SPid) -> gen_server:call(SPid, attrs, infinity); @@ -204,7 +206,7 @@ attrs(#state{clean_start = CleanStart, {expiry_interval, ExpiryInterval div 1000}, {created_at, CreatedAt}]. --spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). +-spec(stats(spid() | #state{}) -> list({atom(), non_neg_integer()})). stats(SPid) when is_pid(SPid) -> gen_server:call(SPid, stats, infinity); @@ -233,19 +235,19 @@ stats(#state{max_subscriptions = MaxSubscriptions, %% PubSub API %%------------------------------------------------------------------------------ --spec(subscribe(pid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok). +-spec(subscribe(spid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok). subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) || {RawTopic, SubOpts} <- RawTopicFilters], subscribe(SPid, undefined, #{}, TopicFilters). --spec(subscribe(pid(), emqx_mqtt_types:packet_id(), +-spec(subscribe(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). subscribe(SPid, PacketId, Properties, TopicFilters) -> SubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {subscribe, self(), SubReq}). --spec(publish(pid(), emqx_mqtt_types:packet_id(), emqx_types:message()) +-spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message()) -> {ok, emqx_types:deliver_results()}). publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message to broker directly @@ -259,56 +261,56 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2}) -> %% Publish QoS2 message to session gen_server:call(SPid, {publish, PacketId, Msg}, infinity). --spec(puback(pid(), emqx_mqtt_types:packet_id()) -> ok). +-spec(puback(spid(), emqx_mqtt_types:packet_id()) -> ok). puback(SPid, PacketId) -> gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}). puback(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {puback, PacketId, ReasonCode}). --spec(pubrec(pid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}). +-spec(pubrec(spid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}). pubrec(SPid, PacketId) -> pubrec(SPid, PacketId, ?RC_SUCCESS). --spec(pubrec(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) +-spec(pubrec(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok | {error, emqx_mqtt_types:reason_code()}). pubrec(SPid, PacketId, ReasonCode) -> gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity). --spec(pubrel(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) +-spec(pubrel(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok | {error, emqx_mqtt_types:reason_code()}). pubrel(SPid, PacketId, ReasonCode) -> gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity). --spec(pubcomp(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). +-spec(pubcomp(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). pubcomp(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}). --spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). +-spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> - TopicFilters = lists:map(fun({RawTopic, Opts}) -> - emqx_topic:parse(RawTopic, Opts); - (RawTopic) -> - emqx_topic:parse(RawTopic) - end, RawTopicFilters), + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) when is_binary(RawTopic) -> + emqx_topic:parse(RawTopic) + end, RawTopicFilters), unsubscribe(SPid, undefined, #{}, TopicFilters). --spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), +-spec(unsubscribe(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). unsubscribe(SPid, PacketId, Properties, TopicFilters) -> UnsubReq = {PacketId, Properties, TopicFilters}, gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). --spec(resume(pid(), pid()) -> ok). +-spec(resume(spid(), pid()) -> ok). resume(SPid, ConnPid) -> gen_server:cast(SPid, {resume, ConnPid}). %% @doc Discard the session --spec(discard(pid(), emqx_types:client_id()) -> ok). -discard(SPid, ClientId) -> - gen_server:call(SPid, {discard, ClientId}, infinity). +-spec(discard(spid(), ByPid :: pid()) -> ok). +discard(SPid, ByPid) -> + gen_server:call(SPid, {discard, ByPid}, infinity). --spec(close(pid()) -> ok). +-spec(close(spid()) -> ok). close(SPid) -> gen_server:call(SPid, close, infinity). @@ -367,13 +369,23 @@ init_mqueue(Zone) -> binding(ConnPid) -> case node(ConnPid) =:= node() of true -> local; false -> remote end. -handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) -> - ?LOG(warning, "Discarded by ~p", [ConnPid], State), +handle_call(info, _From, State) -> + reply(info(State), State); + +handle_call(attrs, _From, State) -> + reply(attrs(State), State); + +handle_call(stats, _From, State) -> + reply(stats(State), State); + +handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> + ?LOG(warning, "Discarded by ~p", [ByPid], State), {stop, {shutdown, discard}, ok, State}; -handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> - ?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State), - {stop, {shutdown, conflict}, ok, State}; +handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> + ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid], State), + ConnPid ! {shutdown, discard, {ClientId, ByPid}}, + {stop, {shutdown, discard}, ok, State}; %% PUBLISH: handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From, @@ -415,15 +427,6 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} end); -handle_call(info, _From, State) -> - reply(info(State), State); - -handle_call(attrs, _From, State) -> - reply(attrs(State), State); - -handle_call(stats, _From, State) -> - reply(stats(State), State); - handle_call(close, _From, State) -> {stop, normal, ok, State}; @@ -441,6 +444,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, SubMap; {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), + %% Why??? emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), maps:put(Topic, SubOpts, SubMap); error -> @@ -617,17 +621,17 @@ unsuback(From, PacketId, ReasonCodes) -> From ! {deliver, {unsuback, PacketId, ReasonCodes}}. %%------------------------------------------------------------------------------ -%% Kickout old client +%% Kickout old connection -kick(_ClientId, undefined, _Pid) -> +kick(_ClientId, undefined, _ConnPid) -> ignore; -kick(_ClientId, Pid, Pid) -> +kick(_ClientId, ConnPid, ConnPid) -> ignore; -kick(ClientId, OldPid, Pid) -> - unlink(OldPid), - OldPid ! {shutdown, conflict, {ClientId, Pid}}, +kick(ClientId, OldConnPid, ConnPid) -> + unlink(OldConnPid), + OldConnPid ! {shutdown, conflict, {ClientId, ConnPid}}, %% Clean noproc - receive {'EXIT', OldPid, _} -> ok after 1 -> ok end. + receive {'EXIT', OldConnPid, _} -> ok after 1 -> ok end. %%------------------------------------------------------------------------------ %% Replay or Retry Delivery diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ed1532565..74014707a 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -34,20 +34,21 @@ options, peername, sockname, + idle_timeout, proto_state, parser_state, keepalive, enable_stats, stats_timer, - idle_timeout, shutdown_reason }). -define(INFO_KEYS, [peername, sockname]). + -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). + emqx_logger:Level("WSMQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). %%------------------------------------------------------------------------------ %% API @@ -235,6 +236,10 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> shutdown(keepalive_error, State) end; +websocket_info({shutdown, discard, {ClientId, ByPid}}, State) -> + ?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), + shutdown(discard, State); + websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), shutdown(conflict, State); From 1c945661411178b4e70edd5a2c04ee11a6cbc06e Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 23:49:08 +0800 Subject: [PATCH 13/37] add topic alias validate --- src/emqx_packet.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index dfc8359d2..7e4f27821 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -55,10 +55,11 @@ validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> validate_packet_id(PacketId) andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); -validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) -> +validate(?PUBLISH_PACKET(_QoS, <<>>, _, _, _)) -> error(topic_name_invalid); -validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) -> - (not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid); +validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) -> + ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid)) + andalso validate_properties(?PUBLISH, Properties); validate(_Packet) -> true. @@ -71,9 +72,14 @@ validate_packet_id(_) -> validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) when I =< 0; I >= 16#FFFFFFF -> error(subscription_identifier_invalid); +validate_properties(?PUBLISH, # {'Topic-Alias':= I}) + when I =:= 0 -> + error(topic_alias_invalid); validate_properties(_, _) -> true. + + validate_subscription({Topic, #{qos := QoS}}) -> emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). From 809c516a2b2036baaf952a9ad287ae9fa1ccc738 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 Aug 2018 00:04:01 +0800 Subject: [PATCH 14/37] delete client testcases duplicated with emqx_mqtt_compat --- test/emqx_client_SUITE.erl | 42 -------------------------------------- 1 file changed, 42 deletions(-) delete mode 100644 test/emqx_client_SUITE.erl diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl deleted file mode 100644 index 2fee82759..000000000 --- a/test/emqx_client_SUITE.erl +++ /dev/null @@ -1,42 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_client_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include("emqx_mqtt.hrl"). - --include_lib("eunit/include/eunit.hrl"). - -all() -> [{group, connect}]. - -groups() -> [{connect, [start]}]. - -init_per_suite(Config) -> - Config. - -end_per_suite(_Config) -> - ok. - -init_per_group(_Group, Config) -> - Config. - -end_per_group(_Group, _Config) -> - ok. - -start(_Config) -> - {ok, ClientPid, _} = emqx_client:start_link(). - From a7274b115d407bdcabe26e19d88402f45578c21a Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 Aug 2018 00:15:45 +0800 Subject: [PATCH 15/37] ignore dup cases --- Makefile | 2 +- test/emqx_mqtt_compat_SUITE.erl | 34 +++++++++++++++++---------------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index 13f726faa..c8d8db93f 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ EUNIT_OPTS = verbose # CT_SUITES = emqx_mqueue ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \ +CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone \ diff --git a/test/emqx_mqtt_compat_SUITE.erl b/test/emqx_mqtt_compat_SUITE.erl index 452b2a821..af2583678 100644 --- a/test/emqx_mqtt_compat_SUITE.erl +++ b/test/emqx_mqtt_compat_SUITE.erl @@ -37,7 +37,7 @@ all() -> zero_length_clientid_test, offline_message_queueing_test, overlapping_subscriptions_test, - keepalive_test, + %% keepalive_test, redelivery_on_reconnect_test, %% subscribe_failure_test, dollar_topics_test]. @@ -58,7 +58,8 @@ receive_messages(Count, Msgs) -> receive {publish, Msg} -> receive_messages(Count-1, [Msg|Msgs]); - _Other -> + Other -> + ct:log("~p~n", [Other]), receive_messages(Count, Msgs) after 10 -> Msgs @@ -141,20 +142,21 @@ overlapping_subscriptions_test(_) -> end, emqx_client:disconnect(C). -keepalive_test(_) -> - ct:print("Keepalive test starting"), - {ok, C1, _} = emqx_client:start_link([{clean_start, true}, - {keepalive, 5}, - {will_topic, nth(5, ?TOPICS)}, - {will_payload, <<"keepalive expiry">>}]), - ok = emqx_client:pause(C1), - {ok, C2, _} = emqx_client:start_link([{clean_start, true}, - {keepalive, 0}]), - {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2), - timer:sleep(15000), - ?assertEqual(1, length(receive_messages(1))), - ct:print("Keepalive test succeeded"), - ok = emqx_client:disconnect(C2). +%% keepalive_test(_) -> +%% ct:print("Keepalive test starting"), +%% {ok, C1, _} = emqx_client:start_link([{clean_start, true}, +%% {keepalive, 5}, +%% {will_flag, true}, +%% {will_topic, nth(5, ?TOPICS)}, +%% %% {will_qos, 2}, +%% {will_payload, <<"keepalive expiry">>}]), +%% ok = emqx_client:pause(C1), +%% {ok, C2, _} = emqx_client:start_link([{clean_start, true}, +%% {keepalive, 0}]), +%% {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2), +%% ok = emqx_client:disconnect(C2), +%% ?assertEqual(1, length(receive_messages(1))), +%% ct:print("Keepalive test succeeded"). redelivery_on_reconnect_test(_) -> ct:print("Redelivery on reconnect test starting"), From e6bed24bb32a6d30057c132d998866d012730a00 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 00:32:56 +0800 Subject: [PATCH 16/37] Add server_keepalive config --- etc/emqx.conf | 5 +++++ priv/emqx.schema | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/etc/emqx.conf b/etc/emqx.conf index 5975ae542..e8435a3b3 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -580,6 +580,11 @@ zone.external.enable_stats = on ## Value: boolean ## zone.external.shared_subscription = false +## Server Keep Alive +## +## Value: Number +## zone.external.server_keepalive = 0 + ## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## until 'Keepalive * backoff * 2' timeout. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 07ff5ce6f..1e1892b33 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -735,6 +735,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Server Keepalive +{mapping, "zone.$name.server_keepalive", "emqx.zones", [ + {datatype, integer} +]}. + %% @doc Keepalive backoff {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ {default, 0.75}, From b6006b5947b784fa5cdfdd558f03fd8938bd6708 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 00:40:10 +0800 Subject: [PATCH 17/37] Support CONNACK properties --- src/emqx_protocol.erl | 65 +++++++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 15 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 5ef4a9b0d..4dc2c8e75 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -41,6 +41,7 @@ proto_name, ackprops, client_id, + is_assigned, conn_pid, conn_props, ack_props, @@ -87,6 +88,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, client_id = <<>>, + is_assigned = false, conn_pid = self(), username = init_username(Peercert, Options), is_super = false, @@ -265,17 +267,16 @@ process_packet(?CONNECT_PACKET( %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = emqx_packet:will_msg(Connect), - PState1 = set_username(Username, - PState#pstate{client_id = ClientId, - proto_ver = ProtoVer, - proto_name = ProtoName, - clean_start = CleanStart, - keepalive = Keepalive, - conn_props = ConnProps, - will_topic = WillTopic, - will_msg = WillMsg, - is_bridge = IsBridge, - connected_at = os:timestamp()}), + PState1 = set_username(Username, PState#pstate{client_id = ClientId, + proto_ver = ProtoVer, + proto_name = ProtoName, + clean_start = CleanStart, + keepalive = Keepalive, + conn_props = ConnProps, + will_topic = WillTopic, + will_msg = WillMsg, + is_bridge = IsBridge, + connected_at = os:timestamp()}), connack( case check_connect(Connect, PState1) of @@ -450,6 +451,33 @@ puback(?QOS_2, PacketId, {ok, _}, PState) -> deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); +deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, + proto_ver = ?MQTT_PROTO_V5, + client_id = ClientId, + is_assigned = IsAssigned}) -> + #{max_packet_size := MaxPktSize, + max_qos_allowed := MaxQoS, + mqtt_retain_available := Retain, + max_topic_alias := MaxAlias, + mqtt_shared_subscription := Shared, + mqtt_wildcard_subscription := Wildcard} = caps(PState), + Props = #{'Maximum-QoS' => MaxQoS, + 'Retain-Available' => flag(Retain), + 'Maximum-Packet-Size' => MaxPktSize, + 'Topic-Alias-Maximum' => MaxAlias, + 'Wildcard-Subscription-Available' => Wildcard, + 'Subscription-Identifiers-Available' => 1, + 'Shared-Subscription-Available' => flag(Shared)}, + Props1 = if IsAssigned -> + Props#{'Assigned-Client-Identifier' => ClientId}; + true -> Props + end, + Props2 = case emqx_zone:get_env(Zone, server_keepalive) of + undefined -> Props1; + Keepalive -> Props1#{'Server-Keep-Alive' => Keepalive} + end, + send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props2), PState); + deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); @@ -509,7 +537,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) -> ClientId = emqx_guid:to_base62(emqx_guid:gen()), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), - PState#pstate{client_id = ClientId, ackprops = AckProps1}; + PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1}; maybe_assign_client_id(PState) -> PState. @@ -532,9 +560,13 @@ try_open_session(#pstate{zone = Zone, authenticate(Credentials, Password) -> case emqx_access_control:authenticate(Credentials, Password) of - ok -> {ok, false}; - {ok, IsSuper} -> {ok, IsSuper}; - {error, Error} -> {error, Error} + ok -> {ok, false}; + {ok, IsSuper} when is_boolean(IsSuper) -> + {ok, IsSuper}; + {ok, Result} when is_map(Result) -> + {ok, maps:get(is_superuser, Result, false)}; + {error, Error} -> + {error, Error} end. set_property(Name, Value, undefined) -> @@ -705,3 +737,6 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> sp(true) -> 1; sp(false) -> 0. +flag(false) -> 0; +flag(true) -> 1. + From 487fa6824d483814f27c71cb097a8848462f9fbf Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 Aug 2018 00:42:55 +0800 Subject: [PATCH 18/37] add emqx_protocol test suites --- Makefile | 2 +- test/emqx_protocol_SUITE.erl | 132 +++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 test/emqx_protocol_SUITE.erl diff --git a/Makefile b/Makefile index c8d8db93f..e0680732a 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_broke emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone \ - emqx_mountpoint emqx_listeners + emqx_mountpoint emqx_listeners emqx_protocol CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl new file mode 100644 index 000000000..2323519b1 --- /dev/null +++ b/test/emqx_protocol_SUITE.erl @@ -0,0 +1,132 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_protocol_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-import(emqx_serializer, [serialize/1]). + +all() -> + [%% {group, parser}, + %% {group, serializer}, + {group, packet}, + {group, message}]. + +groups() -> + [%% {parser, [], + %% [ + %% parse_connect, + %% parse_bridge, + %% parse_publish, + %% parse_puback, + %% parse_pubrec, + %% parse_pubrel, + %% parse_pubcomp, + %% parse_subscribe, + %% parse_unsubscribe, + %% parse_pingreq, + %% parse_disconnect]}, + %% {serializer, [], + %% [serialize_connect, + %% serialize_connack, + %% serialize_publish, + %% serialize_puback, + %% serialize_pubrel, + %% serialize_subscribe, + %% serialize_suback, + %% serialize_unsubscribe, + %% serialize_unsuback, + %% serialize_pingreq, + %% serialize_pingresp, + %% serialize_disconnect]}, + {packet, [], + [packet_proto_name, + packet_type_name, + packet_format]}, + {message, [], + [message_make + %% message_from_packet + ]} + ]. + + + +%%-------------------------------------------------------------------- +%% Packet Cases +%%-------------------------------------------------------------------- + +packet_proto_name(_) -> + ?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), + ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)). + +packet_type_name(_) -> + ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), + ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). + +%% packet_connack_name(_) -> +%% ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)), +%% ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)), +%% ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)), +%% ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)), +%% ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)), +%% ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)). + +packet_format(_) -> + io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), + io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), + io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), + io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), + io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), + io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), + io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), + io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). + +%%-------------------------------------------------------------------- +%% Message Cases +%%-------------------------------------------------------------------- + +message_make(_) -> + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + ?assertEqual(0, Msg#message.qos), + Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>), + ?assert(is_binary(Msg1#message.id)), + ?assertEqual(qos2, Msg1#message.qos). + +%% message_from_packet(_) -> +%% Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), +%% ?assertEqual(1, Msg#message.qos), +%% %% ?assertEqual(10, Msg#message.pktid), +%% ?assertEqual(<<"topic">>, Msg#message.topic), +%% WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true, +%% will_topic = <<"WillTopic">>, +%% will_payload = <<"WillMsg">>}), +%% ?assertEqual(<<"WillTopic">>, WillMsg#message.topic), +%% ?assertEqual(<<"WillMsg">>, WillMsg#message.payload). + + %% Msg2 = emqx_message:fomat_packet(<<"username">>, <<"clientid">>, + %% ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)), + + From dc9a1cd80feff452d18a8da694f313bd2657974b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 00:48:23 +0800 Subject: [PATCH 19/37] Format emqx_protocol module --- src/emqx_protocol.erl | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index a3f2e2699..9affac57e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -267,16 +267,17 @@ process_packet(?CONNECT_PACKET( %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = emqx_packet:will_msg(Connect), - PState1 = set_username(Username, PState#pstate{client_id = ClientId, - proto_ver = ProtoVer, - proto_name = ProtoName, - clean_start = CleanStart, - keepalive = Keepalive, - conn_props = ConnProps, - will_topic = WillTopic, - will_msg = WillMsg, - is_bridge = IsBridge, - connected_at = os:timestamp()}), + PState1 = set_username(Username, + PState#pstate{client_id = ClientId, + proto_ver = ProtoVer, + proto_name = ProtoName, + clean_start = CleanStart, + keepalive = Keepalive, + conn_props = ConnProps, + will_topic = WillTopic, + will_msg = WillMsg, + is_bridge = IsBridge, + connected_at = os:timestamp()}), connack( case check_connect(Connect, PState1) of {ok, PState2} -> @@ -681,7 +682,7 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) -> shutdown(_Reason, #pstate{client_id = undefined}) -> ok; -shutdown(_Reason, PState = #pstate{connected = false}) -> +shutdown(_Reason, #pstate{connected = false}) -> ok; shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; Reason =:= discard -> From a4efcb5b2c9fd33f02991063759e29a10f159af0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 01:08:01 +0800 Subject: [PATCH 20/37] Update Makefile and README.md --- Makefile | 4 ++-- README.md | 30 +++++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/Makefile b/Makefile index e0680732a..7202915cc 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,8 @@ dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_gproc = git https://github.com/uwiger/gproc 0.8.0 dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0 dep_lager = git https://github.com/erlang-lager/lager 3.6.4 -dep_esockd = git https://github.com/emqx/esockd emqx30 -dep_ekka = git https://github.com/emqx/ekka emqx30 +dep_esockd = git https://github.com/emqx/esockd v5.4 +dep_ekka = git https://github.com/emqx/ekka v0.4.1 dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 dep_clique = git https://github.com/emqx/clique dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1 diff --git a/README.md b/README.md index 0e13dd019..9ca4f447d 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,13 @@ Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol spec - For full list of new features, please read *EMQ X* broker 3.0 [release notes](https://github.com/emqtt/emqttd/releases/). -- For more information, please visit [EMQ X homepage](http://emqtt.io). +- For more information, please visit [EMQ X homepage](http://emqtt.io). ## Installation -The *EMQ* broker is cross-platform, which can be deployed on Linux, Unix, Mac, Windows and even Raspberry Pi. +The *EMQ X* broker is cross-platform, which can be deployed on Linux, Unix, Mac, Windows and even Raspberry Pi. Download the binary package for your platform from [here](http://emqtt.io/downloads). @@ -23,27 +23,27 @@ Download the binary package for your platform from [here](http://emqtt.io/downlo ## Build From Source -The *EMQ* broker requires Erlang/OTP R21+ to build since 3.0 release. +The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release. ``` -git clone https://github.com/emqtt/emq-relx.git +git clone https://github.com/emqx/emqx-rel.git -cd emq-relx && make +cd emqx-rel && make -cd _rel/emqttd && ./bin/emqttd console +cd _rel/emqx && ./bin/emqx console ``` ## Quick Start - # Start emqttd - ./bin/emqttd start - + # Start emqx + ./bin/emqx start + # Check Status - ./bin/emqttd_ctl status - - # Stop emqttd - ./bin/emqttd stop + ./bin/emqx_ctl status + + # Stop emqx + ./bin/emqx stop To view the dashboard after running, use your browser to open: http://localhost:18083 @@ -59,11 +59,11 @@ You can reach the EMQ community and developers via the following channels: -[#emqx-users](https://emqx.slack.com/messages/CBUF2TTB8/) -[#emqx-devs](https://emqx.slack.com/messages/CBSL57DUH/) - [Mailing Lists]() -- [Twitter](https://twitter.com/emqtt) +- [Twitter](https://twitter.com/emqtt) - [Forum](https://groups.google.com/d/forum/emqtt) - [Blog](https://medium.com/@emqtt) -Please submit any bugs, issues, and feature requests to [emqtt/emqttd](//github.com/emqtt/emqttd/issues). +Please submit any bugs, issues, and feature requests to [emqtt/emqttd](//github.com/emqtt/emqttd/issues). ## License From 748826bdeef970f7a12e954ae95ec8aa700c6d9e Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 Aug 2018 01:16:54 +0800 Subject: [PATCH 21/37] update access sutie and access control --- src/emqx_access_control.erl | 2 +- test/emqx_access_SUITE.erl | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 46ed9ed53..bc4969e54 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -154,7 +154,7 @@ init([]) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), reply(case lists:keyfind(Mod, 1, Mods) of - true -> + {_, _, _} -> {error, already_existed}; false -> case catch Mod:init(Opts) of diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index 3a7aca390..e08cec08e 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -98,7 +98,8 @@ end_per_group(_Group, Config) -> Config. init_per_testcase(_TestCase, Config) -> - {ok, _Pid} = ?AC:start_link(), + %% {ok, _Pid} = + ?AC:start_link(), Config. end_per_testcase(_TestCase, _Config) -> ok. From 25391e8c71206ba7f4a7634cd9433f9c2b32ca66 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 01:22:03 +0800 Subject: [PATCH 22/37] Rename 'Subscription-Identifiers-Available' to 'Subscription-Identifier-Available' --- src/emqx_protocol.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9affac57e..d30eeece8 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -466,7 +466,7 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Maximum-Packet-Size' => MaxPktSize, 'Topic-Alias-Maximum' => MaxAlias, 'Wildcard-Subscription-Available' => Wildcard, - 'Subscription-Identifiers-Available' => 1, + 'Subscription-Identifier-Available' => 1, 'Shared-Subscription-Available' => flag(Shared)}, Props1 = if IsAssigned -> Props#{'Assigned-Client-Identifier' => ClientId}; From 9406bc1fd13cb927ebbe47d5ad04388f6173184a Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 31 Aug 2018 01:39:56 +0800 Subject: [PATCH 23/37] fix typo --- src/emqx_protocol.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index d30eeece8..0bfcdb6dd 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -465,7 +465,7 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Retain-Available' => flag(Retain), 'Maximum-Packet-Size' => MaxPktSize, 'Topic-Alias-Maximum' => MaxAlias, - 'Wildcard-Subscription-Available' => Wildcard, + 'Wildcard-Subscription-Available' => flag(Wildcard), 'Subscription-Identifier-Available' => 1, 'Shared-Subscription-Available' => flag(Shared)}, Props1 = if IsAssigned -> @@ -745,4 +745,3 @@ sp(false) -> 0. flag(false) -> 0; flag(true) -> 1. - From 23e72feab7cf08661b2900370b97965a78e2cb7d Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 31 Aug 2018 01:54:25 +0800 Subject: [PATCH 24/37] fix reason codes --- src/emqx_protocol.erl | 15 ++++++++------- src/emqx_reason_codes.erl | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 0bfcdb6dd..c36346673 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -407,13 +407,14 @@ connack({?RC_SUCCESS, SP, PState}) -> deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> - emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), - _ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> - ReasonCode; - true -> - emqx_reason_codes:compat(connack, ReasonCode) - end}, PState), - {error, emqx_reason_codes:name(ReasonCode, ProtoVer), PState}. + emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]), + ReasonCode1 = if ProtoVer =:= ?MQTT_PROTO_V5 -> + ReasonCode; + true -> + emqx_reason_codes:compat(connack, ReasonCode) + end, + _ = deliver({connack, ReasonCode1}, PState), + {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}. %%------------------------------------------------------------------------------ %% Publish Message -> Broker diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 0cc52acbb..fdc1377ec 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -27,7 +27,8 @@ name(1, _Ver) -> unacceptable_protocol_version; name(2, _Ver) -> client_identifier_not_valid; name(3, _Ver) -> server_unavaliable; name(4, _Ver) -> malformed_username_or_password; -name(5, _Ver) -> unauthorized_client. +name(5, _Ver) -> unauthorized_client; +name(I, _Ver) -> list_to_atom("unkown_connack" ++ integer_to_list(I)). name(16#00) -> success; name(16#01) -> granted_qos1; @@ -139,4 +140,3 @@ compat(connack, 16#9F) -> ?CONNACK_SERVER; compat(suback, Code) when Code =< ?QOS2 -> Code; compat(suback, Code) when Code > 16#80 -> 16#80. - From 237e65a4e0e70df01f03367afa89f4f8c5139feb Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 10:22:16 +0800 Subject: [PATCH 25/37] Use emqx_mqueue:init/1 to create a mqueue --- src/emqx_local_bridge.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl index 9ed8fdbac..66cdf4010 100644 --- a/src/emqx_local_bridge.erl +++ b/src/emqx_local_bridge.erl @@ -63,8 +63,9 @@ init([Pool, Id, Node, Topic, Options]) -> Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), - %%TODO: queue.... - MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}]), + MQueue = emqx_mqueue:init(#{type => simple, + max_len => State#state.max_queue_len, + store_qos0 => true}), {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; false -> {stop, {cannot_connect_node, Node}} From 3045ec10ab7af0e37f361b9df890284e4bae9636 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 14:04:26 +0800 Subject: [PATCH 26/37] Add banned feature --- etc/emqx.conf | 5 ++++ priv/emqx.schema | 6 +++++ src/emqx_access_control.erl | 5 ++-- src/emqx_banned.erl | 47 ++++++++++++++++--------------------- src/emqx_cm_sup.erl | 20 ++++++++++------ src/emqx_protocol.erl | 16 ++++++++++++- 6 files changed, 61 insertions(+), 38 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index e8435a3b3..deb702211 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -529,6 +529,11 @@ zone.external.idle_timeout = 15s ## Default: 10 messages per second, and 100 messages burst. ## zone.external.publish_limit = 10,100 +## Enable ban check. +## +## Value: Flag +zone.external.enable_ban = on + ## Enable ACL check. ## ## Value: Flag diff --git a/priv/emqx.schema b/priv/emqx.schema index 1e1892b33..e9f4932c4 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -676,6 +676,12 @@ end}. {datatype, {enum, [allow, deny]}} ]}. +%% @doc Enable Ban. +{mapping, "zone.$name.enable_ban", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + %% @doc Enable ACL check. {mapping, "zone.$name.enable_acl", "emqx.zones", [ {default, off}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index bc4969e54..8301bd8d8 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -153,9 +153,8 @@ init([]) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), - reply(case lists:keyfind(Mod, 1, Mods) of - {_, _, _} -> - {error, already_existed}; + reply(case lists:keymember(Mod, 1, Mods) of + true -> {error, already_existed}; false -> case catch Mod:init(Opts) of {ok, ModState} -> diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 4f8d44f44..444f07dad 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -24,27 +24,23 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -%% API -export([start_link/0]). -export([check/1]). -export([add/1, del/1]). -%% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). --record(state, {expiry_timer}). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ - {type, ordered_set}, + {type, set}, {disc_copies, [node()]}, {record_name, banned}, {attributes, record_info(fields, banned)}]); @@ -52,11 +48,7 @@ mnesia(boot) -> mnesia(copy) -> ok = ekka_mnesia:copy_table(?TAB). -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Start the banned server +%% @doc Start the banned server. -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -67,9 +59,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) - orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {ipaddr, IPAddr}). -add(Record) when is_record(Record, banned) -> - mnesia:dirty_write(?TAB, Record). +-spec(add(#banned{}) -> ok). +add(Banned) when is_record(Banned, banned) -> + mnesia:dirty_write(?TAB, Banned). +-spec(del({client_id, emqx_types:client_id()} | + {username, emqx_types:username()} | + {peername, emqx_types:peername()}) -> ok). del(Key) -> mnesia:dirty_delete(?TAB, Key). @@ -78,27 +74,26 @@ del(Key) -> %%-------------------------------------------------------------------- init([]) -> - emqx_time:seed(), - {ok, ensure_expiry_timer(#state{})}. + {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. handle_call(Req, _From, State) -> - emqx_logger:error("[BANNED] Unexpected request: ~p", [Req]), - {reply, ignore, State}. + emqx_logger:error("[BANNED] unexpected call: ~p", [Req]), + {reply, ignored, State}. handle_cast(Msg, State) -> - emqx_logger:error("[BANNED] Unexpected msg: ~p", [Msg]), + emqx_logger:error("[BANNED] unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({timeout, Ref, expire}, State = #state{expiry_timer = Ref}) -> +handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> - emqx_logger:error("[BANNED] Unexpected info: ~p", [Info]), + emqx_logger:error("[BANNED] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{expiry_timer = Timer}) -> - emqx_misc:cancel_timer(Timer). +terminate(_Reason, #{expiry_timer := TRef}) -> + emqx_misc:cancel_timer(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -108,9 +103,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- ensure_expiry_timer(State) -> - Interval = emqx_config:get_env(banned_expiry_interval, timer:minutes(5)), - State#state{expiry_timer = emqx_misc:start_timer( - Interval + rand:uniform(Interval), expire)}. + State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. expire_banned_items(Now) -> expire_banned_item(mnesia:first(?TAB), Now). @@ -119,11 +112,11 @@ expire_banned_item('$end_of_table', _Now) -> ok; expire_banned_item(Key, Now) -> case mnesia:read(?TAB, Key) of - [#banned{until = undefined}] -> ok; + [#banned{until = undefined}] -> + ok; [B = #banned{until = Until}] when Until < Now -> mnesia:delete_object(?TAB, B, sticky_write); - [_] -> ok; - [] -> ok + _ -> ok end, expire_banned_item(mnesia:next(?TAB, Key), Now). diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index 231822ba5..000e79336 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -25,11 +25,17 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, {{one_for_all, 10, 3600}, - [#{id => manager, - start => {emqx_cm, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_cm]}]}}. + Banned = #{id => banned, + start => {emqx_banned, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_banned]}, + Manager = #{id => manager, + start => {emqx_cm, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_cm]}, + {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c36346673..01fbce313 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -56,6 +56,7 @@ mountpoint, is_super, is_bridge, + enable_ban, enable_acl, recv_stats, send_stats, @@ -97,6 +98,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) packet_size = emqx_zone:get_env(Zone, max_packet_size), mountpoint = emqx_zone:get_env(Zone, mountpoint), is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), enable_acl = emqx_zone:get_env(Zone, enable_acl), recv_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0}, @@ -581,7 +583,8 @@ set_property(Name, Value, Props) -> check_connect(Packet, PState) -> run_check_steps([fun check_proto_ver/2, - fun check_client_id/2], Packet, PState). + fun check_client_id/2, + fun check_banned/2], Packet, PState). check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> @@ -612,6 +615,17 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} end. +check_banned(_Connect, #pstate{enable_ban = false}) -> + ok; +check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username}, + #pstate{peername = Peername}) -> + case emqx_banned:check(#{client_id => ClientId, + username => Username, + peername => Peername}) of + true -> {error, ?RC_BANNED}; + false -> ok + end. + check_publish(Packet, PState) -> run_check_steps([fun check_pub_caps/2, fun check_pub_acl/2], Packet, PState). From e6fd7faa4b9f8c824735bac85d8cd4de84833776 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 Aug 2018 16:24:10 +0800 Subject: [PATCH 27/37] add format_variable for disconnect packet --- src/emqx_packet.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 7e4f27821..715526964 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -195,6 +195,10 @@ format_variable(#mqtt_packet_connect{ end, io_lib:format(Format1, Args1); +format_variable(#mqtt_packet_disconnect + {reason_code = ReasonCode}) -> + io_lib:format("ReasonCode=~p", [ReasonCode]); + format_variable(#mqtt_packet_connack{ack_flags = AckFlags, reason_code = ReasonCode}) -> io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]); From ea1ae708335ab5347ec885a409cc674196c6749f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 16:46:51 +0800 Subject: [PATCH 28/37] Fix errors found by dialyzer --- src/emqx.erl | 64 ++++++++---------- src/emqx_acl_internal.erl | 4 +- src/emqx_alarm_mgr.erl | 31 ++++----- src/emqx_bridge.erl | 5 +- src/emqx_broker.erl | 4 +- src/emqx_client.erl | 2 +- src/emqx_connection.erl | 31 +++++---- src/emqx_local_bridge.erl | 14 ++-- src/emqx_metrics.erl | 10 ++- src/emqx_mod_rewrite.erl | 4 +- src/emqx_protocol.erl | 7 +- src/emqx_session.erl | 5 +- src/emqx_sm.erl | 47 ++++++------- src/emqx_sm_registry.erl | 34 +++++----- src/emqx_stats.erl | 3 +- src/emqx_ws_connection.erl | 131 +++++++++++++++++++++---------------- 16 files changed, 200 insertions(+), 196 deletions(-) diff --git a/src/emqx.erl b/src/emqx.erl index 8e1f10168..217611171 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -66,41 +66,36 @@ is_running(Node) -> %% PubSub API %%-------------------------------------------------------------------- --spec(subscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). +-spec(subscribe(emqx_topic:topic() | string()) -> ok). subscribe(Topic) -> emqx_broker:subscribe(iolist_to_binary(Topic)). --spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) - -> ok | {error, term()}). -subscribe(Topic, Sub) when is_list(Sub)-> - emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub)); -subscribe(Topic, Subscriber) when is_tuple(Subscriber) -> - {SubPid, SubId} = Subscriber, - emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId). +-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok). +subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)-> + emqx_broker:subscribe(iolist_to_binary(Topic), SubId); +subscribe(Topic, SubPid) when is_pid(SubPid) -> + emqx_broker:subscribe(iolist_to_binary(Topic), SubPid). --spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string(), - emqx_topic:subopts()) -> ok | {error, term()}). -subscribe(Topic, Sub, Options) when is_list(Sub)-> - emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub), Options); -subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)-> - {SubPid, SubId} = Subscriber, - emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options). +-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid(), + emqx_types:subopts()) -> ok). +subscribe(Topic, SubId, Options) when is_atom(SubId); is_binary(SubId)-> + emqx_broker:subscribe(iolist_to_binary(Topic), SubId, Options); +subscribe(Topic, SubPid, Options) when is_pid(SubPid)-> + emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, Options). -spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}). publish(Msg) -> emqx_broker:publish(Msg). --spec(unsubscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). +-spec(unsubscribe(emqx_topic:topic() | string()) -> ok). unsubscribe(Topic) -> emqx_broker:unsubscribe(iolist_to_binary(Topic)). --spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) - -> ok | {error, term()}). -unsubscribe(Topic, Sub) when is_list(Sub) -> - emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub)); -unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> - {SubPid, SubId} = Subscriber, - emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId). +-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok). +unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> + emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId); +unsubscribe(Topic, SubPid) when is_pid(SubPid) -> + emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid). %%-------------------------------------------------------------------- %% PubSub management API @@ -109,12 +104,12 @@ unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> -spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber()) -> emqx_types:subopts()). get_subopts(Topic, Subscriber) -> - emqx_broker:get_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber)). + emqx_broker:get_subopts(iolist_to_binary(Topic), Subscriber). -spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(), - emqx_types:subopts()) -> ok). -set_subopts(Topic, Subscriber, Options) when is_list(Options) -> - emqx_broker:set_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options). + emqx_types:subopts()) -> boolean()). +set_subopts(Topic, Subscriber, Options) when is_map(Options) -> + emqx_broker:set_subopts(iolist_to_binary(Topic), Subscriber, Options). -spec(topics() -> list(emqx_topic:topic())). topics() -> emqx_router:topics(). @@ -127,16 +122,11 @@ subscribers(Topic) -> subscriptions(Subscriber) -> emqx_broker:subscriptions(Subscriber). --spec(subscribed(emqx_topic:topic() | string(), emqx_types:subscriber()) -> boolean()). -subscribed(Topic, Subscriber) -> - emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)). - -list_to_subid(SubId) when is_binary(SubId) -> - SubId; -list_to_subid(SubId) when is_list(SubId) -> - iolist_to_binary(SubId); -list_to_subid(SubPid) when is_pid(SubPid) -> - SubPid. +-spec(subscribed(emqx_topic:topic() | string(), pid() | emqx_types:subid()) -> boolean()). +subscribed(Topic, SubPid) when is_pid(SubPid) -> + emqx_broker:subscribed(iolist_to_binary(Topic), SubPid); +subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> + emqx_broker:subscribed(iolist_to_binary(Topic), SubId). %%-------------------------------------------------------------------- %% Hooks API diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 0f25e6808..eee7e6c18 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -25,6 +25,8 @@ -define(ACL_RULE_TAB, emqx_acl_rule). +-type(state() :: #{acl_file := string()}). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -95,7 +97,7 @@ match(Credentials, Topic, [Rule|Rules]) -> {matched, AllowDeny} end. --spec(reload_acl(#{}) -> ok | {error, term()}). +-spec(reload_acl(state()) -> ok | {error, term()}). reload_acl(#{acl_file := AclFile}) -> case catch load_rules_from_file(AclFile) of true -> diff --git a/src/emqx_alarm_mgr.erl b/src/emqx_alarm_mgr.erl index bb734c8e6..fd2a42aa7 100644 --- a/src/emqx_alarm_mgr.erl +++ b/src/emqx_alarm_mgr.erl @@ -28,10 +28,11 @@ -define(ALARM_MGR, ?MODULE). --record(state, {alarms}). - start_link() -> - start_with(fun(Pid) -> gen_event:add_handler(Pid, ?MODULE, []) end). + start_with( + fun(Pid) -> + gen_event:add_handler(Pid, ?MODULE, []) + end). start_with(Fun) -> case gen_event:start_link({local, ?ALARM_MGR}) of @@ -73,42 +74,42 @@ delete_alarm_handler(Module) when is_atom(Module) -> %% Default Alarm handler %%------------------------------------------------------------------------------ -init(_) -> {ok, #state{alarms = []}}. +init(_) -> {ok, #{alarms => []}}. handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)-> handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State); -handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) -> +handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alarms}) -> case encode_alarm(Alarm) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json)); {error, Reason} -> emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason]) end, - {ok, State#state{alarms = [Alarm|Alarms]}}; + {ok, State#{alarms := [Alarm|Alarms]}}; -handle_event({clear_alarm, AlarmId}, State = #state{alarms = Alarms}) -> - case emqx_json:safe_encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of +handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) -> + case emqx_json:safe_encode([{id, AlarmId}, {ts, os:system_time(second)}]) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json)); {error, Reason} -> emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason]) end, - {ok, State#state{alarms = lists:keydelete(AlarmId, 2, Alarms)}, hibernate}; + {ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate}; handle_event(Event, State)-> - error_logger:error("[AlarmMgr] unexpected event: ~p", [Event]), + emqx_logger:error("[AlarmMgr] unexpected event: ~p", [Event]), {ok, State}. handle_info(Info, State) -> - error_logger:error("[AlarmMgr] unexpected info: ~p", [Info]), + emqx_logger:error("[AlarmMgr] unexpected info: ~p", [Info]), {ok, State}. -handle_call(get_alarms, State = #state{alarms = Alarms}) -> +handle_call(get_alarms, State = #{alarms := Alarms}) -> {ok, Alarms, State}; handle_call(Req, State) -> - error_logger:error("[AlarmMgr] unexpected call: ~p", [Req]), + emqx_logger:error("[AlarmMgr] unexpected call: ~p", [Req]), {ok, ignored, State}. terminate(swap, State) -> @@ -132,8 +133,8 @@ encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title, alarm_msg(Type, AlarmId, Json) -> Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json), - emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, - emqx_message:set_flags(#{sys => true}, Msg)). + emqx_message:set_headers( #{'Content-Type' => <<"application/json">>}, + emqx_message:set_flag(sys, Msg)). topic(alert, AlarmId) -> emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index d1763d31c..0f64f331d 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -133,7 +133,8 @@ handle_info(start, State = #state{options = Options, Subs = get_value(subscriptions, Options, []), [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), - [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})], + [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, + emqx_topic:validate({filter, i2b(Topic)})], {noreply, State#state{client_pid = ClientPid}}; {error,_} -> erlang:send_after(ReconnectTime, self(), start), @@ -251,4 +252,4 @@ store(disk, Data, Queue, _MaxPendingMsg)-> delete(memory, PkgId, Queue) -> lists:keydelete(PkgId, 1, Queue); delete(disk, PkgId, Queue) -> - lists:keydelete(PkgId, 1, Queue). \ No newline at end of file + lists:keydelete(PkgId, 1, Queue). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 623290961..b2f1bb119 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -260,9 +260,9 @@ subscription(Topic, Subscriber) -> -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) == 1; + length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1; subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) == 1; + length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1; subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 5c50519bc..192569ca4 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -186,7 +186,7 @@ with_owner(Options) -> connect(Client) -> gen_statem:call(Client, connect, infinity). --spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]}) +-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]} | [{topic(), qos()}]) -> subscribe_ret()). subscribe(Client, Topic) when is_binary(Topic) -> subscribe(Client, {Topic, ?QOS_0}); diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index bcaea297d..adda71450 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -202,20 +202,19 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} -> {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))}; {error, Reason} -> - shutdown(Reason, State); - {error, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}) + shutdown(Reason, State) end; -handle_info(emit_stats, State = #state{proto_state = ProtoState}) -> +handle_info({timeout, Timer, emit_stats}, + State = #state{stats_timer = Timer, proto_state = ProtoState}) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info(timeout, State) -> shutdown(idle_timeout, State); -handle_info({shutdown, Error}, State) -> - shutdown(Error, State); +handle_info({shutdown, Reason}, State) -> + shutdown(Reason, State); handle_info({shutdown, discard, {ClientId, ByPid}}, State) -> ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), @@ -311,13 +310,13 @@ handle_packet(Data, State = #state{proto_state = ProtoState, {ok, ProtoState1} -> NewState = State#state{proto_state = ProtoState1}, handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState))); - {error, Error} -> - ?LOG(error, "Protocol error - ~p", [Error], State), - shutdown(Error, State); - {error, Error, ProtoState1} -> - shutdown(Error, State#state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#state{proto_state = ProtoState1}) + {error, Reason} -> + ?LOG(error, "Process packet error - ~p", [Reason], State), + shutdown(Reason, State); + {error, Reason, ProtoState1} -> + shutdown(Reason, State#state{proto_state = ProtoState1}); + {stop, Error, ProtoState1} -> + stop(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?LOG(error, "Framing error - ~p", [Error], State), @@ -371,9 +370,9 @@ run_socket(State = #state{transport = Transport, socket = Socket}) -> %%------------------------------------------------------------------------------ ensure_stats_timer(State = #state{enable_stats = true, - stats_timer = undefined, - idle_timeout = IdleTimeout}) -> - State#state{stats_timer = erlang:send_after(IdleTimeout, self(), emit_stats)}; + stats_timer = undefined, + idle_timeout = IdleTimeout}) -> + State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> State. shutdown(Reason, State) -> diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl index 66cdf4010..228a64cff 100644 --- a/src/emqx_local_bridge.erl +++ b/src/emqx_local_bridge.erl @@ -60,8 +60,8 @@ init([Pool, Id, Node, Topic, Options]) -> case net_kernel:connect_node(Node) of true -> true = erlang:monitor_node(Node, true), - Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]), + Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), + emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), MQueue = emqx_mqueue:init(#{type => simple, max_len => State#state.max_queue_len, @@ -86,11 +86,6 @@ parse_opts([{ping_down_interval, Interval} | Opts], State) -> parse_opts([_Opt | Opts], State) -> parse_opts(Opts, State). -qname(Node, Topic) when is_atom(Node) -> - qname(atom_to_list(Node), Topic); -qname(Node, Topic) -> - iolist_to_binary(["Bridge:", Node, ":", Topic]). - handle_call(Req, _From, State) -> emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -104,7 +99,7 @@ handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}}; handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> - ok = emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), + emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), {noreply, State}; handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> @@ -157,7 +152,6 @@ dequeue(State = #state{mqueue = MQ}) -> dequeue(State#state{mqueue = MQ1}) end. -transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, - topic_suffix = Suffix}) -> +transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) -> Msg#message{topic = <>}. diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 0a9ad67aa..e319a425b 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -26,8 +26,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). - %% Bytes sent and received of Broker -define(BYTES_METRICS, [ {counter, 'bytes/received'}, % Total bytes received @@ -85,8 +83,8 @@ -define(TAB, ?MODULE). -define(SERVER, ?MODULE). -%% @doc Start the metrics server --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +%% @doc Start the metrics server. +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -252,7 +250,7 @@ init([]) -> % Create metrics table _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), - {ok, #state{}, hibernate}. + {ok, #{}, hibernate}. handle_call(Req, _From, State) -> emqx_logger:error("[Metrics] unexpected call: ~p", [Req]), @@ -266,7 +264,7 @@ handle_info(Info, State) -> emqx_logger:error("[Metrics] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{}) -> +terminate(_Reason, #{}) -> ok. code_change(_OldVsn, State, _Extra) -> diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 2a92793eb..a9ff334ce 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -23,8 +23,8 @@ load(Rules0) -> Rules = compile(Rules0), - emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]), - emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]), + emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]), + emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]), emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). rewrite_subscribe(_Credentials, TopicTable, Rules) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 01fbce313..018166d20 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -188,14 +188,14 @@ session(#pstate{session = SPid}) -> SPid. parser(#pstate{packet_size = Size, proto_ver = Ver}) -> - emqx_frame:initial_state(#{packet_size => Size, version => Ver}). + emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}). %%------------------------------------------------------------------------------ %% Packet Received %%------------------------------------------------------------------------------ --spec(received(emqx_mqtt_types:packet(), state()) - -> {ok, state()} | {error, term()} | {error, term(), state()}). +-spec(received(emqx_mqtt_types:packet(), state()) -> + {ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}). received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> {error, proto_not_connected, PState}; @@ -451,6 +451,7 @@ puback(?QOS_2, PacketId, {ok, _}, PState) -> %% Deliver Packet -> Client %%------------------------------------------------------------------------------ +-spec(deliver(term(), state()) -> {ok, state()} | {error, term()}). deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d5b68a1f6..81299711d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -148,6 +148,9 @@ }). -type(spid() :: pid()). +-type(attr() :: {atom(), term()}). + +-export_type([attr/0]). -define(TIMEOUT, 60000). @@ -564,7 +567,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> - true = emqx_sm:set_session_stats(ClientId, stats(State)), + _ = emqx_sm:set_session_stats(ClientId, stats(State)), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 0b188f986..36d416f3b 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -49,22 +49,20 @@ start_link() -> %% @doc Open a session. -spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}). -open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> +open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> CleanStart = fun(_) -> ok = discard_session(ClientId, ConnPid), - emqx_session_sup:start_session(Attrs) + emqx_session_sup:start_session(SessAttrs) end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> +open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> ResumeStart = fun(_) -> case resume_session(ClientId, ConnPid) of {ok, SPid} -> {ok, SPid, true}; {error, not_found} -> - emqx_session_sup:start_session(Attrs); - {error, Reason} -> - {error, Reason} + emqx_session_sup:start_session(SessAttrs) end end, emqx_sm_locker:trans(ClientId, ResumeStart). @@ -113,31 +111,31 @@ close_session(SPid) when is_pid(SPid) -> %% @doc Register a session with attributes. -spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}, - list(emqx_session:attribute())) -> ok). -register_session(ClientId, Attrs) when is_binary(ClientId) -> - register_session({ClientId, self()}, Attrs); + list(emqx_session:attr())) -> ok). +register_session(ClientId, SessAttrs) when is_binary(ClientId) -> + register_session({ClientId, self()}, SessAttrs); -register_session(Session = {ClientId, SPid}, Attrs) +register_session(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) -> ets:insert(?SESSION_TAB, Session), - ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}), - case proplists:get_value(clean_start, Attrs, true) of - true -> ok; - false -> ets:insert(?SESSION_P_TAB, Session) - end, + ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}), + proplists:get_value(clean_start, SessAttrs, true) + andalso ets:insert(?SESSION_P_TAB, Session), emqx_sm_registry:register_session(Session), notify({registered, ClientId, SPid}). %% @doc Get session attrs --spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attribute())). +-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())). get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> safe_lookup_element(?SESSION_ATTRS_TAB, Session, []). %% @doc Set session attrs -set_session_attrs(ClientId, Attrs) when is_binary(ClientId) -> - set_session_attrs({ClientId, self()}, Attrs); -set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) -> - ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}). +-spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()}, + list(emqx_session:attr())) -> true). +set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) -> + set_session_attrs({ClientId, self()}, SessAttrs); +set_session_attrs(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) -> + ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}). %% @doc Unregister a session -spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). @@ -154,18 +152,15 @@ unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid( %% @doc Get session stats -spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). -get_session_stats(Session = {ClientId, SPid}) - when is_binary(ClientId), is_pid(SPid) -> +get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> safe_lookup_element(?SESSION_STATS_TAB, Session, []). %% @doc Set session stats -spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()}, - emqx_stats:stats()) -> ok). + emqx_stats:stats()) -> true). set_session_stats(ClientId, Stats) when is_binary(ClientId) -> set_session_stats({ClientId, self()}, Stats); - -set_session_stats(Session = {ClientId, SPid}, Stats) - when is_binary(ClientId), is_pid(SPid) -> +set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) -> ets:insert(?SESSION_STATS_TAB, {Session, Stats}). %% @doc Lookup a session from registry diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index 74690b4b1..659b3a92b 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -20,7 +20,9 @@ -export([start_link/0]). -export([is_enabled/0]). + -export([register_session/1, lookup_session/1, unregister_session/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -30,12 +32,11 @@ -define(LOCK, {?MODULE, cleanup_sessions}). -record(global_session, {sid, pid}). --record(state, {}). -type(session_pid() :: pid()). -%% @doc Start the session manager. --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +%% @doc Start the global session manager. +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). @@ -46,19 +47,18 @@ is_enabled() -> -spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), session_pid()})). lookup_session(ClientId) -> - [{ClientId, SessionPid} || #global_session{pid = SessionPid} - <- mnesia:dirty_read(?TAB, ClientId)]. + [{ClientId, SessPid} || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)]. -spec(register_session({emqx_types:client_id(), session_pid()}) -> ok). -register_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> - mnesia:dirty_write(?TAB, record(ClientId, SessionPid)). +register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) -> + mnesia:dirty_write(?TAB, record(ClientId, SessPid)). -spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok). -unregister_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> - mnesia:dirty_delete_object(?TAB, record(ClientId, SessionPid)). +unregister_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) -> + mnesia:dirty_delete_object(?TAB, record(ClientId, SessPid)). -record(ClientId, SessionPid) -> - #global_session{sid = ClientId, pid = SessionPid}. +record(ClientId, SessPid) -> + #global_session{sid = ClientId, pid = SessPid}. %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -72,7 +72,7 @@ init([]) -> {attributes, record_info(fields, global_session)}]), ok = ekka_mnesia:copy_table(?TAB), ekka:monitor(membership), - {ok, #state{}}. + {ok, #{}}. handle_call(Req, _From, State) -> emqx_logger:error("[Registry] unexpected call: ~p", [Req]), @@ -107,9 +107,9 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ cleanup_sessions(Node) -> - Pat = [{#global_session{pid = '$1', _ = '_'}, - [{'==', {node, '$1'}, Node}], ['$_']}], - lists:foreach(fun(Session) -> - mnesia:delete_object(?TAB, Session) - end, mnesia:select(?TAB, Pat)). + Pat = [{#global_session{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], + lists:foreach(fun delete_session/1, mnesia:select(?TAB, Pat, write)). + +delete_session(Session) -> + mnesia:delete_object(?TAB, Session, write). diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 664e04680..510b2d91a 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -31,9 +31,10 @@ code_change/3]). -record(update, {name, countdown, interval, func}). --record(state, {timer, updates :: #update{}}). +-record(state, {timer, updates :: [#update{}]}). -type(stats() :: list({atom(), non_neg_integer()})). + -export_type([stats/0]). %% Connection stats diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 74014707a..fa08fa1bb 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -40,38 +40,61 @@ keepalive, enable_stats, stats_timer, - shutdown_reason + shutdown }). --define(INFO_KEYS, [peername, sockname]). - -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("WSMQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). + emqx_logger:Level("MQTT/WS(~s): " ++ Format, + [esockd_net:format(State#state.peername) | Args])). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ %% for debug -info(WSPid) -> - call(WSPid, info). +info(WSPid) when is_pid(WSPid) -> + call(WSPid, info); + +info(#state{peername = Peername, + sockname = Sockname, + proto_state = ProtoState}) -> + ProtoInfo = emqx_protocol:info(ProtoState), + ConnInfo = [{socktype, websocket}, + {conn_state, running}, + {peername, Peername}, + {sockname, Sockname}], + lists:append([ConnInfo, ProtoInfo]). %% for dashboard -attrs(CPid) when is_pid(CPid) -> - call(CPid, attrs). +attrs(WSPid) when is_pid(WSPid) -> + call(WSPid, attrs); -stats(WSPid) -> - call(WSPid, stats). +attrs(#state{peername = Peername, + sockname = Sockname, + proto_state = ProtoState}) -> + SockAttrs = [{peername, Peername}, + {sockname, Sockname}], + ProtoAttrs = emqx_protocol:attrs(ProtoState), + lists:usort(lists:append(SockAttrs, ProtoAttrs)). -kick(WSPid) -> +stats(WSPid) when is_pid(WSPid) -> + call(WSPid, stats); + +stats(#state{proto_state = ProtoState}) -> + lists:append([wsock_stats(), + emqx_misc:proc_stats(), + emqx_protocol:stats(ProtoState) + ]). + +kick(WSPid) when is_pid(WSPid) -> call(WSPid, kick). -session(WSPid) -> +session(WSPid) when is_pid(WSPid) -> call(WSPid, session). -call(WSPid, Req) -> +call(WSPid, Req) when is_pid(WSPid) -> Mref = erlang:monitor(process, WSPid), WSPid ! {call, {self(), Mref}, Req}, receive @@ -153,41 +176,30 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); {error, Error} -> ?WSLOG(error, "Protocol error - ~p", [Error], State), - {stop, State}; - {error, Error, ProtoState1} -> - shutdown(Error, State#state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}) + stop(Error, State); + {error, Reason, ProtoState1} -> + shutdown(Reason, State#state{proto_state = ProtoState1}); + {stop, Error, ProtoState1} -> + stop(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?WSLOG(error, "Frame error: ~p", [Error], State), - {stop, State}; + stop(Error, State); {'EXIT', Reason} -> ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State), - {stop, State} + shutdown(parse_error, State) end. -websocket_info({call, From, info}, State = #state{peername = Peername, - sockname = Sockname, - proto_state = ProtoState}) -> - ProtoInfo = emqx_protocol:info(ProtoState), - ConnInfo = [{socktype, websocket}, {conn_state, running}, - {peername, Peername}, {sockname, Sockname}], - gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])), +websocket_info({call, From, info}, State) -> + gen_server:reply(From, info(State)), {ok, State}; -websocket_info({call, From, attrs}, State = #state{peername = Peername, - sockname = Sockname, - proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], - ProtoAttrs = emqx_protocol:attrs(ProtoState), - gen_server:reply(From, lists:usort(lists:append(SockAttrs, ProtoAttrs))), +websocket_info({call, From, attrs}, State) -> + gen_server:reply(From, attrs(State)), {ok, State}; -websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) -> - Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]), - gen_server:reply(From, Stats), +websocket_info({call, From, stats}, State) -> + gen_server:reply(From, stats(State)), {ok, State}; websocket_info({call, From, kick}, State) -> @@ -203,15 +215,12 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} -> {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})}; {error, Reason} -> - shutdown(Reason, State); - {error, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}) + shutdown(Reason, State) end; -websocket_info(emit_stats, State = #state{proto_state = ProtoState}) -> - Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), - emqx_protocol:stats(ProtoState)]), - emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats), +websocket_info({timeout, Timer, emit_stats}, + State = #state{stats_timer = Timer, proto_state = ProtoState}) -> + emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) -> @@ -254,30 +263,40 @@ websocket_info(Info, State) -> ?WSLOG(error, "unexpected info: ~p", [Info], State), {ok, State}. -terminate(SockError, _Req, #state{keepalive = Keepalive, - proto_state = ProtoState, - shutdown_reason = Reason}) -> +terminate(SockError, _Req, State = #state{keepalive = Keepalive, + proto_state = ProtoState, + shutdown = Shutdown}) -> + ?WSLOG(debug, "Terminated for ~p, sockerror: ~p", + [Shutdown, SockError], State), emqx_keepalive:cancel(Keepalive), - io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]), - case Reason of - undefined -> - ok; - _ -> - emqx_protocol:shutdown(Reason, ProtoState) + case {ProtoState, Shutdown} of + {undefined, _} -> ok; + {_, {shutdown, Reason}} -> + emqx_protocol:shutdown(Reason, ProtoState); + {_, Error} -> + emqx_protocol:shutdown(Error, ProtoState) end. +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + reset_parser(State = #state{proto_state = ProtoState}) -> State#state{parser_state = emqx_protocol:parser(ProtoState)}. ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, - idle_timeout = Timeout}) -> - State#state{stats_timer = erlang:send_after(Timeout, self(), emit_stats)}; + idle_timeout = IdleTimeout}) -> + State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> State. shutdown(Reason, State) -> - {stop, State#state{shutdown_reason = Reason}}. + {stop, State#state{shutdown = Reason}}. + +stop(Error, State) -> + {stop, State#state{shutdown = Error}}. wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. + From 7c45d988f2d72e4477040e2e4751033abbd5ebfc Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 16:57:43 +0800 Subject: [PATCH 29/37] Update the spec of deliver/2 function --- src/emqx_protocol.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 018166d20..c703d8a24 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -451,7 +451,7 @@ puback(?QOS_2, PacketId, {ok, _}, PState) -> %% Deliver Packet -> Client %%------------------------------------------------------------------------------ --spec(deliver(term(), state()) -> {ok, state()} | {error, term()}). +-spec(deliver(tuple(), state()) -> {ok, state()} | {error, term()}). deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); From 7d90f603b5234ff98134dcc1a516f378c1a4e6e7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 17:10:40 +0800 Subject: [PATCH 30/37] Update README.md --- README.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 9ca4f447d..2f9e31cd8 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ *EMQ X* broker is a fully open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. -Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket, STOMP and SockJS. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. +Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. - For full list of new features, please read *EMQ X* broker 3.0 [release notes](https://github.com/emqtt/emqttd/releases/). @@ -67,7 +67,8 @@ Please submit any bugs, issues, and feature requests to [emqtt/emqttd](//github. ## License -Copyright (c) 2014-2018 [EMQ X Tech, LLC](http://emqtt.io) + +Copyright (c) 2018 [EMQ Technologies Co., Ltd](http://emqtt.io). All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at @@ -76,6 +77,3 @@ Licensed under the Apache License, Version 2.0 (the "License");you may not use t Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - - - From 880c6ab5feb0963d3eae9775f6274d59273f6789 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 17:27:36 +0800 Subject: [PATCH 31/37] Fix typo --- src/emqx_protocol.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c703d8a24..fb7c003d7 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -102,7 +102,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) enable_acl = emqx_zone:get_env(Zone, enable_acl), recv_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0}, - connected = fasle}. + connected = false}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of From 3f42f1271b4bf4b1e62f26530a6de4cf2f1b8f38 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 31 Aug 2018 18:14:10 +0800 Subject: [PATCH 32/37] bug fix --- src/emqx_session.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 81299711d..75c3ac197 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -720,7 +720,7 @@ run_dispatch_steps([], Msg, State) -> dispatch(Msg, State); run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) -> State; -run_dispatch_steps([{nl, 0}|Steps], Msg, State) -> +run_dispatch_steps([{nl, _}|Steps], Msg, State) -> run_dispatch_steps(Steps, Msg, State); run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) -> run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); @@ -897,4 +897,3 @@ shutdown(Reason, State) -> %% TODO: GC Policy and Shutdown Policy %% maybe_gc(State) -> State. - From 14ba395c21d94d84e9251590cc440b3acbea381d Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 Aug 2018 19:15:17 +0800 Subject: [PATCH 33/37] fix seesion and connectiong test suites --- test/emqx_connection_SUITE.erl | 23 ++++++++++++++++++++--- test/emqx_listeners_SUITE.erl | 9 ++++----- test/emqx_session_SUITE.erl | 8 +++++++- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index ae69cdd5d..716e771b5 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -19,12 +19,29 @@ -include_lib("common_test/include/ct.hrl"). -all() -> [t_attrs]. +all() -> + [{group, connection}]. + +groups() -> + [{connection, [sequence], [t_attrs]}]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + t_attrs(_) -> - emqx_ct_broker_helpers:run_setup_steps(), {ok, C, _} = emqx_client:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}, {username, <<"plain">>}, {password, <<"plain">>}]), [{<<"simpleClient">>, ConnPid}] = emqx_cm:lookup_connection(<<"simpleClient">>), Attrs = emqx_connection:attrs(ConnPid), <<"simpleClient">> = proplists:get_value(client_id, Attrs), - <<"plain">> = proplists:get_value(username, Attrs). \ No newline at end of file + <<"plain">> = proplists:get_value(username, Attrs), + emqx_client:disconnect(C). + +%% t_stats() -> +%% {ok, C, _ } = emqx_client; +%% t_stats() -> + diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index d55bba400..6086e98c2 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -28,15 +28,14 @@ all() -> [start_stop_listeners, restart_listeners]. -init_per_suite() -> +init_per_suite(Config) -> NewConfig = generate_config(), application:ensure_all_started(esockd), lists:foreach(fun set_app_env/1, NewConfig), - ok. + Config. -end_per_suite() -> - application:stop(esockd), - ok. +end_per_suite(_Config) -> + application:stop(esockd). start_stop_listeners(_) -> ok = emqx_listeners:start(), diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index f2da10b74..526f8b729 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -21,8 +21,14 @@ all() -> [t_session_all]. -t_session_all(_) -> +init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +t_session_all(_) -> ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), From 2f63b7a487cb83e841c5c66c0ddb890bf6999d3a Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 31 Aug 2018 20:21:28 +0800 Subject: [PATCH 34/37] update broker suite for latest code --- test/emqx_broker_SUITE.erl | 24 ++++++++++++------------ test/emqx_session_SUITE.erl | 1 + 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 93f795d1d..e23330f7b 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -62,12 +62,12 @@ end_per_suite(_Config) -> %%-------------------------------------------------------------------- subscribe_unsubscribe(_) -> - ok = emqx:subscribe(<<"topic">>, "clientId"), - ok = emqx:subscribe(<<"topic/1">>, "clientId", #{ qos => 1 }), - ok = emqx:subscribe(<<"topic/2">>, "clientId", #{ qos => 2 }), - ok = emqx:unsubscribe(<<"topic">>, "clientId"), - ok = emqx:unsubscribe(<<"topic/1">>, "clientId"), - ok = emqx:unsubscribe(<<"topic/2">>, "clientId"). + ok = emqx:subscribe(<<"topic">>, <<"clientId">>), + ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }), + ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }), + ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), + ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), + ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). publish(_) -> Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), @@ -79,9 +79,9 @@ publish(_) -> pubsub(_) -> Self = self(), Subscriber = {Self, <<"clientId">>}, - ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }), + ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }), #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), - ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }), + ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }), #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), timer:sleep(10), @@ -100,8 +100,8 @@ pubsub(_) -> t_local_subscribe(_) -> ok = emqx:subscribe(<<"$local/topic0">>), - ok = emqx:subscribe(<<"$local/topic1">>, "clientId"), - ok = emqx:subscribe(<<"$local/topic2">>, "clientId", #{ qos => 2 }), + ok = emqx:subscribe(<<"$local/topic1">>, <<"clientId">>), + ok = emqx:subscribe(<<"$local/topic2">>, <<"clientId">>, #{ qos => 2 }), timer:sleep(10), ?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")), ?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")), @@ -110,8 +110,8 @@ t_local_subscribe(_) -> emqx:subscriptions({self(), <<"clientId">>})), ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), ?assertEqual(ok, emqx:unsubscribe("$local/topic0")), - ?assertEqual(ok, emqx:unsubscribe("$local/topic1", "clientId")), - ?assertEqual(ok, emqx:unsubscribe("$local/topic2", "clientId")), + ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"clientId">>)), + ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"clientId">>)), ?assertEqual([], emqx:subscribers("topic1")), ?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 526f8b729..29a6edc61 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -1,3 +1,4 @@ + %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); From ec456dcc7315a440b66714e28e74fcff1b495d07 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 31 Aug 2018 20:48:26 +0800 Subject: [PATCH 35/37] Fix dialyze issue --- src/emqx_base62.erl | 3 +-- src/emqx_bridge.erl | 4 ++-- src/emqx_bridge_sup.erl | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/emqx_base62.erl b/src/emqx_base62.erl index 690115ec2..1a9db245a 100644 --- a/src/emqx_base62.erl +++ b/src/emqx_base62.erl @@ -22,7 +22,7 @@ %% @doc Encode any data to base62 binary -spec encode(string() | integer() - | binary()) -> float(). + | binary()) -> binary(). encode(I) when is_integer(I) -> encode(integer_to_binary(I)); encode(S) when is_list(S)-> @@ -110,4 +110,3 @@ decode_char(I) when I >= $A andalso I =< $Z-> decode_char(9, I) -> I + 61 - $A. - diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 0f64f331d..a7ce2581d 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -117,7 +117,7 @@ handle_info(start, State = #state{options = Options, {noreply, State#state{client_pid = ClientPid}}; {error,_} -> erlang:send_after(ReconnectTime, self(), start), - {noreply, State = #state{reconnect_count = ReconnectCount-1}} + {noreply, State#state{reconnect_count = ReconnectCount-1}} end; %%---------------------------------------------------------------- @@ -138,7 +138,7 @@ handle_info(start, State = #state{options = Options, {noreply, State#state{client_pid = ClientPid}}; {error,_} -> erlang:send_after(ReconnectTime, self(), start), - {noreply, State = #state{reconnect_count = ReconnectCount-1}} + {noreply, State#state{reconnect_count = ReconnectCount-1}} end; %%---------------------------------------------------------------- diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index 0d8a0d887..bc8c0a532 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -27,7 +27,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc List all bridges --spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). +-spec(bridges() -> [{node(), Status :: binary()}]). bridges() -> [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. From abc6081282c9b0021e84fa6c15a5ea34adf632fe Mon Sep 17 00:00:00 2001 From: chenyy Date: Sat, 1 Sep 2018 11:54:28 +0800 Subject: [PATCH 36/37] fix error spelling word --- src/emqx_protocol.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index fb7c003d7..ec104799e 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -121,13 +121,13 @@ set_username(_Username, PState) -> %%------------------------------------------------------------------------------ info(PState = #pstate{conn_props = ConnProps, - ack_props = AclProps, + ack_props = AckProps, session = Session, topic_aliases = Aliases, will_msg = WillMsg, enable_acl = EnableAcl}) -> attrs(PState) ++ [{conn_props, ConnProps}, - {ack_props, AclProps}, + {ack_props, AckProps}, {session, Session}, {topic_aliases, Aliases}, {will_msg, WillMsg}, From 7f12db018079f228d602aee928f1dc9a4fe1566b Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Mon, 3 Sep 2018 14:03:20 +0800 Subject: [PATCH 37/37] add editorconfig for emqx --- .editorconfig | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..c563aa10d --- /dev/null +++ b/.editorconfig @@ -0,0 +1,27 @@ +# EditorConfig is awesome: https://EditorConfig.org + +# top-most EditorConfig file +root = true + +# Unix-style newlines with a newline ending every file +[*] +charset = utf-8 +end_of_line = lf +trim_trailing_whitespace = true +insert_final_newline = true + + +# Matches multiple files with brace expansion notation +# Set default charset +[*.{erl, src, hrl}] +indent_style = space +indent_size = 4 + +# Tab indentation (no size specified) +[Makefile] +indent_style = tab + +# Matches the exact files either package.json or .travis.yml +[{.travis.yml}] +indent_style = space +indent_size = 2