From bffdd2ba7441e6462919ec70244474d7f9188e9f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 22 May 2018 13:01:19 +0800 Subject: [PATCH] Upgrade esockd and add more test cases --- include/emqx_mqtt.hrl | 33 +-- src/emqx_access_control.erl | 2 +- src/emqx_broker.erl | 37 ++-- src/emqx_broker_sup.erl | 21 +- src/emqx_client.erl | 145 +++++++------ src/emqx_config.erl | 8 +- src/emqx_connection.erl | 93 ++++---- src/emqx_ctl.erl | 15 +- src/emqx_frame.erl | 10 +- src/emqx_inflight.erl | 56 ++--- src/emqx_protocol.erl | 37 ++-- src/emqx_router.erl | 2 - src/emqx_router_helper.erl | 5 +- src/emqx_session.erl | 70 +++--- src/emqx_sys_sup.erl | 2 +- src/emqx_ws_connection.erl | 64 +++--- test/emqx_SUITE.erl | 1 + test/emqx_SUITE_data/loaded_plugins | 0 test/emqx_access_SUITE.erl | 1 + test/emqx_base62_SUITE.erl | 1 + test/emqx_broker_SUITE.erl | 8 +- test/emqx_client_SUITE.erl | 41 ++++ test/emqx_ct_broker_helpers.erl | 39 ++-- test/emqx_ct_helpers.erl | 24 +++ test/emqx_frame_SUITE.erl | 9 +- test/emqx_guid_SUITE.erl | 1 + test/emqx_inflight_SUITE.erl | 75 +++---- test/emqx_lib_SUITE.erl | 1 + test/emqx_misc_SUITE.erl | 1 + test/emqx_mod_SUITE.erl | 3 +- test/emqx_mqtt_compat_SUITE.erl | 220 +++++++++++++++++++ test/emqx_mqueue_SUITE.erl | 2 + test/emqx_net_SUITE.erl | 1 + test/emqx_pqueue_SUITE.erl | 2 + test/emqx_protocol_SUITE.erl | 320 ---------------------------- test/emqx_protocol_SUITE.erl.bk | 147 +++++++++++++ test/emqx_router_SUITE.erl | 168 +++++---------- test/emqx_time_SUITE.erl | 1 + test/emqx_topic_SUITE.erl | 1 + test/emqx_trie_SUITE.erl | 43 ++-- test/emqx_vm_SUITE.erl | 1 + 41 files changed, 892 insertions(+), 819 deletions(-) create mode 100644 test/emqx_SUITE_data/loaded_plugins create mode 100644 test/emqx_client_SUITE.erl create mode 100644 test/emqx_ct_helpers.erl create mode 100644 test/emqx_mqtt_compat_SUITE.erl delete mode 100644 test/emqx_protocol_SUITE.erl create mode 100644 test/emqx_protocol_SUITE.erl.bk diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 793c9d501..4dae631f4 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -243,7 +243,7 @@ is_bridge = false :: boolean(), clean_start = true :: boolean(), will_flag = false :: boolean(), - will_qos = ?QOS_1 :: mqtt_qos(), + will_qos = ?QOS_0 :: mqtt_qos(), will_retain = false :: boolean(), keepalive = 0 :: non_neg_integer(), properties = undefined :: mqtt_properties(), @@ -339,7 +339,8 @@ -define(CONNACK_PACKET(ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{reason_code = ReasonCode}}). + variable = #mqtt_packet_connack{ack_flags = 0, + reason_code = ReasonCode}}). -define(CONNACK_PACKET(ReasonCode, SessPresent), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, @@ -348,9 +349,9 @@ -define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{ack_flags = SessPresent, + variable = #mqtt_packet_connack{ack_flags = SessPresent, reason_code = ReasonCode, - properties = Properties}}). + properties = Properties}}). -define(AUTH_PACKET(), #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, @@ -370,15 +371,16 @@ qos = Qos}, variable = #mqtt_packet_publish{packet_id = PacketId}}). --define(PUBLISH_PACKET(Qos, Topic, PacketId, Payload), +-define(PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = Qos}, + qos = QoS}, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId}, payload = Payload}). --define(PUBLISH_PACKET(Header, Topic, PacketId, Properties, Payload), - #mqtt_packet{header = Header = #mqtt_packet_header{type = ?PUBLISH}, +-define(PUBLISH_PACKET(QoS, Topic, PacketId, Properties, Payload), + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = QoS}, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId, properties = Properties}, @@ -386,7 +388,8 @@ -define(PUBACK_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, - variable = #mqtt_packet_puback{packet_id = PacketId}}). + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0}}). -define(PUBACK_PACKET(PacketId, ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, @@ -396,7 +399,8 @@ -define(PUBREC_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, - variable = #mqtt_packet_puback{packet_id = PacketId}}). + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0}}). -define(PUBREC_PACKET(PacketId, ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, @@ -406,7 +410,8 @@ -define(PUBREL_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, - variable = #mqtt_packet_puback{packet_id = PacketId}}). + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0}}). -define(PUBREL_PACKET(PacketId, ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, @@ -416,7 +421,8 @@ -define(PUBCOMP_PACKET(PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, - variable = #mqtt_packet_puback{packet_id = PacketId}}). + variable = #mqtt_packet_puback{packet_id = PacketId, + reason_code = 0}}). -define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, @@ -467,7 +473,8 @@ reason_codes = ReasonCodes}}). -define(DISCONNECT_PACKET(), - #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}}). + #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, + variable = #mqtt_packet_disconnect{reason_code = 0}}). -define(DISCONNECT_PACKET(ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 85713b7cc..aa51c0df0 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -114,7 +114,7 @@ tab_key(acl) -> acl_modules. %% @doc Stop access control server. stop() -> - gen_server:call(?MODULE, stop). + gen_server:stop(?MODULE, normal, infinity). %%-------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 57db2defe..aaacb086d 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -22,16 +22,11 @@ -export([start_link/2]). --export([subscribe/1, subscribe/2, subscribe/3, unsubscribe/1, unsubscribe/2]). - +-export([subscribe/1, subscribe/2, subscribe/3, subscribe/4]). -export([publish/1, publish/2]). - +-export([unsubscribe/1, unsubscribe/2]). -export([dispatch/2, dispatch/3]). - -export([subscriptions/1, subscribers/1, subscribed/2]). - --export([topics/0]). - -export([get_subopts/2, set_subopts/3]). %% gen_server Function Exports @@ -41,7 +36,6 @@ -record(state, {pool, id, submon}). -define(BROKER, ?MODULE). - -define(TIMEOUT, 120000). %% ETS tables @@ -104,7 +98,7 @@ unsubscribe(Topic, Subscriber, Timeout) -> -spec(publish(message()) -> delivery() | stopped). publish(Msg = #message{from = From}) -> %% Hook to trace? - trace(public, From, Msg), + trace(publish, From, Msg), case emqx_hooks:run('message.publish', [], Msg) of {ok, Msg1 = #message{topic = Topic}} -> publish(Topic, Msg1); @@ -226,15 +220,20 @@ subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_pid(SubPid) -> ets:member(?SUBOPTION, {Topic, {SubId, SubPid}}). -topics() -> emqx_router:topics(). - +-spec(get_subopts(topic(), subscriber()) -> [suboption()]). get_subopts(Topic, Subscriber) when is_binary(Topic) -> try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) catch error:badarg -> [] end. +-spec(set_subopts(topic(), subscriber(), [suboption()]) -> boolean()). set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) -> - gen_server:call(pick(Subscriber), {set_subopts, Topic, Subscriber, Opts}). + case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of + [{_, OldOpts}] -> + Opts1 = lists:usort(lists:umerge(Opts, OldOpts)), + ets:insert(?SUBOPTION, {{Topic, Subscriber}, Opts1}); + [] -> false + end. with_subpid(SubPid) when is_pid(SubPid) -> SubPid; @@ -267,18 +266,8 @@ init([Pool, Id]) -> gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{pool = Pool, id = Id, submon = emqx_pmon:new()}}. -handle_call({set_subopts, Topic, Subscriber, Opts}, _From, State) -> - case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of - [{_, OldOpts}] -> - Opts1 = lists:usort(lists:umerge(Opts, OldOpts)), - ets:insert(?SUBOPTION, {{Topic, Subscriber}, Opts1}), - {reply, ok, State}; - [] -> - {reply, {error, not_found}, State} - end; - -handle_call(Request, _From, State) -> - emqx_logger:error("[Broker] Unexpected request: ~p", [Request]), +handle_call(Req, _From, State) -> + emqx_logger:error("[Broker] Unexpected request: ~p", [Req]), {reply, ignore, State}. handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) -> diff --git a/src/emqx_broker_sup.erl b/src/emqx_broker_sup.erl index c16b5c63d..034d87711 100644 --- a/src/emqx_broker_sup.erl +++ b/src/emqx_broker_sup.erl @@ -22,11 +22,7 @@ -export([init/1]). --import(lists, [foreach/2]). - --define(TAB_OPTS, [public, - {read_concurrency, true}, - {write_concurrency, true}]). +-define(TAB_OPTS, [public, {read_concurrency, true}, {write_concurrency, true}]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -37,9 +33,9 @@ start_link() -> init([]) -> %% Create the pubsub tables - foreach(fun create_tab/1, [subscription, subscriber, suboption]), + lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]), - %% Shared subscription + %% Shared Subscription SharedSub = {shared_sub, {emqx_shared_sub, start_link, []}, permanent, 5000, worker, [emqx_shared_sub]}, @@ -47,13 +43,12 @@ init([]) -> Helper = {broker_helper, {emqx_broker_helper, start_link, []}, permanent, 5000, worker, [emqx_broker_helper]}, - %% Broker pool - PoolArgs = [broker, hash, emqx_vm:schedulers() * 2, - {emqx_broker, start_link, []}], + %% Broker Pool + BrokerPool = emqx_pool_sup:spec(emqx_broker_pool, + [broker, hash, emqx_vm:schedulers() * 2, + {emqx_broker, start_link, []}]), - PoolSup = emqx_pool_sup:spec(eqmx_broker_pool, PoolArgs), - - {ok, {{one_for_all, 0, 1}, [SharedSub, Helper, PoolSup]}}. + {ok, {{one_for_all, 0, 1}, [SharedSub, Helper, BrokerPool]}}. %%-------------------------------------------------------------------- %% Create tables diff --git a/src/emqx_client.erl b/src/emqx_client.erl index ef640fe43..277d7ecd8 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -32,7 +32,9 @@ -export([pubrel/2, pubrel/3, pubrel/4]). -export([pubcomp/2, pubcomp/3, pubcomp/4]). -export([subscriptions/1]). --export([info/1]). +-export([info/1, stop/1]). +%% For test cases +-export([pause/1, resume/1]). -export([initialized/3, waiting_for_connack/3, connected/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). @@ -60,12 +62,12 @@ | {will_topic, iodata()} | {will_payload, iodata()} | {will_retain, boolean()} - | {will_qos, mqtt_qos() | mqtt_qos_name()} - | {will_props, mqtt_properties()} + | {will_qos, qos()} + | {will_props, properties()} | {auto_ack, boolean()} | {ack_timeout, pos_integer()} | {force_ping, boolean()} - | {properties, mqtt_properties()}). + | {properties, properties()}). -export_type([host/0, option/0]). @@ -80,18 +82,17 @@ bridge_mode :: boolean(), client_id :: binary(), clean_start :: boolean(), - session_present :: boolean(), username :: binary() | undefined, password :: binary() | undefined, proto_ver :: mqtt_version(), proto_name :: iodata(), keepalive :: non_neg_integer(), keepalive_timer :: reference() | undefined, - expiry_interval :: pos_integer(), force_ping :: boolean(), + paused :: boolean(), will_flag :: boolean(), will_msg :: mqtt_message(), - properties :: mqtt_properties(), + properties :: properties(), pending_calls :: list(), subscriptions :: map(), max_inflight :: infinity | pos_integer(), @@ -102,8 +103,9 @@ ack_timer :: reference(), retry_interval :: pos_integer(), retry_timer :: reference(), - last_packet_id :: mqtt_packet_id(), - parse_state :: emqx_parser:state()}). + session_present :: boolean(), + last_packet_id :: packet_id(), + parse_state :: emqx_frame:state()}). -record(call, {id, from, req, ts}). @@ -254,7 +256,6 @@ publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) -> publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) -> publish(Client, Topic, #{}, Payload, Opts). -%% MQTT Version 5.0 -spec(publish(client(), topic(), properties(), payload(), [pubopt()]) -> ok | {ok, packet_id()} | {error, term()}). publish(Client, Topic, Properties, Payload, Opts) @@ -262,9 +263,9 @@ publish(Client, Topic, Properties, Payload, Opts) ok = emqx_mqtt_properties:validate(Properties), Retain = proplists:get_bool(retain, Opts), QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), - publish(Client, #mqtt_message{topic = Topic, - qos = QoS, + publish(Client, #mqtt_message{qos = QoS, retain = Retain, + topic = Topic, properties = Properties, payload = iolist_to_binary(Payload)}). @@ -279,7 +280,6 @@ unsubscribe(Client, Topic) when is_binary(Topic) -> unsubscribe(Client, Topics) when is_list(Topics) -> unsubscribe(Client, #{}, Topics). -%% MQTT Version 5.0 -spec(unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret()). unsubscribe(Client, Properties, Topic) when is_map(Properties), is_binary(Topic) -> unsubscribe(Client, Properties, [Topic]); @@ -303,47 +303,43 @@ disconnect(Client, ReasonCode, Properties) -> gen_statem:call(Client, {disconnect, ReasonCode, Properties}). %%-------------------------------------------------------------------- -%% For test cases. +%% For test cases %%-------------------------------------------------------------------- puback(Client, PacketId) when is_integer(PacketId) -> puback(Client, PacketId, ?RC_SUCCESS). -puback(Client, PacketId, ReasonCode) when is_integer(PacketId), - is_integer(ReasonCode) -> +puback(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> puback(Client, PacketId, ReasonCode, #{}). -puback(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), - is_integer(ReasonCode), - is_map(Properties) -> +puback(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> gen_statem:cast(Client, {puback, PacketId, ReasonCode, Properties}). pubrec(Client, PacketId) when is_integer(PacketId) -> pubrec(Client, PacketId, ?RC_SUCCESS). -pubrec(Client, PacketId, ReasonCode) when is_integer(PacketId), - is_integer(ReasonCode) -> +pubrec(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> pubrec(Client, PacketId, ReasonCode, #{}). -pubrec(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), - is_integer(ReasonCode), - is_map(Properties) -> +pubrec(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> gen_statem:cast(Client, {pubrec, PacketId, ReasonCode, Properties}). pubrel(Client, PacketId) when is_integer(PacketId) -> pubrel(Client, PacketId, ?RC_SUCCESS). -pubrel(Client, PacketId, ReasonCode) when is_integer(PacketId), - is_integer(ReasonCode) -> +pubrel(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> pubrel(Client, PacketId, ReasonCode, #{}). -pubrel(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), - is_integer(ReasonCode), - is_map(Properties) -> +pubrel(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> gen_statem:cast(Client, {pubrel, PacketId, ReasonCode, Properties}). pubcomp(Client, PacketId) when is_integer(PacketId) -> pubcomp(Client, PacketId, ?RC_SUCCESS). -pubcomp(Client, PacketId, ReasonCode) when is_integer(PacketId), - is_integer(ReasonCode) -> +pubcomp(Client, PacketId, ReasonCode) + when is_integer(PacketId), is_integer(ReasonCode) -> pubcomp(Client, PacketId, ReasonCode, #{}). -pubcomp(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), - is_integer(ReasonCode), - is_map(Properties) -> +pubcomp(Client, PacketId, ReasonCode, Properties) + when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) -> gen_statem:cast(Client, {pubcomp, PacketId, ReasonCode, Properties}). subscriptions(Client) -> @@ -352,6 +348,15 @@ subscriptions(Client) -> info(Client) -> gen_statem:call(Client, info). +stop(Client) -> + gen_statem:call(Client, stop). + +pause(Client) -> + gen_statem:call(Client, pause). + +resume(Client) -> + gen_statem:call(Client, resume). + %%-------------------------------------------------------------------- %% gen_statem callbacks %%-------------------------------------------------------------------- @@ -375,6 +380,7 @@ init([Options]) -> proto_name = <<"MQTT">>, keepalive = ?DEFAULT_KEEPALIVE, force_ping = false, + paused = false, will_flag = false, will_msg = #mqtt_message{}, pending_calls = [], @@ -496,8 +502,8 @@ init_will_msg({qos, QoS}, WillMsg) -> init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) -> Size = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), - State#state{parse_state = emqx_parser:initial_state([{max_len, Size}, - {version, Ver}])}. + State#state{parse_state = emqx_frame:initial_state( + #{max_packet_size => Size, version => Ver})}. callback_mode() -> state_functions. @@ -519,17 +525,17 @@ initialized({call, From}, connect, State = #state{sock_opts = SockOpts, initialized(EventType, EventContent, State) -> handle_event(EventType, EventContent, initialized, State). -mqtt_connect(State = #state{client_id = ClientId, - clean_start = CleanStart, - bridge_mode = IsBridge, - username = Username, - password = Password, - proto_ver = ProtoVer, - proto_name = ProtoName, - keepalive = KeepAlive, - will_flag = WillFlag, - will_msg = WillMsg, - properties = Properties}) -> +mqtt_connect(State = #state{client_id = ClientId, + clean_start = CleanStart, + bridge_mode = IsBridge, + username = Username, + password = Password, + proto_ver = ProtoVer, + proto_name = ProtoName, + keepalive = KeepAlive, + will_flag = WillFlag, + will_msg = WillMsg, + properties = Properties}) -> ?WILL_MSG(WillQos, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, ConnProps = emqx_mqtt_properties:filter(?CONNECT, maps:to_list(Properties)), send(?CONNECT_PACKET( @@ -597,6 +603,15 @@ connected({call, From}, info, State) -> Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))), {keep_state, State, [{reply, From, Info}]}; +connected({call, From}, pause, State) -> + {keep_state, State#state{paused = true}, [{reply, From, ok}]}; + +connected({call, From}, resume, State) -> + {keep_state, State#state{paused = false}, [{reply, From, ok}]}; + +connected({call, From}, stop, _State) -> + {stop_and_reply, normal, [{reply, From, ok}]}; + connected({call, From}, SubReq = {subscribe, Properties, Topics}, State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of @@ -622,14 +637,14 @@ connected({call, From}, {publish, Msg = #mqtt_message{qos = ?QOS_0}}, State) -> connected({call, From}, {publish, Msg = #mqtt_message{qos = Qos}}, State = #state{inflight = Inflight, last_packet_id = PacketId}) when (Qos =:= ?QOS_1); (Qos =:= ?QOS_2) -> - case Inflight:is_full() of + case emqx_inflight:is_full(Inflight) of true -> {keep_state, State, [{reply, From, {error, inflight_full}}]}; false -> Msg1 = Msg#mqtt_message{packet_id = PacketId}, case send(Msg1, State) of {ok, NewState} -> - Inflight1 = Inflight:insert(PacketId, {publish, Msg1, os:timestamp()}), + Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight), {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), [{reply, From, {ok, PacketId}}]}; Error = {error, Reason} -> @@ -679,8 +694,12 @@ connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> {keep_state, deliver_msg(packet_to_msg(Packet), State)}; +connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) -> + {keep_state, State}; + connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State = #state{auto_ack = AutoAck}) -> + _ = deliver_msg(packet_to_msg(Packet), State), case AutoAck of true -> send_puback(?PUBACK_PACKET(PacketId), State); @@ -698,12 +717,12 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), State = #state{owner = Owner, inflight = Inflight}) -> - case Inflight:lookup(PacketId) of + case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, #mqtt_message{packet_id = PacketId}, _Ts}} -> Owner ! {puback, #{packet_id => PacketId, reason_code => ReasonCode, properties => Properties}}, - {keep_state, State#state{inflight = Inflight:delete(PacketId)}}; + {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; none -> emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]), {keep_state, State} @@ -711,9 +730,9 @@ connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -> send_puback(?PUBREL_PACKET(PacketId), - case Inflight:lookup(PacketId) of + case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, _Msg, _Ts}} -> - Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}), + Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight), State#state{inflight = Inflight1}; {value, {pubrel, _Ref, _Ts}} -> emqx_logger:warning("Duplicated PUBREC Packet: ~p", [PacketId]), @@ -741,12 +760,12 @@ connected(cast, ?PUBREL_PACKET(PacketId), connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State = #state{owner = Owner, inflight = Inflight}) -> - case Inflight:lookup(PacketId) of + case emqx_inflight:lookup(PacketId, Inflight) of {value, {pubrel, _PacketId, _Ts}} -> Owner ! {puback, #{packet_id => PacketId, reason_code => ReasonCode, properties => Properties}}, - {keep_state, State#state{inflight = Inflight:delete(PacketId)}}; + {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; none -> emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]), {keep_state, State} @@ -797,8 +816,8 @@ connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) end; connected(info, {timeout, TRef, keepalive}, - State = #state{socket = Sock, keepalive_timer = TRef}) -> - case should_ping(Sock) of + State = #state{socket = Sock, paused = Paused, keepalive_timer = TRef}) -> + case (not Paused) andalso should_ping(Sock) of true -> case send(?PACKET(?PINGREQ), State) of {ok, NewState} -> @@ -820,7 +839,7 @@ connected(info, {timeout, TRef, ack}, State = #state{ack_timer = TRef, connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef, inflight = Inflight}) -> - case Inflight:is_empty() of + case emqx_inflight:is_empty(Inflight) of true -> {keep_state, State#state{retry_timer = undefined}}; false -> retry_send(State) end; @@ -928,7 +947,7 @@ ensure_retry_timer(_Interval, State) -> retry_send(State = #state{inflight = Inflight}) -> SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, - Msgs = lists:sort(SortFun, Inflight:values()), + Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)), retry_send(Msgs, os:timestamp(), State ). retry_send([], _Now, State) -> @@ -948,7 +967,7 @@ retry_send(publish, Msg = #mqtt_message{qos = QoS, packet_id = PacketId}, Msg1 = Msg#mqtt_message{dup = (QoS =:= ?QOS1)}, case send(Msg1, State) of {ok, NewState} -> - Inflight1 = Inflight:update(PacketId, {publish, Msg1, Now}), + Inflight1 = emqx_inflight:update(PacketId, {publish, Msg1, Now}, Inflight), {ok, NewState#state{inflight = Inflight1}}; Error = {error, _Reason} -> Error @@ -956,7 +975,7 @@ retry_send(publish, Msg = #mqtt_message{qos = QoS, packet_id = PacketId}, retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> case send(?PUBREL_PACKET(PacketId), State) of {ok, NewState} -> - Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}), + Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight), {ok, NewState#state{inflight = Inflight1}}; Error = {error, _Reason} -> Error @@ -1028,7 +1047,7 @@ send(Msg, State) when is_record(Msg, mqtt_message) -> send(Packet, State = #state{socket = Sock, proto_ver = Ver}) when is_record(Packet, mqtt_packet) -> - Data = emqx_serializer:serialize(Packet, [{version, Ver}]), + Data = emqx_frame:serialize(Packet, #{version => Ver}), emqx_logger:debug("SEND Data: ~p", [Data]), case emqx_client_sock:send(Sock, Data) of ok -> {ok, next_packet_id(State)}; @@ -1045,7 +1064,7 @@ receive_loop(<<>>, State) -> {keep_state, State}; receive_loop(Bytes, State = #state{parse_state = ParseState}) -> - case catch emqx_parser:parse(Bytes, ParseState) of + case catch emqx_frame:parse(Bytes, ParseState) of {ok, Packet, Rest} -> ok = gen_statem:cast(self(), Packet), receive_loop(Rest, init_parse_state(State)); diff --git a/src/emqx_config.erl b/src/emqx_config.erl index a1ea69394..ef1d8ec9f 100644 --- a/src/emqx_config.erl +++ b/src/emqx_config.erl @@ -17,7 +17,7 @@ %% @doc Hot Configuration %% %% TODO: How to persist the configuration? -%% +%% %% 1. Store in mnesia database? %% 2. Store in dets? %% 3. Store in data/app.config? @@ -27,6 +27,8 @@ -export([get_env/1, get_env/2]). +-export([populate/1]). + -export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]). -type(env() :: {atom(), term()}). @@ -42,6 +44,10 @@ get_env(Key, Default) -> get_env(Key) -> application:get_env(?APP, Key). +%% TODO: +populate(_App) -> + ok. + %% @doc Read the configuration of an application. -spec(read(atom()) -> {ok, list(env())} | {error, term()}). read(App) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 930424d38..fe0518072 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -27,7 +27,7 @@ -import(proplists, [get_value/2, get_value/3]). %% API Function Exports --export([start_link/2]). +-export([start_link/3]). %% Management and Monitor API -export([info/1, stats/1, kick/1, clean_acl_cache/2]). @@ -44,11 +44,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -%% TODO: How to emit stats? --export([handle_pre_hibernate/1]). - %% Unused fields: connname, peerhost, peerport --record(state, {connection, peername, conn_state, await_recv, +-record(state, {transport, socket, peername, conn_state, await_recv, rate_limit, max_packet_size, proto_state, parse_state, keepalive, enable_stats, idle_timeout, force_gc_count}). @@ -60,8 +57,8 @@ emqx_logger:Level("Client(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). -start_link(Conn, Env) -> - {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. +start_link(Transport, Sock, Env) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, Env]])}. info(CPid) -> gen_server:call(CPid, info). @@ -72,11 +69,11 @@ stats(CPid) -> kick(CPid) -> gen_server:call(CPid, kick). -set_rate_limit(Cpid, Rl) -> - gen_server:call(Cpid, {set_rate_limit, Rl}). +set_rate_limit(CPid, Rl) -> + gen_server:call(CPid, {set_rate_limit, Rl}). -get_rate_limit(Cpid) -> - gen_server:call(Cpid, get_rate_limit). +get_rate_limit(CPid) -> + gen_server:call(CPid, get_rate_limit). subscribe(CPid, TopicTable) -> CPid ! {subscribe, TopicTable}. @@ -94,26 +91,25 @@ clean_acl_cache(CPid, Topic) -> %% gen_server Callbacks %%-------------------------------------------------------------------- -init([Conn0, Env]) -> - {ok, Conn} = Conn0:wait(), - case Conn:peername() of - {ok, Peername} -> do_init(Conn, Env, Peername); - {error, enotconn} -> Conn:fast_close(), - exit(normal); - {error, Reason} -> Conn:fast_close(), - exit({shutdown, Reason}) +init([Transport, Sock, Env]) -> + case Transport:wait(Sock) of + {ok, NewSock} -> + {ok, Peername} = Transport:ensure_ok_or_exit(peername, [NewSock]), + do_init(Transport, Sock, Peername, Env); + {error, Reason} -> + {stop, Reason} end. -do_init(Conn, Env, Peername) -> - %% Send Fun - SendFun = send_fun(Conn, Peername), - RateLimit = get_value(rate_limit, Conn:opts()), +do_init(Transport, Sock, Peername, Env) -> + RateLimit = get_value(rate_limit, Env), PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), - ProtoState = emqx_protocol:init(Conn, Peername, SendFun, Env), + SendFun = send_fun(Transport, Sock, Peername), + ProtoState = emqx_protocol:init(Transport, Sock, Peername, SendFun, Env), EnableStats = get_value(client_enable_stats, Env, false), IdleTimout = get_value(client_idle_timeout, Env, 30000), ForceGcCount = emqx_gc:conn_max_gc_count(), - State = run_socket(#state{connection = Conn, + State = run_socket(#state{transport = Transport, + socket = Sock, peername = Peername, await_recv = false, conn_state = running, @@ -123,18 +119,17 @@ do_init(Conn, Env, Peername) -> enable_stats = EnableStats, idle_timeout = IdleTimout, force_gc_count = ForceGcCount}), - gen_server:enter_loop(?MODULE, [{hibernate_after, 10000}], + gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], init_parse_state(State), self(), IdleTimout). -send_fun(Conn, Peername) -> +send_fun(Transport, Sock, Peername) -> Self = self(), fun(Packet) -> - Data = emqx_serializer:serialize(Packet), + Data = emqx_frame:serialize(Packet), ?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}), emqx_metrics:inc('bytes/sent', iolist_size(Data)), - try Conn:async_send(Data) of + try Transport:async_send(Sock, Data) of ok -> ok; - true -> ok; %% Compatible with esockd 4.x {error, Reason} -> Self ! {shutdown, Reason} catch error:Error -> Self ! {shutdown, Error} @@ -142,12 +137,9 @@ send_fun(Conn, Peername) -> end. init_parse_state(State = #state{max_packet_size = Size, proto_state = ProtoState}) -> - emqx_parser:initial_state([{max_len, Size}, - {ver, emqx_protocol:get(proto_ver, ProtoState)}]), - State. - -handle_pre_hibernate(State) -> - {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. + Version = emqx_protocol:get(proto_ver, ProtoState), + State#state{parse_state = emqx_frame:initial_state( + #{max_packet_size => Size, version => Version})}. handle_call(info, From, State = #state{proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), @@ -252,12 +244,13 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #state{connection = Conn}) -> +handle_info({keepalive, start, Interval}, + State = #state{transport = Transport, socket = Sock}) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), StatFun = fun() -> - case Conn:getstat([recv_oct]) of + case Transport:getstat(Sock, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - {error, Error} -> {error, Error} + Error -> Error end end, case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of @@ -284,12 +277,13 @@ handle_info(Info, State) -> ?LOG(error, "Unexpected Info: ~p", [Info], State), {noreply, State}. -terminate(Reason, State = #state{connection = Conn, +terminate(Reason, State = #state{transport = Transport, + socket = Sock, keepalive = KeepAlive, proto_state = ProtoState}) -> ?LOG(debug, "Terminated for ~p", [Reason], State), - Conn:fast_close(), + Transport:fast_close(Sock), emqx_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> @@ -314,7 +308,7 @@ received(<<>>, State) -> received(Bytes, State = #state{parse_state = ParseState, proto_state = ProtoState, idle_timeout = IdleTimeout}) -> - case catch emqx_parser:parse(Bytes, ParseState) of + case catch emqx_frame:parse(Bytes, ParseState) of {more, NewParseState} -> {noreply, State#state{parse_state = NewParseState}, IdleTimeout}; {ok, Packet, Rest} -> @@ -355,8 +349,8 @@ run_socket(State = #state{conn_state = blocked}) -> State; run_socket(State = #state{await_recv = true}) -> State; -run_socket(State = #state{connection = Conn}) -> - Conn:async_recv(0, infinity), +run_socket(State = #state{transport = Transport, socket = Sock}) -> + Transport:async_recv(Sock, 0, infinity), State#state{await_recv = true}. with_proto(Fun, State = #state{proto_state = ProtoState}) -> @@ -375,8 +369,11 @@ emit_stats(ClientId, State) -> emqx_cm:set_client_stats(ClientId, Stats), State. -sock_stats(#state{connection = Conn}) -> - case Conn:getstat(?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end. +sock_stats(#state{transport = Transport, socket = Sock}) -> + case Transport:getstat(Sock, ?SOCK_STATS) of + {ok, Ss} -> Ss; + _Error -> [] + end. reply(Reply, State) -> {reply, Reply, State, hibernate}. @@ -387,7 +384,7 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. -gc(State = #state{connection = Conn}) -> - Cb = fun() -> Conn:gc(), emit_stats(State) end, +gc(State = #state{transport = Transport, socket = Sock}) -> + Cb = fun() -> Transport:gc(Sock), emit_stats(State) end, emqx_gc:maybe_force_gc(#state.force_gc_count, State, Cb). diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 805982e0e..a2228564a 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -40,24 +40,21 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%% @doc Register a command -spec(register_command(cmd(), {module(), atom()}) -> ok). register_command(Cmd, MF) when is_atom(Cmd) -> register_command(Cmd, MF, []). -%% @doc Register a command with options -spec(register_command(cmd(), {module(), atom()}, list()) -> ok). register_command(Cmd, MF, Opts) when is_atom(Cmd) -> cast({register_command, Cmd, MF, Opts}). -%% @doc Unregister a command -spec(unregister_command(cmd()) -> ok). unregister_command(Cmd) when is_atom(Cmd) -> cast({unregister_command, Cmd}). -cast(Msg) -> gen_server:cast(?SERVER, Msg). +cast(Msg) -> + gen_server:cast(?SERVER, Msg). -%% @doc Run a command -spec(run_command(cmd(), [string()]) -> ok | {error, term()}). run_command(help, []) -> usage(); @@ -68,7 +65,7 @@ run_command(Cmd, Args) when is_atom(Cmd) -> _ -> ok catch _:Reason -> - emqx_logger:error("[CTL] Cmd error:~p, stacktrace:~p", + emqx_logger:error("[CTL] CMD Error:~p, Stacktrace:~p", [Reason, erlang:get_stacktrace()]), {error, Reason} end; @@ -76,7 +73,6 @@ run_command(Cmd, Args) when is_atom(Cmd) -> usage(), {error, cmd_not_found} end. -%% @doc Lookup a command -spec(lookup_command(cmd()) -> [{module(), atom()}]). lookup_command(Cmd) when is_atom(Cmd) -> case ets:match(?TAB, {{'_', Cmd}, '$1', '_'}) of @@ -84,7 +80,6 @@ lookup_command(Cmd) when is_atom(Cmd) -> [] -> [] end. -%% @doc Usage usage() -> io:format("Usage: ~s~n", [?MODULE]), [begin io:format("~80..-s~n", [""]), Mod:Cmd(usage) end @@ -144,7 +139,7 @@ next_seq(State = #state{seq = Seq}) -> -include_lib("eunit/include/eunit.hrl"). register_command_test_() -> - {setup, + {setup, fun() -> {ok, InitState} = emqx_ctl:init([]), InitState @@ -152,7 +147,7 @@ register_command_test_() -> fun(State) -> ok = emqx_ctl:terminate(shutdown, State) end, - fun(State = #state{seq = Seq}) -> + fun(State = #state{seq = Seq}) -> emqx_ctl:handle_cast({register_command, test0, {?MODULE, test0}, []}, State), [?_assertMatch([{{0,test0},{?MODULE, test0}, []}], ets:lookup(?TAB, {Seq,test0}))] end diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 7f01ee706..61f8f9e50 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -167,6 +167,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) end, + io:format("Rest1: ~p~n", [Rest1]), {Properties, Payload} = parse_properties(Rest1, Ver), {#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, @@ -577,8 +578,10 @@ serialize_property('Shared-Subscription-Available', Val) -> <<16#2A, Val>>. serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> - << <<(serialize_utf8_string(Topic))/binary, (serialize_subopts(SubOpts)) >> - || {Topic, SubOpts} <- TopicFilters >>; + << <<(serialize_utf8_string(Topic))/binary, + ?RESERVED:2, Rh:2, (flag(Rap)):1,(flag(Nl)):1, QoS:2 >> + || {Topic, #mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}} + <- TopicFilters >>; serialize_topic_filters(subscribe, TopicFilters, _Ver) -> << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>> @@ -587,9 +590,6 @@ serialize_topic_filters(subscribe, TopicFilters, _Ver) -> serialize_topic_filters(unsubscribe, TopicFilters, _Ver) -> << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>. -serialize_subopts(#mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}) -> - <>. - serialize_reason_codes(undefined) -> <<>>; serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) -> diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index 10b3043b3..e2d77d95b 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -19,72 +19,72 @@ -export([new/1, contain/2, lookup/2, insert/3, update/3, delete/2, values/1, to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). --type(inflight() :: {?MODULE, list()}). +-type(inflight() :: {max_size, gb_trees:tree()}). -export_type([inflight/0]). -spec(new(non_neg_integer()) -> inflight()). new(MaxSize) when MaxSize >= 0 -> - {?MODULE, [MaxSize, gb_trees:empty()]}. + {MaxSize, gb_trees:empty()}. --spec(contain(Key :: any(), inflight()) -> boolean()). -contain(Key, {?MODULE, [_MaxSize, Tree]}) -> +-spec(contain(Key :: term(), inflight()) -> boolean()). +contain(Key, {_MaxSize, Tree}) -> gb_trees:is_defined(Key, Tree). --spec(lookup(Key :: any(), inflight()) -> {value, any()} | none). -lookup(Key, {?MODULE, [_MaxSize, Tree]}) -> +-spec(lookup(Key :: term(), inflight()) -> {value, term()} | none). +lookup(Key, {_MaxSize, Tree}) -> gb_trees:lookup(Key, Tree). --spec(insert(Key :: any(), Value :: any(), inflight()) -> inflight()). -insert(Key, Value, {?MODULE, [MaxSize, Tree]}) -> - {?MODULE, [MaxSize, gb_trees:insert(Key, Value, Tree)]}. +-spec(insert(Key :: term(), Value :: term(), inflight()) -> inflight()). +insert(Key, Value, {MaxSize, Tree}) -> + {MaxSize, gb_trees:insert(Key, Value, Tree)}. --spec(delete(Key :: any(), inflight()) -> inflight()). -delete(Key, {?MODULE, [MaxSize, Tree]}) -> - {?MODULE, [MaxSize, gb_trees:delete(Key, Tree)]}. +-spec(delete(Key :: term(), inflight()) -> inflight()). +delete(Key, {MaxSize, Tree}) -> + {MaxSize, gb_trees:delete(Key, Tree)}. --spec(update(Key :: any(), Val :: any(), inflight()) -> inflight()). -update(Key, Val, {?MODULE, [MaxSize, Tree]}) -> - {?MODULE, [MaxSize, gb_trees:update(Key, Val, Tree)]}. +-spec(update(Key :: term(), Val :: term(), inflight()) -> inflight()). +update(Key, Val, {MaxSize, Tree}) -> + {MaxSize, gb_trees:update(Key, Val, Tree)}. -spec(is_full(inflight()) -> boolean()). -is_full({?MODULE, [0, _Tree]}) -> +is_full({0, _Tree}) -> false; -is_full({?MODULE, [MaxSize, Tree]}) -> +is_full({MaxSize, Tree}) -> MaxSize =< gb_trees:size(Tree). -spec(is_empty(inflight()) -> boolean()). -is_empty({?MODULE, [_MaxSize, Tree]}) -> +is_empty({_MaxSize, Tree}) -> gb_trees:is_empty(Tree). --spec(smallest(inflight()) -> {K :: any(), V :: any()}). -smallest({?MODULE, [_MaxSize, Tree]}) -> +-spec(smallest(inflight()) -> {K :: term(), V :: term()}). +smallest({_MaxSize, Tree}) -> gb_trees:smallest(Tree). --spec(largest(inflight()) -> {K :: any(), V :: any()}). -largest({?MODULE, [_MaxSize, Tree]}) -> +-spec(largest(inflight()) -> {K :: term(), V :: term()}). +largest({_MaxSize, Tree}) -> gb_trees:largest(Tree). -spec(values(inflight()) -> list()). -values({?MODULE, [_MaxSize, Tree]}) -> +values({_MaxSize, Tree}) -> gb_trees:values(Tree). --spec(to_list(inflight()) -> list({K :: any(), V :: any()})). -to_list({?MODULE, [_MaxSize, Tree]}) -> +-spec(to_list(inflight()) -> list({K :: term(), V :: term()})). +to_list({_MaxSize, Tree}) -> gb_trees:to_list(Tree). -spec(window(inflight()) -> list()). -window(Inflight = {?MODULE, [_MaxSize, Tree]}) -> +window(Inflight = {_MaxSize, Tree}) -> case gb_trees:is_empty(Tree) of true -> []; false -> [Key || {Key, _Val} <- [smallest(Inflight), largest(Inflight)]] end. -spec(size(inflight()) -> non_neg_integer()). -size({?MODULE, [_MaxSize, Tree]}) -> +size({_MaxSize, Tree}) -> gb_trees:size(Tree). -spec(max_size(inflight()) -> non_neg_integer()). -max_size({?MODULE, [MaxSize, _Tree]}) -> +max_size({MaxSize, _Tree}) -> MaxSize. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 370ab4739..30fbb1848 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -1,18 +1,18 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_protocol). @@ -25,7 +25,7 @@ -import(proplists, [get_value/2, get_value/3]). %% API --export([init/3, init/4, get/2, info/1, stats/1, clientid/1, client/1, session/1]). +-export([init/3, init/5, get/2, info/1, stats/1, clientid/1, client/1, session/1]). -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]). @@ -76,8 +76,9 @@ init(Peername, SendFun, Opts) -> keepalive_backoff = Backoff, stats_data = #proto_stats{enable_stats = EnableStats}}. -init(Conn, Peername, SendFun, Opts) -> - enrich_opt(Conn:opts(), Conn, init(Peername, SendFun, Opts)). +init(_Transport, _Sock, Peername, SendFun, Opts) -> + init(Peername, SendFun, Opts). + %%enrich_opt(Conn:opts(), Conn, ). enrich_opt([], _Conn, State) -> State; diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 56f94eb08..7aff3a446 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -117,12 +117,10 @@ del_route(From, Topic, Dest) when is_binary(Topic) -> Route = #route{topic = Topic, dest = Dest}, cast(pick(Topic), {del_route, From, Route}). -%% @doc Has routes? -spec(has_routes(topic()) -> boolean()). has_routes(Topic) when is_binary(Topic) -> ets:member(?ROUTE, Topic). -%% @doc Get topics -spec(topics() -> list(topic())). topics() -> mnesia:dirty_all_keys(?ROUTE). diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index f4a19aca9..0606f70fd 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -40,7 +40,6 @@ -compile({no_auto_import, [monitor/1]}). -define(SERVER, ?MODULE). - -define(ROUTE, emqx_route). -define(ROUTING_NODE, emqx_routing_node). @@ -96,7 +95,7 @@ init([]) -> end end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), emqx_stats:update_interval(route_stats, stats_fun()), - {ok, #state{nodes = Nodes}}. + {ok, #state{nodes = Nodes}, hibernate}. handle_call(Req, _From, State) -> emqx_logger:error("[RouterHelper] Unexpected request: ~p", [Req]), @@ -124,7 +123,7 @@ handle_info({nodedown, Node}, State = #state{nodes = Nodes}) -> mnesia:transaction(fun cleanup_routes/1, [Node]) end), mnesia:dirty_delete(?ROUTING_NODE, Node), - {noreply, State#state{nodes = lists:delete(Node, Nodes)}}; + {noreply, State#state{nodes = lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({nodedown, Node}, State); diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1dbe1e83a..83d53abc6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -1,18 +1,18 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_session). @@ -246,7 +246,7 @@ stats(#state{max_subscriptions = MaxSubscriptions, [{max_subscriptions, MaxSubscriptions}, {subscriptions, maps:size(Subscriptions)}, {max_inflight, MaxInflight}, - {inflight_len, Inflight:size()}, + {inflight_len, emqx_inflight:size(Inflight)}, {max_mqueue, ?MQueue:max_len(MQueue)}, {mqueue_len, ?MQueue:len(MQueue)}, {mqueue_dropped, ?MQueue:dropped(MQueue)}, @@ -405,12 +405,12 @@ handle_cast({unsubscribe, From, TopicTable}, %% PUBACK: handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) -> {noreply, - case Inflight:contain(PacketId) of + case emqx_inflight:contain(PacketId, Inflight) of true -> dequeue(acked(puback, PacketId, State)); false -> ?LOG(warning, "PUBACK ~p missed inflight: ~p", - [PacketId, Inflight:window()], State), + [PacketId, emqx_inflight:window(Inflight)], State), emqx_metrics:inc('packets/puback/missed'), State end, hibernate}; @@ -418,12 +418,12 @@ handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) -> %% PUBREC: handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) -> {noreply, - case Inflight:contain(PacketId) of + case emqx_inflight:contain(PacketId, Inflight) of true -> acked(pubrec, PacketId, State); false -> ?LOG(warning, "PUBREC ~p missed inflight: ~p", - [PacketId, Inflight:window()], State), + [PacketId, emqx_inflight:window(Inflight)], State), emqx_metrics:inc('packets/pubrec/missed'), State end, hibernate}; @@ -446,12 +446,12 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) -> %% PUBCOMP: handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) -> {noreply, - case Inflight:contain(PacketId) of + case emqx_inflight:contain(PacketId, Inflight) of true -> dequeue(acked(pubcomp, PacketId, State)); false -> ?LOG(warning, "The PUBCOMP ~p is not inflight: ~p", - [PacketId, Inflight:window()], State), + [PacketId, emqx_inflight:window(Inflight)], State), emqx_metrics:inc('packets/pubcomp/missed'), State end, hibernate}; @@ -581,11 +581,11 @@ kick(ClientId, OldPid, Pid) -> %%-------------------------------------------------------------------- %% Redeliver at once if Force is true - retry_delivery(Force, State = #state{inflight = Inflight}) -> - case Inflight:is_empty() of + case emqx_inflight:is_empty(Inflight) of true -> State; - false -> Msgs = lists:sort(sortfun(inflight), Inflight:values()), + false -> Msgs = lists:sort(sortfun(inflight), + emqx_inflight:values(Inflight)), retry_delivery(Force, Msgs, os:timestamp(), State) end. @@ -601,11 +601,11 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now, case {Type, Msg} of {publish, Msg = #message{headers = #{packet_id := PacketId}}} -> redeliver(Msg, State), - Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), + Inflight1 = emqx_inflight:update(PacketId, {publish, Msg, Now}, Inflight), retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); {pubrel, PacketId} -> redeliver({pubrel, PacketId}, State), - Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}), + Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight), retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}) end; true -> @@ -678,7 +678,7 @@ dispatch(Msg = #message{qos = ?QOS0}, State) -> dispatch(Msg = #message{qos = QoS}, State = #state{next_msg_id = MsgId, inflight = Inflight}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> - case Inflight:is_full() of + case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); false -> @@ -719,15 +719,15 @@ await(Msg = #message{headers = #{packet_id := PacketId}}, true -> State#state{retry_timer = start_timer(Interval, retry_delivery)}; false -> State end, - State1#state{inflight = Inflight:insert(PacketId, {publish, Msg, os:timestamp()})}. + State1#state{inflight = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight)}. acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> - case Inflight:lookup(PacketId) of + case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, Msg, _Ts}} -> emqx_hooks:run('message.acked', [ClientId, Username], Msg), - State#state{inflight = Inflight:delete(PacketId)}; + State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State), State @@ -736,10 +736,10 @@ acked(puback, PacketId, State = #state{client_id = ClientId, acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> - case Inflight:lookup(PacketId) of + case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, Msg, _Ts}} -> emqx_hooks:run('message.acked', [ClientId, Username], Msg), - State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; + State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State), State; @@ -749,7 +749,7 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, end; acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> - State#state{inflight = Inflight:delete(PacketId)}. + State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}. %%-------------------------------------------------------------------- %% Dequeue @@ -760,7 +760,7 @@ dequeue(State = #state{client_pid = undefined}) -> State; dequeue(State = #state{inflight = Inflight}) -> - case Inflight:is_full() of + case emqx_inflight:is_full(Inflight) of true -> State; false -> dequeue2(State) end. diff --git a/src/emqx_sys_sup.erl b/src/emqx_sys_sup.erl index 745817469..69d6f4cb5 100644 --- a/src/emqx_sys_sup.erl +++ b/src/emqx_sys_sup.erl @@ -30,7 +30,7 @@ init([]) -> permanent, 5000, worker, [emqx_sys]}, {ok, Env} = emqx_config:get_env(sysmon), - Sysmon = {sys_mon, {emqx_sysmon, start_link, [Env]}, + Sysmon = {sys_mon, {emqx_sys_mon, start_link, [Env]}, permanent, 5000, worker, [emqx_sys_mon]}, {ok, {{one_for_one, 10, 100}, [Sys, Sysmon]}}. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 161104568..83a55c21e 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -1,18 +1,18 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_ws_connection). @@ -44,8 +44,9 @@ -export([handle_pre_hibernate/1]). %% WebSocket Client State --record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive, - enable_stats, force_gc_count}). +-record(wsclient_state, {ws_pid, transport, socket, peername, + proto_state, keepalive, enable_stats, + force_gc_count}). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -85,27 +86,29 @@ clean_acl_cache(CPid, Topic) -> init([Env, WsPid, Req, ReplyChannel]) -> process_flag(trap_exit, true), - Conn = Req:get(connection), true = link(WsPid), - case Req:get(peername) of + Transport = mochiweb_request:get(transport, Req), + Sock = mochiweb_request:get(socket, Req), + case mochiweb_request:get(peername, Req) of {ok, Peername} -> Headers = mochiweb_headers:to_list(mochiweb_request:get(headers, Req)), - ProtoState = emqx_protocol:init(Conn, Peername, send_fun(ReplyChannel), + ProtoState = emqx_protocol:init(Transport, Sock, Peername, send_fun(ReplyChannel), [{ws_initial_headers, Headers} | Env]), IdleTimeout = get_value(client_idle_timeout, Env, 30000), EnableStats = get_value(client_enable_stats, Env, false), ForceGcCount = emqx_gc:conn_max_gc_count(), - {ok, #wsclient_state{connection = Conn, + {ok, #wsclient_state{transport = Transport, + socket = Sock, ws_pid = WsPid, peername = Peername, proto_state = ProtoState, enable_stats = EnableStats, force_gc_count = ForceGcCount}, IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}; - {error, enotconn} -> Conn:fast_close(), + {error, enotconn} -> Transport:fast_close(Sock), exit(WsPid, normal), exit(normal); - {error, Reason} -> Conn:fast_close(), + {error, Reason} -> Transport:fast_close(Sock), exit(WsPid, normal), exit({shutdown, Reason}) end. @@ -205,9 +208,10 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> handle_info({shutdown, Reason}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) -> +handle_info({keepalive, start, Interval}, + State = #wsclient_state{transport = Transport, socket =Sock}) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}) of + case emqx_keepalive:start(stat_fun(Transport, Sock), Interval, {keepalive, check}) of {ok, KeepAlive} -> {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate}; {error, Error} -> @@ -265,7 +269,7 @@ code_change(_OldVsn, State, _Extra) -> send_fun(ReplyChannel) -> Self = self(), fun(Packet) -> - Data = emqx_serializer:serialize(Packet), + Data = emqx_frame:serialize(Packet), emqx_metrics:inc('bytes/sent', iolist_size(Data)), case ReplyChannel({binary, Data}) of ok -> ok; @@ -273,9 +277,9 @@ send_fun(ReplyChannel) -> end end. -stat_fun(Conn) -> +stat_fun(Transport, Sock) -> fun() -> - case Conn:getstat([recv_oct]) of + case Transport:getstat(Sock, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {error, Error} -> {error, Error} end @@ -293,8 +297,8 @@ emit_stats(ClientId, State) -> emqx_cm:set_client_stats(ClientId, Stats), State. -wsock_stats(#wsclient_state{connection = Conn}) -> - case Conn:getstat(?SOCK_STATS) of +wsock_stats(#wsclient_state{transport = Transport, socket = Sock}) -> + case Transport:getstat(Sock, ?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end. diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index b47b23942..e3e0bbb38 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -17,6 +17,7 @@ -module(emqx_SUITE). -compile(export_all). +-compile(nowarn_export_all). -include_lib("emqttc/include/emqttc_packet.hrl"). diff --git a/test/emqx_SUITE_data/loaded_plugins b/test/emqx_SUITE_data/loaded_plugins new file mode 100644 index 000000000..e69de29bb diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index aee73b873..270d55f57 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -17,6 +17,7 @@ -module(emqx_access_SUITE). -compile(export_all). +-compile(nowarn_export_all). -include("emqx.hrl"). diff --git a/test/emqx_base62_SUITE.erl b/test/emqx_base62_SUITE.erl index 6baf724d1..e0cb0e26a 100644 --- a/test/emqx_base62_SUITE.erl +++ b/test/emqx_base62_SUITE.erl @@ -21,6 +21,7 @@ -define(BASE62, emqx_base62). -compile(export_all). +-compile(nowarn_export_all). all() -> [t_base62_encode]. diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e0ec205f7..e77d689d4 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -16,6 +16,7 @@ -module(emqx_broker_SUITE). -compile(export_all). +-compile(nowarn_export_all). -define(APP, emqx). @@ -24,6 +25,7 @@ -include_lib("common_test/include/ct.hrl"). -include("emqx.hrl"). +-include("emqx_mqtt.hrl"). all() -> [ @@ -56,7 +58,7 @@ init_per_suite(Config) -> end_per_suite(Config) -> emqx_ct_broker_helpers:run_teardown_steps(). - + %%-------------------------------------------------------------------- %% PubSub Test %%-------------------------------------------------------------------- @@ -147,7 +149,7 @@ start_session(_) -> {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), {ok, SessPid} = emqx_mock_client:start_session(ClientPid), Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), - Message1 = Message#mqtt_message{pktid = 1}, + Message1 = Message#mqtt_message{packet_id = 1}, emqx_session:publish(SessPid, Message1), emqx_session:pubrel(SessPid, 1), emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), @@ -228,7 +230,7 @@ hook_fun7(arg, initArg) -> any. hook_fun8(arg, initArg) -> stop. set_alarms(_) -> - AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, + AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, emqx_alarm:set_alarm(AlarmTest), Alarms = emqx_alarm:get_alarms(), ?assertEqual(1, length(Alarms)), diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl new file mode 100644 index 000000000..7b2d5aaae --- /dev/null +++ b/test/emqx_client_SUITE.erl @@ -0,0 +1,41 @@ +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== + +-module(emqx_client_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> []. + +groups() -> []. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index 3dfe84fcc..a62297a49 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -1,22 +1,23 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_ct_broker_helpers). -compile(export_all). +-compile(nowarn_export_all). -define(APP, emqx). @@ -57,13 +58,11 @@ local_path(Components) -> set_app_env({App, Lists}) -> lists:foreach(fun({acl_file, _Var}) -> - application:set_env(App, acl_file, local_path(["etc", "acl.conf"])); - ({license_file, _Var}) -> - application:set_env(App, license_file, local_path(["etc", "emqx.lic"])); + application:set_env(App, acl_file, local_path(["etc", "acl.conf"])); ({plugins_loaded_file, _Var}) -> - application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"])); + application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"])); ({Par, Var}) -> - application:set_env(App, Par, Var) + application:set_env(App, Par, Var) end, Lists). change_opts(SslType) -> diff --git a/test/emqx_ct_helpers.erl b/test/emqx_ct_helpers.erl new file mode 100644 index 000000000..c1618eccd --- /dev/null +++ b/test/emqx_ct_helpers.erl @@ -0,0 +1,24 @@ +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== + +-module(emqx_ct_helpers). + +-export([ensure_mnesia_stopped/0]). + +ensure_mnesia_stopped() -> + ekka_mnesia:ensure_stopped(), + ekka_mnesia:delete_schema(). + diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index c177fda8e..c4ec83024 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -284,9 +284,7 @@ serialize_parse_publish_v5(_) -> 'Correlation-Data' => <<"correlation-id">>, 'Subscription-Identifier' => 1, 'Content-Type' => <<"text/json">>}, - Packet = ?PUBLISH_PACKET(#mqtt_packet_header{type = ?PUBLISH}, - <<"$share/group/topic">>, 1, Props, - <<"payload">>), + Packet = ?PUBLISH_PACKET(?QOS_1, <<"$share/group/topic">>, 1, Props, <<"payload">>), ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). @@ -335,7 +333,7 @@ serialize_parse_subscribe(_) -> Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, TopicFilters = [{<<"TopicA">>, #mqtt_subopts{qos = 2}}], Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), - ?assertEqual(Bin, serialize(Packet)), + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), ?assertEqual({ok, Packet, <<>>}, parse(Bin)). serialize_parse_subscribe_v5(_) -> @@ -343,7 +341,8 @@ serialize_parse_subscribe_v5(_) -> {<<"TopicQos1">>, #mqtt_subopts{rh = 1, qos =?QOS_1}}], Packet = ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 16#FFFFFFF}, TopicFilters), - ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). serialize_parse_suback(_) -> Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]), diff --git a/test/emqx_guid_SUITE.erl b/test/emqx_guid_SUITE.erl index a6978bac8..0dee1da48 100644 --- a/test/emqx_guid_SUITE.erl +++ b/test/emqx_guid_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-compile(nowarn_export_all). all() -> [t_guid_gen, t_guid_hexstr, t_guid_base62]. diff --git a/test/emqx_inflight_SUITE.erl b/test/emqx_inflight_SUITE.erl index 903d5c7b5..de3accc06 100644 --- a/test/emqx_inflight_SUITE.erl +++ b/test/emqx_inflight_SUITE.erl @@ -1,61 +1,62 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_inflight_SUITE). -include_lib("eunit/include/eunit.hrl"). -%% CT -compile(export_all). +-compile(nowarn_export_all). -all() -> [t_contain, t_lookup, t_insert, t_update, t_delete, t_window, - t_is_full, t_is_empty]. +-import(emqx_inflight, [new/1, contain/2, insert/3, lookup/2, update/3, + delete/2, is_empty/1, is_full/1]). + +all() -> + [t_contain, t_lookup, t_insert, t_update, t_delete, t_window, + t_is_full, t_is_empty]. t_contain(_) -> - Inflight = emqx_inflight:new(0), - ?assertNot(Inflight:contain(k)), - Inflight1 = Inflight:insert(k, v), - ?assert(Inflight1:contain(k)). + ?assertNot(contain(k, new(0))), + ?assert(contain(k, insert(k, v, new(0)))). t_lookup(_) -> - Inflight = (emqx_inflight:new(0)):insert(k, v), - ?assertEqual(v, Inflight:lookup(k)). + Inflight = insert(k, v, new(0)), + ?assertEqual({value, v}, lookup(k, Inflight)), + ?assertEqual(none, lookup(x, Inflight)). t_insert(_) -> - Inflight = ((emqx_inflight:new(0)):insert(k1, v1)):insert(k2, v2), - ?assertEqual(v2, Inflight:lookup(k2)). + Inflight = insert(k2, v2, insert(k1, v1, new(0))), + ?assertEqual({value, v1}, lookup(k1, Inflight)), + ?assertEqual({value, v2}, lookup(k2, Inflight)). t_update(_) -> - Inflight = ((emqx_inflight:new(0)):insert(k, v1)):update(k, v2), - ?assertEqual(v2, Inflight:lookup(k)). + Inflight = update(k, v2, insert(k, v1, new(0))), + ?assertEqual({value, v2}, lookup(k, Inflight)). t_delete(_) -> - Inflight = ((emqx_inflight:new(0)):insert(k, v1)):delete(k), - ?assert(Inflight:is_empty()). + ?assert(is_empty(delete(k, insert(k, v1, new(0))))). t_window(_) -> - ?assertEqual([], (emqx_inflight:new(10)):window()), - Inflight = ((emqx_inflight:new(0)):insert(1, 1)):insert(2, 2), - ?assertEqual([1, 2], Inflight:window()). + ?assertEqual([], emqx_inflight:window(new(10))), + Inflight = insert(2, 2, insert(1, 1, new(0))), + ?assertEqual([1, 2], emqx_inflight:window(Inflight)). t_is_full(_) -> - Inflight = ((emqx_inflight:new(1)):insert(k, v1)), - ?assert(Inflight:is_full()). + ?assert(is_full(insert(k, v1, new(1)))). t_is_empty(_) -> - Inflight = ((emqx_inflight:new(1)):insert(k, v1)), - ?assertNot(Inflight:is_empty()). + ?assertNot(is_empty(insert(k, v1, new(1)))). diff --git a/test/emqx_lib_SUITE.erl b/test/emqx_lib_SUITE.erl index 0b24fb059..2cd24bb63 100644 --- a/test/emqx_lib_SUITE.erl +++ b/test/emqx_lib_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-compile(nowarn_export_all). -define(SOCKOPTS, [ binary, diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index e1283a9d4..4b7ec74f6 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-compile(nowarn_export_all). -define(SOCKOPTS, [binary, {packet, raw}, diff --git a/test/emqx_mod_SUITE.erl b/test/emqx_mod_SUITE.erl index e87bddecc..963d39c45 100644 --- a/test/emqx_mod_SUITE.erl +++ b/test/emqx_mod_SUITE.erl @@ -17,10 +17,11 @@ -module(emqx_mod_SUITE). -compile(export_all). +-compile(nowarn_export_all). -include("emqx.hrl"). all() -> [mod_subscription_rep]. mod_subscription_rep(_) -> ok. - + diff --git a/test/emqx_mqtt_compat_SUITE.erl b/test/emqx_mqtt_compat_SUITE.erl new file mode 100644 index 000000000..d27c094ea --- /dev/null +++ b/test/emqx_mqtt_compat_SUITE.erl @@ -0,0 +1,220 @@ +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== + +-module(emqx_mqtt_compat_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(lists, [nth/2]). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). + +-define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>, + <<"/TopicA">>]). + +-define(WILD_TOPICS, [<<"TopicA/+">>, <<"+/C">>, <<"#">>, <<"/#">>, <<"/+">>, + <<"+/+">>, <<"TopicA/#">>]). + +all() -> + [basic_test, + retained_message_test, + will_message_test, + zero_length_clientid_test, + offline_message_queueing_test, + overlapping_subscriptions_test, + keepalive_test, + redelivery_on_reconnect_test, + subscribe_failure_test, + dollar_topics_test]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {public, Msg} -> + receive_messages(Count-1, [Msg|Msgs]); + _Other -> + receive_messages(Count, Msgs) + after 10 -> + Msgs + end. + +basic_test(_Config) -> + Topic = nth(1, ?TOPICS), + ct:print("Basic test starting"), + {ok, C, _} = emqx_client:start_link(), + {ok, _, [0]} = emqx_client:subscribe(C, Topic, qos2), + ok = emqx_client:publish(C, Topic, <<"qos 0">>), + {ok, _} = emqx_client:publish(C, Topic, <<"qos 1">>, 1), + {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), + ok = emqx_client:disconnect(C), + ?assertEqual(3, length(receive_messages(3))). + +retained_message_test(_Config) -> + ct:print("Retained message test starting"), + + %% Retained messages + {ok, C1, _} = emqx_client:start_link([{clean_start, true}]), + ok = emqx_client:publish(C1, nth(1, ?TOPICS), <<"qos 0">>, [{qos, 0}, {retain, true}]), + {ok, _} = emqx_client:publish(C1, nth(3, ?TOPICS), <<"qos 1">>, [{qos, 1}, {retain, true}]), + {ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<"qos 2">>, [{qos, 2}, {retain, true}]), + timer:sleep(10), + {ok, #{}, [0]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), + ok = emqx_client:disconnect(C1), + ?assertEqual(3, length(receive_messages(10))), + + %% Clear retained messages + {ok, C2, _} = emqx_client:start_link([{clean_start, true}]), + ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"">>, [{qos, 0}, {retain, true}]), + {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"">>, [{qos, 1}, {retain, true}]), + {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"">>, [{qos, 2}, {retain, true}]), + timer:sleep(10), %% wait for QoS 2 exchange to be completed + {ok, _, [0]} = emqx_client:subscribe(C2, nth(6, ?WILD_TOPICS), 2), + timer:sleep(10), + ok = emqx_client:disconnect(), + ?assertEqual(0, length(receive_messages(3))). + +will_message_test(_Config) -> + {ok, C1, _} = emqx_client:start_link([{clean_start, true}, + {will_topic = nth(3, ?TOPICS)}, + {will_payload, <<"client disconnected">>}, + {keepalive, 2}]), + {ok, C2, _} = emqx_client:start_link(), + {ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2), + timer:sleep(10), + ok = emqx_client:stop(C1), + timer:sleep(5), + ok = emqx_client:disconnect(C2), + ?assertEqual(1, length(receive_messages(1))), + ct:print("Will message test succeeded"). + +zero_length_clientid_test(_Config) -> + ct:print("Zero length clientid test starting"), + {error, _} = emqx_client:start_link([{clean_start, false}, + {client_id, <<>>}]), + {ok, _, _} = emqx_client:start_link([{clean_start, true}, + {client_id, <<>>}]), + ct:print("Zero length clientid test succeeded"). + +offline_message_queueing_test(_) -> + {ok, C1, _} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c1">>}]), + {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), + ok = emqx_client:disconnect(C1), + {ok, C2, _} = emqx_client:start_link([{clean_start, true}, + {client_id, <<"c2">>}]), + + ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), + {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), + {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), + timer:sleep(10), + emqx_client:disconnect(C2), + {ok, C3, _} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c1">>}]), + timer:sleep(10), + emqx_client:disconnect(C3), + ?assertEqual(3, length(receive_messages(3))). + +overlapping_subscriptions_test(_) -> + {ok, C, _} = emqx_client:start_link([]), + {ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2}, + {nth(1, ?WILD_TOPICS), 1}]), + timer:sleep(10), + {ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2), + time:sleep(10), + emqx_client:disconnect(C), + Num = receive_messages(2), + ?assert(lists:member(Num, [1, 2])), + if + Num == 1 -> + ct:print("This server is publishing one message for all + matching overlapping subscriptions, not one for each."); + Num == 2 -> + ct:print("This server is publishing one message per each + matching overlapping subscription."); + true -> ok + end. + +keepalive_test(_) -> + ct:print("Keepalive test starting"), + {ok, C1, _} = emqx_client:start_link([{clean_start, true}, + {keepalive, 5}, + {will_topic, nth(5, ?TOPICS)}, + {will_payload, <<"keepalive expiry">>}]), + ok = emqx_client:pause(C1), + + {ok, C2, _} = emqx_client:start_link([{clean_start, true}, + {keepalive, 0}]), + {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2), + timer:sleep(15000), + ok = emqx_client:disconnect(C2), + ?assertEqual(1, length(receive_messages(1))), + ct:print("Keepalive test succeeded"). + +redelivery_on_reconnect_test(_) -> + ct:print("Redelivery on reconnect test starting"), + {ok, C1, _} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c">>}]), + {ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2), + timer:sleep(10), + ok = emqx_client:pause(C1), + {ok, _} = emqx_client:publish(C1, nth(2, ?TOPICS), <<>>, + [{qos, 1}, {retain, false}]), + {ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>, + [{qos, 2}, {retain, false}]), + time:sleep(10), + ok = emqx_client:disconnect(C1), + ?assertEqual(0, length(receive_messages(2))), + {ok, C2, _} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c">>}]), + timer:sleep(10), + ok = emqx_client:disconnect(C2), + ?assertEqual(2, length(receive_messages(2))). + +subscribe_failure_test(_) -> + ct:print("Subscribe failure test starting"), + {ok, C, _} = emqx_client:start_link([]), + {ok, _, [16#80]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2), + timer:sleep(10), + ct:print("Subscribe failure test succeeded"). + +dollar_topics_test(_) -> + ct:print("$ topics test starting"), + {ok, C, _} = emqx_client:start_link([{clean_start, true}, + {keepalive, 0}]), + {ok, _, [2]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 2), + {ok, _} = emqx_client:publish(C, <<"$", (nth(2, ?TOPICS))>>, + <<"">>, [{qos, 1}, {retain, false}]), + timer:sleep(10), + ?assertEqual(0, length(receive_messages(1))), + ok = emqx_client:disconnect(C), + ct:print("$ topics test succeeded"). + diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 3123ab94f..d174d980e 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -17,8 +17,10 @@ -module(emqx_mqueue_SUITE). -compile(export_all). +-compile(nowarn_export_all). -include("emqx.hrl"). +-include("emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/emqx_net_SUITE.erl b/test/emqx_net_SUITE.erl index a729ad1a7..34f3d54e2 100644 --- a/test/emqx_net_SUITE.erl +++ b/test/emqx_net_SUITE.erl @@ -18,6 +18,7 @@ %% CT -compile(export_all). +-compile(nowarn_export_all). all() -> [{group, keepalive}]. diff --git a/test/emqx_pqueue_SUITE.erl b/test/emqx_pqueue_SUITE.erl index c798bf100..55b2dc01b 100644 --- a/test/emqx_pqueue_SUITE.erl +++ b/test/emqx_pqueue_SUITE.erl @@ -16,9 +16,11 @@ -module(emqx_pqueue_SUITE). +-include("emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-compile(nowarn_export_all). -define(PQ, emqx_pqueue). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl deleted file mode 100644 index 0d86beea0..000000000 --- a/test/emqx_protocol_SUITE.erl +++ /dev/null @@ -1,320 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_protocol_SUITE). - --compile(export_all). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --include_lib("eunit/include/eunit.hrl"). - --import(emqx_serializer, [serialize/1]). - -all() -> - [{group, parser}, - {group, serializer}, - {group, packet}, - {group, message}]. - -groups() -> - [{parser, [], - [parse_connect, - parse_bridge, - parse_publish, - parse_puback, - parse_pubrec, - parse_pubrel, - parse_pubcomp, - parse_subscribe, - parse_unsubscribe, - parse_pingreq, - parse_disconnect]}, - {serializer, [], - [serialize_connect, - serialize_connack, - serialize_publish, - serialize_puback, - serialize_pubrel, - serialize_subscribe, - serialize_suback, - serialize_unsubscribe, - serialize_unsuback, - serialize_pingreq, - serialize_pingresp, - serialize_disconnect]}, - {packet, [], - [packet_proto_name, - packet_type_name, - packet_connack_name, - packet_format]}, - {message, [], - [message_make, - message_from_packet, - message_flag]}]. - -%%-------------------------------------------------------------------- -%% Parse Cases -%%-------------------------------------------------------------------- - -parse_connect(_) -> - Parser = emqx_parser:initial_state(), - %% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) - V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT, - dup = false, - qos = 0, - retain = false}, - variable = #mqtt_packet_connect{proto_ver = 3, - proto_name = <<"MQIsdp">>, - client_id = <<"mosqpub/10451-iMac.loca">>, - clean_sess = true, - keep_alive = 60}}, <<>>} = emqx_parser:parse(V31ConnBin, Parser), - %% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) - V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT, - dup = false, - qos = 0, - retain = false}, - variable = #mqtt_packet_connect{proto_ver = 4, - proto_name = <<"MQTT">>, - client_id = <<"mosqpub/10451-iMac.loca">>, - clean_sess = true, - keep_alive = 60 } }, <<>>} = emqx_parser:parse(V311ConnBin, Parser), - - %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60) - V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT, - dup = false, - qos = 0, - retain = false}, - variable = #mqtt_packet_connect{proto_ver = 4, - proto_name = <<"MQTT">>, - client_id = <<>>, - clean_sess = true, - keep_alive = 60 } }, <<>>} = emqx_parser:parse(V311ConnWithoutClientId, Parser), - %%CONNECT(Q0, R0, D0, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, - %% Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg)) - ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>, - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT, - dup = false, - qos = 0, - retain = false}, - variable = #mqtt_packet_connect{proto_ver = 3, - proto_name = <<"MQIsdp">>, - client_id = <<"mosqpub/10452-iMac.loca">>, - clean_sess = true, - keep_alive = 60, - will_retain = false, - will_qos = 1, - will_flag = true, - will_topic = <<"/will">>, - will_msg = <<"willmsg">>, - username = <<"test">>, - password = <<"public">>}}, <<>>} = emqx_parser:parse(ConnBinWithWill, Parser), - ok. - -parse_bridge(_) -> - Parser = emqx_parser:initial_state(), - Data = <<16,86,0,6,77,81,73,115,100,112,131,44,0,60,0,19,67,95,48,48,58,48,67,58,50,57,58,50,66,58,55,55,58,53,50, - 0,48,36,83,89,83,47,98,114,111,107,101,114,47,99,111,110,110,101,99,116,105,111,110,47,67,95,48,48,58,48, - 67,58,50,57,58,50,66,58,55,55,58,53,50,47,115,116,97,116,101,0,1,48>>, - - %% CONNECT(Q0, R0, D0, ClientId=C_00:0C:29:2B:77:52, ProtoName=MQIsdp, ProtoVsn=131, CleanSess=false, KeepAlive=60, - %% Username=undefined, Password=undefined, Will(Q1, R1, Topic=$SYS/broker/connection/C_00:0C:29:2B:77:52/state, Msg=0)) - {ok, #mqtt_packet{variable = Variable}, <<>>} = emqx_parser:parse(Data, Parser), - #mqtt_packet_connect{client_id = <<"C_00:0C:29:2B:77:52">>, - proto_ver = 16#03, - proto_name = <<"MQIsdp">>, - will_retain = true, - will_qos = 1, - will_flag = true, - clean_sess = false, - keep_alive = 60, - will_topic = <<"$SYS/broker/connection/C_00:0C:29:2B:77:52/state">>, - will_msg = <<"0">>} = Variable. - -parse_publish(_) -> - Parser = emqx_parser:initial_state(), - %%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>) - PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>, - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - dup = false, - qos = 1, - retain = false}, - variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>, - packet_id = 1}, - payload = <<"hahah">> }, <<>>} = emqx_parser:parse(PubBin, Parser), - - %PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>) - %DISCONNECT(Qos=0, Retain=false, Dup=false) - PubBin1 = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111,224,0>>, - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - dup = false, - qos = 0, - retain = false}, - variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>, - packet_id = undefined}, - payload = <<"hello">> }, <<224,0>>} = emqx_parser:parse(PubBin1, Parser), - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT, - dup = false, - qos = 0, - retain = false}}, <<>>} = emqx_parser:parse(<<224, 0>>, Parser). - -parse_puback(_) -> - Parser = emqx_parser:initial_state(), - %%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1) - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK, - dup = false, - qos = 0, - retain = false}}, <<>>} = emqx_parser:parse(<<64,2,0,1>>, Parser). -parse_pubrec(_) -> - Parser = emqx_parser:initial_state(), - %%PUBREC(Qos=0, Retain=false, Dup=false, PacketId=1) - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC, - dup = false, - qos = 0, - retain = false}}, <<>>} = emqx_parser:parse(<<5:4,0:4,2,0,1>>, Parser). - -parse_pubrel(_) -> - Parser = emqx_parser:initial_state(), - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, - dup = false, - qos = 1, - retain = false}}, <<>>} = emqx_parser:parse(<<6:4,2:4,2,0,1>>, Parser). - -parse_pubcomp(_) -> - Parser = emqx_parser:initial_state(), - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP, - dup = false, - qos = 0, - retain = false}}, <<>>} = emqx_parser:parse(<<7:4,0:4,2,0,1>>, Parser). - -parse_subscribe(_) -> - Parser = emqx_parser:initial_state(), - %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}]) - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, - dup = false, - qos = 1, - retain = false}, - variable = #mqtt_packet_subscribe{packet_id = 2, - topic_table = [{<<"TopicA">>,2}]} }, <<>>} - = emqx_parser:parse(<<130,11,0,2,0,6,84,111,112,105,99,65,2>>, Parser). - -parse_unsubscribe(_) -> - Parser = emqx_parser:initial_state(), - %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, - dup = false, - qos = 1, - retain = false}, - variable = #mqtt_packet_unsubscribe{packet_id = 2, - topics = [<<"TopicA">>]}}, <<>>} - = emqx_parser:parse(<<162,10,0,2,0,6,84,111,112,105,99,65>>, Parser). - -parse_pingreq(_) -> - Parser = emqx_parser:initial_state(), - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PINGREQ, - dup = false, - qos = 0, - retain = false}}, <<>>} - = emqx_parser:parse(<>, Parser). - -parse_disconnect(_) -> - Parser = emqx_parser:initial_state(), - %DISCONNECT(Qos=0, Retain=false, Dup=false) - Bin = <<224, 0>>, - {ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT, - dup = false, - qos = 0, - retain = false}}, <<>>} = emqx_parser:parse(Bin, Parser). - -%%-------------------------------------------------------------------- -%% Packet Cases -%%-------------------------------------------------------------------- - -packet_proto_name(_) -> - ?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), - ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)). - -packet_type_name(_) -> - ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), - ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). - -packet_connack_name(_) -> - ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)), - ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)), - ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)), - ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)), - ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)), - ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)). - -packet_format(_) -> - io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), - io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), - io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), - io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), - io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), - io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). - -%%-------------------------------------------------------------------- -%% Message Cases -%%-------------------------------------------------------------------- - -message_make(_) -> - Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertEqual(0, Msg#mqtt_message.qos), - Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>), - ?assert(is_binary(Msg1#mqtt_message.id)), - ?assertEqual(2, Msg1#mqtt_message.qos). - -message_from_packet(_) -> - Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), - ?assertEqual(1, Msg#mqtt_message.qos), - ?assertEqual(10, Msg#mqtt_message.pktid), - ?assertEqual(<<"topic">>, Msg#mqtt_message.topic), - WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true, - will_topic = <<"WillTopic">>, - will_msg = <<"WillMsg">>}), - ?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic), - ?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload), - - Msg2 = emqx_message:from_packet(<<"username">>, <<"clientid">>, - ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)), - ?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from), - io:format("~s", [emqx_message:format(Msg2)]). - -message_flag(_) -> - Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>), - Msg2 = emqx_message:from_packet(<<"clientid">>, Pkt), - Msg3 = emqx_message:set_flag(retain, Msg2), - Msg4 = emqx_message:set_flag(dup, Msg3), - ?assert(Msg4#mqtt_message.dup), - ?assert(Msg4#mqtt_message.retain), - Msg5 = emqx_message:set_flag(Msg4), - Msg6 = emqx_message:unset_flag(dup, Msg5), - Msg7 = emqx_message:unset_flag(retain, Msg6), - ?assertNot(Msg7#mqtt_message.dup), - ?assertNot(Msg7#mqtt_message.retain), - emqx_message:unset_flag(Msg7), - emqx_message:to_packet(Msg7). - diff --git a/test/emqx_protocol_SUITE.erl.bk b/test/emqx_protocol_SUITE.erl.bk new file mode 100644 index 000000000..f2d6d306a --- /dev/null +++ b/test/emqx_protocol_SUITE.erl.bk @@ -0,0 +1,147 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_protocol_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-import(emqx_serializer, [serialize/1]). + +all() -> + [{group, parser}, + {group, serializer}, + {group, packet}, + {group, message}]. + +groups() -> + [{parser, [], + [parse_connect, + parse_bridge, + parse_publish, + parse_puback, + parse_pubrec, + parse_pubrel, + parse_pubcomp, + parse_subscribe, + parse_unsubscribe, + parse_pingreq, + parse_disconnect]}, + {serializer, [], + [serialize_connect, + serialize_connack, + serialize_publish, + serialize_puback, + serialize_pubrel, + serialize_subscribe, + serialize_suback, + serialize_unsubscribe, + serialize_unsuback, + serialize_pingreq, + serialize_pingresp, + serialize_disconnect]}, + {packet, [], + [packet_proto_name, + packet_type_name, + packet_connack_name, + packet_format]}, + {message, [], + [message_make, + message_from_packet, + message_flag]}]. + + + +%%-------------------------------------------------------------------- +%% Packet Cases +%%-------------------------------------------------------------------- + +packet_proto_name(_) -> + ?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), + ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)). + +packet_type_name(_) -> + ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), + ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). + +packet_connack_name(_) -> + ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)), + ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)), + ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)), + ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)), + ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)), + ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)). + +packet_format(_) -> + io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), + io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), + io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), + io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), + io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), + io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), + io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), + io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). + +%%-------------------------------------------------------------------- +%% Message Cases +%%-------------------------------------------------------------------- + +message_make(_) -> + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + ?assertEqual(0, Msg#mqtt_message.qos), + Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>), + ?assert(is_binary(Msg1#mqtt_message.id)), + ?assertEqual(2, Msg1#mqtt_message.qos). + +message_from_packet(_) -> + Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), + ?assertEqual(1, Msg#mqtt_message.qos), + ?assertEqual(10, Msg#mqtt_message.pktid), + ?assertEqual(<<"topic">>, Msg#mqtt_message.topic), + WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true, + will_topic = <<"WillTopic">>, + will_msg = <<"WillMsg">>}), + ?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic), + ?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload), + + Msg2 = emqx_message:from_packet(<<"username">>, <<"clientid">>, + ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)), + ?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from), + io:format("~s", [emqx_message:format(Msg2)]). + +message_flag(_) -> + Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>), + Msg2 = emqx_message:from_packet(<<"clientid">>, Pkt), + Msg3 = emqx_message:set_flag(retain, Msg2), + Msg4 = emqx_message:set_flag(dup, Msg3), + ?assert(Msg4#mqtt_message.dup), + ?assert(Msg4#mqtt_message.retain), + Msg5 = emqx_message:set_flag(Msg4), + Msg6 = emqx_message:unset_flag(dup, Msg5), + Msg7 = emqx_message:unset_flag(retain, Msg6), + ?assertNot(Msg7#mqtt_message.dup), + ?assertNot(Msg7#mqtt_message.retain), + emqx_message:unset_flag(Msg7), + emqx_message:to_packet(Msg7). + diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index c46ae27c0..17da6d97d 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -1,57 +1,44 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_router_SUITE). -include("emqx.hrl"). - -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-compile(nowarn_export_all). -define(R, emqx_router). +-define(TABS, [emqx_route, emqx_trie, emqx_trie_node]). all() -> - [{group, route}, - {group, local_route}]. + [{group, route}]. groups() -> [{route, [sequence], - [t_get_topics, - t_add_del_route, - t_match_route, - t_print, - t_has_route, - t_unused]}, - {local_route, [sequence], - [t_get_local_topics, - t_add_del_local_route, - t_match_local_route]}]. + [add_del_route, + match_routes]}]. init_per_suite(Config) -> - ekka:start(), - ekka_mnesia:ensure_started(), - {ok, _} = emqx_router_sup:start_link(), + emqx_ct_broker_helpers:run_setup_steps(), Config. end_per_suite(_Config) -> - emqx_router:stop(), - ekka:stop(), - ekka_mnesia:ensure_stopped(), - ekka_mnesia:delete_schema(). + emqx_ct_broker_helpers:run_teardown_steps(). init_per_testcase(_TestCase, Config) -> Config. @@ -59,87 +46,49 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -t_get_topics(_) -> - ?R:add_route(<<"a/b/c">>), - ?R:add_route(<<"a/b/c">>), - ?R:add_route(<<"a/+/b">>), +add_del_route(_) -> + From = {self(), make_ref()}, + ?R:add_route(From, <<"a/b/c">>, node()), + ?R:add_route(From, <<"a/b/c">>, node()), + ?R:add_route(From, <<"a/+/b">>, node()), ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), - ?R:del_route(<<"a/b/c">>), - ?R:del_route(<<"a/+/b">>), + ?R:del_route(From, <<"a/b/c">>, node()), + ?R:del_route(From, <<"a/+/b">>, node()), ?assertEqual([], lists:sort(?R:topics())). -t_add_del_route(_) -> - %%Node = node(), - ?R:add_route(<<"a/b/c">>), - ?R:add_route(<<"a/+/b">>), - ?R:del_route(<<"a/b/c">>), - ?R:del_route(<<"a/+/b">>). +match_routes(_) -> + From = {self(), make_ref()}, + ?R:add_route(From, <<"a/b/c">>, node()), + ?R:add_route(From, <<"a/+/c">>, node()), + ?R:add_route(From, <<"a/b/#">>, node()), + ?R:add_route(From, <<"#">>, node()), + ?assertEqual([#route{topic = <<"#">>, dest = node()}, + #route{topic = <<"a/+/c">>, dest = node()}, + #route{topic = <<"a/b/#">>, dest = node()}, + #route{topic = <<"a/b/c">>, dest = node()}], + lists:sort(?R:match_routes(<<"a/b/c">>))). -t_match_route(_) -> - Node = node(), - ?R:add_route(<<"a/b/c">>), - ?R:add_route(<<"a/+/c">>), - ?R:add_route(<<"a/b/#">>), - ?R:add_route(<<"#">>), - ?assertEqual([#route{topic = <<"#">>, node = Node}, - #route{topic = <<"a/+/c">>, node = Node}, - #route{topic = <<"a/b/#">>, node = Node}, - #route{topic = <<"a/b/c">>, node = Node}], - lists:sort(?R:match(<<"a/b/c">>))). - -t_has_route(_) -> - ?R:add_route(<<"devices/+/messages">>), - ?assert(?R:has_route(<<"devices/+/messages">>)). - -t_get_local_topics(_) -> - ?R:add_local_route(<<"a/b/c">>), - ?R:add_local_route(<<"x/+/y">>), - ?R:add_local_route(<<"z/#">>), - ?assertEqual([<<"z/#">>, <<"x/+/y">>, <<"a/b/c">>], ?R:local_topics()), - ?R:del_local_route(<<"x/+/y">>), - ?R:del_local_route(<<"z/#">>), - ?assertEqual([<<"a/b/c">>], ?R:local_topics()). - -t_add_del_local_route(_) -> - Node = node(), - ?R:add_local_route(<<"a/b/c">>), - ?R:add_local_route(<<"x/+/y">>), - ?R:add_local_route(<<"z/#">>), - ?assertEqual([{<<"a/b/c">>, Node}, - {<<"x/+/y">>, Node}, - {<<"z/#">>, Node}], - lists:sort(?R:get_local_routes())), - ?R:del_local_route(<<"x/+/y">>), - ?R:del_local_route(<<"z/#">>), - ?assertEqual([{<<"a/b/c">>, Node}], lists:sort(?R:get_local_routes())). - -t_match_local_route(_) -> - ?R:add_local_route(<<"$SYS/#">>), - ?R:add_local_route(<<"a/b/c">>), - ?R:add_local_route(<<"a/+/c">>), - ?R:add_local_route(<<"a/b/#">>), - ?R:add_local_route(<<"#">>), - Matched = [Topic || #route{topic = {local, Topic}} <- ?R:match_local(<<"a/b/c">>)], - ?assertEqual([<<"#">>, <<"a/+/c">>, <<"a/b/#">>, <<"a/b/c">>], lists:sort(Matched)). +has_routes(_) -> + From = {self(), make_ref()}, + ?R:add_route(From, <<"devices/+/messages">>, node()), + ?assert(?R:has_routes(<<"devices/+/messages">>)). clear_tables() -> - ?R:clean_local_routes(), - lists:foreach(fun mnesia:clear_table/1, [route, trie, trie_node]). + lists:foreach(fun mnesia:clear_table/1, ?TABS). router_add_del(_) -> - %% Add ?R:add_route(<<"#">>), ?R:add_route(<<"a/b/c">>), ?R:add_route(<<"+/#">>), Routes = [R1, R2 | _] = [ - #route{topic = <<"#">>, node = node()}, - #route{topic = <<"+/#">>, node = node()}, - #route{topic = <<"a/b/c">>, node = node()}], - Routes = lists:sort(?R:match(<<"a/b/c">>)), + #route{topic = <<"#">>, dest = node()}, + #route{topic = <<"+/#">>, dest = node()}, + #route{topic = <<"a/b/c">>, dest = node()}], + ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), %% Batch Add lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), - Routes = lists:sort(?R:match(<<"a/b/c">>)), + ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), %% Del ?R:del_route(<<"a/b/c">>), @@ -147,25 +96,10 @@ router_add_del(_) -> {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), %% Batch Del - R3 = #route{topic = <<"#">>, node = 'a@127.0.0.1'}, + R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'}, ?R:add_route(R3), ?R:del_route(R1), ?R:del_route(R2), ?R:del_route(R3), [] = lists:sort(?R:match(<<"a/b/c">>)). -t_print(_) -> - Routes = [#route{topic = <<"a/b/c">>, node = node()}, - #route{topic = <<"#">>, node = node()}, - #route{topic = <<"+/#">>, node = node()}], - lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), - ?R:print(<<"a/b/c">>), - ?R:del_route(<<"+/#">>), - ?R:del_route(<<"a/b/c">>), - ?R:del_route(<<"#">>). - -t_unused(_) -> - gen_server:call(?R, bad_call), - gen_server:cast(?R, bad_msg), - ?R ! bad_info. - diff --git a/test/emqx_time_SUITE.erl b/test/emqx_time_SUITE.erl index 9a0514e74..8ff8a4437 100644 --- a/test/emqx_time_SUITE.erl +++ b/test/emqx_time_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +-compile(nowarn_export_all). all() -> [t_time_now_to]. diff --git a/test/emqx_topic_SUITE.erl b/test/emqx_topic_SUITE.erl index a60921ada..87fb2c906 100644 --- a/test/emqx_topic_SUITE.erl +++ b/test/emqx_topic_SUITE.erl @@ -20,6 +20,7 @@ %% CT -compile(export_all). +-compile(nowarn_export_all). -import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1, words/1, systop/1, feed_var/3, parse/1, parse/2]). diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 87eb52fea..98d14e7e1 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -1,39 +1,40 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_trie_SUITE). -compile(export_all). +-compile(nowarn_export_all). -include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). -define(TRIE, emqx_trie). - --include_lib("eunit/include/eunit.hrl"). +-define(TRIE_TABS, [emqx_trie, emqx_trie_node]). all() -> [t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3]. init_per_suite(Config) -> - ekka_mnesia:ensure_started(), - ?TRIE:mnesia(boot), - ?TRIE:mnesia(copy), + application:load(emqx), + ok = ekka:start(), Config. end_per_suite(_Config) -> + ekka:stop(), ekka_mnesia:ensure_stopped(), ekka_mnesia:delete_schema(). @@ -131,5 +132,5 @@ t_delete3(_) -> end). clear_tables() -> - lists:foreach(fun mnesia:clear_table/1, [emqx_trie, emqx_trie_node]). + lists:foreach(fun mnesia:clear_table/1, ?TRIE_TABS). diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index 07f6a22cc..1f5c4b2b0 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -17,6 +17,7 @@ -module(emqx_vm_SUITE). -compile(export_all). +-compile(nowarn_export_all). -include_lib("common_test/include/ct.hrl").