From edf654727c47409508db4c9bb96fa06713603b25 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 18:09:22 +0800 Subject: [PATCH 1/8] Rename emqx_mqtt_properties module to emqx_mqtt_props --- src/emqx_client.erl | 6 +++--- src/{emqx_mqtt_properties.erl => emqx_mqtt_props.erl} | 2 +- ...tt_properties_SUITE.erl => emqx_mqtt_props_SUITE.erl} | 9 +++++---- 3 files changed, 9 insertions(+), 8 deletions(-) rename src/{emqx_mqtt_properties.erl => emqx_mqtt_props.erl} (99%) rename test/{emqx_mqtt_properties_SUITE.erl => emqx_mqtt_props_SUITE.erl} (73%) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 192569ca4..85d6ca59d 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -159,7 +159,7 @@ start_link() -> start_link([]). start_link(Options) when is_map(Options) -> start_link(maps:to_list(Options)); start_link(Options) when is_list(Options) -> - ok = emqx_mqtt_properties:validate( + ok = emqx_mqtt_props:validate( proplists:get_value(properties, Options, #{})), case start_client(with_owner(Options)) of {ok, Client} -> @@ -265,7 +265,7 @@ publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) -> -> ok | {ok, packet_id()} | {error, term()}). publish(Client, Topic, Properties, Payload, Opts) when is_binary(Topic), is_map(Properties), is_list(Opts) -> - ok = emqx_mqtt_properties:validate(Properties), + ok = emqx_mqtt_props:validate(Properties), Retain = proplists:get_bool(retain, Opts), QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), publish(Client, #mqtt_msg{qos = QoS, @@ -541,7 +541,7 @@ mqtt_connect(State = #state{client_id = ClientId, will_msg = WillMsg, properties = Properties}) -> ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, - ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties), + ConnProps = emqx_mqtt_props:filter(?CONNECT, Properties), send(?CONNECT_PACKET( #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, diff --git a/src/emqx_mqtt_properties.erl b/src/emqx_mqtt_props.erl similarity index 99% rename from src/emqx_mqtt_properties.erl rename to src/emqx_mqtt_props.erl index 643156013..33acb360b 100644 --- a/src/emqx_mqtt_properties.erl +++ b/src/emqx_mqtt_props.erl @@ -13,7 +13,7 @@ %% limitations under the License. %% @doc MQTT5 Properties --module(emqx_mqtt_properties). +-module(emqx_mqtt_props). -include("emqx_mqtt.hrl"). diff --git a/test/emqx_mqtt_properties_SUITE.erl b/test/emqx_mqtt_props_SUITE.erl similarity index 73% rename from test/emqx_mqtt_properties_SUITE.erl rename to test/emqx_mqtt_props_SUITE.erl index a8301d1f4..8d3b16a14 100644 --- a/test/emqx_mqtt_properties_SUITE.erl +++ b/test/emqx_mqtt_props_SUITE.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_mqtt_properties_SUITE). +-module(emqx_mqtt_props_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -22,6 +22,7 @@ all() -> [t_mqtt_properties_all]. t_mqtt_properties_all(_) -> - Props = emqx_mqtt_properties:filter(?CONNECT, #{'Session-Expiry-Interval' => 1, 'Maximum-Packet-Size' => 255}), - ok = emqx_mqtt_properties:validate(Props), - #{} = emqx_mqtt_properties:filter(?CONNECT, #{'Maximum-QoS' => ?QOS_2}). + Props = emqx_mqtt_props:filter(?CONNECT, #{'Session-Expiry-Interval' => 1, 'Maximum-Packet-Size' => 255}), + ok = emqx_mqtt_props:validate(Props), + #{} = emqx_mqtt_props:filter(?CONNECT, #{'Maximum-QoS' => ?QOS_2}). + From 7c688a483949a217b0587cb0b3780f8393c5e6df Mon Sep 17 00:00:00 2001 From: HuangDan Date: Thu, 6 Sep 2018 18:09:58 +0800 Subject: [PATCH 2/8] Add test case for mqtt5 connect packet --- test/emqx_SUITE.erl | 63 +++++------------ test/emqx_mqtt_packet_SUITE.erl | 117 ++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 47 deletions(-) create mode 100644 test/emqx_mqtt_packet_SUITE.erl diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index a56af1a8a..a08305a30 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -17,8 +17,6 @@ -compile(export_all). -compile(nowarn_export_all). --include("emqx_mqtt.hrl"). - -define(APP, emqx). -include_lib("eunit/include/eunit.hrl"). @@ -52,9 +50,7 @@ -define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)). all() -> - [{group, connect}%, - % {group, cleanSession} - ]. + [{group, connect}]. groups() -> [{connect, [non_parallel_tests], @@ -64,11 +60,7 @@ groups() -> mqtt_connect_with_ssl_oneway, mqtt_connect_with_ssl_twoway, mqtt_connect_with_ws - ]}, - {cleanSession, [sequence], - [cleanSession_validate] - } - ]. + ]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -109,15 +101,17 @@ mqtt_connect_with_ssl_oneway(_) -> emqx_ct_broker_helpers:change_opts(ssl_oneway), emqx:start(), ClientSsl = emqx_ct_broker_helpers:client_ssl(), - {ok, #ssl_socket{tcp = Sock, ssl = SslSock}} + {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), -%% Packet = raw_send_serialise(?CLIENT), -%% ssl:send(SslSock, Packet), -%% receive Data -> -%% ct:log("Data:~p~n", [Data]) -%% after 30000 -> -%% ok -%% end, + Packet = raw_send_serialise(?CLIENT), + emqx_client_sock:setopts(Sock, [{active, once}]), + emqx_client_sock:send(Sock, Packet), + ?assert( + receive {ssl, _, ConAck}-> + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(ConAck), true + after 1000 -> + false + end), ssl:close(SslSock). mqtt_connect_with_ssl_twoway(_Config) -> @@ -131,11 +125,12 @@ mqtt_connect_with_ssl_twoway(_Config) -> emqx_client_sock:setopts(Sock, [{active, once}]), emqx_client_sock:send(Sock, Packet), timer:sleep(500), + ?assert( receive {ssl, _, Data}-> - {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data) + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), true after 1000 -> - ok - end, + false + end), emqx_client_sock:close(Sock). mqtt_connect_with_ws(_Config) -> @@ -162,32 +157,6 @@ mqtt_connect_with_ws(_Config) -> {close, _} = rfc6455_client:close(WS), ok. -cleanSession_validate(_) -> - {ok, C1} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(10), - emqttc:subscribe(C1, <<"topic">>, qos0), - emqttc:disconnect(C1), - {ok, Pub} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"pub">>}]), - - emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 0}]), - timer:sleep(10), - {ok, C11} = emqttc:start_link([{host, "localhost"}, - {port, 1883}, - {client_id, <<"c1">>}, - {clean_sess, false}]), - timer:sleep(100), - receive {publish, _Topic, M1} -> - ?assertEqual(<<"m1">>, M1) - after 1000 -> false - end, - emqttc:disconnect(Pub), - emqttc:disconnect(C11). - raw_send_serialise(Packet) -> emqx_frame:serialize(Packet). diff --git a/test/emqx_mqtt_packet_SUITE.erl b/test/emqx_mqtt_packet_SUITE.erl new file mode 100644 index 000000000..8bc41cb37 --- /dev/null +++ b/test/emqx_mqtt_packet_SUITE.erl @@ -0,0 +1,117 @@ +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== + +-module(emqx_mqtt_packet_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(emqx_frame, [serialize/1]). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-define(INVALID_RESERVED, 1). + +-define(CONNECT_INVALID_PACKET(Var), + #mqtt_packet{header = #mqtt_packet_header{type = ?INVALID_RESERVED}, + variable = Var}). + +-define(CASE1_PROTOCOL_NAME, ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_name = <<"MQTC">>, + client_id = <<"mqtt_protocol_name">>, + username = <<"admin">>, + password = <<"public">>})). + +-define(CASE2_PROTOCAL_VER, ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + proto_ver = 6, + username = <<"admin">>, + password = <<"public">>})). + +-define(CASE3_PROTOCAL_INVALID_RESERVED, ?CONNECT_INVALID_PACKET(#mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + proto_ver = 5, + username = <<"admin">>, + password = <<"public">>})). + +-define(PROTOCOL5, ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_ver = 5, + keepalive = 60, + properties = #{'Message-Expiry-Interval' => 3600}, + client_id = <<"mqtt_client">>, + will_topic = <<"will_tipic">>, + will_payload = <<"will message">>, + username = <<"admin">>, + password = <<"public">>})). + + + +all() -> [{group, connect}]. + +groups() -> [{connect, [sequence], + [case1_protocol_name, + case2_protocol_ver%, + %TOTO case3_invalid_reserved + ]}]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +case1_protocol_name(_) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + MqttPacket = serialize(?CASE1_PROTOCOL_NAME), + emqx_client_sock:send(Sock, MqttPacket), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + Disconnect = gen_tcp:recv(Sock, 0), + ?assertEqual({error, closed}, Disconnect). + +case2_protocol_ver(_) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = serialize(?CASE2_PROTOCAL_VER), + emqx_client_sock:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + %% case1 Unacceptable protocol version + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + Disconnect = gen_tcp:recv(Sock, 0), + ?assertEqual({error, closed}, Disconnect). + +case3_invalid_reserved(_) -> + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = serialize(?CASE3_PROTOCAL_INVALID_RESERVED), + emqx_client_sock:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + %% case1 Unacceptable protocol version + ct:log("Data:~p~n", [raw_recv_pase(Data)]), + {ok, ?CONNACK_PACKET(?CONNACK_PROTO_VER), _} = raw_recv_pase(Data), + Disconnect = gen_tcp:recv(Sock, 0), + ?assertEqual({error, closed}, Disconnect). + +raw_recv_pase(P) -> + emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4} }). From 5774ba542c67b7be0ebd086858ff3122ecf987a0 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 18:10:01 +0800 Subject: [PATCH 3/8] Rename the emqx_mqtt_properties SUITE to emqx_mqtt_props --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7202915cc..e12a516b1 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ EUNIT_OPTS = verbose CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ - emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ + emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone \ emqx_mountpoint emqx_listeners emqx_protocol From 328d035dab7240d5a305a5024e4a4cfd2495c9e2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Sep 2018 18:43:47 +0800 Subject: [PATCH 4/8] Replace 'state' record with map --- src/emqx_pool.erl | 18 ++++++++---------- src/emqx_pool_sup.erl | 8 ++++++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 8d927ddd9..276352797 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -23,16 +23,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id}). - -define(POOL, ?MODULE). %% @doc Start pooler supervisor. start_link() -> emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}). -%% @doc Start pool --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). +%% @doc Start pool. +-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). @@ -49,13 +47,13 @@ async_submit(Fun) -> worker() -> gproc_pool:pick_worker(pool). -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id}}. + {ok, #{pool => Pool, id => Id}}. handle_call({submit, Fun}, _From, State) -> {reply, catch run(Fun), State}; @@ -79,15 +77,15 @@ handle_info(Info, State) -> emqx_logger:error("[Pool] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> +terminate(_Reason, #{pool := Pool, id := Id}) -> true = gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ run({M, F, A}) -> erlang:apply(M, F, A); diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index b71c15f1e..b371549c0 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -26,8 +26,12 @@ spec(Args) -> -spec(spec(any(), list()) -> supervisor:child_spec()). spec(ChildId, Args) -> - {ChildId, {?MODULE, start_link, Args}, - transient, infinity, supervisor, [?MODULE]}. + #{id => ChildId, + start => {?MODULE, start_link, Args}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [?MODULE]}. -spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, MFA) -> From d9ad29476a60873c7cb74e153b12f55fd5c99aaa Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 7 Sep 2018 10:23:43 +0800 Subject: [PATCH 5/8] Code Review: Update the zone module 1. Add force_reload/1 API 2. Change the default reload interval to 5 minutes --- src/emqx_zone.erl | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 209f0323c..8344ba150 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -26,10 +26,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {timer}). - -define(TAB, ?MODULE). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -58,7 +57,7 @@ set_env(Zone, Key, Val) -> init([]) -> _ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), - {ok, element(2, handle_info(reload, #state{}))}. + {ok, element(2, handle_info(reload, #{timer => undefined}))}. handle_call(Req, _From, State) -> emqx_logger:error("[Zone] unexpected call: ~p", [Req]), @@ -72,11 +71,9 @@ handle_cast(Msg, State) -> emqx_logger:error("[Zone] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(reload, State) -> - lists:foreach( - fun({Zone, Opts}) -> - [ets:insert(?TAB, {{Zone, Key}, Val}) || {Key, Val} <- Opts] - end, emqx_config:get_env(zones, [])), +handle_info({timeout, TRef, reload}, State = #{timer := TRef}) -> + [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) + || {Zone, Opts} <- emqx_config:get_env(zones, [])], {noreply, ensure_reload_timer(State), hibernate}; handle_info(Info, State) -> @@ -94,5 +91,5 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ ensure_reload_timer(State) -> - State#state{timer = erlang:send_after(10000, self(), reload)}. + State#{timer := emqx_misc:start_timer(timer:minutes(5), reload)}. From 304a24ca6a0a8f0d2c6b6e035d69e1d5e2c6e7f3 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 7 Sep 2018 10:26:14 +0800 Subject: [PATCH 6/8] Code Review: Update the zone module 1. Add force_reload/0 management API 2. Change the reload interval to 5 minutes --- src/emqx_zone.erl | 37 +++++++++++++++++++++++++++++-------- test/emqx_zone_SUITE.erl | 14 ++++++++++---- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 8344ba150..dd183dbdf 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -21,16 +21,20 @@ -export([start_link/0]). -export([get_env/2, get_env/3]). -export([set_env/3]). +-export([force_reload/0]). +%% for test +-export([stop/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TAB, ?MODULE). +-define(SERVER, ?MODULE). -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec(get_env(emqx_types:zone() | undefined, atom()) -> undefined | term()). get_env(undefined, Key) -> @@ -49,7 +53,15 @@ get_env(Zone, Key, Def) -> -spec(set_env(emqx_types:zone(), atom(), term()) -> ok). set_env(Zone, Key, Val) -> - gen_server:cast(?MODULE, {set_env, Zone, Key, Val}). + gen_server:cast(?SERVER, {set_env, Zone, Key, Val}). + +-spec(force_reload() -> ok). +force_reload() -> + gen_server:call(?SERVER, force_reload). + +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER, normal, infinity). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -59,6 +71,10 @@ init([]) -> _ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), {ok, element(2, handle_info(reload, #{timer => undefined}))}. +handle_call(force_reload, _From, State) -> + _ = do_reload(), + {reply, ok, State}; + handle_call(Req, _From, State) -> emqx_logger:error("[Zone] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -71,10 +87,9 @@ handle_cast(Msg, State) -> emqx_logger:error("[Zone] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({timeout, TRef, reload}, State = #{timer := TRef}) -> - [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) - || {Zone, Opts} <- emqx_config:get_env(zones, [])], - {noreply, ensure_reload_timer(State), hibernate}; +handle_info(reload, State) -> + _ = do_reload(), + {noreply, ensure_reload_timer(State#{timer := undefined}), hibernate}; handle_info(Info, State) -> emqx_logger:error("[Zone] unexpected info: ~p", [Info]), @@ -90,6 +105,12 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -ensure_reload_timer(State) -> - State#{timer := emqx_misc:start_timer(timer:minutes(5), reload)}. +do_reload() -> + [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) + || {Zone, Opts} <- emqx_config:get_env(zones, [])]. + +ensure_reload_timer(State = #{timer := undefined}) -> + State#{timer := erlang:send_after(timer:minutes(5), self(), reload)}; +ensure_reload_timer(State) -> + State. diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 282acc3e5..83c2ceaab 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -18,15 +18,21 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). all() -> [t_set_get_env]. t_set_get_env(_) -> - emqx_zone:start_link(), - ok = emqx_zone:set_env(china, language, chinese), - timer:sleep(100), % make sure set_env/3 is okay + application:set_env(emqx, zones, [{china, [{language, chinese}]}]), + {ok, _} = emqx_zone:start_link(), + ct:print("~p~n", [ets:tab2list(emqx_zone)]), chinese = emqx_zone:get_env(china, language), cn470 = emqx_zone:get_env(china, ism_band, cn470), undefined = emqx_zone:get_env(undefined, delay), - 500 = emqx_zone:get_env(undefined, delay, 500). + 500 = emqx_zone:get_env(undefined, delay, 500), + application:set_env(emqx, zones, [{zone1, [{key, val}]}]), + ?assertEqual(undefined, emqx_zone:get_env(zone1, key)), + emqx_zone:force_reload(), + ?assertEqual(val, emqx_zone:get_env(zone1, key)), + emqx_zone:stop(). From dd8513ad35bd24a02f37eac2c2103322b52eb25b Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 7 Sep 2018 14:10:16 +0800 Subject: [PATCH 7/8] Update for banned API Use `mnesia:foldl` to traverse mnesia rather than `mnesia:first` and `mnesia:next`, as a badarg exception would occur if the record was deleted while travering the whole table. --- include/emqx.hrl | 15 +++++++++------ src/emqx_banned.erl | 21 ++++++--------------- test/emqx_banned_SUITE.erl | 10 +++++----- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/include/emqx.hrl b/include/emqx.hrl index 34e41b0a1..bca6fe519 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -148,13 +148,16 @@ %%-------------------------------------------------------------------- %% Banned %%-------------------------------------------------------------------- +-type(banned_who() :: {client_id, binary()} + | {username, binary()} + | {ip_address, inet:ip_address()}). -record(banned, { - key, - reason, - by, - desc, - until}). + who :: banned_who(), + reason :: binary(), + by :: binary(), + desc :: binary(), + until :: integer() + }). -endif. - diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 444f07dad..8f1c3156f 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -85,7 +85,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]), + mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]), {noreply, ensure_expiry_timer(State), hibernate}; handle_info(Info, State) -> @@ -106,17 +106,8 @@ ensure_expiry_timer(State) -> State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}. expire_banned_items(Now) -> - expire_banned_item(mnesia:first(?TAB), Now). - -expire_banned_item('$end_of_table', _Now) -> - ok; -expire_banned_item(Key, Now) -> - case mnesia:read(?TAB, Key) of - [#banned{until = undefined}] -> - ok; - [B = #banned{until = Until}] when Until < Now -> - mnesia:delete_object(?TAB, B, sticky_write); - _ -> ok - end, - expire_banned_item(mnesia:next(?TAB, Key), Now). - + mnesia:foldl(fun + (B = #banned{until = Until}, _Acc) when Until < Now -> + mnesia:delete_object(?TAB, B, sticky_write); + (_, _Acc) -> ok + end, ok, ?TAB). diff --git a/test/emqx_banned_SUITE.erl b/test/emqx_banned_SUITE.erl index 9fae880d4..c8eab87b6 100644 --- a/test/emqx_banned_SUITE.erl +++ b/test/emqx_banned_SUITE.erl @@ -28,14 +28,14 @@ all() -> [t_banned_all]. t_banned_all(_) -> emqx_ct_broker_helpers:run_setup_steps(), emqx_banned:start_link(), - {MegaSecs, Secs, MicroSecs} = erlang:timestamp(), - ok = emqx_banned:add(#banned{key = {client_id, <<"TestClient">>}, + TimeNow = erlang:system_time(second), + ok = emqx_banned:add(#banned{who = {client_id, <<"TestClient">>}, reason = <<"test">>, by = <<"banned suite">>, - desc = <<"test">>, - until = {MegaSecs, Secs + 10, MicroSecs}}), + desc = <<"test">>, + until = TimeNow + 10}), % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed timer:sleep(100), ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})), emqx_banned:del({client_id, <<"TestClient">>}), - ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})). \ No newline at end of file + ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})). From 1c5615c957573f84e1813c5dc1ae0862f1cc1ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=AD=90=E5=8D=9A?= <349832309@qq.com> Date: Fri, 7 Sep 2018 17:22:24 +0800 Subject: [PATCH 8/8] Stop emqx_zone when emqx_mqtt_caps test over --- test/emqx_mqtt_caps_SUITE.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 8b840b91c..85f6fae1d 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -57,7 +57,8 @@ t_get_set_caps(_) -> mqtt_shared_subscription => true, mqtt_wildcard_subscription => true }, - SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe). + SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe), + emqx_zone:stop(). t_check_pub(_) -> {ok, _} = emqx_zone:start_link(), @@ -81,7 +82,8 @@ t_check_pub(_) -> qos => ?QOS_1, retain => false }, - ok = emqx_mqtt_caps:check_pub(zone, PubProps). + ok = emqx_mqtt_caps:check_pub(zone, PubProps), + emqx_zone:stop(). t_check_sub(_) -> {ok, _} = emqx_zone:start_link(), @@ -104,7 +106,8 @@ t_check_sub(_) -> [{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]), ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false}, [{<<"vlient/+/dsofi">>, Opts}], - [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]). + [{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]), + emqx_zone:stop().