Upgrade esockd and add more test cases

This commit is contained in:
Feng Lee 2018-05-22 13:01:19 +08:00
parent bf253ab9b3
commit bffdd2ba74
41 changed files with 892 additions and 819 deletions

View File

@ -243,7 +243,7 @@
is_bridge = false :: boolean(), is_bridge = false :: boolean(),
clean_start = true :: boolean(), clean_start = true :: boolean(),
will_flag = false :: boolean(), will_flag = false :: boolean(),
will_qos = ?QOS_1 :: mqtt_qos(), will_qos = ?QOS_0 :: mqtt_qos(),
will_retain = false :: boolean(), will_retain = false :: boolean(),
keepalive = 0 :: non_neg_integer(), keepalive = 0 :: non_neg_integer(),
properties = undefined :: mqtt_properties(), properties = undefined :: mqtt_properties(),
@ -339,7 +339,8 @@
-define(CONNACK_PACKET(ReasonCode), -define(CONNACK_PACKET(ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{reason_code = ReasonCode}}). variable = #mqtt_packet_connack{ack_flags = 0,
reason_code = ReasonCode}}).
-define(CONNACK_PACKET(ReasonCode, SessPresent), -define(CONNACK_PACKET(ReasonCode, SessPresent),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
@ -348,9 +349,9 @@
-define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), -define(CONNACK_PACKET(ReasonCode, SessPresent, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = SessPresent, variable = #mqtt_packet_connack{ack_flags = SessPresent,
reason_code = ReasonCode, reason_code = ReasonCode,
properties = Properties}}). properties = Properties}}).
-define(AUTH_PACKET(), -define(AUTH_PACKET(),
#mqtt_packet{header = #mqtt_packet_header{type = ?AUTH}, #mqtt_packet{header = #mqtt_packet_header{type = ?AUTH},
@ -370,15 +371,16 @@
qos = Qos}, qos = Qos},
variable = #mqtt_packet_publish{packet_id = PacketId}}). variable = #mqtt_packet_publish{packet_id = PacketId}}).
-define(PUBLISH_PACKET(Qos, Topic, PacketId, Payload), -define(PUBLISH_PACKET(QoS, Topic, PacketId, Payload),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = Qos}, qos = QoS},
variable = #mqtt_packet_publish{topic_name = Topic, variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId}, packet_id = PacketId},
payload = Payload}). payload = Payload}).
-define(PUBLISH_PACKET(Header, Topic, PacketId, Properties, Payload), -define(PUBLISH_PACKET(QoS, Topic, PacketId, Properties, Payload),
#mqtt_packet{header = Header = #mqtt_packet_header{type = ?PUBLISH}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = QoS},
variable = #mqtt_packet_publish{topic_name = Topic, variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId, packet_id = PacketId,
properties = Properties}, properties = Properties},
@ -386,7 +388,8 @@
-define(PUBACK_PACKET(PacketId), -define(PUBACK_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
variable = #mqtt_packet_puback{packet_id = PacketId}}). variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
-define(PUBACK_PACKET(PacketId, ReasonCode, Properties), -define(PUBACK_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK},
@ -396,7 +399,8 @@
-define(PUBREC_PACKET(PacketId), -define(PUBREC_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
variable = #mqtt_packet_puback{packet_id = PacketId}}). variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
-define(PUBREC_PACKET(PacketId, ReasonCode, Properties), -define(PUBREC_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC},
@ -406,7 +410,8 @@
-define(PUBREL_PACKET(PacketId), -define(PUBREL_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
variable = #mqtt_packet_puback{packet_id = PacketId}}). variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
-define(PUBREL_PACKET(PacketId, ReasonCode, Properties), -define(PUBREL_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL, qos = ?QOS_1},
@ -416,7 +421,8 @@
-define(PUBCOMP_PACKET(PacketId), -define(PUBCOMP_PACKET(PacketId),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
variable = #mqtt_packet_puback{packet_id = PacketId}}). variable = #mqtt_packet_puback{packet_id = PacketId,
reason_code = 0}}).
-define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties), -define(PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP},
@ -467,7 +473,8 @@
reason_codes = ReasonCodes}}). reason_codes = ReasonCodes}}).
-define(DISCONNECT_PACKET(), -define(DISCONNECT_PACKET(),
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}}). #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},
variable = #mqtt_packet_disconnect{reason_code = 0}}).
-define(DISCONNECT_PACKET(ReasonCode), -define(DISCONNECT_PACKET(ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT}, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT},

View File

@ -114,7 +114,7 @@ tab_key(acl) -> acl_modules.
%% @doc Stop access control server. %% @doc Stop access control server.
stop() -> stop() ->
gen_server:call(?MODULE, stop). gen_server:stop(?MODULE, normal, infinity).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks

View File

@ -22,16 +22,11 @@
-export([start_link/2]). -export([start_link/2]).
-export([subscribe/1, subscribe/2, subscribe/3, unsubscribe/1, unsubscribe/2]). -export([subscribe/1, subscribe/2, subscribe/3, subscribe/4]).
-export([publish/1, publish/2]). -export([publish/1, publish/2]).
-export([unsubscribe/1, unsubscribe/2]).
-export([dispatch/2, dispatch/3]). -export([dispatch/2, dispatch/3]).
-export([subscriptions/1, subscribers/1, subscribed/2]). -export([subscriptions/1, subscribers/1, subscribed/2]).
-export([topics/0]).
-export([get_subopts/2, set_subopts/3]). -export([get_subopts/2, set_subopts/3]).
%% gen_server Function Exports %% gen_server Function Exports
@ -41,7 +36,6 @@
-record(state, {pool, id, submon}). -record(state, {pool, id, submon}).
-define(BROKER, ?MODULE). -define(BROKER, ?MODULE).
-define(TIMEOUT, 120000). -define(TIMEOUT, 120000).
%% ETS tables %% ETS tables
@ -104,7 +98,7 @@ unsubscribe(Topic, Subscriber, Timeout) ->
-spec(publish(message()) -> delivery() | stopped). -spec(publish(message()) -> delivery() | stopped).
publish(Msg = #message{from = From}) -> publish(Msg = #message{from = From}) ->
%% Hook to trace? %% Hook to trace?
trace(public, From, Msg), trace(publish, From, Msg),
case emqx_hooks:run('message.publish', [], Msg) of case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} -> {ok, Msg1 = #message{topic = Topic}} ->
publish(Topic, Msg1); publish(Topic, Msg1);
@ -226,15 +220,20 @@ subscribed(Topic, {SubId, SubPid}) when is_binary(Topic),
is_pid(SubPid) -> is_pid(SubPid) ->
ets:member(?SUBOPTION, {Topic, {SubId, SubPid}}). ets:member(?SUBOPTION, {Topic, {SubId, SubPid}}).
topics() -> emqx_router:topics(). -spec(get_subopts(topic(), subscriber()) -> [suboption()]).
get_subopts(Topic, Subscriber) when is_binary(Topic) -> get_subopts(Topic, Subscriber) when is_binary(Topic) ->
try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)
catch error:badarg -> [] catch error:badarg -> []
end. end.
-spec(set_subopts(topic(), subscriber(), [suboption()]) -> boolean()).
set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) -> set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) ->
gen_server:call(pick(Subscriber), {set_subopts, Topic, Subscriber, Opts}). case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
[{_, OldOpts}] ->
Opts1 = lists:usort(lists:umerge(Opts, OldOpts)),
ets:insert(?SUBOPTION, {{Topic, Subscriber}, Opts1});
[] -> false
end.
with_subpid(SubPid) when is_pid(SubPid) -> with_subpid(SubPid) when is_pid(SubPid) ->
SubPid; SubPid;
@ -267,18 +266,8 @@ init([Pool, Id]) ->
gproc_pool:connect_worker(Pool, {Pool, Id}), gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, submon = emqx_pmon:new()}}. {ok, #state{pool = Pool, id = Id, submon = emqx_pmon:new()}}.
handle_call({set_subopts, Topic, Subscriber, Opts}, _From, State) -> handle_call(Req, _From, State) ->
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of emqx_logger:error("[Broker] Unexpected request: ~p", [Req]),
[{_, OldOpts}] ->
Opts1 = lists:usort(lists:umerge(Opts, OldOpts)),
ets:insert(?SUBOPTION, {{Topic, Subscriber}, Opts1}),
{reply, ok, State};
[] ->
{reply, {error, not_found}, State}
end;
handle_call(Request, _From, State) ->
emqx_logger:error("[Broker] Unexpected request: ~p", [Request]),
{reply, ignore, State}. {reply, ignore, State}.
handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) -> handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) ->

View File

@ -22,11 +22,7 @@
-export([init/1]). -export([init/1]).
-import(lists, [foreach/2]). -define(TAB_OPTS, [public, {read_concurrency, true}, {write_concurrency, true}]).
-define(TAB_OPTS, [public,
{read_concurrency, true},
{write_concurrency, true}]).
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -37,9 +33,9 @@ start_link() ->
init([]) -> init([]) ->
%% Create the pubsub tables %% Create the pubsub tables
foreach(fun create_tab/1, [subscription, subscriber, suboption]), lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]),
%% Shared subscription %% Shared Subscription
SharedSub = {shared_sub, {emqx_shared_sub, start_link, []}, SharedSub = {shared_sub, {emqx_shared_sub, start_link, []},
permanent, 5000, worker, [emqx_shared_sub]}, permanent, 5000, worker, [emqx_shared_sub]},
@ -47,13 +43,12 @@ init([]) ->
Helper = {broker_helper, {emqx_broker_helper, start_link, []}, Helper = {broker_helper, {emqx_broker_helper, start_link, []},
permanent, 5000, worker, [emqx_broker_helper]}, permanent, 5000, worker, [emqx_broker_helper]},
%% Broker pool %% Broker Pool
PoolArgs = [broker, hash, emqx_vm:schedulers() * 2, BrokerPool = emqx_pool_sup:spec(emqx_broker_pool,
{emqx_broker, start_link, []}], [broker, hash, emqx_vm:schedulers() * 2,
{emqx_broker, start_link, []}]),
PoolSup = emqx_pool_sup:spec(eqmx_broker_pool, PoolArgs), {ok, {{one_for_all, 0, 1}, [SharedSub, Helper, BrokerPool]}}.
{ok, {{one_for_all, 0, 1}, [SharedSub, Helper, PoolSup]}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Create tables %% Create tables

View File

@ -32,7 +32,9 @@
-export([pubrel/2, pubrel/3, pubrel/4]). -export([pubrel/2, pubrel/3, pubrel/4]).
-export([pubcomp/2, pubcomp/3, pubcomp/4]). -export([pubcomp/2, pubcomp/3, pubcomp/4]).
-export([subscriptions/1]). -export([subscriptions/1]).
-export([info/1]). -export([info/1, stop/1]).
%% For test cases
-export([pause/1, resume/1]).
-export([initialized/3, waiting_for_connack/3, connected/3]). -export([initialized/3, waiting_for_connack/3, connected/3]).
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
@ -60,12 +62,12 @@
| {will_topic, iodata()} | {will_topic, iodata()}
| {will_payload, iodata()} | {will_payload, iodata()}
| {will_retain, boolean()} | {will_retain, boolean()}
| {will_qos, mqtt_qos() | mqtt_qos_name()} | {will_qos, qos()}
| {will_props, mqtt_properties()} | {will_props, properties()}
| {auto_ack, boolean()} | {auto_ack, boolean()}
| {ack_timeout, pos_integer()} | {ack_timeout, pos_integer()}
| {force_ping, boolean()} | {force_ping, boolean()}
| {properties, mqtt_properties()}). | {properties, properties()}).
-export_type([host/0, option/0]). -export_type([host/0, option/0]).
@ -80,18 +82,17 @@
bridge_mode :: boolean(), bridge_mode :: boolean(),
client_id :: binary(), client_id :: binary(),
clean_start :: boolean(), clean_start :: boolean(),
session_present :: boolean(),
username :: binary() | undefined, username :: binary() | undefined,
password :: binary() | undefined, password :: binary() | undefined,
proto_ver :: mqtt_version(), proto_ver :: mqtt_version(),
proto_name :: iodata(), proto_name :: iodata(),
keepalive :: non_neg_integer(), keepalive :: non_neg_integer(),
keepalive_timer :: reference() | undefined, keepalive_timer :: reference() | undefined,
expiry_interval :: pos_integer(),
force_ping :: boolean(), force_ping :: boolean(),
paused :: boolean(),
will_flag :: boolean(), will_flag :: boolean(),
will_msg :: mqtt_message(), will_msg :: mqtt_message(),
properties :: mqtt_properties(), properties :: properties(),
pending_calls :: list(), pending_calls :: list(),
subscriptions :: map(), subscriptions :: map(),
max_inflight :: infinity | pos_integer(), max_inflight :: infinity | pos_integer(),
@ -102,8 +103,9 @@
ack_timer :: reference(), ack_timer :: reference(),
retry_interval :: pos_integer(), retry_interval :: pos_integer(),
retry_timer :: reference(), retry_timer :: reference(),
last_packet_id :: mqtt_packet_id(), session_present :: boolean(),
parse_state :: emqx_parser:state()}). last_packet_id :: packet_id(),
parse_state :: emqx_frame:state()}).
-record(call, {id, from, req, ts}). -record(call, {id, from, req, ts}).
@ -254,7 +256,6 @@ publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) ->
publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) -> publish(Client, Topic, Payload, Opts) when is_binary(Topic), is_list(Opts) ->
publish(Client, Topic, #{}, Payload, Opts). publish(Client, Topic, #{}, Payload, Opts).
%% MQTT Version 5.0
-spec(publish(client(), topic(), properties(), payload(), [pubopt()]) -spec(publish(client(), topic(), properties(), payload(), [pubopt()])
-> ok | {ok, packet_id()} | {error, term()}). -> ok | {ok, packet_id()} | {error, term()}).
publish(Client, Topic, Properties, Payload, Opts) publish(Client, Topic, Properties, Payload, Opts)
@ -262,9 +263,9 @@ publish(Client, Topic, Properties, Payload, Opts)
ok = emqx_mqtt_properties:validate(Properties), ok = emqx_mqtt_properties:validate(Properties),
Retain = proplists:get_bool(retain, Opts), Retain = proplists:get_bool(retain, Opts),
QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)),
publish(Client, #mqtt_message{topic = Topic, publish(Client, #mqtt_message{qos = QoS,
qos = QoS,
retain = Retain, retain = Retain,
topic = Topic,
properties = Properties, properties = Properties,
payload = iolist_to_binary(Payload)}). payload = iolist_to_binary(Payload)}).
@ -279,7 +280,6 @@ unsubscribe(Client, Topic) when is_binary(Topic) ->
unsubscribe(Client, Topics) when is_list(Topics) -> unsubscribe(Client, Topics) when is_list(Topics) ->
unsubscribe(Client, #{}, Topics). unsubscribe(Client, #{}, Topics).
%% MQTT Version 5.0
-spec(unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret()). -spec(unsubscribe(client(), properties(), topic() | [topic()]) -> subscribe_ret()).
unsubscribe(Client, Properties, Topic) when is_map(Properties), is_binary(Topic) -> unsubscribe(Client, Properties, Topic) when is_map(Properties), is_binary(Topic) ->
unsubscribe(Client, Properties, [Topic]); unsubscribe(Client, Properties, [Topic]);
@ -303,47 +303,43 @@ disconnect(Client, ReasonCode, Properties) ->
gen_statem:call(Client, {disconnect, ReasonCode, Properties}). gen_statem:call(Client, {disconnect, ReasonCode, Properties}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For test cases. %% For test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
puback(Client, PacketId) when is_integer(PacketId) -> puback(Client, PacketId) when is_integer(PacketId) ->
puback(Client, PacketId, ?RC_SUCCESS). puback(Client, PacketId, ?RC_SUCCESS).
puback(Client, PacketId, ReasonCode) when is_integer(PacketId), puback(Client, PacketId, ReasonCode)
is_integer(ReasonCode) -> when is_integer(PacketId), is_integer(ReasonCode) ->
puback(Client, PacketId, ReasonCode, #{}). puback(Client, PacketId, ReasonCode, #{}).
puback(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), puback(Client, PacketId, ReasonCode, Properties)
is_integer(ReasonCode), when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
is_map(Properties) ->
gen_statem:cast(Client, {puback, PacketId, ReasonCode, Properties}). gen_statem:cast(Client, {puback, PacketId, ReasonCode, Properties}).
pubrec(Client, PacketId) when is_integer(PacketId) -> pubrec(Client, PacketId) when is_integer(PacketId) ->
pubrec(Client, PacketId, ?RC_SUCCESS). pubrec(Client, PacketId, ?RC_SUCCESS).
pubrec(Client, PacketId, ReasonCode) when is_integer(PacketId), pubrec(Client, PacketId, ReasonCode)
is_integer(ReasonCode) -> when is_integer(PacketId), is_integer(ReasonCode) ->
pubrec(Client, PacketId, ReasonCode, #{}). pubrec(Client, PacketId, ReasonCode, #{}).
pubrec(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), pubrec(Client, PacketId, ReasonCode, Properties)
is_integer(ReasonCode), when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
is_map(Properties) ->
gen_statem:cast(Client, {pubrec, PacketId, ReasonCode, Properties}). gen_statem:cast(Client, {pubrec, PacketId, ReasonCode, Properties}).
pubrel(Client, PacketId) when is_integer(PacketId) -> pubrel(Client, PacketId) when is_integer(PacketId) ->
pubrel(Client, PacketId, ?RC_SUCCESS). pubrel(Client, PacketId, ?RC_SUCCESS).
pubrel(Client, PacketId, ReasonCode) when is_integer(PacketId), pubrel(Client, PacketId, ReasonCode)
is_integer(ReasonCode) -> when is_integer(PacketId), is_integer(ReasonCode) ->
pubrel(Client, PacketId, ReasonCode, #{}). pubrel(Client, PacketId, ReasonCode, #{}).
pubrel(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), pubrel(Client, PacketId, ReasonCode, Properties)
is_integer(ReasonCode), when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
is_map(Properties) ->
gen_statem:cast(Client, {pubrel, PacketId, ReasonCode, Properties}). gen_statem:cast(Client, {pubrel, PacketId, ReasonCode, Properties}).
pubcomp(Client, PacketId) when is_integer(PacketId) -> pubcomp(Client, PacketId) when is_integer(PacketId) ->
pubcomp(Client, PacketId, ?RC_SUCCESS). pubcomp(Client, PacketId, ?RC_SUCCESS).
pubcomp(Client, PacketId, ReasonCode) when is_integer(PacketId), pubcomp(Client, PacketId, ReasonCode)
is_integer(ReasonCode) -> when is_integer(PacketId), is_integer(ReasonCode) ->
pubcomp(Client, PacketId, ReasonCode, #{}). pubcomp(Client, PacketId, ReasonCode, #{}).
pubcomp(Client, PacketId, ReasonCode, Properties) when is_integer(PacketId), pubcomp(Client, PacketId, ReasonCode, Properties)
is_integer(ReasonCode), when is_integer(PacketId), is_integer(ReasonCode), is_map(Properties) ->
is_map(Properties) ->
gen_statem:cast(Client, {pubcomp, PacketId, ReasonCode, Properties}). gen_statem:cast(Client, {pubcomp, PacketId, ReasonCode, Properties}).
subscriptions(Client) -> subscriptions(Client) ->
@ -352,6 +348,15 @@ subscriptions(Client) ->
info(Client) -> info(Client) ->
gen_statem:call(Client, info). gen_statem:call(Client, info).
stop(Client) ->
gen_statem:call(Client, stop).
pause(Client) ->
gen_statem:call(Client, pause).
resume(Client) ->
gen_statem:call(Client, resume).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_statem callbacks %% gen_statem callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -375,6 +380,7 @@ init([Options]) ->
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
keepalive = ?DEFAULT_KEEPALIVE, keepalive = ?DEFAULT_KEEPALIVE,
force_ping = false, force_ping = false,
paused = false,
will_flag = false, will_flag = false,
will_msg = #mqtt_message{}, will_msg = #mqtt_message{},
pending_calls = [], pending_calls = [],
@ -496,8 +502,8 @@ init_will_msg({qos, QoS}, WillMsg) ->
init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) -> init_parse_state(State = #state{proto_ver = Ver, properties = Properties}) ->
Size = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE), Size = maps:get('Maximum-Packet-Size', Properties, ?MAX_PACKET_SIZE),
State#state{parse_state = emqx_parser:initial_state([{max_len, Size}, State#state{parse_state = emqx_frame:initial_state(
{version, Ver}])}. #{max_packet_size => Size, version => Ver})}.
callback_mode() -> state_functions. callback_mode() -> state_functions.
@ -519,17 +525,17 @@ initialized({call, From}, connect, State = #state{sock_opts = SockOpts,
initialized(EventType, EventContent, State) -> initialized(EventType, EventContent, State) ->
handle_event(EventType, EventContent, initialized, State). handle_event(EventType, EventContent, initialized, State).
mqtt_connect(State = #state{client_id = ClientId, mqtt_connect(State = #state{client_id = ClientId,
clean_start = CleanStart, clean_start = CleanStart,
bridge_mode = IsBridge, bridge_mode = IsBridge,
username = Username, username = Username,
password = Password, password = Password,
proto_ver = ProtoVer, proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
keepalive = KeepAlive, keepalive = KeepAlive,
will_flag = WillFlag, will_flag = WillFlag,
will_msg = WillMsg, will_msg = WillMsg,
properties = Properties}) -> properties = Properties}) ->
?WILL_MSG(WillQos, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, ?WILL_MSG(WillQos, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
ConnProps = emqx_mqtt_properties:filter(?CONNECT, maps:to_list(Properties)), ConnProps = emqx_mqtt_properties:filter(?CONNECT, maps:to_list(Properties)),
send(?CONNECT_PACKET( send(?CONNECT_PACKET(
@ -597,6 +603,15 @@ connected({call, From}, info, State) ->
Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))), Info = lists:zip(record_info(fields, state), tl(tuple_to_list(State))),
{keep_state, State, [{reply, From, Info}]}; {keep_state, State, [{reply, From, Info}]};
connected({call, From}, pause, State) ->
{keep_state, State#state{paused = true}, [{reply, From, ok}]};
connected({call, From}, resume, State) ->
{keep_state, State#state{paused = false}, [{reply, From, ok}]};
connected({call, From}, stop, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]};
connected({call, From}, SubReq = {subscribe, Properties, Topics}, connected({call, From}, SubReq = {subscribe, Properties, Topics},
State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
@ -622,14 +637,14 @@ connected({call, From}, {publish, Msg = #mqtt_message{qos = ?QOS_0}}, State) ->
connected({call, From}, {publish, Msg = #mqtt_message{qos = Qos}}, connected({call, From}, {publish, Msg = #mqtt_message{qos = Qos}},
State = #state{inflight = Inflight, last_packet_id = PacketId}) State = #state{inflight = Inflight, last_packet_id = PacketId})
when (Qos =:= ?QOS_1); (Qos =:= ?QOS_2) -> when (Qos =:= ?QOS_1); (Qos =:= ?QOS_2) ->
case Inflight:is_full() of case emqx_inflight:is_full(Inflight) of
true -> true ->
{keep_state, State, [{reply, From, {error, inflight_full}}]}; {keep_state, State, [{reply, From, {error, inflight_full}}]};
false -> false ->
Msg1 = Msg#mqtt_message{packet_id = PacketId}, Msg1 = Msg#mqtt_message{packet_id = PacketId},
case send(Msg1, State) of case send(Msg1, State) of
{ok, NewState} -> {ok, NewState} ->
Inflight1 = Inflight:insert(PacketId, {publish, Msg1, os:timestamp()}), Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
{keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}), {keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}),
[{reply, From, {ok, PacketId}}]}; [{reply, From, {ok, PacketId}}]};
Error = {error, Reason} -> Error = {error, Reason} ->
@ -679,8 +694,12 @@ connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
{keep_state, deliver_msg(packet_to_msg(Packet), State)}; {keep_state, deliver_msg(packet_to_msg(Packet), State)};
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) ->
{keep_state, State};
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId),
State = #state{auto_ack = AutoAck}) -> State = #state{auto_ack = AutoAck}) ->
_ = deliver_msg(packet_to_msg(Packet), State), _ = deliver_msg(packet_to_msg(Packet), State),
case AutoAck of case AutoAck of
true -> send_puback(?PUBACK_PACKET(PacketId), State); true -> send_puback(?PUBACK_PACKET(PacketId), State);
@ -698,12 +717,12 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties),
State = #state{owner = Owner, inflight = Inflight}) -> State = #state{owner = Owner, inflight = Inflight}) ->
case Inflight:lookup(PacketId) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, #mqtt_message{packet_id = PacketId}, _Ts}} -> {value, {publish, #mqtt_message{packet_id = PacketId}, _Ts}} ->
Owner ! {puback, #{packet_id => PacketId, Owner ! {puback, #{packet_id => PacketId,
reason_code => ReasonCode, reason_code => ReasonCode,
properties => Properties}}, properties => Properties}},
{keep_state, State#state{inflight = Inflight:delete(PacketId)}}; {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
none -> none ->
emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]), emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]),
{keep_state, State} {keep_state, State}
@ -711,9 +730,9 @@ connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties),
connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) -> connected(cast, ?PUBREC_PACKET(PacketId), State = #state{inflight = Inflight}) ->
send_puback(?PUBREL_PACKET(PacketId), send_puback(?PUBREL_PACKET(PacketId),
case Inflight:lookup(PacketId) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, _Msg, _Ts}} -> {value, {publish, _Msg, _Ts}} ->
Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}), Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight),
State#state{inflight = Inflight1}; State#state{inflight = Inflight1};
{value, {pubrel, _Ref, _Ts}} -> {value, {pubrel, _Ref, _Ts}} ->
emqx_logger:warning("Duplicated PUBREC Packet: ~p", [PacketId]), emqx_logger:warning("Duplicated PUBREC Packet: ~p", [PacketId]),
@ -741,12 +760,12 @@ connected(cast, ?PUBREL_PACKET(PacketId),
connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties),
State = #state{owner = Owner, inflight = Inflight}) -> State = #state{owner = Owner, inflight = Inflight}) ->
case Inflight:lookup(PacketId) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _PacketId, _Ts}} -> {value, {pubrel, _PacketId, _Ts}} ->
Owner ! {puback, #{packet_id => PacketId, Owner ! {puback, #{packet_id => PacketId,
reason_code => ReasonCode, reason_code => ReasonCode,
properties => Properties}}, properties => Properties}},
{keep_state, State#state{inflight = Inflight:delete(PacketId)}}; {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}};
none -> none ->
emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]), emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]),
{keep_state, State} {keep_state, State}
@ -797,8 +816,8 @@ connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true})
end; end;
connected(info, {timeout, TRef, keepalive}, connected(info, {timeout, TRef, keepalive},
State = #state{socket = Sock, keepalive_timer = TRef}) -> State = #state{socket = Sock, paused = Paused, keepalive_timer = TRef}) ->
case should_ping(Sock) of case (not Paused) andalso should_ping(Sock) of
true -> true ->
case send(?PACKET(?PINGREQ), State) of case send(?PACKET(?PINGREQ), State) of
{ok, NewState} -> {ok, NewState} ->
@ -820,7 +839,7 @@ connected(info, {timeout, TRef, ack}, State = #state{ack_timer = TRef,
connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef, connected(info, {timeout, TRef, retry}, State = #state{retry_timer = TRef,
inflight = Inflight}) -> inflight = Inflight}) ->
case Inflight:is_empty() of case emqx_inflight:is_empty(Inflight) of
true -> {keep_state, State#state{retry_timer = undefined}}; true -> {keep_state, State#state{retry_timer = undefined}};
false -> retry_send(State) false -> retry_send(State)
end; end;
@ -928,7 +947,7 @@ ensure_retry_timer(_Interval, State) ->
retry_send(State = #state{inflight = Inflight}) -> retry_send(State = #state{inflight = Inflight}) ->
SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end, SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end,
Msgs = lists:sort(SortFun, Inflight:values()), Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)),
retry_send(Msgs, os:timestamp(), State ). retry_send(Msgs, os:timestamp(), State ).
retry_send([], _Now, State) -> retry_send([], _Now, State) ->
@ -948,7 +967,7 @@ retry_send(publish, Msg = #mqtt_message{qos = QoS, packet_id = PacketId},
Msg1 = Msg#mqtt_message{dup = (QoS =:= ?QOS1)}, Msg1 = Msg#mqtt_message{dup = (QoS =:= ?QOS1)},
case send(Msg1, State) of case send(Msg1, State) of
{ok, NewState} -> {ok, NewState} ->
Inflight1 = Inflight:update(PacketId, {publish, Msg1, Now}), Inflight1 = emqx_inflight:update(PacketId, {publish, Msg1, Now}, Inflight),
{ok, NewState#state{inflight = Inflight1}}; {ok, NewState#state{inflight = Inflight1}};
Error = {error, _Reason} -> Error = {error, _Reason} ->
Error Error
@ -956,7 +975,7 @@ retry_send(publish, Msg = #mqtt_message{qos = QoS, packet_id = PacketId},
retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
case send(?PUBREL_PACKET(PacketId), State) of case send(?PUBREL_PACKET(PacketId), State) of
{ok, NewState} -> {ok, NewState} ->
Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}), Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight),
{ok, NewState#state{inflight = Inflight1}}; {ok, NewState#state{inflight = Inflight1}};
Error = {error, _Reason} -> Error = {error, _Reason} ->
Error Error
@ -1028,7 +1047,7 @@ send(Msg, State) when is_record(Msg, mqtt_message) ->
send(Packet, State = #state{socket = Sock, proto_ver = Ver}) send(Packet, State = #state{socket = Sock, proto_ver = Ver})
when is_record(Packet, mqtt_packet) -> when is_record(Packet, mqtt_packet) ->
Data = emqx_serializer:serialize(Packet, [{version, Ver}]), Data = emqx_frame:serialize(Packet, #{version => Ver}),
emqx_logger:debug("SEND Data: ~p", [Data]), emqx_logger:debug("SEND Data: ~p", [Data]),
case emqx_client_sock:send(Sock, Data) of case emqx_client_sock:send(Sock, Data) of
ok -> {ok, next_packet_id(State)}; ok -> {ok, next_packet_id(State)};
@ -1045,7 +1064,7 @@ receive_loop(<<>>, State) ->
{keep_state, State}; {keep_state, State};
receive_loop(Bytes, State = #state{parse_state = ParseState}) -> receive_loop(Bytes, State = #state{parse_state = ParseState}) ->
case catch emqx_parser:parse(Bytes, ParseState) of case catch emqx_frame:parse(Bytes, ParseState) of
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
ok = gen_statem:cast(self(), Packet), ok = gen_statem:cast(self(), Packet),
receive_loop(Rest, init_parse_state(State)); receive_loop(Rest, init_parse_state(State));

View File

@ -27,6 +27,8 @@
-export([get_env/1, get_env/2]). -export([get_env/1, get_env/2]).
-export([populate/1]).
-export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]). -export([read/1, write/2, dump/2, reload/1, get/2, get/3, set/3]).
-type(env() :: {atom(), term()}). -type(env() :: {atom(), term()}).
@ -42,6 +44,10 @@ get_env(Key, Default) ->
get_env(Key) -> get_env(Key) ->
application:get_env(?APP, Key). application:get_env(?APP, Key).
%% TODO:
populate(_App) ->
ok.
%% @doc Read the configuration of an application. %% @doc Read the configuration of an application.
-spec(read(atom()) -> {ok, list(env())} | {error, term()}). -spec(read(atom()) -> {ok, list(env())} | {error, term()}).
read(App) -> read(App) ->

View File

@ -27,7 +27,7 @@
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
%% API Function Exports %% API Function Exports
-export([start_link/2]). -export([start_link/3]).
%% Management and Monitor API %% Management and Monitor API
-export([info/1, stats/1, kick/1, clean_acl_cache/2]). -export([info/1, stats/1, kick/1, clean_acl_cache/2]).
@ -44,11 +44,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]). code_change/3, terminate/2]).
%% TODO: How to emit stats?
-export([handle_pre_hibernate/1]).
%% Unused fields: connname, peerhost, peerport %% Unused fields: connname, peerhost, peerport
-record(state, {connection, peername, conn_state, await_recv, -record(state, {transport, socket, peername, conn_state, await_recv,
rate_limit, max_packet_size, proto_state, parse_state, rate_limit, max_packet_size, proto_state, parse_state,
keepalive, enable_stats, idle_timeout, force_gc_count}). keepalive, enable_stats, idle_timeout, force_gc_count}).
@ -60,8 +57,8 @@
emqx_logger:Level("Client(~s): " ++ Format, emqx_logger:Level("Client(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])). [esockd_net:format(State#state.peername) | Args])).
start_link(Conn, Env) -> start_link(Transport, Sock, Env) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, Env]])}.
info(CPid) -> info(CPid) ->
gen_server:call(CPid, info). gen_server:call(CPid, info).
@ -72,11 +69,11 @@ stats(CPid) ->
kick(CPid) -> kick(CPid) ->
gen_server:call(CPid, kick). gen_server:call(CPid, kick).
set_rate_limit(Cpid, Rl) -> set_rate_limit(CPid, Rl) ->
gen_server:call(Cpid, {set_rate_limit, Rl}). gen_server:call(CPid, {set_rate_limit, Rl}).
get_rate_limit(Cpid) -> get_rate_limit(CPid) ->
gen_server:call(Cpid, get_rate_limit). gen_server:call(CPid, get_rate_limit).
subscribe(CPid, TopicTable) -> subscribe(CPid, TopicTable) ->
CPid ! {subscribe, TopicTable}. CPid ! {subscribe, TopicTable}.
@ -94,26 +91,25 @@ clean_acl_cache(CPid, Topic) ->
%% gen_server Callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Conn0, Env]) -> init([Transport, Sock, Env]) ->
{ok, Conn} = Conn0:wait(), case Transport:wait(Sock) of
case Conn:peername() of {ok, NewSock} ->
{ok, Peername} -> do_init(Conn, Env, Peername); {ok, Peername} = Transport:ensure_ok_or_exit(peername, [NewSock]),
{error, enotconn} -> Conn:fast_close(), do_init(Transport, Sock, Peername, Env);
exit(normal); {error, Reason} ->
{error, Reason} -> Conn:fast_close(), {stop, Reason}
exit({shutdown, Reason})
end. end.
do_init(Conn, Env, Peername) -> do_init(Transport, Sock, Peername, Env) ->
%% Send Fun RateLimit = get_value(rate_limit, Env),
SendFun = send_fun(Conn, Peername),
RateLimit = get_value(rate_limit, Conn:opts()),
PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE),
ProtoState = emqx_protocol:init(Conn, Peername, SendFun, Env), SendFun = send_fun(Transport, Sock, Peername),
ProtoState = emqx_protocol:init(Transport, Sock, Peername, SendFun, Env),
EnableStats = get_value(client_enable_stats, Env, false), EnableStats = get_value(client_enable_stats, Env, false),
IdleTimout = get_value(client_idle_timeout, Env, 30000), IdleTimout = get_value(client_idle_timeout, Env, 30000),
ForceGcCount = emqx_gc:conn_max_gc_count(), ForceGcCount = emqx_gc:conn_max_gc_count(),
State = run_socket(#state{connection = Conn, State = run_socket(#state{transport = Transport,
socket = Sock,
peername = Peername, peername = Peername,
await_recv = false, await_recv = false,
conn_state = running, conn_state = running,
@ -123,18 +119,17 @@ do_init(Conn, Env, Peername) ->
enable_stats = EnableStats, enable_stats = EnableStats,
idle_timeout = IdleTimout, idle_timeout = IdleTimout,
force_gc_count = ForceGcCount}), force_gc_count = ForceGcCount}),
gen_server:enter_loop(?MODULE, [{hibernate_after, 10000}], gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}],
init_parse_state(State), self(), IdleTimout). init_parse_state(State), self(), IdleTimout).
send_fun(Conn, Peername) -> send_fun(Transport, Sock, Peername) ->
Self = self(), Self = self(),
fun(Packet) -> fun(Packet) ->
Data = emqx_serializer:serialize(Packet), Data = emqx_frame:serialize(Packet),
?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}), ?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}),
emqx_metrics:inc('bytes/sent', iolist_size(Data)), emqx_metrics:inc('bytes/sent', iolist_size(Data)),
try Conn:async_send(Data) of try Transport:async_send(Sock, Data) of
ok -> ok; ok -> ok;
true -> ok; %% Compatible with esockd 4.x
{error, Reason} -> Self ! {shutdown, Reason} {error, Reason} -> Self ! {shutdown, Reason}
catch catch
error:Error -> Self ! {shutdown, Error} error:Error -> Self ! {shutdown, Error}
@ -142,12 +137,9 @@ send_fun(Conn, Peername) ->
end. end.
init_parse_state(State = #state{max_packet_size = Size, proto_state = ProtoState}) -> init_parse_state(State = #state{max_packet_size = Size, proto_state = ProtoState}) ->
emqx_parser:initial_state([{max_len, Size}, Version = emqx_protocol:get(proto_ver, ProtoState),
{ver, emqx_protocol:get(proto_ver, ProtoState)}]), State#state{parse_state = emqx_frame:initial_state(
State. #{max_packet_size => Size, version => Version})}.
handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
handle_call(info, From, State = #state{proto_state = ProtoState}) -> handle_call(info, From, State = #state{proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState), ProtoInfo = emqx_protocol:info(ProtoState),
@ -252,12 +244,13 @@ handle_info({inet_reply, _Sock, ok}, State) ->
handle_info({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
shutdown(Reason, State); shutdown(Reason, State);
handle_info({keepalive, start, Interval}, State = #state{connection = Conn}) -> handle_info({keepalive, start, Interval},
State = #state{transport = Transport, socket = Sock}) ->
?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
StatFun = fun() -> StatFun = fun() ->
case Conn:getstat([recv_oct]) of case Transport:getstat(Sock, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error} Error -> Error
end end
end, end,
case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of
@ -284,12 +277,13 @@ handle_info(Info, State) ->
?LOG(error, "Unexpected Info: ~p", [Info], State), ?LOG(error, "Unexpected Info: ~p", [Info], State),
{noreply, State}. {noreply, State}.
terminate(Reason, State = #state{connection = Conn, terminate(Reason, State = #state{transport = Transport,
socket = Sock,
keepalive = KeepAlive, keepalive = KeepAlive,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
?LOG(debug, "Terminated for ~p", [Reason], State), ?LOG(debug, "Terminated for ~p", [Reason], State),
Conn:fast_close(), Transport:fast_close(Sock),
emqx_keepalive:cancel(KeepAlive), emqx_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of case {ProtoState, Reason} of
{undefined, _} -> {undefined, _} ->
@ -314,7 +308,7 @@ received(<<>>, State) ->
received(Bytes, State = #state{parse_state = ParseState, received(Bytes, State = #state{parse_state = ParseState,
proto_state = ProtoState, proto_state = ProtoState,
idle_timeout = IdleTimeout}) -> idle_timeout = IdleTimeout}) ->
case catch emqx_parser:parse(Bytes, ParseState) of case catch emqx_frame:parse(Bytes, ParseState) of
{more, NewParseState} -> {more, NewParseState} ->
{noreply, State#state{parse_state = NewParseState}, IdleTimeout}; {noreply, State#state{parse_state = NewParseState}, IdleTimeout};
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
@ -355,8 +349,8 @@ run_socket(State = #state{conn_state = blocked}) ->
State; State;
run_socket(State = #state{await_recv = true}) -> run_socket(State = #state{await_recv = true}) ->
State; State;
run_socket(State = #state{connection = Conn}) -> run_socket(State = #state{transport = Transport, socket = Sock}) ->
Conn:async_recv(0, infinity), Transport:async_recv(Sock, 0, infinity),
State#state{await_recv = true}. State#state{await_recv = true}.
with_proto(Fun, State = #state{proto_state = ProtoState}) -> with_proto(Fun, State = #state{proto_state = ProtoState}) ->
@ -375,8 +369,11 @@ emit_stats(ClientId, State) ->
emqx_cm:set_client_stats(ClientId, Stats), emqx_cm:set_client_stats(ClientId, Stats),
State. State.
sock_stats(#state{connection = Conn}) -> sock_stats(#state{transport = Transport, socket = Sock}) ->
case Conn:getstat(?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end. case Transport:getstat(Sock, ?SOCK_STATS) of
{ok, Ss} -> Ss;
_Error -> []
end.
reply(Reply, State) -> reply(Reply, State) ->
{reply, Reply, State, hibernate}. {reply, Reply, State, hibernate}.
@ -387,7 +384,7 @@ shutdown(Reason, State) ->
stop(Reason, State) -> stop(Reason, State) ->
{stop, Reason, State}. {stop, Reason, State}.
gc(State = #state{connection = Conn}) -> gc(State = #state{transport = Transport, socket = Sock}) ->
Cb = fun() -> Conn:gc(), emit_stats(State) end, Cb = fun() -> Transport:gc(Sock), emit_stats(State) end,
emqx_gc:maybe_force_gc(#state.force_gc_count, State, Cb). emqx_gc:maybe_force_gc(#state.force_gc_count, State, Cb).

View File

@ -40,24 +40,21 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Register a command
-spec(register_command(cmd(), {module(), atom()}) -> ok). -spec(register_command(cmd(), {module(), atom()}) -> ok).
register_command(Cmd, MF) when is_atom(Cmd) -> register_command(Cmd, MF) when is_atom(Cmd) ->
register_command(Cmd, MF, []). register_command(Cmd, MF, []).
%% @doc Register a command with options
-spec(register_command(cmd(), {module(), atom()}, list()) -> ok). -spec(register_command(cmd(), {module(), atom()}, list()) -> ok).
register_command(Cmd, MF, Opts) when is_atom(Cmd) -> register_command(Cmd, MF, Opts) when is_atom(Cmd) ->
cast({register_command, Cmd, MF, Opts}). cast({register_command, Cmd, MF, Opts}).
%% @doc Unregister a command
-spec(unregister_command(cmd()) -> ok). -spec(unregister_command(cmd()) -> ok).
unregister_command(Cmd) when is_atom(Cmd) -> unregister_command(Cmd) when is_atom(Cmd) ->
cast({unregister_command, Cmd}). cast({unregister_command, Cmd}).
cast(Msg) -> gen_server:cast(?SERVER, Msg). cast(Msg) ->
gen_server:cast(?SERVER, Msg).
%% @doc Run a command
-spec(run_command(cmd(), [string()]) -> ok | {error, term()}). -spec(run_command(cmd(), [string()]) -> ok | {error, term()}).
run_command(help, []) -> run_command(help, []) ->
usage(); usage();
@ -68,7 +65,7 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
_ -> ok _ -> ok
catch catch
_:Reason -> _:Reason ->
emqx_logger:error("[CTL] Cmd error:~p, stacktrace:~p", emqx_logger:error("[CTL] CMD Error:~p, Stacktrace:~p",
[Reason, erlang:get_stacktrace()]), [Reason, erlang:get_stacktrace()]),
{error, Reason} {error, Reason}
end; end;
@ -76,7 +73,6 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
usage(), {error, cmd_not_found} usage(), {error, cmd_not_found}
end. end.
%% @doc Lookup a command
-spec(lookup_command(cmd()) -> [{module(), atom()}]). -spec(lookup_command(cmd()) -> [{module(), atom()}]).
lookup_command(Cmd) when is_atom(Cmd) -> lookup_command(Cmd) when is_atom(Cmd) ->
case ets:match(?TAB, {{'_', Cmd}, '$1', '_'}) of case ets:match(?TAB, {{'_', Cmd}, '$1', '_'}) of
@ -84,7 +80,6 @@ lookup_command(Cmd) when is_atom(Cmd) ->
[] -> [] [] -> []
end. end.
%% @doc Usage
usage() -> usage() ->
io:format("Usage: ~s~n", [?MODULE]), io:format("Usage: ~s~n", [?MODULE]),
[begin io:format("~80..-s~n", [""]), Mod:Cmd(usage) end [begin io:format("~80..-s~n", [""]), Mod:Cmd(usage) end

View File

@ -167,6 +167,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
?QOS_0 -> {undefined, Rest}; ?QOS_0 -> {undefined, Rest};
_ -> parse_packet_id(Rest) _ -> parse_packet_id(Rest)
end, end,
io:format("Rest1: ~p~n", [Rest1]),
{Properties, Payload} = parse_properties(Rest1, Ver), {Properties, Payload} = parse_properties(Rest1, Ver),
{#mqtt_packet_publish{topic_name = TopicName, {#mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId, packet_id = PacketId,
@ -577,8 +578,10 @@ serialize_property('Shared-Subscription-Available', Val) ->
<<16#2A, Val>>. <<16#2A, Val>>.
serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) ->
<< <<(serialize_utf8_string(Topic))/binary, (serialize_subopts(SubOpts)) >> << <<(serialize_utf8_string(Topic))/binary,
|| {Topic, SubOpts} <- TopicFilters >>; ?RESERVED:2, Rh:2, (flag(Rap)):1,(flag(Nl)):1, QoS:2 >>
|| {Topic, #mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}}
<- TopicFilters >>;
serialize_topic_filters(subscribe, TopicFilters, _Ver) -> serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
<< <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>> << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>>
@ -587,9 +590,6 @@ serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
serialize_topic_filters(unsubscribe, TopicFilters, _Ver) -> serialize_topic_filters(unsubscribe, TopicFilters, _Ver) ->
<< <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>. << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>.
serialize_subopts(#mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}) ->
<<?RESERVED:2, Rh:2, (flag(Rap)):1, (flag(Nl)):1, QoS:2>>.
serialize_reason_codes(undefined) -> serialize_reason_codes(undefined) ->
<<>>; <<>>;
serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) -> serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) ->

View File

@ -19,72 +19,72 @@
-export([new/1, contain/2, lookup/2, insert/3, update/3, delete/2, values/1, -export([new/1, contain/2, lookup/2, insert/3, update/3, delete/2, values/1,
to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]). to_list/1, size/1, max_size/1, is_full/1, is_empty/1, window/1]).
-type(inflight() :: {?MODULE, list()}). -type(inflight() :: {max_size, gb_trees:tree()}).
-export_type([inflight/0]). -export_type([inflight/0]).
-spec(new(non_neg_integer()) -> inflight()). -spec(new(non_neg_integer()) -> inflight()).
new(MaxSize) when MaxSize >= 0 -> new(MaxSize) when MaxSize >= 0 ->
{?MODULE, [MaxSize, gb_trees:empty()]}. {MaxSize, gb_trees:empty()}.
-spec(contain(Key :: any(), inflight()) -> boolean()). -spec(contain(Key :: term(), inflight()) -> boolean()).
contain(Key, {?MODULE, [_MaxSize, Tree]}) -> contain(Key, {_MaxSize, Tree}) ->
gb_trees:is_defined(Key, Tree). gb_trees:is_defined(Key, Tree).
-spec(lookup(Key :: any(), inflight()) -> {value, any()} | none). -spec(lookup(Key :: term(), inflight()) -> {value, term()} | none).
lookup(Key, {?MODULE, [_MaxSize, Tree]}) -> lookup(Key, {_MaxSize, Tree}) ->
gb_trees:lookup(Key, Tree). gb_trees:lookup(Key, Tree).
-spec(insert(Key :: any(), Value :: any(), inflight()) -> inflight()). -spec(insert(Key :: term(), Value :: term(), inflight()) -> inflight()).
insert(Key, Value, {?MODULE, [MaxSize, Tree]}) -> insert(Key, Value, {MaxSize, Tree}) ->
{?MODULE, [MaxSize, gb_trees:insert(Key, Value, Tree)]}. {MaxSize, gb_trees:insert(Key, Value, Tree)}.
-spec(delete(Key :: any(), inflight()) -> inflight()). -spec(delete(Key :: term(), inflight()) -> inflight()).
delete(Key, {?MODULE, [MaxSize, Tree]}) -> delete(Key, {MaxSize, Tree}) ->
{?MODULE, [MaxSize, gb_trees:delete(Key, Tree)]}. {MaxSize, gb_trees:delete(Key, Tree)}.
-spec(update(Key :: any(), Val :: any(), inflight()) -> inflight()). -spec(update(Key :: term(), Val :: term(), inflight()) -> inflight()).
update(Key, Val, {?MODULE, [MaxSize, Tree]}) -> update(Key, Val, {MaxSize, Tree}) ->
{?MODULE, [MaxSize, gb_trees:update(Key, Val, Tree)]}. {MaxSize, gb_trees:update(Key, Val, Tree)}.
-spec(is_full(inflight()) -> boolean()). -spec(is_full(inflight()) -> boolean()).
is_full({?MODULE, [0, _Tree]}) -> is_full({0, _Tree}) ->
false; false;
is_full({?MODULE, [MaxSize, Tree]}) -> is_full({MaxSize, Tree}) ->
MaxSize =< gb_trees:size(Tree). MaxSize =< gb_trees:size(Tree).
-spec(is_empty(inflight()) -> boolean()). -spec(is_empty(inflight()) -> boolean()).
is_empty({?MODULE, [_MaxSize, Tree]}) -> is_empty({_MaxSize, Tree}) ->
gb_trees:is_empty(Tree). gb_trees:is_empty(Tree).
-spec(smallest(inflight()) -> {K :: any(), V :: any()}). -spec(smallest(inflight()) -> {K :: term(), V :: term()}).
smallest({?MODULE, [_MaxSize, Tree]}) -> smallest({_MaxSize, Tree}) ->
gb_trees:smallest(Tree). gb_trees:smallest(Tree).
-spec(largest(inflight()) -> {K :: any(), V :: any()}). -spec(largest(inflight()) -> {K :: term(), V :: term()}).
largest({?MODULE, [_MaxSize, Tree]}) -> largest({_MaxSize, Tree}) ->
gb_trees:largest(Tree). gb_trees:largest(Tree).
-spec(values(inflight()) -> list()). -spec(values(inflight()) -> list()).
values({?MODULE, [_MaxSize, Tree]}) -> values({_MaxSize, Tree}) ->
gb_trees:values(Tree). gb_trees:values(Tree).
-spec(to_list(inflight()) -> list({K :: any(), V :: any()})). -spec(to_list(inflight()) -> list({K :: term(), V :: term()})).
to_list({?MODULE, [_MaxSize, Tree]}) -> to_list({_MaxSize, Tree}) ->
gb_trees:to_list(Tree). gb_trees:to_list(Tree).
-spec(window(inflight()) -> list()). -spec(window(inflight()) -> list()).
window(Inflight = {?MODULE, [_MaxSize, Tree]}) -> window(Inflight = {_MaxSize, Tree}) ->
case gb_trees:is_empty(Tree) of case gb_trees:is_empty(Tree) of
true -> []; true -> [];
false -> [Key || {Key, _Val} <- [smallest(Inflight), largest(Inflight)]] false -> [Key || {Key, _Val} <- [smallest(Inflight), largest(Inflight)]]
end. end.
-spec(size(inflight()) -> non_neg_integer()). -spec(size(inflight()) -> non_neg_integer()).
size({?MODULE, [_MaxSize, Tree]}) -> size({_MaxSize, Tree}) ->
gb_trees:size(Tree). gb_trees:size(Tree).
-spec(max_size(inflight()) -> non_neg_integer()). -spec(max_size(inflight()) -> non_neg_integer()).
max_size({?MODULE, [MaxSize, _Tree]}) -> max_size({MaxSize, _Tree}) ->
MaxSize. MaxSize.

View File

@ -1,18 +1,18 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_protocol). -module(emqx_protocol).
@ -25,7 +25,7 @@
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
%% API %% API
-export([init/3, init/4, get/2, info/1, stats/1, clientid/1, client/1, session/1]). -export([init/3, init/5, get/2, info/1, stats/1, clientid/1, client/1, session/1]).
-export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]). -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]).
@ -76,8 +76,9 @@ init(Peername, SendFun, Opts) ->
keepalive_backoff = Backoff, keepalive_backoff = Backoff,
stats_data = #proto_stats{enable_stats = EnableStats}}. stats_data = #proto_stats{enable_stats = EnableStats}}.
init(Conn, Peername, SendFun, Opts) -> init(_Transport, _Sock, Peername, SendFun, Opts) ->
enrich_opt(Conn:opts(), Conn, init(Peername, SendFun, Opts)). init(Peername, SendFun, Opts).
%%enrich_opt(Conn:opts(), Conn, ).
enrich_opt([], _Conn, State) -> enrich_opt([], _Conn, State) ->
State; State;

View File

@ -117,12 +117,10 @@ del_route(From, Topic, Dest) when is_binary(Topic) ->
Route = #route{topic = Topic, dest = Dest}, Route = #route{topic = Topic, dest = Dest},
cast(pick(Topic), {del_route, From, Route}). cast(pick(Topic), {del_route, From, Route}).
%% @doc Has routes?
-spec(has_routes(topic()) -> boolean()). -spec(has_routes(topic()) -> boolean()).
has_routes(Topic) when is_binary(Topic) -> has_routes(Topic) when is_binary(Topic) ->
ets:member(?ROUTE, Topic). ets:member(?ROUTE, Topic).
%% @doc Get topics
-spec(topics() -> list(topic())). -spec(topics() -> list(topic())).
topics() -> mnesia:dirty_all_keys(?ROUTE). topics() -> mnesia:dirty_all_keys(?ROUTE).

View File

@ -40,7 +40,6 @@
-compile({no_auto_import, [monitor/1]}). -compile({no_auto_import, [monitor/1]}).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(ROUTE, emqx_route). -define(ROUTE, emqx_route).
-define(ROUTING_NODE, emqx_routing_node). -define(ROUTING_NODE, emqx_routing_node).
@ -96,7 +95,7 @@ init([]) ->
end end
end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), end, [], mnesia:dirty_all_keys(?ROUTING_NODE)),
emqx_stats:update_interval(route_stats, stats_fun()), emqx_stats:update_interval(route_stats, stats_fun()),
{ok, #state{nodes = Nodes}}. {ok, #state{nodes = Nodes}, hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[RouterHelper] Unexpected request: ~p", [Req]), emqx_logger:error("[RouterHelper] Unexpected request: ~p", [Req]),
@ -124,7 +123,7 @@ handle_info({nodedown, Node}, State = #state{nodes = Nodes}) ->
mnesia:transaction(fun cleanup_routes/1, [Node]) mnesia:transaction(fun cleanup_routes/1, [Node])
end), end),
mnesia:dirty_delete(?ROUTING_NODE, Node), mnesia:dirty_delete(?ROUTING_NODE, Node),
{noreply, State#state{nodes = lists:delete(Node, Nodes)}}; {noreply, State#state{nodes = lists:delete(Node, Nodes)}, hibernate};
handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({membership, {mnesia, down, Node}}, State) ->
handle_info({nodedown, Node}, State); handle_info({nodedown, Node}, State);

View File

@ -1,18 +1,18 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_session). -module(emqx_session).
@ -246,7 +246,7 @@ stats(#state{max_subscriptions = MaxSubscriptions,
[{max_subscriptions, MaxSubscriptions}, [{max_subscriptions, MaxSubscriptions},
{subscriptions, maps:size(Subscriptions)}, {subscriptions, maps:size(Subscriptions)},
{max_inflight, MaxInflight}, {max_inflight, MaxInflight},
{inflight_len, Inflight:size()}, {inflight_len, emqx_inflight:size(Inflight)},
{max_mqueue, ?MQueue:max_len(MQueue)}, {max_mqueue, ?MQueue:max_len(MQueue)},
{mqueue_len, ?MQueue:len(MQueue)}, {mqueue_len, ?MQueue:len(MQueue)},
{mqueue_dropped, ?MQueue:dropped(MQueue)}, {mqueue_dropped, ?MQueue:dropped(MQueue)},
@ -405,12 +405,12 @@ handle_cast({unsubscribe, From, TopicTable},
%% PUBACK: %% PUBACK:
handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) -> handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) ->
{noreply, {noreply,
case Inflight:contain(PacketId) of case emqx_inflight:contain(PacketId, Inflight) of
true -> true ->
dequeue(acked(puback, PacketId, State)); dequeue(acked(puback, PacketId, State));
false -> false ->
?LOG(warning, "PUBACK ~p missed inflight: ~p", ?LOG(warning, "PUBACK ~p missed inflight: ~p",
[PacketId, Inflight:window()], State), [PacketId, emqx_inflight:window(Inflight)], State),
emqx_metrics:inc('packets/puback/missed'), emqx_metrics:inc('packets/puback/missed'),
State State
end, hibernate}; end, hibernate};
@ -418,12 +418,12 @@ handle_cast({puback, PacketId}, State = #state{inflight = Inflight}) ->
%% PUBREC: %% PUBREC:
handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) -> handle_cast({pubrec, PacketId}, State = #state{inflight = Inflight}) ->
{noreply, {noreply,
case Inflight:contain(PacketId) of case emqx_inflight:contain(PacketId, Inflight) of
true -> true ->
acked(pubrec, PacketId, State); acked(pubrec, PacketId, State);
false -> false ->
?LOG(warning, "PUBREC ~p missed inflight: ~p", ?LOG(warning, "PUBREC ~p missed inflight: ~p",
[PacketId, Inflight:window()], State), [PacketId, emqx_inflight:window(Inflight)], State),
emqx_metrics:inc('packets/pubrec/missed'), emqx_metrics:inc('packets/pubrec/missed'),
State State
end, hibernate}; end, hibernate};
@ -446,12 +446,12 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
%% PUBCOMP: %% PUBCOMP:
handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) -> handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) ->
{noreply, {noreply,
case Inflight:contain(PacketId) of case emqx_inflight:contain(PacketId, Inflight) of
true -> true ->
dequeue(acked(pubcomp, PacketId, State)); dequeue(acked(pubcomp, PacketId, State));
false -> false ->
?LOG(warning, "The PUBCOMP ~p is not inflight: ~p", ?LOG(warning, "The PUBCOMP ~p is not inflight: ~p",
[PacketId, Inflight:window()], State), [PacketId, emqx_inflight:window(Inflight)], State),
emqx_metrics:inc('packets/pubcomp/missed'), emqx_metrics:inc('packets/pubcomp/missed'),
State State
end, hibernate}; end, hibernate};
@ -581,11 +581,11 @@ kick(ClientId, OldPid, Pid) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Redeliver at once if Force is true %% Redeliver at once if Force is true
retry_delivery(Force, State = #state{inflight = Inflight}) -> retry_delivery(Force, State = #state{inflight = Inflight}) ->
case Inflight:is_empty() of case emqx_inflight:is_empty(Inflight) of
true -> State; true -> State;
false -> Msgs = lists:sort(sortfun(inflight), Inflight:values()), false -> Msgs = lists:sort(sortfun(inflight),
emqx_inflight:values(Inflight)),
retry_delivery(Force, Msgs, os:timestamp(), State) retry_delivery(Force, Msgs, os:timestamp(), State)
end. end.
@ -601,11 +601,11 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now,
case {Type, Msg} of case {Type, Msg} of
{publish, Msg = #message{headers = #{packet_id := PacketId}}} -> {publish, Msg = #message{headers = #{packet_id := PacketId}}} ->
redeliver(Msg, State), redeliver(Msg, State),
Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), Inflight1 = emqx_inflight:update(PacketId, {publish, Msg, Now}, Inflight),
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
{pubrel, PacketId} -> {pubrel, PacketId} ->
redeliver({pubrel, PacketId}, State), redeliver({pubrel, PacketId}, State),
Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}), Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight),
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}) retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1})
end; end;
true -> true ->
@ -678,7 +678,7 @@ dispatch(Msg = #message{qos = ?QOS0}, State) ->
dispatch(Msg = #message{qos = QoS}, dispatch(Msg = #message{qos = QoS},
State = #state{next_msg_id = MsgId, inflight = Inflight}) State = #state{next_msg_id = MsgId, inflight = Inflight})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case Inflight:is_full() of case emqx_inflight:is_full(Inflight) of
true -> true ->
enqueue_msg(Msg, State); enqueue_msg(Msg, State);
false -> false ->
@ -719,15 +719,15 @@ await(Msg = #message{headers = #{packet_id := PacketId}},
true -> State#state{retry_timer = start_timer(Interval, retry_delivery)}; true -> State#state{retry_timer = start_timer(Interval, retry_delivery)};
false -> State false -> State
end, end,
State1#state{inflight = Inflight:insert(PacketId, {publish, Msg, os:timestamp()})}. State1#state{inflight = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight)}.
acked(puback, PacketId, State = #state{client_id = ClientId, acked(puback, PacketId, State = #state{client_id = ClientId,
username = Username, username = Username,
inflight = Inflight}) -> inflight = Inflight}) ->
case Inflight:lookup(PacketId) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, Msg, _Ts}} -> {value, {publish, Msg, _Ts}} ->
emqx_hooks:run('message.acked', [ClientId, Username], Msg), emqx_hooks:run('message.acked', [ClientId, Username], Msg),
State#state{inflight = Inflight:delete(PacketId)}; State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none -> none ->
?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State), ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State),
State State
@ -736,10 +736,10 @@ acked(puback, PacketId, State = #state{client_id = ClientId,
acked(pubrec, PacketId, State = #state{client_id = ClientId, acked(pubrec, PacketId, State = #state{client_id = ClientId,
username = Username, username = Username,
inflight = Inflight}) -> inflight = Inflight}) ->
case Inflight:lookup(PacketId) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, Msg, _Ts}} -> {value, {publish, Msg, _Ts}} ->
emqx_hooks:run('message.acked', [ClientId, Username], Msg), emqx_hooks:run('message.acked', [ClientId, Username], Msg),
State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} -> {value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State), ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State),
State; State;
@ -749,7 +749,7 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId,
end; end;
acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> acked(pubcomp, PacketId, State = #state{inflight = Inflight}) ->
State#state{inflight = Inflight:delete(PacketId)}. State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Dequeue %% Dequeue
@ -760,7 +760,7 @@ dequeue(State = #state{client_pid = undefined}) ->
State; State;
dequeue(State = #state{inflight = Inflight}) -> dequeue(State = #state{inflight = Inflight}) ->
case Inflight:is_full() of case emqx_inflight:is_full(Inflight) of
true -> State; true -> State;
false -> dequeue2(State) false -> dequeue2(State)
end. end.

View File

@ -30,7 +30,7 @@ init([]) ->
permanent, 5000, worker, [emqx_sys]}, permanent, 5000, worker, [emqx_sys]},
{ok, Env} = emqx_config:get_env(sysmon), {ok, Env} = emqx_config:get_env(sysmon),
Sysmon = {sys_mon, {emqx_sysmon, start_link, [Env]}, Sysmon = {sys_mon, {emqx_sys_mon, start_link, [Env]},
permanent, 5000, worker, [emqx_sys_mon]}, permanent, 5000, worker, [emqx_sys_mon]},
{ok, {{one_for_one, 10, 100}, [Sys, Sysmon]}}. {ok, {{one_for_one, 10, 100}, [Sys, Sysmon]}}.

View File

@ -1,18 +1,18 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_ws_connection). -module(emqx_ws_connection).
@ -44,8 +44,9 @@
-export([handle_pre_hibernate/1]). -export([handle_pre_hibernate/1]).
%% WebSocket Client State %% WebSocket Client State
-record(wsclient_state, {ws_pid, peername, connection, proto_state, keepalive, -record(wsclient_state, {ws_pid, transport, socket, peername,
enable_stats, force_gc_count}). proto_state, keepalive, enable_stats,
force_gc_count}).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -85,27 +86,29 @@ clean_acl_cache(CPid, Topic) ->
init([Env, WsPid, Req, ReplyChannel]) -> init([Env, WsPid, Req, ReplyChannel]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
Conn = Req:get(connection),
true = link(WsPid), true = link(WsPid),
case Req:get(peername) of Transport = mochiweb_request:get(transport, Req),
Sock = mochiweb_request:get(socket, Req),
case mochiweb_request:get(peername, Req) of
{ok, Peername} -> {ok, Peername} ->
Headers = mochiweb_headers:to_list(mochiweb_request:get(headers, Req)), Headers = mochiweb_headers:to_list(mochiweb_request:get(headers, Req)),
ProtoState = emqx_protocol:init(Conn, Peername, send_fun(ReplyChannel), ProtoState = emqx_protocol:init(Transport, Sock, Peername, send_fun(ReplyChannel),
[{ws_initial_headers, Headers} | Env]), [{ws_initial_headers, Headers} | Env]),
IdleTimeout = get_value(client_idle_timeout, Env, 30000), IdleTimeout = get_value(client_idle_timeout, Env, 30000),
EnableStats = get_value(client_enable_stats, Env, false), EnableStats = get_value(client_enable_stats, Env, false),
ForceGcCount = emqx_gc:conn_max_gc_count(), ForceGcCount = emqx_gc:conn_max_gc_count(),
{ok, #wsclient_state{connection = Conn, {ok, #wsclient_state{transport = Transport,
socket = Sock,
ws_pid = WsPid, ws_pid = WsPid,
peername = Peername, peername = Peername,
proto_state = ProtoState, proto_state = ProtoState,
enable_stats = EnableStats, enable_stats = EnableStats,
force_gc_count = ForceGcCount}, force_gc_count = ForceGcCount},
IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}; IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE};
{error, enotconn} -> Conn:fast_close(), {error, enotconn} -> Transport:fast_close(Sock),
exit(WsPid, normal), exit(WsPid, normal),
exit(normal); exit(normal);
{error, Reason} -> Conn:fast_close(), {error, Reason} -> Transport:fast_close(Sock),
exit(WsPid, normal), exit(WsPid, normal),
exit({shutdown, Reason}) exit({shutdown, Reason})
end. end.
@ -205,9 +208,10 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
handle_info({shutdown, Reason}, State) -> handle_info({shutdown, Reason}, State) ->
shutdown(Reason, State); shutdown(Reason, State);
handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) -> handle_info({keepalive, start, Interval},
State = #wsclient_state{transport = Transport, socket =Sock}) ->
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
case emqx_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}) of case emqx_keepalive:start(stat_fun(Transport, Sock), Interval, {keepalive, check}) of
{ok, KeepAlive} -> {ok, KeepAlive} ->
{noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate}; {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
{error, Error} -> {error, Error} ->
@ -265,7 +269,7 @@ code_change(_OldVsn, State, _Extra) ->
send_fun(ReplyChannel) -> send_fun(ReplyChannel) ->
Self = self(), Self = self(),
fun(Packet) -> fun(Packet) ->
Data = emqx_serializer:serialize(Packet), Data = emqx_frame:serialize(Packet),
emqx_metrics:inc('bytes/sent', iolist_size(Data)), emqx_metrics:inc('bytes/sent', iolist_size(Data)),
case ReplyChannel({binary, Data}) of case ReplyChannel({binary, Data}) of
ok -> ok; ok -> ok;
@ -273,9 +277,9 @@ send_fun(ReplyChannel) ->
end end
end. end.
stat_fun(Conn) -> stat_fun(Transport, Sock) ->
fun() -> fun() ->
case Conn:getstat([recv_oct]) of case Transport:getstat(Sock, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error} {error, Error} -> {error, Error}
end end
@ -293,8 +297,8 @@ emit_stats(ClientId, State) ->
emqx_cm:set_client_stats(ClientId, Stats), emqx_cm:set_client_stats(ClientId, Stats),
State. State.
wsock_stats(#wsclient_state{connection = Conn}) -> wsock_stats(#wsclient_state{transport = Transport, socket = Sock}) ->
case Conn:getstat(?SOCK_STATS) of case Transport:getstat(Sock, ?SOCK_STATS) of
{ok, Ss} -> Ss; {ok, Ss} -> Ss;
{error, _} -> [] {error, _} -> []
end. end.

View File

@ -17,6 +17,7 @@
-module(emqx_SUITE). -module(emqx_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqttc/include/emqttc_packet.hrl"). -include_lib("emqttc/include/emqttc_packet.hrl").

View File

View File

@ -17,6 +17,7 @@
-module(emqx_access_SUITE). -module(emqx_access_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl"). -include("emqx.hrl").

View File

@ -21,6 +21,7 @@
-define(BASE62, emqx_base62). -define(BASE62, emqx_base62).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
all() -> [t_base62_encode]. all() -> [t_base62_encode].

View File

@ -16,6 +16,7 @@
-module(emqx_broker_SUITE). -module(emqx_broker_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-define(APP, emqx). -define(APP, emqx).
@ -24,6 +25,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl").
all() -> all() ->
[ [
@ -147,7 +149,7 @@ start_session(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), {ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>),
{ok, SessPid} = emqx_mock_client:start_session(ClientPid), {ok, SessPid} = emqx_mock_client:start_session(ClientPid),
Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
Message1 = Message#mqtt_message{pktid = 1}, Message1 = Message#mqtt_message{packet_id = 1},
emqx_session:publish(SessPid, Message1), emqx_session:publish(SessPid, Message1),
emqx_session:pubrel(SessPid, 1), emqx_session:pubrel(SessPid, 1),
emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]),
@ -228,7 +230,7 @@ hook_fun7(arg, initArg) -> any.
hook_fun8(arg, initArg) -> stop. hook_fun8(arg, initArg) -> stop.
set_alarms(_) -> set_alarms(_) ->
AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
emqx_alarm:set_alarm(AlarmTest), emqx_alarm:set_alarm(AlarmTest),
Alarms = emqx_alarm:get_alarms(), Alarms = emqx_alarm:get_alarms(),
?assertEqual(1, length(Alarms)), ?assertEqual(1, length(Alarms)),

View File

@ -0,0 +1,41 @@
%%%===================================================================
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%===================================================================
-module(emqx_client_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [].
groups() -> [].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.

View File

@ -1,22 +1,23 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2017 EMQ Enterprise, Inc. (http://emqtt.io) %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_ct_broker_helpers). -module(emqx_ct_broker_helpers).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-define(APP, emqx). -define(APP, emqx).
@ -57,13 +58,11 @@ local_path(Components) ->
set_app_env({App, Lists}) -> set_app_env({App, Lists}) ->
lists:foreach(fun({acl_file, _Var}) -> lists:foreach(fun({acl_file, _Var}) ->
application:set_env(App, acl_file, local_path(["etc", "acl.conf"])); application:set_env(App, acl_file, local_path(["etc", "acl.conf"]));
({license_file, _Var}) ->
application:set_env(App, license_file, local_path(["etc", "emqx.lic"]));
({plugins_loaded_file, _Var}) -> ({plugins_loaded_file, _Var}) ->
application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"])); application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"]));
({Par, Var}) -> ({Par, Var}) ->
application:set_env(App, Par, Var) application:set_env(App, Par, Var)
end, Lists). end, Lists).
change_opts(SslType) -> change_opts(SslType) ->

24
test/emqx_ct_helpers.erl Normal file
View File

@ -0,0 +1,24 @@
%%%===================================================================
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%===================================================================
-module(emqx_ct_helpers).
-export([ensure_mnesia_stopped/0]).
ensure_mnesia_stopped() ->
ekka_mnesia:ensure_stopped(),
ekka_mnesia:delete_schema().

View File

@ -284,9 +284,7 @@ serialize_parse_publish_v5(_) ->
'Correlation-Data' => <<"correlation-id">>, 'Correlation-Data' => <<"correlation-id">>,
'Subscription-Identifier' => 1, 'Subscription-Identifier' => 1,
'Content-Type' => <<"text/json">>}, 'Content-Type' => <<"text/json">>},
Packet = ?PUBLISH_PACKET(#mqtt_packet_header{type = ?PUBLISH}, Packet = ?PUBLISH_PACKET(?QOS_1, <<"$share/group/topic">>, 1, Props, <<"payload">>),
<<"$share/group/topic">>, 1, Props,
<<"payload">>),
?assertEqual({ok, Packet, <<>>}, ?assertEqual({ok, Packet, <<>>},
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
@ -335,7 +333,7 @@ serialize_parse_subscribe(_) ->
Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>,
TopicFilters = [{<<"TopicA">>, #mqtt_subopts{qos = 2}}], TopicFilters = [{<<"TopicA">>, #mqtt_subopts{qos = 2}}],
Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
?assertEqual(Bin, serialize(Packet)), ?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
?assertEqual({ok, Packet, <<>>}, parse(Bin)). ?assertEqual({ok, Packet, <<>>}, parse(Bin)).
serialize_parse_subscribe_v5(_) -> serialize_parse_subscribe_v5(_) ->
@ -343,7 +341,8 @@ serialize_parse_subscribe_v5(_) ->
{<<"TopicQos1">>, #mqtt_subopts{rh = 1, qos =?QOS_1}}], {<<"TopicQos1">>, #mqtt_subopts{rh = 1, qos =?QOS_1}}],
Packet = ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 16#FFFFFFF}, Packet = ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 16#FFFFFFF},
TopicFilters), TopicFilters),
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). ?assertEqual({ok, Packet, <<>>},
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
serialize_parse_suback(_) -> serialize_parse_suback(_) ->
Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]), Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]),

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
all() -> [t_guid_gen, t_guid_hexstr, t_guid_base62]. all() -> [t_guid_gen, t_guid_hexstr, t_guid_base62].

View File

@ -1,61 +1,62 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_inflight_SUITE). -module(emqx_inflight_SUITE).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
%% CT
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
all() -> [t_contain, t_lookup, t_insert, t_update, t_delete, t_window, -import(emqx_inflight, [new/1, contain/2, insert/3, lookup/2, update/3,
t_is_full, t_is_empty]. delete/2, is_empty/1, is_full/1]).
all() ->
[t_contain, t_lookup, t_insert, t_update, t_delete, t_window,
t_is_full, t_is_empty].
t_contain(_) -> t_contain(_) ->
Inflight = emqx_inflight:new(0), ?assertNot(contain(k, new(0))),
?assertNot(Inflight:contain(k)), ?assert(contain(k, insert(k, v, new(0)))).
Inflight1 = Inflight:insert(k, v),
?assert(Inflight1:contain(k)).
t_lookup(_) -> t_lookup(_) ->
Inflight = (emqx_inflight:new(0)):insert(k, v), Inflight = insert(k, v, new(0)),
?assertEqual(v, Inflight:lookup(k)). ?assertEqual({value, v}, lookup(k, Inflight)),
?assertEqual(none, lookup(x, Inflight)).
t_insert(_) -> t_insert(_) ->
Inflight = ((emqx_inflight:new(0)):insert(k1, v1)):insert(k2, v2), Inflight = insert(k2, v2, insert(k1, v1, new(0))),
?assertEqual(v2, Inflight:lookup(k2)). ?assertEqual({value, v1}, lookup(k1, Inflight)),
?assertEqual({value, v2}, lookup(k2, Inflight)).
t_update(_) -> t_update(_) ->
Inflight = ((emqx_inflight:new(0)):insert(k, v1)):update(k, v2), Inflight = update(k, v2, insert(k, v1, new(0))),
?assertEqual(v2, Inflight:lookup(k)). ?assertEqual({value, v2}, lookup(k, Inflight)).
t_delete(_) -> t_delete(_) ->
Inflight = ((emqx_inflight:new(0)):insert(k, v1)):delete(k), ?assert(is_empty(delete(k, insert(k, v1, new(0))))).
?assert(Inflight:is_empty()).
t_window(_) -> t_window(_) ->
?assertEqual([], (emqx_inflight:new(10)):window()), ?assertEqual([], emqx_inflight:window(new(10))),
Inflight = ((emqx_inflight:new(0)):insert(1, 1)):insert(2, 2), Inflight = insert(2, 2, insert(1, 1, new(0))),
?assertEqual([1, 2], Inflight:window()). ?assertEqual([1, 2], emqx_inflight:window(Inflight)).
t_is_full(_) -> t_is_full(_) ->
Inflight = ((emqx_inflight:new(1)):insert(k, v1)), ?assert(is_full(insert(k, v1, new(1)))).
?assert(Inflight:is_full()).
t_is_empty(_) -> t_is_empty(_) ->
Inflight = ((emqx_inflight:new(1)):insert(k, v1)), ?assertNot(is_empty(insert(k, v1, new(1)))).
?assertNot(Inflight:is_empty()).

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-define(SOCKOPTS, [ -define(SOCKOPTS, [
binary, binary,

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-define(SOCKOPTS, [binary, -define(SOCKOPTS, [binary,
{packet, raw}, {packet, raw},

View File

@ -17,6 +17,7 @@
-module(emqx_mod_SUITE). -module(emqx_mod_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl"). -include("emqx.hrl").

View File

@ -0,0 +1,220 @@
%%%===================================================================
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%===================================================================
-module(emqx_mqtt_compat_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-import(lists, [nth/2]).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>,
<<"/TopicA">>]).
-define(WILD_TOPICS, [<<"TopicA/+">>, <<"+/C">>, <<"#">>, <<"/#">>, <<"/+">>,
<<"+/+">>, <<"TopicA/#">>]).
all() ->
[basic_test,
retained_message_test,
will_message_test,
zero_length_clientid_test,
offline_message_queueing_test,
overlapping_subscriptions_test,
keepalive_test,
redelivery_on_reconnect_test,
subscribe_failure_test,
dollar_topics_test].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
receive_messages(Count) ->
receive_messages(Count, []).
receive_messages(0, Msgs) ->
Msgs;
receive_messages(Count, Msgs) ->
receive
{public, Msg} ->
receive_messages(Count-1, [Msg|Msgs]);
_Other ->
receive_messages(Count, Msgs)
after 10 ->
Msgs
end.
basic_test(_Config) ->
Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"),
{ok, C, _} = emqx_client:start_link(),
{ok, _, [0]} = emqx_client:subscribe(C, Topic, qos2),
ok = emqx_client:publish(C, Topic, <<"qos 0">>),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 1">>, 1),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
ok = emqx_client:disconnect(C),
?assertEqual(3, length(receive_messages(3))).
retained_message_test(_Config) ->
ct:print("Retained message test starting"),
%% Retained messages
{ok, C1, _} = emqx_client:start_link([{clean_start, true}]),
ok = emqx_client:publish(C1, nth(1, ?TOPICS), <<"qos 0">>, [{qos, 0}, {retain, true}]),
{ok, _} = emqx_client:publish(C1, nth(3, ?TOPICS), <<"qos 1">>, [{qos, 1}, {retain, true}]),
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<"qos 2">>, [{qos, 2}, {retain, true}]),
timer:sleep(10),
{ok, #{}, [0]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1),
?assertEqual(3, length(receive_messages(10))),
%% Clear retained messages
{ok, C2, _} = emqx_client:start_link([{clean_start, true}]),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"">>, [{qos, 0}, {retain, true}]),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"">>, [{qos, 1}, {retain, true}]),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"">>, [{qos, 2}, {retain, true}]),
timer:sleep(10), %% wait for QoS 2 exchange to be completed
{ok, _, [0]} = emqx_client:subscribe(C2, nth(6, ?WILD_TOPICS), 2),
timer:sleep(10),
ok = emqx_client:disconnect(),
?assertEqual(0, length(receive_messages(3))).
will_message_test(_Config) ->
{ok, C1, _} = emqx_client:start_link([{clean_start, true},
{will_topic = nth(3, ?TOPICS)},
{will_payload, <<"client disconnected">>},
{keepalive, 2}]),
{ok, C2, _} = emqx_client:start_link(),
{ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2),
timer:sleep(10),
ok = emqx_client:stop(C1),
timer:sleep(5),
ok = emqx_client:disconnect(C2),
?assertEqual(1, length(receive_messages(1))),
ct:print("Will message test succeeded").
zero_length_clientid_test(_Config) ->
ct:print("Zero length clientid test starting"),
{error, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<>>}]),
{ok, _, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<>>}]),
ct:print("Zero length clientid test succeeded").
offline_message_queueing_test(_) ->
{ok, C1, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1),
{ok, C2, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<"c2">>}]),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
timer:sleep(10),
emqx_client:disconnect(C2),
{ok, C3, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]),
timer:sleep(10),
emqx_client:disconnect(C3),
?assertEqual(3, length(receive_messages(3))).
overlapping_subscriptions_test(_) ->
{ok, C, _} = emqx_client:start_link([]),
{ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
{nth(1, ?WILD_TOPICS), 1}]),
timer:sleep(10),
{ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
time:sleep(10),
emqx_client:disconnect(C),
Num = receive_messages(2),
?assert(lists:member(Num, [1, 2])),
if
Num == 1 ->
ct:print("This server is publishing one message for all
matching overlapping subscriptions, not one for each.");
Num == 2 ->
ct:print("This server is publishing one message per each
matching overlapping subscription.");
true -> ok
end.
keepalive_test(_) ->
ct:print("Keepalive test starting"),
{ok, C1, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 5},
{will_topic, nth(5, ?TOPICS)},
{will_payload, <<"keepalive expiry">>}]),
ok = emqx_client:pause(C1),
{ok, C2, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]),
{ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2),
timer:sleep(15000),
ok = emqx_client:disconnect(C2),
?assertEqual(1, length(receive_messages(1))),
ct:print("Keepalive test succeeded").
redelivery_on_reconnect_test(_) ->
ct:print("Redelivery on reconnect test starting"),
{ok, C1, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
timer:sleep(10),
ok = emqx_client:pause(C1),
{ok, _} = emqx_client:publish(C1, nth(2, ?TOPICS), <<>>,
[{qos, 1}, {retain, false}]),
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>,
[{qos, 2}, {retain, false}]),
time:sleep(10),
ok = emqx_client:disconnect(C1),
?assertEqual(0, length(receive_messages(2))),
{ok, C2, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]),
timer:sleep(10),
ok = emqx_client:disconnect(C2),
?assertEqual(2, length(receive_messages(2))).
subscribe_failure_test(_) ->
ct:print("Subscribe failure test starting"),
{ok, C, _} = emqx_client:start_link([]),
{ok, _, [16#80]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2),
timer:sleep(10),
ct:print("Subscribe failure test succeeded").
dollar_topics_test(_) ->
ct:print("$ topics test starting"),
{ok, C, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]),
{ok, _, [2]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 2),
{ok, _} = emqx_client:publish(C, <<"$", (nth(2, ?TOPICS))>>,
<<"">>, [{qos, 1}, {retain, false}]),
timer:sleep(10),
?assertEqual(0, length(receive_messages(1))),
ok = emqx_client:disconnect(C),
ct:print("$ topics test succeeded").

View File

@ -17,8 +17,10 @@
-module(emqx_mqueue_SUITE). -module(emqx_mqueue_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -18,6 +18,7 @@
%% CT %% CT
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
all() -> [{group, keepalive}]. all() -> [{group, keepalive}].

View File

@ -16,9 +16,11 @@
-module(emqx_pqueue_SUITE). -module(emqx_pqueue_SUITE).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-define(PQ, emqx_pqueue). -define(PQ, emqx_pqueue).

View File

@ -1,320 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_protocol_SUITE).
-compile(export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_serializer, [serialize/1]).
all() ->
[{group, parser},
{group, serializer},
{group, packet},
{group, message}].
groups() ->
[{parser, [],
[parse_connect,
parse_bridge,
parse_publish,
parse_puback,
parse_pubrec,
parse_pubrel,
parse_pubcomp,
parse_subscribe,
parse_unsubscribe,
parse_pingreq,
parse_disconnect]},
{serializer, [],
[serialize_connect,
serialize_connack,
serialize_publish,
serialize_puback,
serialize_pubrel,
serialize_subscribe,
serialize_suback,
serialize_unsubscribe,
serialize_unsuback,
serialize_pingreq,
serialize_pingresp,
serialize_disconnect]},
{packet, [],
[packet_proto_name,
packet_type_name,
packet_connack_name,
packet_format]},
{message, [],
[message_make,
message_from_packet,
message_flag]}].
%%--------------------------------------------------------------------
%% Parse Cases
%%--------------------------------------------------------------------
parse_connect(_) ->
Parser = emqx_parser:initial_state(),
%% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
V31ConnBin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 3,
proto_name = <<"MQIsdp">>,
client_id = <<"mosqpub/10451-iMac.loca">>,
clean_sess = true,
keep_alive = 60}}, <<>>} = emqx_parser:parse(V31ConnBin, Parser),
%% CONNECT(Q0, R0, D0, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
V311ConnBin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 4,
proto_name = <<"MQTT">>,
client_id = <<"mosqpub/10451-iMac.loca">>,
clean_sess = true,
keep_alive = 60 } }, <<>>} = emqx_parser:parse(V311ConnBin, Parser),
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId="", ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60)
V311ConnWithoutClientId = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 4,
proto_name = <<"MQTT">>,
client_id = <<>>,
clean_sess = true,
keep_alive = 60 } }, <<>>} = emqx_parser:parse(V311ConnWithoutClientId, Parser),
%%CONNECT(Q0, R0, D0, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60,
%% Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
ConnBinWithWill = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,112,117,98,108,105,99>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_connect{proto_ver = 3,
proto_name = <<"MQIsdp">>,
client_id = <<"mosqpub/10452-iMac.loca">>,
clean_sess = true,
keep_alive = 60,
will_retain = false,
will_qos = 1,
will_flag = true,
will_topic = <<"/will">>,
will_msg = <<"willmsg">>,
username = <<"test">>,
password = <<"public">>}}, <<>>} = emqx_parser:parse(ConnBinWithWill, Parser),
ok.
parse_bridge(_) ->
Parser = emqx_parser:initial_state(),
Data = <<16,86,0,6,77,81,73,115,100,112,131,44,0,60,0,19,67,95,48,48,58,48,67,58,50,57,58,50,66,58,55,55,58,53,50,
0,48,36,83,89,83,47,98,114,111,107,101,114,47,99,111,110,110,101,99,116,105,111,110,47,67,95,48,48,58,48,
67,58,50,57,58,50,66,58,55,55,58,53,50,47,115,116,97,116,101,0,1,48>>,
%% CONNECT(Q0, R0, D0, ClientId=C_00:0C:29:2B:77:52, ProtoName=MQIsdp, ProtoVsn=131, CleanSess=false, KeepAlive=60,
%% Username=undefined, Password=undefined, Will(Q1, R1, Topic=$SYS/broker/connection/C_00:0C:29:2B:77:52/state, Msg=0))
{ok, #mqtt_packet{variable = Variable}, <<>>} = emqx_parser:parse(Data, Parser),
#mqtt_packet_connect{client_id = <<"C_00:0C:29:2B:77:52">>,
proto_ver = 16#03,
proto_name = <<"MQIsdp">>,
will_retain = true,
will_qos = 1,
will_flag = true,
clean_sess = false,
keep_alive = 60,
will_topic = <<"$SYS/broker/connection/C_00:0C:29:2B:77:52/state">>,
will_msg = <<"0">>} = Variable.
parse_publish(_) ->
Parser = emqx_parser:initial_state(),
%%PUBLISH(Qos=1, Retain=false, Dup=false, TopicName=a/b/c, PacketId=1, Payload=<<"hahah">>)
PubBin = <<50,14,0,5,97,47,98,47,99,0,1,104,97,104,97,104>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = 1,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
packet_id = 1},
payload = <<"hahah">> }, <<>>} = emqx_parser:parse(PubBin, Parser),
%PUBLISH(Qos=0, Retain=false, Dup=false, TopicName=xxx/yyy, PacketId=undefined, Payload=<<"hello">>)
%DISCONNECT(Qos=0, Retain=false, Dup=false)
PubBin1 = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111,224,0>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = 0,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
packet_id = undefined},
payload = <<"hello">> }, <<224,0>>} = emqx_parser:parse(PubBin1, Parser),
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT,
dup = false,
qos = 0,
retain = false}}, <<>>} = emqx_parser:parse(<<224, 0>>, Parser).
parse_puback(_) ->
Parser = emqx_parser:initial_state(),
%%PUBACK(Qos=0, Retain=false, Dup=false, PacketId=1)
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK,
dup = false,
qos = 0,
retain = false}}, <<>>} = emqx_parser:parse(<<64,2,0,1>>, Parser).
parse_pubrec(_) ->
Parser = emqx_parser:initial_state(),
%%PUBREC(Qos=0, Retain=false, Dup=false, PacketId=1)
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREC,
dup = false,
qos = 0,
retain = false}}, <<>>} = emqx_parser:parse(<<5:4,0:4,2,0,1>>, Parser).
parse_pubrel(_) ->
Parser = emqx_parser:initial_state(),
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBREL,
dup = false,
qos = 1,
retain = false}}, <<>>} = emqx_parser:parse(<<6:4,2:4,2,0,1>>, Parser).
parse_pubcomp(_) ->
Parser = emqx_parser:initial_state(),
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBCOMP,
dup = false,
qos = 0,
retain = false}}, <<>>} = emqx_parser:parse(<<7:4,0:4,2,0,1>>, Parser).
parse_subscribe(_) ->
Parser = emqx_parser:initial_state(),
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE,
dup = false,
qos = 1,
retain = false},
variable = #mqtt_packet_subscribe{packet_id = 2,
topic_table = [{<<"TopicA">>,2}]} }, <<>>}
= emqx_parser:parse(<<130,11,0,2,0,6,84,111,112,105,99,65,2>>, Parser).
parse_unsubscribe(_) ->
Parser = emqx_parser:initial_state(),
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE,
dup = false,
qos = 1,
retain = false},
variable = #mqtt_packet_unsubscribe{packet_id = 2,
topics = [<<"TopicA">>]}}, <<>>}
= emqx_parser:parse(<<162,10,0,2,0,6,84,111,112,105,99,65>>, Parser).
parse_pingreq(_) ->
Parser = emqx_parser:initial_state(),
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PINGREQ,
dup = false,
qos = 0,
retain = false}}, <<>>}
= emqx_parser:parse(<<?PINGREQ:4, 0:4, 0:8>>, Parser).
parse_disconnect(_) ->
Parser = emqx_parser:initial_state(),
%DISCONNECT(Qos=0, Retain=false, Dup=false)
Bin = <<224, 0>>,
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?DISCONNECT,
dup = false,
qos = 0,
retain = false}}, <<>>} = emqx_parser:parse(Bin, Parser).
%%--------------------------------------------------------------------
%% Packet Cases
%%--------------------------------------------------------------------
packet_proto_name(_) ->
?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)),
?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)).
packet_type_name(_) ->
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)),
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)).
packet_connack_name(_) ->
?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)),
?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)),
?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)),
?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)),
?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)),
?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)).
packet_format(_) ->
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
%%--------------------------------------------------------------------
%% Message Cases
%%--------------------------------------------------------------------
message_make(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertEqual(0, Msg#mqtt_message.qos),
Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg1#mqtt_message.id)),
?assertEqual(2, Msg1#mqtt_message.qos).
message_from_packet(_) ->
Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
?assertEqual(1, Msg#mqtt_message.qos),
?assertEqual(10, Msg#mqtt_message.pktid),
?assertEqual(<<"topic">>, Msg#mqtt_message.topic),
WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true,
will_topic = <<"WillTopic">>,
will_msg = <<"WillMsg">>}),
?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic),
?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload),
Msg2 = emqx_message:from_packet(<<"username">>, <<"clientid">>,
?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),
?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from),
io:format("~s", [emqx_message:format(Msg2)]).
message_flag(_) ->
Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>),
Msg2 = emqx_message:from_packet(<<"clientid">>, Pkt),
Msg3 = emqx_message:set_flag(retain, Msg2),
Msg4 = emqx_message:set_flag(dup, Msg3),
?assert(Msg4#mqtt_message.dup),
?assert(Msg4#mqtt_message.retain),
Msg5 = emqx_message:set_flag(Msg4),
Msg6 = emqx_message:unset_flag(dup, Msg5),
Msg7 = emqx_message:unset_flag(retain, Msg6),
?assertNot(Msg7#mqtt_message.dup),
?assertNot(Msg7#mqtt_message.retain),
emqx_message:unset_flag(Msg7),
emqx_message:to_packet(Msg7).

View File

@ -0,0 +1,147 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_protocol_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_serializer, [serialize/1]).
all() ->
[{group, parser},
{group, serializer},
{group, packet},
{group, message}].
groups() ->
[{parser, [],
[parse_connect,
parse_bridge,
parse_publish,
parse_puback,
parse_pubrec,
parse_pubrel,
parse_pubcomp,
parse_subscribe,
parse_unsubscribe,
parse_pingreq,
parse_disconnect]},
{serializer, [],
[serialize_connect,
serialize_connack,
serialize_publish,
serialize_puback,
serialize_pubrel,
serialize_subscribe,
serialize_suback,
serialize_unsubscribe,
serialize_unsuback,
serialize_pingreq,
serialize_pingresp,
serialize_disconnect]},
{packet, [],
[packet_proto_name,
packet_type_name,
packet_connack_name,
packet_format]},
{message, [],
[message_make,
message_from_packet,
message_flag]}].
%%--------------------------------------------------------------------
%% Packet Cases
%%--------------------------------------------------------------------
packet_proto_name(_) ->
?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)),
?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)).
packet_type_name(_) ->
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)),
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)).
packet_connack_name(_) ->
?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)),
?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)),
?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)),
?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)),
?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)),
?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)).
packet_format(_) ->
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
%%--------------------------------------------------------------------
%% Message Cases
%%--------------------------------------------------------------------
message_make(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertEqual(0, Msg#mqtt_message.qos),
Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg1#mqtt_message.id)),
?assertEqual(2, Msg1#mqtt_message.qos).
message_from_packet(_) ->
Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
?assertEqual(1, Msg#mqtt_message.qos),
?assertEqual(10, Msg#mqtt_message.pktid),
?assertEqual(<<"topic">>, Msg#mqtt_message.topic),
WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true,
will_topic = <<"WillTopic">>,
will_msg = <<"WillMsg">>}),
?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic),
?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload),
Msg2 = emqx_message:from_packet(<<"username">>, <<"clientid">>,
?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),
?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from),
io:format("~s", [emqx_message:format(Msg2)]).
message_flag(_) ->
Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>),
Msg2 = emqx_message:from_packet(<<"clientid">>, Pkt),
Msg3 = emqx_message:set_flag(retain, Msg2),
Msg4 = emqx_message:set_flag(dup, Msg3),
?assert(Msg4#mqtt_message.dup),
?assert(Msg4#mqtt_message.retain),
Msg5 = emqx_message:set_flag(Msg4),
Msg6 = emqx_message:unset_flag(dup, Msg5),
Msg7 = emqx_message:unset_flag(retain, Msg6),
?assertNot(Msg7#mqtt_message.dup),
?assertNot(Msg7#mqtt_message.retain),
emqx_message:unset_flag(Msg7),
emqx_message:to_packet(Msg7).

View File

@ -1,57 +1,44 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_router_SUITE). -module(emqx_router_SUITE).
-include("emqx.hrl"). -include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-define(R, emqx_router). -define(R, emqx_router).
-define(TABS, [emqx_route, emqx_trie, emqx_trie_node]).
all() -> all() ->
[{group, route}, [{group, route}].
{group, local_route}].
groups() -> groups() ->
[{route, [sequence], [{route, [sequence],
[t_get_topics, [add_del_route,
t_add_del_route, match_routes]}].
t_match_route,
t_print,
t_has_route,
t_unused]},
{local_route, [sequence],
[t_get_local_topics,
t_add_del_local_route,
t_match_local_route]}].
init_per_suite(Config) -> init_per_suite(Config) ->
ekka:start(), emqx_ct_broker_helpers:run_setup_steps(),
ekka_mnesia:ensure_started(),
{ok, _} = emqx_router_sup:start_link(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_router:stop(), emqx_ct_broker_helpers:run_teardown_steps().
ekka:stop(),
ekka_mnesia:ensure_stopped(),
ekka_mnesia:delete_schema().
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.
@ -59,87 +46,49 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
clear_tables(). clear_tables().
t_get_topics(_) -> add_del_route(_) ->
?R:add_route(<<"a/b/c">>), From = {self(), make_ref()},
?R:add_route(<<"a/b/c">>), ?R:add_route(From, <<"a/b/c">>, node()),
?R:add_route(<<"a/+/b">>), ?R:add_route(From, <<"a/b/c">>, node()),
?R:add_route(From, <<"a/+/b">>, node()),
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
?R:del_route(<<"a/b/c">>), ?R:del_route(From, <<"a/b/c">>, node()),
?R:del_route(<<"a/+/b">>), ?R:del_route(From, <<"a/+/b">>, node()),
?assertEqual([], lists:sort(?R:topics())). ?assertEqual([], lists:sort(?R:topics())).
t_add_del_route(_) -> match_routes(_) ->
%%Node = node(), From = {self(), make_ref()},
?R:add_route(<<"a/b/c">>), ?R:add_route(From, <<"a/b/c">>, node()),
?R:add_route(<<"a/+/b">>), ?R:add_route(From, <<"a/+/c">>, node()),
?R:del_route(<<"a/b/c">>), ?R:add_route(From, <<"a/b/#">>, node()),
?R:del_route(<<"a/+/b">>). ?R:add_route(From, <<"#">>, node()),
?assertEqual([#route{topic = <<"#">>, dest = node()},
#route{topic = <<"a/+/c">>, dest = node()},
#route{topic = <<"a/b/#">>, dest = node()},
#route{topic = <<"a/b/c">>, dest = node()}],
lists:sort(?R:match_routes(<<"a/b/c">>))).
t_match_route(_) -> has_routes(_) ->
Node = node(), From = {self(), make_ref()},
?R:add_route(<<"a/b/c">>), ?R:add_route(From, <<"devices/+/messages">>, node()),
?R:add_route(<<"a/+/c">>), ?assert(?R:has_routes(<<"devices/+/messages">>)).
?R:add_route(<<"a/b/#">>),
?R:add_route(<<"#">>),
?assertEqual([#route{topic = <<"#">>, node = Node},
#route{topic = <<"a/+/c">>, node = Node},
#route{topic = <<"a/b/#">>, node = Node},
#route{topic = <<"a/b/c">>, node = Node}],
lists:sort(?R:match(<<"a/b/c">>))).
t_has_route(_) ->
?R:add_route(<<"devices/+/messages">>),
?assert(?R:has_route(<<"devices/+/messages">>)).
t_get_local_topics(_) ->
?R:add_local_route(<<"a/b/c">>),
?R:add_local_route(<<"x/+/y">>),
?R:add_local_route(<<"z/#">>),
?assertEqual([<<"z/#">>, <<"x/+/y">>, <<"a/b/c">>], ?R:local_topics()),
?R:del_local_route(<<"x/+/y">>),
?R:del_local_route(<<"z/#">>),
?assertEqual([<<"a/b/c">>], ?R:local_topics()).
t_add_del_local_route(_) ->
Node = node(),
?R:add_local_route(<<"a/b/c">>),
?R:add_local_route(<<"x/+/y">>),
?R:add_local_route(<<"z/#">>),
?assertEqual([{<<"a/b/c">>, Node},
{<<"x/+/y">>, Node},
{<<"z/#">>, Node}],
lists:sort(?R:get_local_routes())),
?R:del_local_route(<<"x/+/y">>),
?R:del_local_route(<<"z/#">>),
?assertEqual([{<<"a/b/c">>, Node}], lists:sort(?R:get_local_routes())).
t_match_local_route(_) ->
?R:add_local_route(<<"$SYS/#">>),
?R:add_local_route(<<"a/b/c">>),
?R:add_local_route(<<"a/+/c">>),
?R:add_local_route(<<"a/b/#">>),
?R:add_local_route(<<"#">>),
Matched = [Topic || #route{topic = {local, Topic}} <- ?R:match_local(<<"a/b/c">>)],
?assertEqual([<<"#">>, <<"a/+/c">>, <<"a/b/#">>, <<"a/b/c">>], lists:sort(Matched)).
clear_tables() -> clear_tables() ->
?R:clean_local_routes(), lists:foreach(fun mnesia:clear_table/1, ?TABS).
lists:foreach(fun mnesia:clear_table/1, [route, trie, trie_node]).
router_add_del(_) -> router_add_del(_) ->
%% Add
?R:add_route(<<"#">>), ?R:add_route(<<"#">>),
?R:add_route(<<"a/b/c">>), ?R:add_route(<<"a/b/c">>),
?R:add_route(<<"+/#">>), ?R:add_route(<<"+/#">>),
Routes = [R1, R2 | _] = [ Routes = [R1, R2 | _] = [
#route{topic = <<"#">>, node = node()}, #route{topic = <<"#">>, dest = node()},
#route{topic = <<"+/#">>, node = node()}, #route{topic = <<"+/#">>, dest = node()},
#route{topic = <<"a/b/c">>, node = node()}], #route{topic = <<"a/b/c">>, dest = node()}],
Routes = lists:sort(?R:match(<<"a/b/c">>)), ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
%% Batch Add %% Batch Add
lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
Routes = lists:sort(?R:match(<<"a/b/c">>)), ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
%% Del %% Del
?R:del_route(<<"a/b/c">>), ?R:del_route(<<"a/b/c">>),
@ -147,25 +96,10 @@ router_add_del(_) ->
{atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
%% Batch Del %% Batch Del
R3 = #route{topic = <<"#">>, node = 'a@127.0.0.1'}, R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'},
?R:add_route(R3), ?R:add_route(R3),
?R:del_route(R1), ?R:del_route(R1),
?R:del_route(R2), ?R:del_route(R2),
?R:del_route(R3), ?R:del_route(R3),
[] = lists:sort(?R:match(<<"a/b/c">>)). [] = lists:sort(?R:match(<<"a/b/c">>)).
t_print(_) ->
Routes = [#route{topic = <<"a/b/c">>, node = node()},
#route{topic = <<"#">>, node = node()},
#route{topic = <<"+/#">>, node = node()}],
lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
?R:print(<<"a/b/c">>),
?R:del_route(<<"+/#">>),
?R:del_route(<<"a/b/c">>),
?R:del_route(<<"#">>).
t_unused(_) ->
gen_server:call(?R, bad_call),
gen_server:cast(?R, bad_msg),
?R ! bad_info.

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
all() -> [t_time_now_to]. all() -> [t_time_now_to].

View File

@ -20,6 +20,7 @@
%% CT %% CT
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1, -import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1,
words/1, systop/1, feed_var/3, parse/1, parse/2]). words/1, systop/1, feed_var/3, parse/1, parse/2]).

View File

@ -1,39 +1,40 @@
%%-------------------------------------------------------------------- %%%===================================================================
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
%% %%%
%% Licensed under the Apache License, Version 2.0 (the "License"); %%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at %%% You may obtain a copy of the License at
%% %%%
%% http://www.apache.org/licenses/LICENSE-2.0 %%% http://www.apache.org/licenses/LICENSE-2.0
%% %%%
%% Unless required by applicable law or agreed to in writing, software %%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %%% See the License for the specific language governing permissions and
%% limitations under the License. %%% limitations under the License.
%%-------------------------------------------------------------------- %%%===================================================================
-module(emqx_trie_SUITE). -module(emqx_trie_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl"). -include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(TRIE, emqx_trie). -define(TRIE, emqx_trie).
-define(TRIE_TABS, [emqx_trie, emqx_trie_node]).
-include_lib("eunit/include/eunit.hrl").
all() -> all() ->
[t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3]. [t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3].
init_per_suite(Config) -> init_per_suite(Config) ->
ekka_mnesia:ensure_started(), application:load(emqx),
?TRIE:mnesia(boot), ok = ekka:start(),
?TRIE:mnesia(copy),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ekka:stop(),
ekka_mnesia:ensure_stopped(), ekka_mnesia:ensure_stopped(),
ekka_mnesia:delete_schema(). ekka_mnesia:delete_schema().
@ -131,5 +132,5 @@ t_delete3(_) ->
end). end).
clear_tables() -> clear_tables() ->
lists:foreach(fun mnesia:clear_table/1, [emqx_trie, emqx_trie_node]). lists:foreach(fun mnesia:clear_table/1, ?TRIE_TABS).

View File

@ -17,6 +17,7 @@
-module(emqx_vm_SUITE). -module(emqx_vm_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").