Merge branch 'emqx30' into improve_connect

This commit is contained in:
tigercl 2018-10-19 16:03:17 +08:00 committed by GitHub
commit 55a12c1ab4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 952 additions and 404 deletions

View File

@ -35,11 +35,13 @@ EUNIT_OPTS = verbose
# CT_SUITES = emqx_frame # CT_SUITES = emqx_frame
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_connection emqx_session \
emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mqtt_caps \
emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub
CT_NODE_NAME = emqxct@127.0.0.1 CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
@ -138,4 +140,3 @@ dep-vsn-check:
{[], []} -> halt(0); \ {[], []} -> halt(0); \
{Rebar, Mk} -> erlang:error({deps_version_discrepancy, [{rebar, Rebar}, {mk, Mk}]}) \ {Rebar, Mk} -> erlang:error({deps_version_discrepancy, [{rebar, Rebar}, {mk, Mk}]}) \
end." end."

View File

@ -1,6 +1,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% %%
%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL) %% [ACL](http://emqtt.io/docs/v2/config.html#allow-anonymous-and-acl-file)
%% %%
%% -type who() :: all | binary() | %% -type who() :: all | binary() |
%% {ipaddr, esockd_access:cidr()} | %% {ipaddr, esockd_access:cidr()} |

View File

@ -459,6 +459,12 @@ acl_cache_ttl = 1m
## MQTT Protocol ## MQTT Protocol
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Response Topic Prefix
##
## Value: String
## Default: emqxrspv1
mqtt.response_topic_prefix = emqxrspv1
## Maximum MQTT packet size allowed. ## Maximum MQTT packet size allowed.
## ##
## Value: Bytes ## Value: Bytes

View File

@ -37,9 +37,6 @@
%% Queue topic %% Queue topic
-define(QUEUE, <<"$queue/">>). -define(QUEUE, <<"$queue/">>).
%% Shared topic
-define(SHARE, <<"$share/">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Message and Delivery %% Message and Delivery
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -65,9 +65,9 @@
end). end).
-define(IS_QOS_NAME(I), -define(IS_QOS_NAME(I),
(I =:= qos0; I =:= at_most_once; (I =:= qos0 orelse I =:= at_most_once orelse
I =:= qos1; I =:= at_least_once; I =:= qos1 orelse I =:= at_least_once orelse
I =:= qos2; I =:= exactly_once)). I =:= qos2 orelse I =:= exactly_once)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Maximum ClientId Length. %% Maximum ClientId Length.
@ -527,5 +527,8 @@
-define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}).
-endif. -define(SHARE, "$share").
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
-define(IS_SHARE(Topic), case Topic of <<?SHARE, _/binary>> -> true; _ -> false end).
-endif.

View File

@ -597,6 +597,11 @@ end}.
%% MQTT Protocol %% MQTT Protocol
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Response Topic Prefix
{mapping, "mqtt.response_topic_prefix", "emqx.response_topic_prefix",[
{datatype, string}
]}.
%% @doc Max Packet Size Allowed, 1MB by default. %% @doc Max Packet Size Allowed, 1MB by default.
{mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [ {mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [
{default, "1MB"}, {default, "1MB"},
@ -1797,4 +1802,3 @@ end}.
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
end}. end}.

View File

@ -317,13 +317,6 @@ handle_call(Req, _From, State) ->
emqx_logger:error("[Broker] unexpected call: ~p", [Req]), emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
resubscribe(From, {Subscriber, SubOpts, Topic}, State) ->
{SubPid, _} = Subscriber,
Group = maps:get(share, SubOpts, undefined),
true = do_subscribe(Group, Topic, Subscriber, SubOpts),
emqx_shared_sub:subscribe(Group, Topic, SubPid),
emqx_router:add_route(From, Topic, dest(Group)),
{noreply, monitor_subscriber(Subscriber, State)}.
handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) -> handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) ->
@ -385,6 +378,14 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
resubscribe(From, {Subscriber, SubOpts, Topic}, State) ->
{SubPid, _} = Subscriber,
Group = maps:get(share, SubOpts, undefined),
true = do_subscribe(Group, Topic, Subscriber, SubOpts),
emqx_shared_sub:subscribe(Group, Topic, SubPid),
emqx_router:add_route(From, Topic, dest(Group)),
{noreply, monitor_subscriber(Subscriber, State)}.
insert_subscriber(Group, Topic, Subscriber) -> insert_subscriber(Group, Topic, Subscriber) ->
Subscribers = subscribers(Topic), Subscribers = subscribers(Topic),
case lists:member(Subscriber, Subscribers) of case lists:member(Subscriber, Subscribers) of

View File

@ -19,6 +19,9 @@
-export([start_link/0]). -export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%% internal export
-export([stats_fun/0]).
-define(HELPER, ?MODULE). -define(HELPER, ?MODULE).
-record(state, {}). -record(state, {}).
@ -32,7 +35,9 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
init([]) -> init([]) ->
emqx_stats:update_interval(broker_stats, stats_fun()), %% Use M:F/A for callback, not anonymous function because
%% fun M:F/A is small, also no badfun risk during hot beam reload
emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0),
{ok, #state{}, hibernate}. {ok, #state{}, hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -58,14 +63,12 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
stats_fun() -> stats_fun() ->
fun() -> safe_update_stats(emqx_subscriber,
safe_update_stats(emqx_subscriber, 'subscribers/count', 'subscribers/max'),
'subscribers/count', 'subscribers/max'), safe_update_stats(emqx_subscription,
safe_update_stats(emqx_subscription, 'subscriptions/count', 'subscriptions/max'),
'subscriptions/count', 'subscriptions/max'), safe_update_stats(emqx_suboptions,
safe_update_stats(emqx_suboptions, 'suboptions/count', 'suboptions/max').
'suboptions/count', 'suboptions/max')
end.
safe_update_stats(Tab, Stat, MaxStat) -> safe_update_stats(Tab, Stat, MaxStat) ->
case ets:info(Tab, size) of case ets:info(Tab, size) of

View File

@ -19,7 +19,8 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([start_link/0, start_link/1]). -export([start_link/0, start_link/1]).
-export([request/5, request/6, request_async/7, receive_response/3]).
-export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]).
-export([subscribe/2, subscribe/3, subscribe/4]). -export([subscribe/2, subscribe/3, subscribe/4]).
-export([publish/2, publish/3, publish/4, publish/5]). -export([publish/2, publish/3, publish/4, publish/5]).
-export([unsubscribe/2, unsubscribe/3]). -export([unsubscribe/2, unsubscribe/3]).
@ -37,8 +38,34 @@
-export([initialized/3, waiting_for_connack/3, connected/3]). -export([initialized/3, waiting_for_connack/3, connected/3]).
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-export_type([client/0, properties/0, payload/0,
pubopt/0, subopt/0, request_input/0,
response_payload/0, request_handler/0,
corr_data/0]).
-export_type([host/0, option/0]).
%% Default timeout
-define(DEFAULT_KEEPALIVE, 60000).
-define(DEFAULT_ACK_TIMEOUT, 30000).
-define(DEFAULT_CONNECT_TIMEOUT, 60000).
-define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}).
-define(WILL_MSG(QoS, Retain, Topic, Props, Payload),
#mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}).
-define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)).
-define(NO_HANDLER, undefined).
-define(NO_GROUP, <<>>).
-define(NO_CLIENT_ID, <<>>).
-type(host() :: inet:ip_address() | inet:hostname()). -type(host() :: inet:ip_address() | inet:hostname()).
-type corr_data() :: binary().
-type(option() :: {name, atom()} -type(option() :: {name, atom()}
| {owner, pid()} | {owner, pid()}
| {host, host()} | {host, host()}
@ -57,6 +84,7 @@
| {keepalive, non_neg_integer()} | {keepalive, non_neg_integer()}
| {max_inflight, pos_integer()} | {max_inflight, pos_integer()}
| {retry_interval, timeout()} | {retry_interval, timeout()}
| {request_handler, request_handler()}
| {will_topic, iodata()} | {will_topic, iodata()}
| {will_payload, iodata()} | {will_payload, iodata()}
| {will_retain, boolean()} | {will_retain, boolean()}
@ -67,8 +95,6 @@
| {force_ping, boolean()} | {force_ping, boolean()}
| {properties, properties()}). | {properties, properties()}).
-export_type([host/0, option/0]).
-record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, -record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false,
packet_id, topic, props, payload}). packet_id, topic, props, payload}).
@ -106,6 +132,7 @@
ack_timer :: reference(), ack_timer :: reference(),
retry_interval :: pos_integer(), retry_interval :: pos_integer(),
retry_timer :: reference(), retry_timer :: reference(),
request_handler :: request_handler(),
session_present :: boolean(), session_present :: boolean(),
last_packet_id :: packet_id(), last_packet_id :: packet_id(),
parse_state :: emqx_frame:state()}). parse_state :: emqx_frame:state()}).
@ -124,7 +151,7 @@
-type(qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos()). -type(qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos()).
-type(pubopt() :: {retain, boolean()} | {qos, qos()}). -type(pubopt() :: {retain, boolean()} | {qos, qos()} | {timeout, timeout()}).
-type(subopt() :: {rh, 0 | 1 | 2} -type(subopt() :: {rh, 0 | 1 | 2}
| {rap, boolean()} | {rap, boolean()}
@ -135,23 +162,35 @@
-type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}).
-export_type([client/0, topic/0, qos/0, properties/0, payload/0, -type(request_input() :: binary()).
packet_id/0, pubopt/0, subopt/0, reason_code/0]).
%% Default timeout -type(response_payload() :: binary()).
-define(DEFAULT_KEEPALIVE, 60000).
-define(DEFAULT_ACK_TIMEOUT, 30000).
-define(DEFAULT_CONNECT_TIMEOUT, 60000).
-define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}). -type(request_handler() :: fun((request_input()) -> response_payload())).
-define(WILL_MSG(QoS, Retain, Topic, Props, Payload), -type(group() :: binary()).
#mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Swap in a new request handler on the fly.
-spec(set_request_handler(client(), request_handler()) -> ok).
set_request_handler(Responser, RequestHandler) ->
gen_statem:call(Responser, {set_request_handler, RequestHandler}).
%% @doc Subscribe to request topic.
-spec(sub_request_topic(client(), qos(), topic()) -> ok).
sub_request_topic(Client, QoS, Topic) ->
sub_request_topic(Client, QoS, Topic, ?NO_GROUP).
%% @doc Share-subscribe to request topic.
-spec(sub_request_topic(client(), qos(), topic(), group()) -> ok).
sub_request_topic(Client, QoS, Topic, Group) ->
Properties = get_properties(Client),
NewTopic = make_req_rsp_topic(Properties, Topic, Group),
subscribe_req_rsp_topic(Client, QoS, NewTopic).
-spec(start_link() -> gen_statem:start_ret()). -spec(start_link() -> gen_statem:start_ret()).
start_link() -> start_link([]). start_link() -> start_link([]).
@ -248,12 +287,82 @@ parse_subopt([{nl, false} | Opts], Result) ->
parse_subopt([{qos, QoS} | Opts], Result) -> parse_subopt([{qos, QoS} | Opts], Result) ->
parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}). parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}).
-spec(request(client(), topic(), topic(), payload(), qos() | [pubopt()])
-> ok | {ok, packet_id()} | {error, term()}).
request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), is_atom(QoS) ->
request(Client, ResponseTopic, RequestTopic, Payload, [{qos, ?QOS_I(QoS)}]);
request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), ?IS_QOS(QoS) ->
request(Client, ResponseTopic, RequestTopic, Payload, [{qos, QoS}]);
request(Client, ResponseTopic, RequestTopic, Payload, Opts) when is_binary(ResponseTopic), is_list(Opts) ->
request(Client, ResponseTopic, RequestTopic, Payload, Opts, _Properties = #{}).
%% @doc Send a request to request topic and wait for response.
-spec(request(client(), topic(), topic(), payload(), [pubopt()], properties())
-> {ok, response_payload()} | {error, term()}).
request(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties) ->
CorrData = make_corr_data(),
case request_async(Client, ResponseTopic, RequestTopic,
Payload, Opts, Properties, CorrData) of
ok -> receive_response(Client, CorrData, Opts);
{error, Reason} -> {error, Reason}
end.
%% @doc Get client properties.
-spec(get_properties(client()) -> properties()).
get_properties(Client) -> gen_statem:call(Client, get_properties, infinity).
%% @doc Send a request, but do not wait for response.
%% The caller should expect a `{publish, Response}' message,
%% or call `receive_response/3' to receive the message.
-spec(request_async(client(), topic(), topic(), payload(),
[pubopt()], properties(), corr_data()) -> ok | {error, any()}).
request_async(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties, CorrData)
when is_binary(ResponseTopic),
is_binary(RequestTopic),
is_map(Properties),
is_list(Opts) ->
ok = emqx_mqtt_props:validate(Properties),
Retain = proplists:get_bool(retain, Opts),
QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)),
ClientProperties = get_properties(Client),
NewResponseTopic = make_req_rsp_topic(ClientProperties, ResponseTopic),
NewRequestTopic = make_req_rsp_topic(ClientProperties, RequestTopic),
%% This is perhaps not optimal to subscribe the response topic for
%% each and every request even though the response topic is always the same
ok = sub_response_topic(Client, QoS, NewResponseTopic),
NewProperties = maps:merge(Properties, #{'Response-Topic' => NewResponseTopic,
'Correlation-Data' => CorrData}),
case publish(Client, #mqtt_msg{qos = QoS,
retain = Retain,
topic = NewRequestTopic,
props = NewProperties,
payload = iolist_to_binary(Payload)}) of
ok -> ok;
{ok, _PacketId} -> ok; %% assume auto_ack
{error, Reason} -> {error, Reason}
end.
%% @doc Block wait the response for a request sent earlier.
-spec(receive_response(client(), corr_data(), [pubopt()])
-> {ok, response_payload()} | {error, any()}).
receive_response(Client, CorrData, Opts) ->
TimeOut = proplists:get_value(timeout, Opts, ?RESPONSE_TIMEOUT_SECONDS),
MRef = erlang:monitor(process, Client),
TRef = erlang:start_timer(TimeOut, self(), response),
try
receive_response(Client, CorrData, TRef, MRef)
after
erlang:cancel_timer(TRef),
receive {timeout, TRef, _} -> ok after 0 -> ok end,
erlang:demonitor(MRef, [flush])
end.
-spec(publish(client(), topic(), payload()) -> ok | {error, term()}). -spec(publish(client(), topic(), payload()) -> ok | {error, term()}).
publish(Client, Topic, Payload) when is_binary(Topic) -> publish(Client, Topic, Payload) when is_binary(Topic) ->
publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}). publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}).
-spec(publish(client(), topic(), payload(), qos() | [pubopt()]) -spec(publish(client(), topic(), payload(), qos() | [pubopt()])
-> ok | {ok, packet_id()} | {error, term()}). -> ok | {ok, packet_id()} | {error, term()}).
publish(Client, Topic, Payload, QoS) when is_binary(Topic), is_atom(QoS) -> publish(Client, Topic, Payload, QoS) when is_binary(Topic), is_atom(QoS) ->
publish(Client, Topic, Payload, [{qos, ?QOS_I(QoS)}]); publish(Client, Topic, Payload, [{qos, ?QOS_I(QoS)}]);
publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) -> publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) ->
@ -369,7 +478,7 @@ init([Options]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
ClientId = case {proplists:get_value(proto_ver, Options, v4), ClientId = case {proplists:get_value(proto_ver, Options, v4),
proplists:get_value(client_id, Options)} of proplists:get_value(client_id, Options)} of
{v5, undefined} -> <<>>; {v5, undefined} -> ?NO_CLIENT_ID;
{_ver, undefined} -> random_client_id(); {_ver, undefined} -> random_client_id();
{_ver, Id} -> iolist_to_binary(Id) {_ver, Id} -> iolist_to_binary(Id)
end, end,
@ -396,6 +505,7 @@ init([Options]) ->
auto_ack = true, auto_ack = true,
ack_timeout = ?DEFAULT_ACK_TIMEOUT, ack_timeout = ?DEFAULT_ACK_TIMEOUT,
retry_interval = 0, retry_interval = 0,
request_handler = ?NO_HANDLER,
connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT,
last_packet_id = 1}), last_packet_id = 1}),
{ok, initialized, init_parse_state(State)}. {ok, initialized, init_parse_state(State)}.
@ -488,6 +598,8 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) ->
init(Opts, State#state{auto_ack = AutoAck}); init(Opts, State#state{auto_ack = AutoAck});
init([{retry_interval, I} | Opts], State) -> init([{retry_interval, I} | Opts], State) ->
init(Opts, State#state{retry_interval = timer:seconds(I)}); init(Opts, State#state{retry_interval = timer:seconds(I)});
init([{request_handler, Handler} | Opts], State) ->
init(Opts, State#state{request_handler = Handler});
init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) ->
init(Opts, State#state{bridge_mode = Mode}); init(Opts, State#state{bridge_mode = Mode});
init([_Opt | Opts], State) -> init([_Opt | Opts], State) ->
@ -562,7 +674,8 @@ mqtt_connect(State = #state{client_id = ClientId,
waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
SessPresent, SessPresent,
Properties), Properties),
State = #state{properties = AllProps}) -> State = #state{properties = AllProps,
client_id = ClientId}) ->
case take_call(connect, State) of case take_call(connect, State) of
{value, #call{from = From}, State1} -> {value, #call{from = From}, State1} ->
AllProps1 = case Properties of AllProps1 = case Properties of
@ -570,7 +683,8 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
_ -> maps:merge(AllProps, Properties) _ -> maps:merge(AllProps, Properties)
end, end,
Reply = {ok, self(), Properties}, Reply = {ok, self(), Properties},
State2 = State1#state{properties = AllProps1, State2 = State1#state{client_id = assign_id(ClientId, AllProps1),
properties = AllProps1,
session_present = SessPresent}, session_present = SessPresent},
{next_state, connected, ensure_keepalive_timer(State2), {next_state, connected, ensure_keepalive_timer(State2),
[{reply, From, Reply}]}; [{reply, From, Reply}]};
@ -616,6 +730,15 @@ connected({call, From}, resume, State) ->
connected({call, From}, stop, _State) -> connected({call, From}, stop, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]}; {stop_and_reply, normal, [{reply, From, ok}]};
connected({call, From}, get_properties, State = #state{properties = Properties}) ->
{keep_state, State, [{reply, From, Properties}]};
connected({call, From}, client_id, State = #state{client_id = ClientId}) ->
{keep_state, State, [{reply, From, ClientId}]};
connected({call, From}, {set_request_handler, RequestHandler}, State) ->
{keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]};
connected({call, From}, SubReq = {subscribe, Properties, Topics}, connected({call, From}, SubReq = {subscribe, Properties, Topics},
State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) ->
case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of
@ -695,29 +818,30 @@ connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) ->
connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) ->
send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State); send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State);
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
{keep_state, deliver(packet_to_msg(Packet), State)};
connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) -> connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) ->
{keep_state, State}; {keep_state, State};
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload),
State = #state{auto_ack = AutoAck}) -> State) when Properties =/= undefined ->
NewState = response_publish(Properties, State, ?QOS_0, Payload),
{keep_state, deliver(packet_to_msg(Packet), NewState)};
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) ->
{keep_state, deliver(packet_to_msg(Packet), State)};
_ = deliver(packet_to_msg(Packet), State), connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, Properties, Payload), State)
case AutoAck of when Properties =/= undefined ->
true -> send_puback(?PUBACK_PACKET(PacketId), State); NewState = response_publish(Properties, State, ?QOS_1, Payload),
false -> {keep_state, State} publish_process(?QOS_1, Packet, NewState);
end;
connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
State = #state{awaiting_rel = AwaitingRel}) -> publish_process(?QOS_1, Packet, State);
case send_puback(?PUBREC_PACKET(PacketId), State) of
{keep_state, NewState} -> connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, Payload), State)
AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel), when Properties =/= undefined ->
{keep_state, NewState#state{awaiting_rel = AwaitingRel1}}; NewState = response_publish(Properties, State, ?QOS_2, Payload),
Stop -> Stop publish_process(?QOS_2, Packet, NewState);
end; connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) ->
publish_process(?QOS_2, Packet, State);
connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties),
State = #state{owner = Owner, inflight = Inflight}) -> State = #state{owner = Owner, inflight = Inflight}) ->
@ -899,6 +1023,132 @@ code_change(_Vsn, State, Data, _Extra) ->
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Subscribe to response topic.
-spec(sub_response_topic(client(), qos(), topic()) -> ok).
sub_response_topic(Client, QoS, Topic) when is_binary(Topic) ->
subscribe_req_rsp_topic(Client, QoS, Topic).
receive_response(Client, CorrData, TRef, MRef) ->
receive
{publish, Response} ->
{ok, Properties} = maps:find(properties, Response),
case maps:find('Correlation-Data', Properties) of
{ok, CorrData} ->
maps:find(payload, Response);
_ ->
emqx_logger:debug("Discarded stale response: ~p", [Response]),
receive_response(Client, CorrData, TRef, MRef)
end;
{timeout, TRef, response} ->
{error, timeout};
{'DOWN', MRef, process, _, _} ->
{error, client_down}
end.
%% Make a unique correlation data for each request.
%% It has to be unique because stale responses should be discarded.
make_corr_data() -> term_to_binary(make_ref()).
%% Shared function for request and response topic subscription.
subscribe_req_rsp_topic(Client, QoS, Topic) ->
%% It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription
{ok, _Props, _QoS} = subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
{nl, not ?IS_SHARE(Topic)},
{qos, QoS}]}]),
emqx_logger:debug("Subscribed to topic ~s", [Topic]),
ok.
%% Make a request or response topic.
make_req_rsp_topic(Properties, Topic) ->
make_req_rsp_topic(Properties, Topic, ?NO_GROUP).
%% Same as make_req_rsp_topic/2, but allow shared subscription (for request topics)
make_req_rsp_topic(Properties, Topic, Group) ->
case maps:find('Response-Information', Properties) of
{ok, ResponseInformation} when ResponseInformation =/= <<>> ->
emqx_topic:join([req_rsp_topic_prefix(Group, ResponseInformation), Topic]);
_ ->
erlang:error(no_response_information)
end.
req_rsp_topic_prefix(?NO_GROUP, Prefix) -> Prefix;
req_rsp_topic_prefix(Group, Prefix) -> ?SHARE(Group, Prefix).
assign_id(?NO_CLIENT_ID, Props) ->
case maps:find('Assigned-Client-Identifier', Props) of
{ok, Value} ->
Value;
_ ->
error(bad_client_id)
end;
assign_id(Id, _Props) ->
Id.
publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State = #state{auto_ack = AutoAck}) ->
_ = deliver(packet_to_msg(Packet), State),
case AutoAck of
true -> send_puback(?PUBACK_PACKET(PacketId), State);
false -> {keep_state, State}
end;
publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId),
State = #state{awaiting_rel = AwaitingRel}) ->
case send_puback(?PUBREC_PACKET(PacketId), State) of
{keep_state, NewState} ->
AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel),
{keep_state, NewState#state{awaiting_rel = AwaitingRel1}};
Stop -> Stop
end.
response_publish(undefined, State, _QoS, _Payload) ->
State;
response_publish(Properties, State = #state{request_handler = RequestHandler}, QoS, Payload) ->
case maps:find('Response-Topic', Properties) of
{ok, ResponseTopic} ->
case RequestHandler of
?NO_HANDLER -> State;
_ -> do_publish(ResponseTopic, Properties, State, QoS, Payload)
end;
_ ->
State
end.
do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) ->
Msg = #mqtt_msg{qos = ?QOS_0,
retain = false,
topic = ResponseTopic,
props = Properties,
payload = RequestHandler(Payload)
},
case send(Msg, State) of
{ok, NewState} -> NewState;
_Error -> State
end;
do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler,
inflight = Inflight,
last_packet_id = PacketId},
QoS, Payload)
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2)->
case emqx_inflight:is_full(Inflight) of
true ->
emqx_logger:error("Inflight is full"),
State;
false ->
Msg = #mqtt_msg{packet_id = PacketId,
qos = QoS,
retain = false,
topic = ResponseTopic,
props = Properties,
payload = RequestHandler(Payload)},
case send(Msg, State) of
{ok, NewState} ->
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight),
ensure_retry_timer(NewState#state{inflight = Inflight1});
{error, Reason} ->
emqx_logger:error("Send failed: ~p", [Reason]),
State
end
end.
ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) ->
ensure_keepalive_timer(timer:seconds(Secs), State); ensure_keepalive_timer(timer:seconds(Secs), State);
ensure_keepalive_timer(State = #state{keepalive = 0}) -> ensure_keepalive_timer(State = #state{keepalive = 0}) ->
@ -986,10 +1236,15 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) ->
deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
topic = Topic, props = Props, payload = Payload}, topic = Topic, props = Props, payload = Payload},
State = #state{owner = Owner}) -> State = #state{owner = Owner, request_handler = RequestHandler}) ->
Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, case RequestHandler of
topic => Topic, properties => Props, payload => Payload, ?NO_HANDLER ->
client_pid => self()}}, Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId,
topic => Topic, properties => Props, payload => Payload,
client_pid => self()}};
_ ->
ok
end,
State. State.
packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
@ -1001,7 +1256,7 @@ packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
properties = Props}, properties = Props},
payload = Payload}) -> payload = Payload}) ->
#mqtt_msg{qos = QoS, retain = R, dup = Dup, packet_id = PacketId, #mqtt_msg{qos = QoS, retain = R, dup = Dup, packet_id = PacketId,
topic = Topic, props = Props, payload = Payload}. topic = Topic, props = Props, payload = Payload}.
msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId,
topic = Topic, props = Props, payload = Payload}) -> topic = Topic, props = Props, payload = Payload}) ->
@ -1070,7 +1325,6 @@ receive_loop(Bytes, State = #state{parse_state = ParseState}) ->
{error, Reason} -> {error, Reason} ->
{stop, Reason}; {stop, Reason};
{'EXIT', Error} -> {'EXIT', Error} ->
io:format("client stop"),
{stop, Error} {stop, Error}
end. end.

View File

@ -30,6 +30,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
%% internal export
-export([update_conn_stats/0]).
-define(CM, ?MODULE). -define(CM, ?MODULE).
%% ETS Tables. %% ETS Tables.
@ -125,7 +128,7 @@ init([]) ->
_ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]), _ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
_ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts), _ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
_ = emqx_tables:new(?CONN_STATS_TAB, TabOpts), _ = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
ok = emqx_stats:update_interval(cm_stats, fun update_conn_stats/0), ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0),
{ok, #{conn_pmon => emqx_pmon:new()}}. {ok, #{conn_pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->

View File

@ -63,8 +63,8 @@ validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) ->
((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid)) ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid))
andalso validate_properties(?PUBLISH, Properties); andalso validate_properties(?PUBLISH, Properties);
validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}})) -> validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = Properties})) ->
error(protocol_error); validate_properties(?CONNECT, Properties);
validate(_Packet) -> validate(_Packet) ->
true. true.
@ -82,11 +82,24 @@ validate_properties(?PUBLISH, #{'Topic-Alias':= I})
error(topic_alias_invalid); error(topic_alias_invalid);
validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) ->
error(protocol_error); error(protocol_error);
validate_properties(?PUBLISH, #{'Response-Topic' := ResponseTopic}) ->
case emqx_topic:wildcard(ResponseTopic) of
true ->
error(protocol_error);
false ->
true
end;
validate_properties(?CONNECT, #{'Receive-Maximum' := 0}) ->
error(protocol_error);
validate_properties(?CONNECT, #{'Request-Response-Information' := ReqRespInfo})
when ReqRespInfo =/= 0, ReqRespInfo =/= 1 ->
error(protocol_error);
validate_properties(?CONNECT, #{'Request-Problem-Information' := ReqProInfo})
when ReqProInfo =/= 0, ReqProInfo =/= 1 ->
error(protocol_error);
validate_properties(_, _) -> validate_properties(_, _) ->
true. true.
validate_subscription({Topic, #{qos := QoS}}) -> validate_subscription({Topic, #{qos := QoS}}) ->
emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). emqx_topic:validate(filter, Topic) andalso validate_qos(QoS).

View File

@ -82,27 +82,27 @@
-spec(init(map(), list()) -> state()). -spec(init(map(), list()) -> state()).
init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
Zone = proplists:get_value(zone, Options), Zone = proplists:get_value(zone, Options),
#pstate{zone = Zone, #pstate{zone = Zone,
sendfun = SendFun, sendfun = SendFun,
peername = Peername, peername = Peername,
peercert = Peercert, peercert = Peercert,
proto_ver = ?MQTT_PROTO_V4, proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
client_id = <<>>, client_id = <<>>,
is_assigned = false, is_assigned = false,
conn_pid = self(), conn_pid = self(),
username = init_username(Peercert, Options), username = init_username(Peercert, Options),
is_super = false, is_super = false,
clean_start = false, clean_start = false,
topic_aliases = #{}, topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size), packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint), mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false, is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false), enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl), enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0}, recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0},
connected = false}. connected = false}.
init_username(Peercert, Options) -> init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of case proplists:get_value(peer_cert_as_username, Options) of
@ -194,6 +194,13 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
%% Packet Received %% Packet Received
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
set_protover(?CONNECT_PACKET(#mqtt_packet_connect{
proto_ver = ProtoVer}),
PState) ->
PState#pstate{ proto_ver = ProtoVer };
set_protover(_Packet, PState) ->
PState.
-spec(received(emqx_mqtt_types:packet(), state()) -> -spec(received(emqx_mqtt_types:packet(), state()) ->
{ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}). {ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}).
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
@ -203,14 +210,31 @@ received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
{error, proto_unexpected_connect, PState}; {error, proto_unexpected_connect, PState};
received(Packet = ?PACKET(Type), PState) -> received(Packet = ?PACKET(Type), PState) ->
trace(recv, Packet, PState), PState1 = set_protover(Packet, PState),
case catch emqx_packet:validate(Packet) of trace(recv, Packet, PState1),
try emqx_packet:validate(Packet) of
true -> true ->
{Packet1, PState1} = preprocess_properties(Packet, PState), {Packet1, PState2} = preprocess_properties(Packet, PState1),
process_packet(Packet1, inc_stats(recv, Type, PState1)); process_packet(Packet1, inc_stats(recv, Type, PState2))
{'EXIT', {Reason, _Stacktrace}} -> catch
deliver({disconnect, rc(Reason)}, PState), error : protocol_error ->
{error, Reason, PState} deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState1),
{error, protocol_error, PState};
error : subscription_identifier_invalid ->
deliver({disconnect, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}, PState1),
{error, subscription_identifier_invalid, PState1};
error : topic_alias_invalid ->
deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState1),
{error, topic_alias_invalid, PState1};
error : topic_filters_invalid ->
deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1),
{error, topic_filters_invalid, PState1};
error : topic_name_invalid ->
deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1),
{error, topic_filters_invalid, PState1};
error : Reason ->
deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1),
{error, Reason, PState1}
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -309,7 +333,7 @@ process_packet(?CONNECT_PACKET(
{error, Error} -> {error, Error} ->
?LOG(error, "Failed to open session: ~p", [Error], PState1), ?LOG(error, "Failed to open session: ~p", [Error], PState1),
{?RC_UNSPECIFIED_ERROR, PState1} {?RC_UNSPECIFIED_ERROR, PState1}
end; end;
{error, Reason} -> {error, Reason} ->
?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState2), ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState2),
{?RC_NOT_AUTHORIZED, PState1} {?RC_NOT_AUTHORIZED, PState1}
@ -417,13 +441,13 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process_packet(?PACKET(?PINGREQ), PState) -> process_packet(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState); send(?PACKET(?PINGRESP), PState);
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}),
PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) ->
case Interval =/= 0 andalso OldInterval =:= 0 of case Interval =/= 0 andalso OldInterval =:= 0 of
true -> true ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
{error, protocol_error, PState#pstate{will_msg = undefined}}; {error, protocol_error, PState#pstate{will_msg = undefined}};
false -> false ->
emqx_session:update_expiry_interval(SPid, Interval), emqx_session:update_expiry_interval(SPid, Interval),
%% Clean willmsg %% Clean willmsg
{stop, normal, PState#pstate{will_msg = undefined}} {stop, normal, PState#pstate{will_msg = undefined}}
@ -491,7 +515,14 @@ deliver({connack, ReasonCode}, PState) ->
deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
proto_ver = ?MQTT_PROTO_V5, proto_ver = ?MQTT_PROTO_V5,
client_id = ClientId, client_id = ClientId,
conn_props = ConnProps,
is_assigned = IsAssigned}) -> is_assigned = IsAssigned}) ->
ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of
{ok, 1} ->
iolist_to_binary(emqx_config:get_env(response_topic_prefix));
_ ->
<<>>
end,
#{max_packet_size := MaxPktSize, #{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS, max_qos_allowed := MaxQoS,
mqtt_retain_available := Retain, mqtt_retain_available := Retain,
@ -503,18 +534,21 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
'Topic-Alias-Maximum' => MaxAlias, 'Topic-Alias-Maximum' => MaxAlias,
'Wildcard-Subscription-Available' => flag(Wildcard), 'Wildcard-Subscription-Available' => flag(Wildcard),
'Subscription-Identifier-Available' => 1, 'Subscription-Identifier-Available' => 1,
'Response-Information' => ResponseInformation,
'Shared-Subscription-Available' => flag(Shared)}, 'Shared-Subscription-Available' => flag(Shared)},
Props1 = if Props1 = if
MaxQoS =:= ?QOS_2 -> MaxQoS =:= ?QOS_2 ->
Props; Props;
true -> true ->
maps:put('Maximum-QoS', MaxQoS, Props) maps:put('Maximum-QoS', MaxQoS, Props)
end, end,
Props2 = if IsAssigned -> Props2 = if IsAssigned ->
Props1#{'Assigned-Client-Identifier' => ClientId}; Props1#{'Assigned-Client-Identifier' => ClientId};
true -> Props1 true -> Props1
end, end,
Props3 = case emqx_zone:get_env(Zone, server_keepalive) of Props3 = case emqx_zone:get_env(Zone, server_keepalive) of
@ -621,7 +655,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c
true -> true ->
case CleanStart of case CleanStart of
true -> 0; true -> 0;
false -> false ->
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end end
end, SessAttrs); end, SessAttrs);
@ -762,7 +796,11 @@ check_sub_acl(TopicFilters, PState) ->
fun({Topic, SubOpts}, {Ok, Acc}) -> fun({Topic, SubOpts}, {Ok, Acc}) ->
case emqx_access_control:check_acl(Credentials, subscribe, Topic) of case emqx_access_control:check_acl(Credentials, subscribe, Topic) of
allow -> {Ok, [{Topic, SubOpts}|Acc]}; allow -> {Ok, [{Topic, SubOpts}|Acc]};
deny -> {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} deny ->
emqx_logger:warning([{client, PState#pstate.client_id}],
"ACL(~s) Cannot SUBSCRIBE ~p for ACL Deny",
[PState#pstate.client_id, Topic]),
{error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
end end
end, {ok, []}, TopicFilters). end, {ok, []}, TopicFilters).
@ -814,14 +852,6 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75),
self() ! {keepalive, start, round(Secs * Backoff)}. self() ! {keepalive, start, round(Secs * Backoff)}.
rc(Reason) ->
case Reason of
protocol_error -> ?RC_PROTOCOL_ERROR;
topic_filters_invalid -> ?RC_TOPIC_FILTER_INVALID;
topic_name_invalid -> ?RC_TOPIC_NAME_INVALID;
_ -> ?RC_MALFORMED_PACKET
end.
%%----------------------------------------------------------------------------- %%-----------------------------------------------------------------------------
%% Parse topic filters %% Parse topic filters
%%----------------------------------------------------------------------------- %%-----------------------------------------------------------------------------

View File

@ -31,6 +31,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
%% internal export
-export([stats_fun/0]).
-record(routing_node, {name, const = unused}). -record(routing_node, {name, const = unused}).
-record(state, {nodes = []}). -record(state, {nodes = []}).
@ -90,7 +93,7 @@ init([]) ->
[Node | Acc] [Node | Acc]
end end
end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), end, [], mnesia:dirty_all_keys(?ROUTING_NODE)),
emqx_stats:update_interval(route_stats, stats_fun()), emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
{ok, #state{nodes = Nodes}, hibernate}. {ok, #state{nodes = Nodes}, hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -143,13 +146,11 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
stats_fun() -> stats_fun() ->
fun() -> case ets:info(?ROUTE, size) of
case ets:info(?ROUTE, size) of undefined -> ok;
undefined -> ok; Size ->
Size -> emqx_stats:setstat('routes/count', 'routes/max', Size),
emqx_stats:setstat('routes/count', 'routes/max', Size), emqx_stats:setstat('topics/count', 'topics/max', Size)
emqx_stats:setstat('topics/count', 'topics/max', Size)
end
end. end.
cleanup_routes(Node) -> cleanup_routes(Node) ->

View File

@ -31,6 +31,9 @@
%% Internal functions for rpc %% Internal functions for rpc
-export([dispatch/3]). -export([dispatch/3]).
%% Internal function for stats
-export([stats_fun/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
@ -210,7 +213,7 @@ init([]) ->
_ = emqx_tables:new(?SESSION_P_TAB, TabOpts), _ = emqx_tables:new(?SESSION_P_TAB, TabOpts),
_ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), _ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
_ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), _ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
emqx_stats:update_interval(sm_stats, stats_fun()), emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
{ok, #{session_pmon => emqx_pmon:new()}}. {ok, #{session_pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -251,10 +254,8 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
stats_fun() -> stats_fun() ->
fun() -> safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'),
safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'), safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max').
safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max')
end.
safe_update_stats(Tab, Stat, MaxStat) -> safe_update_stats(Tab, Stat, MaxStat) ->
case ets:info(Tab, size) of case ets:info(Tab, size) of

View File

@ -18,7 +18,7 @@
-include("emqx.hrl"). -include("emqx.hrl").
-export([start_link/0]). -export([start_link/0, start_link/1, stop/0]).
%% Stats API. %% Stats API.
-export([getstats/0, getstat/1]). -export([getstats/0, getstat/1]).
@ -31,7 +31,8 @@
code_change/3]). code_change/3]).
-record(update, {name, countdown, interval, func}). -record(update, {name, countdown, interval, func}).
-record(state, {timer, updates :: [#update{}]}). -record(state, {timer, updates :: [#update{}],
tick_ms :: timeout()}).
-type(stats() :: list({atom(), non_neg_integer()})). -type(stats() :: list({atom(), non_neg_integer()})).
@ -77,10 +78,20 @@
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-type opts() :: #{tick_ms := timeout()}.
%% @doc Start stats server %% @doc Start stats server
-spec(start_link() -> emqx_types:startlink_ret()). -spec(start_link() -> emqx_types:startlink_ret()).
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). start_link(#{tick_ms => timer:seconds(1)}).
-spec(start_link(opts()) -> emqx_types:startlink_ret()).
start_link(Opts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, Opts, []).
-spec(stop() -> ok).
stop() ->
gen_server:call(?SERVER, stop, infinity).
%% @doc Generate stats fun %% @doc Generate stats fun
-spec(statsfun(Stat :: atom()) -> fun()). -spec(statsfun(Stat :: atom()) -> fun()).
@ -140,16 +151,18 @@ cast(Msg) ->
%% gen_server callbacks %% gen_server callbacks
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
init([]) -> init(#{tick_ms := TickMs}) ->
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS, Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
?ROUTE_STATS, ?RETAINED_STATS]), ?ROUTE_STATS, ?RETAINED_STATS]),
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]), true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
{ok, start_timer(#state{updates = []}), hibernate}. {ok, start_timer(#state{updates = [], tick_ms = TickMs}), hibernate}.
start_timer(State) -> start_timer(#state{tick_ms = Ms} = State) ->
State#state{timer = emqx_misc:start_timer(timer:seconds(1), tick)}. State#state{timer = emqx_misc:start_timer(Ms, tick)}.
handle_call(stop, _From, State) ->
{stop, normal, _Reply = ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[Stats] unexpected call: ~p", [Req]), emqx_logger:error("[Stats] unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
@ -201,7 +214,7 @@ handle_info(Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{timer = TRef}) -> terminate(_Reason, #state{timer = TRef}) ->
timer:cancel(TRef). emqx_misc:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.

View File

@ -34,6 +34,8 @@
-define(MAX_TOPIC_LEN, 4096). -define(MAX_TOPIC_LEN, 4096).
-include("emqx_mqtt.hrl").
%% @doc Is wildcard topic? %% @doc Is wildcard topic?
-spec(wildcard(topic() | words()) -> true | false). -spec(wildcard(topic() | words()) -> true | false).
wildcard(Topic) when is_binary(Topic) -> wildcard(Topic) when is_binary(Topic) ->
@ -180,11 +182,11 @@ parse(Topic) when is_binary(Topic) ->
parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) -> parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic, Topic}); error({invalid_topic, Topic});
parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) -> parse(Topic = <<?SHARE, "/", _/binary>>, #{share := _Group}) ->
error({invalid_topic, Topic}); error({invalid_topic, Topic});
parse(<<"$queue/", Topic1/binary>>, Options) -> parse(<<"$queue/", Topic1/binary>>, Options) ->
parse(Topic1, maps:put(share, <<"$queue">>, Options)); parse(Topic1, maps:put(share, <<"$queue">>, Options));
parse(Topic = <<"$share/", Topic1/binary>>, Options) -> parse(Topic = <<?SHARE, "/", Topic1/binary>>, Options) ->
case binary:split(Topic1, <<"/">>) of case binary:split(Topic1, <<"/">>) of
[<<>>] -> error({invalid_topic, Topic}); [<<>>] -> error({invalid_topic, Topic});
[_] -> error({invalid_topic, Topic}); [_] -> error({invalid_topic, Topic});

View File

@ -27,7 +27,6 @@
-record(ssl_socket, {tcp, ssl}). -record(ssl_socket, {tcp, ssl}).
-type(socket() :: inet:socket() | #ssl_socket{}).
-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ -define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"mqtt_client">>, client_id = <<"mqtt_client">>,
@ -112,7 +111,7 @@ mqtt_connect_with_tcp(_) ->
%% Issue #599 %% Issue #599
%% Empty clientId and clean_session = false %% Empty clientId and clean_session = false
{ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
Packet = raw_send_serialise(?CLIENT2), Packet = raw_send_serialize(?CLIENT2),
emqx_client_sock:send(Sock, Packet), emqx_client_sock:send(Sock, Packet),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data), {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data),
@ -133,7 +132,7 @@ mqtt_connect_with_ssl_oneway(_) ->
ClientSsl = emqx_ct_broker_helpers:client_ssl(), ClientSsl = emqx_ct_broker_helpers:client_ssl(),
{ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock}
= emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
Packet = raw_send_serialise(?CLIENT), Packet = raw_send_serialize(?CLIENT),
emqx_client_sock:setopts(Sock, [{active, once}]), emqx_client_sock:setopts(Sock, [{active, once}]),
emqx_client_sock:send(Sock, Packet), emqx_client_sock:send(Sock, Packet),
?assert( ?assert(
@ -151,7 +150,7 @@ mqtt_connect_with_ssl_twoway(_Config) ->
ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(), ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(),
{ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock}
= emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000),
Packet = raw_send_serialise(?CLIENT), Packet = raw_send_serialize(?CLIENT),
emqx_client_sock:setopts(Sock, [{active, once}]), emqx_client_sock:setopts(Sock, [{active, once}]),
emqx_client_sock:send(Sock, Packet), emqx_client_sock:send(Sock, Packet),
timer:sleep(500), timer:sleep(500),
@ -161,6 +160,7 @@ mqtt_connect_with_ssl_twoway(_Config) ->
after 1000 -> after 1000 ->
false false
end), end),
ssl:close(SslSock),
emqx_client_sock:close(Sock). emqx_client_sock:close(Sock).
mqtt_connect_with_ws(_Config) -> mqtt_connect_with_ws(_Config) ->
@ -168,19 +168,19 @@ mqtt_connect_with_ws(_Config) ->
{ok, _} = rfc6455_client:open(WS), {ok, _} = rfc6455_client:open(WS),
%% Connect Packet %% Connect Packet
Packet = raw_send_serialise(?CLIENT), Packet = raw_send_serialize(?CLIENT),
ok = rfc6455_client:send_binary(WS, Packet), ok = rfc6455_client:send_binary(WS, Packet),
{binary, CONACK} = rfc6455_client:recv(WS), {binary, CONACK} = rfc6455_client:recv(WS),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK),
%% Sub Packet %% Sub Packet
SubPacket = raw_send_serialise(?SUBPACKET), SubPacket = raw_send_serialize(?SUBPACKET),
rfc6455_client:send_binary(WS, SubPacket), rfc6455_client:send_binary(WS, SubPacket),
{binary, SubAck} = rfc6455_client:recv(WS), {binary, SubAck} = rfc6455_client:recv(WS),
{ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck), {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck),
%% Pub Packet QoS 1 %% Pub Packet QoS 1
PubPacket = raw_send_serialise(?PUBPACKET), PubPacket = raw_send_serialize(?PUBPACKET),
rfc6455_client:send_binary(WS, PubPacket), rfc6455_client:send_binary(WS, PubPacket),
{binary, PubAck} = rfc6455_client:recv(WS), {binary, PubAck} = rfc6455_client:recv(WS),
{ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck), {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck),
@ -190,22 +190,21 @@ mqtt_connect_with_ws(_Config) ->
%%issue 1811 %%issue 1811
packet_size(_Config) -> packet_size(_Config) ->
{ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000),
Packet = raw_send_serialise(?CLIENT), Packet = raw_send_serialize(?CLIENT),
emqx_client_sock:send(Sock, Packet), emqx_client_sock:send(Sock, Packet),
{ok, Data} = gen_tcp:recv(Sock, 0), {ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
%% Pub Packet QoS 1 %% Pub Packet QoS 1
PubPacket = raw_send_serialise(?BIG_PUBPACKET), PubPacket = raw_send_serialize(?BIG_PUBPACKET),
emqx_client_sock:send(Sock, PubPacket), emqx_client_sock:send(Sock, PubPacket),
{ok, Data1} = gen_tcp:recv(Sock, 0), {ok, Data1} = gen_tcp:recv(Sock, 0),
{ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1), {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1),
emqx_client_sock:close(Sock). emqx_client_sock:close(Sock).
raw_send_serialise(Packet) -> raw_send_serialize(Packet) ->
emqx_frame:serialize(Packet). emqx_frame:serialize(Packet).
raw_recv_pase(P) -> raw_recv_pase(P) ->
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
version => ?MQTT_PROTO_V4} }). version => ?MQTT_PROTO_V4} }).

View File

@ -12,7 +12,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
-module(emqx_mqtt_compat_SUITE). -module(emqx_client_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -32,15 +32,24 @@
<<"+/+">>, <<"TopicA/#">>]). <<"+/+">>, <<"TopicA/#">>]).
all() -> all() ->
[basic_test, [ {group, mqttv4},
will_message_test, {group, mqttv5}
zero_length_clientid_test, ].
offline_message_queueing_test,
overlapping_subscriptions_test, groups() ->
%% keepalive_test, [{mqttv4, [non_parallel_tests],
redelivery_on_reconnect_test, [basic_test,
%% subscribe_failure_test, will_message_test,
dollar_topics_test]. offline_message_queueing_test,
overlapping_subscriptions_test,
%% keepalive_test,
redelivery_on_reconnect_test,
%% subscribe_failure_test,
dollar_topics_test]},
{mqttv5, [non_parallel_tests],
[request_response,
share_sub_request_topic]}
].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(), emqx_ct_broker_helpers:run_setup_steps(),
@ -49,6 +58,77 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps(). emqx_ct_broker_helpers:run_teardown_steps().
request_response_exception(QoS) ->
{ok, Client, _} = emqx_client:start_link([{proto_ver, v5},
{properties, #{ 'Request-Response-Information' => 0 }}]),
?assertError(no_response_information,
emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)),
ok = emqx_client:disconnect(Client).
request_response_per_qos(QoS) ->
{ok, Requester, _} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"requester">>},
{properties, #{ 'Request-Response-Information' => 1}}]),
{ok, Responser, _} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"responser">>},
{properties, #{ 'Request-Response-Information' => 1}},
{request_handler, fun(_Req) -> <<"ResponseTest">> end}]),
ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>),
{ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) ->
<<"ResponseFunctionTest">>;
(_) ->
<<"404">>
end),
{ok, <<"ResponseFunctionTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
{ok, <<"404">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"invalid_request">>, QoS),
ok = emqx_client:disconnect(Responser),
ok = emqx_client:disconnect(Requester).
request_response(_Config) ->
request_response_per_qos(?QOS_2),
request_response_per_qos(?QOS_1),
request_response_per_qos(?QOS_0),
request_response_exception(?QOS_0),
request_response_exception(?QOS_1),
request_response_exception(?QOS_2).
share_sub_request_topic(_Config) ->
share_sub_request_topic_per_qos(?QOS_2),
share_sub_request_topic_per_qos(?QOS_1),
share_sub_request_topic_per_qos(?QOS_0).
share_sub_request_topic_per_qos(QoS) ->
application:set_env(?APPLICATION, shared_subscription_strategy, random),
ReqTopic = <<"request-topic">>,
RspTopic = <<"response-topic">>,
Group = <<"g1">>,
Properties = #{ 'Request-Response-Information' => 1},
Opts = fun(ClientId) -> [{proto_ver, v5},
{client_id, atom_to_binary(ClientId, utf8)},
{properties, Properties}
] end,
{ok, Requester, _} = emqx_client:start_link(Opts(requester)),
{ok, Responser1, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]),
{ok, Responser2, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]),
ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group),
ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group),
%% Send a request, wait for response, validate response then return responser ID
ReqFun = fun(Req) ->
{ok, Rsp} = emqx_client:request(Requester, RspTopic, ReqTopic, Req, QoS),
case Rsp of
<<"1-", Req/binary>> -> 1;
<<"2-", Req/binary>> -> 2
end
end,
Ids = lists:map(fun(I) -> ReqFun(integer_to_binary(I)) end, lists:seq(1, 100)),
%% we are testing with random shared-dispatch strategy,
%% fail if not all responsers got a chance to handle requests
?assertEqual([1, 2], lists:usort(Ids)),
ok = emqx_client:disconnect(Responser1),
ok = emqx_client:disconnect(Responser2),
ok = emqx_client:disconnect(Requester).
receive_messages(Count) -> receive_messages(Count) ->
receive_messages(Count, []). receive_messages(Count, []).
@ -59,7 +139,7 @@ receive_messages(Count, Msgs) ->
{publish, Msg} -> {publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]); receive_messages(Count-1, [Msg|Msgs]);
Other -> Other ->
ct:log("~p~n", [Other]), ct:log("~p~n", [Other]),
receive_messages(Count, Msgs) receive_messages(Count, Msgs)
after 10 -> after 10 ->
Msgs Msgs
@ -90,18 +170,6 @@ will_message_test(_Config) ->
ok = emqx_client:disconnect(C2), ok = emqx_client:disconnect(C2),
ct:print("Will message test succeeded"). ct:print("Will message test succeeded").
zero_length_clientid_test(_Config) ->
ct:print("Zero length clientid test starting"),
%% TODO: There are some controversies on the situation when
%% clean_start flag is true and clientid is zero length.
%% {error, _} = emqx_client:start_link([{clean_start, false},
%% {client_id, <<>>}]),
{ok, _, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<>>}]),
ct:print("Zero length clientid test succeeded").
offline_message_queueing_test(_) -> offline_message_queueing_test(_) ->
{ok, C1, _} = emqx_client:start_link([{clean_start, false}, {ok, C1, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]), {client_id, <<"c1">>}]),
@ -109,7 +177,7 @@ offline_message_queueing_test(_) ->
ok = emqx_client:disconnect(C1), ok = emqx_client:disconnect(C1),
{ok, C2, _} = emqx_client:start_link([{clean_start, true}, {ok, C2, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<"c2">>}]), {client_id, <<"c2">>}]),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
@ -196,4 +264,3 @@ dollar_topics_test(_) ->
?assertEqual(0, length(receive_messages(1))), ?assertEqual(0, length(receive_messages(1))),
ok = emqx_client:disconnect(C), ok = emqx_client:disconnect(C),
ct:print("$ topics test succeeded"). ct:print("$ topics test succeeded").

View File

@ -1,45 +0,0 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_misc_SUITE).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compile(nowarn_export_all).
-define(SOCKOPTS, [binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}]).
all() -> [t_merge_opts].
t_merge_opts(_) ->
Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw,
binary,
{backlog, 1024},
{nodelay, false},
{max_clients, 1024},
{acceptors, 16}]),
?assertEqual(1024, proplists:get_value(backlog, Opts)),
?assertEqual(1024, proplists:get_value(max_clients, Opts)),
[binary, raw,
{acceptors, 16},
{backlog, 1024},
{max_clients, 1024},
{nodelay, false},
{packet, raw},
{reuseaddr, true}] = lists:sort(Opts).

View File

@ -15,6 +15,30 @@
-module(emqx_misc_tests). -module(emqx_misc_tests).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(SOCKOPTS, [binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}]).
t_merge_opts_test() ->
Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw,
binary,
{backlog, 1024},
{nodelay, false},
{max_clients, 1024},
{acceptors, 16}]),
?assertEqual(1024, proplists:get_value(backlog, Opts)),
?assertEqual(1024, proplists:get_value(max_clients, Opts)),
[binary, raw,
{acceptors, 16},
{backlog, 1024},
{max_clients, 1024},
{nodelay, false},
{packet, raw},
{reuseaddr, true}] = lists:sort(Opts).
timer_cancel_flush_test() -> timer_cancel_flush_test() ->
Timer = emqx_misc:start_timer(0, foo), Timer = emqx_misc:start_timer(0, foo),
ok = emqx_misc:cancel_timer(Timer), ok = emqx_misc:cancel_timer(Timer),
@ -39,4 +63,3 @@ message_queue_too_long_test() ->
conn_proc_mng_policy(L) -> conn_proc_mng_policy(L) ->
emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}). emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}).

View File

@ -44,14 +44,49 @@ packet_type_name(_) ->
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)).
packet_validate(_) -> packet_validate(_) ->
?assertEqual(true, emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))), ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))),
?assertEqual(true, emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), ?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))),
?assertEqual(true, emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))),
?assertEqual(true, emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}))), ?assert(emqx_packet:validate(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, <<"payload">>))),
case catch emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 0}})) of ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}))),
{'EXIT', {protocol_error, _}} -> ?assertEqual(true, true); ?assertError(subscription_identifier_invalid,
true -> ?assertEqual(true, false) emqx_packet:validate(
end. ?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1},
[{<<"topic">>, #{qos => ?QOS0}}]))),
?assertError(topic_filters_invalid,
emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))),
?assertError(topic_name_invalid,
emqx_packet:validate(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>))),
?assertError(topic_name_invalid,
emqx_packet:validate(?PUBLISH_PACKET
(1, <<"+/+">>, 1, #{}, <<"payload">>))),
?assertError(topic_alias_invalid,
emqx_packet:validate(
?PUBLISH_PACKET
(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>))),
?assertError(protocol_error,
emqx_packet:validate(
?PUBLISH_PACKET(1, <<"topic">>, 1,
#{'Subscription-Identifier' => 10}, <<"payload">>))),
?assertError(protocol_error,
emqx_packet:validate(
?PUBLISH_PACKET(1, <<"topic">>, 1,
#{'Response-Topic' => <<"+/+">>}, <<"payload">>))),
?assertError(protocol_error,
emqx_packet:validate(
?CONNECT_PACKET(#mqtt_packet_connect{
properties =
#{'Request-Response-Information' => -1}}))),
?assertError(protocol_error,
emqx_packet:validate(
?CONNECT_PACKET(#mqtt_packet_connect{
properties =
#{'Request-Problem-Information' => 2}}))),
?assertError(protocol_error,
emqx_packet:validate(
?CONNECT_PACKET(#mqtt_packet_connect{
properties =
#{'Receive-Maximum' => 0}}))).
packet_message(_) -> packet_message(_) ->
Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
@ -83,16 +118,14 @@ packet_format(_) ->
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
packet_will_msg(_) -> packet_will_msg(_) ->
Pkt = #mqtt_packet_connect{ will_flag = true, Pkt = #mqtt_packet_connect{ will_flag = true,
client_id = <<"clientid">>, client_id = <<"clientid">>,
username = "test", username = "test",
will_retain = true, will_retain = true,
will_qos = ?QOS2, will_qos = ?QOS2,
will_topic = <<"topic">>, will_topic = <<"topic">>,
will_props = #{}, will_props = #{},
will_payload = <<"payload">>}, will_payload = <<"payload">>},
Msg = emqx_packet:will_msg(Pkt), Msg = emqx_packet:will_msg(Pkt),
?assertEqual(<<"clientid">>, Msg#message.from), ?assertEqual(<<"clientid">>, Msg#message.from),
?assertEqual(<<"topic">>, Msg#message.topic). ?assertEqual(<<"topic">>, Msg#message.topic).

View File

@ -19,114 +19,211 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>,
<<"/TopicA">>]).
-import(emqx_serializer, [serialize/1]). -define(CLIENT2, ?CONNECT_PACKET(#mqtt_packet_connect{
username = <<"admin">>,
clean_start = false,
password = <<"public">>})).
all() -> all() ->
[%% {group, parser}, [
%% {group, serializer}, {group, mqttv4},
{group, packet}, {group, mqttv5}].
{group, message}].
groups() -> groups() ->
[%% {parser, [], [{mqttv4,
%% [ [sequence],
%% parse_connect, [
%% parse_bridge, connect_v4,
%% parse_publish, subscribe_v4
%% parse_puback, ]},
%% parse_pubrec, {mqttv5,
%% parse_pubrel, [sequence],
%% parse_pubcomp, [
%% parse_subscribe, connect_v5,
%% parse_unsubscribe, subscribe_v5
%% parse_pingreq, ]
%% parse_disconnect]}, }].
%% {serializer, [],
%% [serialize_connect, init_per_suite(Config) ->
%% serialize_connack, emqx_ct_broker_helpers:run_setup_steps(),
%% serialize_publish, Config.
%% serialize_puback,
%% serialize_pubrel, end_per_suite(_Config) ->
%% serialize_subscribe, emqx_ct_broker_helpers:run_teardown_steps().
%% serialize_suback,
%% serialize_unsubscribe, with_connection(DoFun) ->
%% serialize_unsuback, {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
%% serialize_pingreq, [binary, {packet, raw},
%% serialize_pingresp, {active, false}], 3000),
%% serialize_disconnect]}, try
{packet, [], DoFun(Sock)
[packet_proto_name, after
packet_type_name, emqx_client_sock:close(Sock)
packet_format]}, end.
{message, [],
[message_make connect_v4(_) ->
%% message_from_packet with_connection(fun(Sock) ->
]} emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))),
]. {error, closed} =gen_tcp:recv(Sock, 0)
end),
with_connection(fun(Sock) ->
ConnectPacket = raw_send_serialize(?CONNECT_PACKET
(#mqtt_packet_connect{
client_id = <<"mqttv4_client">>,
username = <<"admin">>,
password = <<"public">>,
proto_ver = ?MQTT_PROTO_V4
})),
emqx_client_sock:send(Sock, ConnectPacket),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4),
emqx_client_sock:send(Sock, ConnectPacket),
{error, closed} = gen_tcp:recv(Sock, 0)
end),
ok.
connect_v5(_) ->
with_connection(fun(Sock) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Request-Response-Information' => -1}}))),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
%%-------------------------------------------------------------------- with_connection(fun(Sock) ->
%% Packet Cases emqx_client_sock:send(Sock,
%%-------------------------------------------------------------------- raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Request-Problem-Information' => 2}}))),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
packet_proto_name(_) -> with_connection(fun(Sock) ->
?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), emqx_client_sock:send(Sock,
?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)). raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
properties =
#{'Request-Response-Information' => 1}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0,
#{'Response-Information' := _RespInfo}), _} =
raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
ok.
packet_type_name(_) -> do_connect(Sock, ProtoVer) ->
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), emqx_client_sock:send(Sock, raw_send_serialize(
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). ?CONNECT_PACKET(
#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
proto_ver = ProtoVer
}))),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ProtoVer).
%% packet_connack_name(_) -> subscribe_v4(_) ->
%% ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)), with_connection(fun(Sock) ->
%% ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)), do_connect(Sock, ?MQTT_PROTO_V4),
%% ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)), SubPacket = raw_send_serialize(
%% ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)), ?SUBSCRIBE_PACKET(15,
%% ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)), [{<<"topic">>, #{rh => 1,
%% ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)). qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}])),
emqx_client_sock:send(Sock, SubPacket),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(15, _), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4)
end),
ok.
packet_format(_) -> subscribe_v5(_) ->
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), with_connection(fun(Sock) ->
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), do_connect(Sock, ?MQTT_PROTO_V5),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), SubPacket = raw_send_serialize(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1},[]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), #{version => ?MQTT_PROTO_V5}),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), emqx_client_sock:send(Sock, SubPacket),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), {ok, DisConnData} = gen_tcp:recv(Sock, 0),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), {ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), _} =
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), end),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). with_connection(fun(Sock) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(0, #{}, [{<<"TopicQos0">>,
#{rh => 1, qos => ?QOS_2,
rap => 0, nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5}),
emqx_client_sock:send(Sock, SubPacket),
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} =
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
end),
with_connection(fun(Sock) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 0},
[{<<"TopicQos0">>,
#{rh => 1, qos => ?QOS_2,
rap => 0, nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5}),
emqx_client_sock:send(Sock, SubPacket),
{ok, DisConnData} = gen_tcp:recv(Sock, 0),
{ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), _} =
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
end),
with_connection(fun(Sock) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1},
[{<<"TopicQos0">>,
#{rh => 1, qos => ?QOS_2,
rap => 0, nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5}),
emqx_client_sock:send(Sock, SubPacket),
{ok, SubData} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} =
raw_recv_parse(SubData, ?MQTT_PROTO_V5)
end),
ok.
%%-------------------------------------------------------------------- publish_v4(_) ->
%% Message Cases ok.
%%--------------------------------------------------------------------
message_make(_) -> publish_v5(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), ok.
?assertEqual(0, Msg#message.qos),
Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg1#message.id)),
?assertEqual(qos2, Msg1#message.qos).
%% message_from_packet(_) -> raw_send_serialize(Packet) ->
%% Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), emqx_frame:serialize(Packet).
%% ?assertEqual(1, Msg#message.qos),
%% %% ?assertEqual(10, Msg#message.pktid),
%% ?assertEqual(<<"topic">>, Msg#message.topic),
%% WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true,
%% will_topic = <<"WillTopic">>,
%% will_payload = <<"WillMsg">>}),
%% ?assertEqual(<<"WillTopic">>, WillMsg#message.topic),
%% ?assertEqual(<<"WillMsg">>, WillMsg#message.payload).
%% Msg2 = emqx_message:fomat_packet(<<"username">>, <<"clientid">>,
%% ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),
raw_send_serialize(Packet, Opts) ->
emqx_frame:serialize(Packet, Opts).
raw_recv_parse(P, ProtoVersion) ->
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
version => ProtoVersion}}).

View File

@ -1,56 +0,0 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_stats_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
all() -> [t_set_get_state, t_update_interval].
t_set_get_state(_) ->
emqx_stats:start_link(),
SetConnsCount = emqx_stats:statsfun('connections/count'),
SetConnsCount(1),
1 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 2),
2 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 'connections/max', 3),
timer:sleep(100),
3 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
emqx_stats:setstat('connections/count', 'connections/max', 2),
timer:sleep(100),
2 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
SetConns = emqx_stats:statsfun('connections/count', 'connections/max'),
SetConns(4),
timer:sleep(100),
4 = emqx_stats:getstat('connections/count'),
4 = emqx_stats:getstat('connections/max'),
Conns = emqx_stats:getstats(),
4 = proplists:get_value('connections/count', Conns),
4 = proplists:get_value('connections/max', Conns).
t_update_interval(_) ->
emqx_stats:start_link(),
emqx_stats:cancel_update(cm_stats),
ok = emqx_stats:update_interval(stats_test, fun update_stats/0),
timer:sleep(2500),
1 = emqx_stats:getstat('connections/count').
update_stats() ->
emqx_stats:setstat('connections/count', 1).

101
test/emqx_stats_tests.erl Normal file
View File

@ -0,0 +1,101 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_stats_tests).
-include_lib("eunit/include/eunit.hrl").
get_state_test() ->
with_proc(fun() ->
SetConnsCount = emqx_stats:statsfun('connections/count'),
SetConnsCount(1),
1 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 2),
2 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 'connections/max', 3),
timer:sleep(100),
3 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
emqx_stats:setstat('connections/count', 'connections/max', 2),
timer:sleep(100),
2 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
SetConns = emqx_stats:statsfun('connections/count', 'connections/max'),
SetConns(4),
timer:sleep(100),
4 = emqx_stats:getstat('connections/count'),
4 = emqx_stats:getstat('connections/max'),
Conns = emqx_stats:getstats(),
4 = proplists:get_value('connections/count', Conns),
4 = proplists:get_value('connections/max', Conns)
end).
update_interval_test() ->
TickMs = 200,
with_proc(fun() ->
SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks
emqx_stats:cancel_update(cm_stats),
UpdFun = fun() -> emqx_stats:setstat('connections/count', 1) end,
ok = emqx_stats:update_interval(stats_test, UpdFun),
timer:sleep(SleepMs),
?assertEqual(1, emqx_stats:getstat('connections/count'))
end, TickMs).
helper_test_() ->
TickMs = 200,
TestF =
fun(CbModule, CbFun) ->
SleepMs = TickMs + TickMs div 2, %% sleep for 1.5 ticks
Ref = make_ref(),
Tester = self(),
UpdFun =
fun() ->
CbModule:CbFun(),
Tester ! Ref,
ok
end,
ok = emqx_stats:update_interval(stats_test, UpdFun),
timer:sleep(SleepMs),
receive Ref -> ok after 2000 -> error(timeout) end
end,
MkTestFun =
fun(CbModule, CbFun) ->
fun() ->
with_proc(fun() -> TestF(CbModule, CbFun) end, TickMs)
end
end,
[{"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)},
{"emqx_sm", MkTestFun(emqx_sm, stats_fun)},
{"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)},
{"emqx_cm", MkTestFun(emqx_cm, update_conn_stats)}
].
with_proc(F) ->
{ok, _Pid} = emqx_stats:start_link(),
with_stop(F).
with_proc(F, TickMs) ->
{ok, _Pid} = emqx_stats:start_link(#{tick_ms => TickMs}),
with_stop(F).
with_stop(F) ->
try
%% make a synced call to the gen_server so we know it has
%% started running, hence it is safe to continue with less risk of race condition
?assertEqual(ignored, gen_server:call(emqx_stats, ignored)),
F()
after
ok = emqx_stats:stop()
end.

View File

@ -21,7 +21,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1, -import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1,
words/1, systop/1, feed_var/3, parse/1, parse/2]). words/1, systop/1, feed_var/3, parse/1]).
-define(N, 10000). -define(N, 10000).
@ -57,10 +57,10 @@ t_match(_) ->
true = match(<<"abc">>, <<"+">>), true = match(<<"abc">>, <<"+">>),
true = match(<<"a/b/c">>, <<"a/b/c">>), true = match(<<"a/b/c">>, <<"a/b/c">>),
false = match(<<"a/b/c">>, <<"a/c/d">>), false = match(<<"a/b/c">>, <<"a/c/d">>),
false = match(<<"$shared/x/y">>, <<"+">>), false = match(<<"$share/x/y">>, <<"+">>),
false = match(<<"$shared/x/y">>, <<"+/x/y">>), false = match(<<"$share/x/y">>, <<"+/x/y">>),
false = match(<<"$shared/x/y">>, <<"#">>), false = match(<<"$share/x/y">>, <<"#">>),
false = match(<<"$shared/x/y">>, <<"+/+/#">>), false = match(<<"$share/x/y">>, <<"+/+/#">>),
false = match(<<"house/1/sensor/0">>, <<"house/+">>), false = match(<<"house/1/sensor/0">>, <<"house/+">>),
false = match(<<"house">>, <<"house/+">>). false = match(<<"house">>, <<"house/+">>).
@ -77,10 +77,10 @@ t_match2(_) ->
true = match(<<"abc">>, <<"+">>), true = match(<<"abc">>, <<"+">>),
true = match(<<"a/b/c">>, <<"a/b/c">>), true = match(<<"a/b/c">>, <<"a/b/c">>),
false = match(<<"a/b/c">>, <<"a/c/d">>), false = match(<<"a/b/c">>, <<"a/c/d">>),
false = match(<<"$shared/x/y">>, <<"+">>), false = match(<<"$share/x/y">>, <<"+">>),
false = match(<<"$shared/x/y">>, <<"+/x/y">>), false = match(<<"$share/x/y">>, <<"+/x/y">>),
false = match(<<"$shared/x/y">>, <<"#">>), false = match(<<"$share/x/y">>, <<"#">>),
false = match(<<"$shared/x/y">>, <<"+/+/#">>), false = match(<<"$share/x/y">>, <<"+/+/#">>),
false = match(<<"house/1/sensor/0">>, <<"house/+">>). false = match(<<"house/1/sensor/0">>, <<"house/+">>).
t_match3(_) -> t_match3(_) ->
@ -208,4 +208,3 @@ t_parse(_) ->
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)). ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).

View File

@ -153,7 +153,7 @@ do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) ->
ok ok
end, end,
die(Socket, PPid, WsReason, normal); die(Socket, PPid, WsReason, normal);
{_, _, _, Rest2} -> {_, _, _, _Rest2} ->
io:format("Unknown frame type~n"), io:format("Unknown frame type~n"),
die(Socket, PPid, {1006, "Unknown frame type"}, normal) die(Socket, PPid, {1006, "Unknown frame type"}, normal)
end. end.

View File

@ -1,7 +1,5 @@
-module(ws_client). -module(ws_client).
-behaviour(websocket_client_handler).
-export([ -export([
start_link/0, start_link/0,
start_link/1, start_link/1,