diff --git a/Makefile b/Makefile index 368aff6a9..f9c8bd201 100644 --- a/Makefile +++ b/Makefile @@ -35,11 +35,13 @@ EUNIT_OPTS = verbose # CT_SUITES = emqx_frame ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat -CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ - emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ - emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ - emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ - emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub + +CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_connection emqx_session \ + emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \ + emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mqtt_caps \ + emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ + emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ + emqx_listeners emqx_protocol emqx_pool emqx_shared_sub CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) @@ -138,4 +140,3 @@ dep-vsn-check: {[], []} -> halt(0); \ {Rebar, Mk} -> erlang:error({deps_version_discrepancy, [{rebar, Rebar}, {mk, Mk}]}) \ end." - diff --git a/etc/acl.conf b/etc/acl.conf index fb85f3f20..b60ea5e7a 100644 --- a/etc/acl.conf +++ b/etc/acl.conf @@ -1,6 +1,6 @@ %%-------------------------------------------------------------------- %% -%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL) +%% [ACL](http://emqtt.io/docs/v2/config.html#allow-anonymous-and-acl-file) %% %% -type who() :: all | binary() | %% {ipaddr, esockd_access:cidr()} | diff --git a/etc/emqx.conf b/etc/emqx.conf index 18a7d6fd6..874a4c560 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -459,6 +459,12 @@ acl_cache_ttl = 1m ## MQTT Protocol ##-------------------------------------------------------------------- +## Response Topic Prefix +## +## Value: String +## Default: emqxrspv1 +mqtt.response_topic_prefix = emqxrspv1 + ## Maximum MQTT packet size allowed. ## ## Value: Bytes diff --git a/include/emqx.hrl b/include/emqx.hrl index bca6fe519..984f4c9e2 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -37,9 +37,6 @@ %% Queue topic -define(QUEUE, <<"$queue/">>). -%% Shared topic --define(SHARE, <<"$share/">>). - %%-------------------------------------------------------------------- %% Message and Delivery %%-------------------------------------------------------------------- diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 74bfc1120..b9bf6b55e 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -65,9 +65,9 @@ end). -define(IS_QOS_NAME(I), - (I =:= qos0; I =:= at_most_once; - I =:= qos1; I =:= at_least_once; - I =:= qos2; I =:= exactly_once)). + (I =:= qos0 orelse I =:= at_most_once orelse + I =:= qos1 orelse I =:= at_least_once orelse + I =:= qos2 orelse I =:= exactly_once)). %%-------------------------------------------------------------------- %% Maximum ClientId Length. @@ -527,5 +527,8 @@ -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). --endif. +-define(SHARE, "$share"). +-define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). +-define(IS_SHARE(Topic), case Topic of <> -> true; _ -> false end). +-endif. diff --git a/priv/emqx.schema b/priv/emqx.schema index 42b894401..becb4bff4 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -597,6 +597,11 @@ end}. %% MQTT Protocol %%-------------------------------------------------------------------- +%% @doc Response Topic Prefix +{mapping, "mqtt.response_topic_prefix", "emqx.response_topic_prefix",[ + {datatype, string} +]}. + %% @doc Max Packet Size Allowed, 1MB by default. {mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [ {default, "1MB"}, @@ -1797,4 +1802,3 @@ end}. {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. - diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 35e8276d2..f0946e92d 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -317,13 +317,6 @@ handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. -resubscribe(From, {Subscriber, SubOpts, Topic}, State) -> - {SubPid, _} = Subscriber, - Group = maps:get(share, SubOpts, undefined), - true = do_subscribe(Group, Topic, Subscriber, SubOpts), - emqx_shared_sub:subscribe(Group, Topic, SubPid), - emqx_router:add_route(From, Topic, dest(Group)), - {noreply, monitor_subscriber(Subscriber, State)}. handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) -> @@ -385,6 +378,14 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +resubscribe(From, {Subscriber, SubOpts, Topic}, State) -> + {SubPid, _} = Subscriber, + Group = maps:get(share, SubOpts, undefined), + true = do_subscribe(Group, Topic, Subscriber, SubOpts), + emqx_shared_sub:subscribe(Group, Topic, SubPid), + emqx_router:add_route(From, Topic, dest(Group)), + {noreply, monitor_subscriber(Subscriber, State)}. + insert_subscriber(Group, Topic, Subscriber) -> Subscribers = subscribers(Topic), case lists:member(Subscriber, Subscribers) of diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index fecf98a7b..e597a233e 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -19,6 +19,9 @@ -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% internal export +-export([stats_fun/0]). + -define(HELPER, ?MODULE). -record(state, {}). @@ -32,7 +35,9 @@ start_link() -> %%------------------------------------------------------------------------------ init([]) -> - emqx_stats:update_interval(broker_stats, stats_fun()), + %% Use M:F/A for callback, not anonymous function because + %% fun M:F/A is small, also no badfun risk during hot beam reload + emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0), {ok, #state{}, hibernate}. handle_call(Req, _From, State) -> @@ -58,14 +63,12 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ stats_fun() -> - fun() -> - safe_update_stats(emqx_subscriber, - 'subscribers/count', 'subscribers/max'), - safe_update_stats(emqx_subscription, - 'subscriptions/count', 'subscriptions/max'), - safe_update_stats(emqx_suboptions, - 'suboptions/count', 'suboptions/max') - end. + safe_update_stats(emqx_subscriber, + 'subscribers/count', 'subscribers/max'), + safe_update_stats(emqx_subscription, + 'subscriptions/count', 'subscriptions/max'), + safe_update_stats(emqx_suboptions, + 'suboptions/count', 'suboptions/max'). safe_update_stats(Tab, Stat, MaxStat) -> case ets:info(Tab, size) of diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 7ef9c5968..22c37d26f 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -19,7 +19,8 @@ -include("emqx_mqtt.hrl"). -export([start_link/0, start_link/1]). - +-export([request/5, request/6, request_async/7, receive_response/3]). +-export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]). -export([subscribe/2, subscribe/3, subscribe/4]). -export([publish/2, publish/3, publish/4, publish/5]). -export([unsubscribe/2, unsubscribe/3]). @@ -37,8 +38,34 @@ -export([initialized/3, waiting_for_connack/3, connected/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). +-export_type([client/0, properties/0, payload/0, + pubopt/0, subopt/0, request_input/0, + response_payload/0, request_handler/0, + corr_data/0]). +-export_type([host/0, option/0]). + +%% Default timeout +-define(DEFAULT_KEEPALIVE, 60000). +-define(DEFAULT_ACK_TIMEOUT, 30000). +-define(DEFAULT_CONNECT_TIMEOUT, 60000). + +-define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}). + +-define(WILL_MSG(QoS, Retain, Topic, Props, Payload), + #mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}). + +-define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)). + +-define(NO_HANDLER, undefined). + +-define(NO_GROUP, <<>>). + +-define(NO_CLIENT_ID, <<>>). + -type(host() :: inet:ip_address() | inet:hostname()). +-type corr_data() :: binary(). + -type(option() :: {name, atom()} | {owner, pid()} | {host, host()} @@ -57,6 +84,7 @@ | {keepalive, non_neg_integer()} | {max_inflight, pos_integer()} | {retry_interval, timeout()} + | {request_handler, request_handler()} | {will_topic, iodata()} | {will_payload, iodata()} | {will_retain, boolean()} @@ -67,8 +95,6 @@ | {force_ping, boolean()} | {properties, properties()}). --export_type([host/0, option/0]). - -record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, packet_id, topic, props, payload}). @@ -106,6 +132,7 @@ ack_timer :: reference(), retry_interval :: pos_integer(), retry_timer :: reference(), + request_handler :: request_handler(), session_present :: boolean(), last_packet_id :: packet_id(), parse_state :: emqx_frame:state()}). @@ -124,7 +151,7 @@ -type(qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos()). --type(pubopt() :: {retain, boolean()} | {qos, qos()}). +-type(pubopt() :: {retain, boolean()} | {qos, qos()} | {timeout, timeout()}). -type(subopt() :: {rh, 0 | 1 | 2} | {rap, boolean()} @@ -135,23 +162,35 @@ -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). --export_type([client/0, topic/0, qos/0, properties/0, payload/0, - packet_id/0, pubopt/0, subopt/0, reason_code/0]). +-type(request_input() :: binary()). -%% Default timeout --define(DEFAULT_KEEPALIVE, 60000). --define(DEFAULT_ACK_TIMEOUT, 30000). --define(DEFAULT_CONNECT_TIMEOUT, 60000). +-type(response_payload() :: binary()). --define(PROPERTY(Name, Val), #state{properties = #{Name := Val}}). +-type(request_handler() :: fun((request_input()) -> response_payload())). --define(WILL_MSG(QoS, Retain, Topic, Props, Payload), - #mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}). +-type(group() :: binary()). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ +%% @doc Swap in a new request handler on the fly. +-spec(set_request_handler(client(), request_handler()) -> ok). +set_request_handler(Responser, RequestHandler) -> + gen_statem:call(Responser, {set_request_handler, RequestHandler}). + +%% @doc Subscribe to request topic. +-spec(sub_request_topic(client(), qos(), topic()) -> ok). +sub_request_topic(Client, QoS, Topic) -> + sub_request_topic(Client, QoS, Topic, ?NO_GROUP). + +%% @doc Share-subscribe to request topic. +-spec(sub_request_topic(client(), qos(), topic(), group()) -> ok). +sub_request_topic(Client, QoS, Topic, Group) -> + Properties = get_properties(Client), + NewTopic = make_req_rsp_topic(Properties, Topic, Group), + subscribe_req_rsp_topic(Client, QoS, NewTopic). + -spec(start_link() -> gen_statem:start_ret()). start_link() -> start_link([]). @@ -248,12 +287,82 @@ parse_subopt([{nl, false} | Opts], Result) -> parse_subopt([{qos, QoS} | Opts], Result) -> parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}). +-spec(request(client(), topic(), topic(), payload(), qos() | [pubopt()]) + -> ok | {ok, packet_id()} | {error, term()}). +request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), is_atom(QoS) -> + request(Client, ResponseTopic, RequestTopic, Payload, [{qos, ?QOS_I(QoS)}]); +request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), ?IS_QOS(QoS) -> + request(Client, ResponseTopic, RequestTopic, Payload, [{qos, QoS}]); +request(Client, ResponseTopic, RequestTopic, Payload, Opts) when is_binary(ResponseTopic), is_list(Opts) -> + request(Client, ResponseTopic, RequestTopic, Payload, Opts, _Properties = #{}). + +%% @doc Send a request to request topic and wait for response. +-spec(request(client(), topic(), topic(), payload(), [pubopt()], properties()) + -> {ok, response_payload()} | {error, term()}). +request(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties) -> + CorrData = make_corr_data(), + case request_async(Client, ResponseTopic, RequestTopic, + Payload, Opts, Properties, CorrData) of + ok -> receive_response(Client, CorrData, Opts); + {error, Reason} -> {error, Reason} + end. + +%% @doc Get client properties. +-spec(get_properties(client()) -> properties()). +get_properties(Client) -> gen_statem:call(Client, get_properties, infinity). + +%% @doc Send a request, but do not wait for response. +%% The caller should expect a `{publish, Response}' message, +%% or call `receive_response/3' to receive the message. +-spec(request_async(client(), topic(), topic(), payload(), + [pubopt()], properties(), corr_data()) -> ok | {error, any()}). +request_async(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties, CorrData) + when is_binary(ResponseTopic), + is_binary(RequestTopic), + is_map(Properties), + is_list(Opts) -> + ok = emqx_mqtt_props:validate(Properties), + Retain = proplists:get_bool(retain, Opts), + QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), + ClientProperties = get_properties(Client), + NewResponseTopic = make_req_rsp_topic(ClientProperties, ResponseTopic), + NewRequestTopic = make_req_rsp_topic(ClientProperties, RequestTopic), + %% This is perhaps not optimal to subscribe the response topic for + %% each and every request even though the response topic is always the same + ok = sub_response_topic(Client, QoS, NewResponseTopic), + NewProperties = maps:merge(Properties, #{'Response-Topic' => NewResponseTopic, + 'Correlation-Data' => CorrData}), + case publish(Client, #mqtt_msg{qos = QoS, + retain = Retain, + topic = NewRequestTopic, + props = NewProperties, + payload = iolist_to_binary(Payload)}) of + ok -> ok; + {ok, _PacketId} -> ok; %% assume auto_ack + {error, Reason} -> {error, Reason} + end. + +%% @doc Block wait the response for a request sent earlier. +-spec(receive_response(client(), corr_data(), [pubopt()]) + -> {ok, response_payload()} | {error, any()}). +receive_response(Client, CorrData, Opts) -> + TimeOut = proplists:get_value(timeout, Opts, ?RESPONSE_TIMEOUT_SECONDS), + MRef = erlang:monitor(process, Client), + TRef = erlang:start_timer(TimeOut, self(), response), + try + receive_response(Client, CorrData, TRef, MRef) + after + erlang:cancel_timer(TRef), + receive {timeout, TRef, _} -> ok after 0 -> ok end, + erlang:demonitor(MRef, [flush]) + end. + -spec(publish(client(), topic(), payload()) -> ok | {error, term()}). publish(Client, Topic, Payload) when is_binary(Topic) -> publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}). -spec(publish(client(), topic(), payload(), qos() | [pubopt()]) - -> ok | {ok, packet_id()} | {error, term()}). + -> ok | {ok, packet_id()} | {error, term()}). publish(Client, Topic, Payload, QoS) when is_binary(Topic), is_atom(QoS) -> publish(Client, Topic, Payload, [{qos, ?QOS_I(QoS)}]); publish(Client, Topic, Payload, QoS) when is_binary(Topic), ?IS_QOS(QoS) -> @@ -369,7 +478,7 @@ init([Options]) -> process_flag(trap_exit, true), ClientId = case {proplists:get_value(proto_ver, Options, v4), proplists:get_value(client_id, Options)} of - {v5, undefined} -> <<>>; + {v5, undefined} -> ?NO_CLIENT_ID; {_ver, undefined} -> random_client_id(); {_ver, Id} -> iolist_to_binary(Id) end, @@ -396,6 +505,7 @@ init([Options]) -> auto_ack = true, ack_timeout = ?DEFAULT_ACK_TIMEOUT, retry_interval = 0, + request_handler = ?NO_HANDLER, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, last_packet_id = 1}), {ok, initialized, init_parse_state(State)}. @@ -488,6 +598,8 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) -> init(Opts, State#state{auto_ack = AutoAck}); init([{retry_interval, I} | Opts], State) -> init(Opts, State#state{retry_interval = timer:seconds(I)}); +init([{request_handler, Handler} | Opts], State) -> + init(Opts, State#state{request_handler = Handler}); init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> init(Opts, State#state{bridge_mode = Mode}); init([_Opt | Opts], State) -> @@ -562,7 +674,8 @@ mqtt_connect(State = #state{client_id = ClientId, waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, SessPresent, Properties), - State = #state{properties = AllProps}) -> + State = #state{properties = AllProps, + client_id = ClientId}) -> case take_call(connect, State) of {value, #call{from = From}, State1} -> AllProps1 = case Properties of @@ -570,7 +683,8 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, _ -> maps:merge(AllProps, Properties) end, Reply = {ok, self(), Properties}, - State2 = State1#state{properties = AllProps1, + State2 = State1#state{client_id = assign_id(ClientId, AllProps1), + properties = AllProps1, session_present = SessPresent}, {next_state, connected, ensure_keepalive_timer(State2), [{reply, From, Reply}]}; @@ -616,6 +730,15 @@ connected({call, From}, resume, State) -> connected({call, From}, stop, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; +connected({call, From}, get_properties, State = #state{properties = Properties}) -> + {keep_state, State, [{reply, From, Properties}]}; + +connected({call, From}, client_id, State = #state{client_id = ClientId}) -> + {keep_state, State, [{reply, From, ClientId}]}; + +connected({call, From}, {set_request_handler, RequestHandler}, State) -> + {keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]}; + connected({call, From}, SubReq = {subscribe, Properties, Topics}, State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of @@ -695,29 +818,30 @@ connected(cast, {pubrel, PacketId, ReasonCode, Properties}, State) -> connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> send_puback(?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), State); -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> - {keep_state, deliver(packet_to_msg(Packet), State)}; - connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), State = #state{paused = true}) -> {keep_state, State}; -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), - State = #state{auto_ack = AutoAck}) -> +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload), + State) when Properties =/= undefined -> + NewState = response_publish(Properties, State, ?QOS_0, Payload), + {keep_state, deliver(packet_to_msg(Packet), NewState)}; +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> + {keep_state, deliver(packet_to_msg(Packet), State)}; - _ = deliver(packet_to_msg(Packet), State), - case AutoAck of - true -> send_puback(?PUBACK_PACKET(PacketId), State); - false -> {keep_state, State} - end; +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, Properties, Payload), State) + when Properties =/= undefined -> + NewState = response_publish(Properties, State, ?QOS_1, Payload), + publish_process(?QOS_1, Packet, NewState); -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), - State = #state{awaiting_rel = AwaitingRel}) -> - case send_puback(?PUBREC_PACKET(PacketId), State) of - {keep_state, NewState} -> - AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel), - {keep_state, NewState#state{awaiting_rel = AwaitingRel1}}; - Stop -> Stop - end; +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> + publish_process(?QOS_1, Packet, State); + +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, Payload), State) + when Properties =/= undefined -> + NewState = response_publish(Properties, State, ?QOS_2, Payload), + publish_process(?QOS_2, Packet, NewState); +connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> + publish_process(?QOS_2, Packet, State); connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), State = #state{owner = Owner, inflight = Inflight}) -> @@ -899,6 +1023,132 @@ code_change(_Vsn, State, Data, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +%% Subscribe to response topic. +-spec(sub_response_topic(client(), qos(), topic()) -> ok). +sub_response_topic(Client, QoS, Topic) when is_binary(Topic) -> + subscribe_req_rsp_topic(Client, QoS, Topic). + +receive_response(Client, CorrData, TRef, MRef) -> + receive + {publish, Response} -> + {ok, Properties} = maps:find(properties, Response), + case maps:find('Correlation-Data', Properties) of + {ok, CorrData} -> + maps:find(payload, Response); + _ -> + emqx_logger:debug("Discarded stale response: ~p", [Response]), + receive_response(Client, CorrData, TRef, MRef) + end; + {timeout, TRef, response} -> + {error, timeout}; + {'DOWN', MRef, process, _, _} -> + {error, client_down} + end. + +%% Make a unique correlation data for each request. +%% It has to be unique because stale responses should be discarded. +make_corr_data() -> term_to_binary(make_ref()). + +%% Shared function for request and response topic subscription. +subscribe_req_rsp_topic(Client, QoS, Topic) -> + %% It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription + {ok, _Props, _QoS} = subscribe(Client, [{Topic, [{rh, 2}, {rap, false}, + {nl, not ?IS_SHARE(Topic)}, + {qos, QoS}]}]), + emqx_logger:debug("Subscribed to topic ~s", [Topic]), + ok. + +%% Make a request or response topic. +make_req_rsp_topic(Properties, Topic) -> + make_req_rsp_topic(Properties, Topic, ?NO_GROUP). + +%% Same as make_req_rsp_topic/2, but allow shared subscription (for request topics) +make_req_rsp_topic(Properties, Topic, Group) -> + case maps:find('Response-Information', Properties) of + {ok, ResponseInformation} when ResponseInformation =/= <<>> -> + emqx_topic:join([req_rsp_topic_prefix(Group, ResponseInformation), Topic]); + _ -> + erlang:error(no_response_information) + end. + +req_rsp_topic_prefix(?NO_GROUP, Prefix) -> Prefix; +req_rsp_topic_prefix(Group, Prefix) -> ?SHARE(Group, Prefix). + +assign_id(?NO_CLIENT_ID, Props) -> + case maps:find('Assigned-Client-Identifier', Props) of + {ok, Value} -> + Value; + _ -> + error(bad_client_id) + end; +assign_id(Id, _Props) -> + Id. + +publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State = #state{auto_ack = AutoAck}) -> + _ = deliver(packet_to_msg(Packet), State), + case AutoAck of + true -> send_puback(?PUBACK_PACKET(PacketId), State); + false -> {keep_state, State} + end; +publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), + State = #state{awaiting_rel = AwaitingRel}) -> + case send_puback(?PUBREC_PACKET(PacketId), State) of + {keep_state, NewState} -> + AwaitingRel1 = maps:put(PacketId, Packet, AwaitingRel), + {keep_state, NewState#state{awaiting_rel = AwaitingRel1}}; + Stop -> Stop + end. + +response_publish(undefined, State, _QoS, _Payload) -> + State; +response_publish(Properties, State = #state{request_handler = RequestHandler}, QoS, Payload) -> + case maps:find('Response-Topic', Properties) of + {ok, ResponseTopic} -> + case RequestHandler of + ?NO_HANDLER -> State; + _ -> do_publish(ResponseTopic, Properties, State, QoS, Payload) + end; + _ -> + State + end. + +do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) -> + Msg = #mqtt_msg{qos = ?QOS_0, + retain = false, + topic = ResponseTopic, + props = Properties, + payload = RequestHandler(Payload) + }, + case send(Msg, State) of + {ok, NewState} -> NewState; + _Error -> State + end; +do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler, + inflight = Inflight, + last_packet_id = PacketId}, + QoS, Payload) + when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2)-> + case emqx_inflight:is_full(Inflight) of + true -> + emqx_logger:error("Inflight is full"), + State; + false -> + Msg = #mqtt_msg{packet_id = PacketId, + qos = QoS, + retain = false, + topic = ResponseTopic, + props = Properties, + payload = RequestHandler(Payload)}, + case send(Msg, State) of + {ok, NewState} -> + Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight), + ensure_retry_timer(NewState#state{inflight = Inflight1}); + {error, Reason} -> + emqx_logger:error("Send failed: ~p", [Reason]), + State + end + end. + ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> ensure_keepalive_timer(timer:seconds(Secs), State); ensure_keepalive_timer(State = #state{keepalive = 0}) -> @@ -986,10 +1236,15 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}, - State = #state{owner = Owner}) -> - Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, - topic => Topic, properties => Props, payload => Payload, - client_pid => self()}}, + State = #state{owner = Owner, request_handler = RequestHandler}) -> + case RequestHandler of + ?NO_HANDLER -> + Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, + topic => Topic, properties => Props, payload => Payload, + client_pid => self()}}; + _ -> + ok + end, State. packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, @@ -1001,7 +1256,7 @@ packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, properties = Props}, payload = Payload}) -> #mqtt_msg{qos = QoS, retain = R, dup = Dup, packet_id = PacketId, - topic = Topic, props = Props, payload = Payload}. + topic = Topic, props = Props, payload = Payload}. msg_to_packet(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}) -> @@ -1070,7 +1325,6 @@ receive_loop(Bytes, State = #state{parse_state = ParseState}) -> {error, Reason} -> {stop, Reason}; {'EXIT', Error} -> - io:format("client stop"), {stop, Error} end. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 3e9958939..19892b386 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -30,6 +30,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% internal export +-export([update_conn_stats/0]). + -define(CM, ?MODULE). %% ETS Tables. @@ -125,7 +128,7 @@ init([]) -> _ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]), _ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts), _ = emqx_tables:new(?CONN_STATS_TAB, TabOpts), - ok = emqx_stats:update_interval(cm_stats, fun update_conn_stats/0), + ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0), {ok, #{conn_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 1db586cea..2040e595e 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -63,8 +63,8 @@ validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) -> ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid)) andalso validate_properties(?PUBLISH, Properties); -validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}})) -> - error(protocol_error); +validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = Properties})) -> + validate_properties(?CONNECT, Properties); validate(_Packet) -> true. @@ -82,11 +82,24 @@ validate_properties(?PUBLISH, #{'Topic-Alias':= I}) error(topic_alias_invalid); validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> error(protocol_error); +validate_properties(?PUBLISH, #{'Response-Topic' := ResponseTopic}) -> + case emqx_topic:wildcard(ResponseTopic) of + true -> + error(protocol_error); + false -> + true + end; +validate_properties(?CONNECT, #{'Receive-Maximum' := 0}) -> + error(protocol_error); +validate_properties(?CONNECT, #{'Request-Response-Information' := ReqRespInfo}) + when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> + error(protocol_error); +validate_properties(?CONNECT, #{'Request-Problem-Information' := ReqProInfo}) + when ReqProInfo =/= 0, ReqProInfo =/= 1 -> + error(protocol_error); validate_properties(_, _) -> true. - - validate_subscription({Topic, #{qos := QoS}}) -> emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index bd440cf39..7dd0b37e8 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -82,27 +82,27 @@ -spec(init(map(), list()) -> state()). init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> Zone = proplists:get_value(zone, Options), - #pstate{zone = Zone, - sendfun = SendFun, - peername = Peername, - peercert = Peercert, - proto_ver = ?MQTT_PROTO_V4, - proto_name = <<"MQTT">>, - client_id = <<>>, - is_assigned = false, - conn_pid = self(), - username = init_username(Peercert, Options), - is_super = false, - clean_start = false, - topic_aliases = #{}, - packet_size = emqx_zone:get_env(Zone, max_packet_size), - mountpoint = emqx_zone:get_env(Zone, mountpoint), - is_bridge = false, - enable_ban = emqx_zone:get_env(Zone, enable_ban, false), - enable_acl = emqx_zone:get_env(Zone, enable_acl), - recv_stats = #{msg => 0, pkt => 0}, - send_stats = #{msg => 0, pkt => 0}, - connected = false}. + #pstate{zone = Zone, + sendfun = SendFun, + peername = Peername, + peercert = Peercert, + proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + is_assigned = false, + conn_pid = self(), + username = init_username(Peercert, Options), + is_super = false, + clean_start = false, + topic_aliases = #{}, + packet_size = emqx_zone:get_env(Zone, max_packet_size), + mountpoint = emqx_zone:get_env(Zone, mountpoint), + is_bridge = false, + enable_ban = emqx_zone:get_env(Zone, enable_ban, false), + enable_acl = emqx_zone:get_env(Zone, enable_acl), + recv_stats = #{msg => 0, pkt => 0}, + send_stats = #{msg => 0, pkt => 0}, + connected = false}. init_username(Peercert, Options) -> case proplists:get_value(peer_cert_as_username, Options) of @@ -194,6 +194,13 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) -> %% Packet Received %%------------------------------------------------------------------------------ +set_protover(?CONNECT_PACKET(#mqtt_packet_connect{ + proto_ver = ProtoVer}), + PState) -> + PState#pstate{ proto_ver = ProtoVer }; +set_protover(_Packet, PState) -> + PState. + -spec(received(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}). received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> @@ -203,14 +210,31 @@ received(?PACKET(?CONNECT), PState = #pstate{connected = true}) -> {error, proto_unexpected_connect, PState}; received(Packet = ?PACKET(Type), PState) -> - trace(recv, Packet, PState), - case catch emqx_packet:validate(Packet) of + PState1 = set_protover(Packet, PState), + trace(recv, Packet, PState1), + try emqx_packet:validate(Packet) of true -> - {Packet1, PState1} = preprocess_properties(Packet, PState), - process_packet(Packet1, inc_stats(recv, Type, PState1)); - {'EXIT', {Reason, _Stacktrace}} -> - deliver({disconnect, rc(Reason)}, PState), - {error, Reason, PState} + {Packet1, PState2} = preprocess_properties(Packet, PState1), + process_packet(Packet1, inc_stats(recv, Type, PState2)) + catch + error : protocol_error -> + deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState1), + {error, protocol_error, PState}; + error : subscription_identifier_invalid -> + deliver({disconnect, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}, PState1), + {error, subscription_identifier_invalid, PState1}; + error : topic_alias_invalid -> + deliver({disconnect, ?RC_TOPIC_ALIAS_INVALID}, PState1), + {error, topic_alias_invalid, PState1}; + error : topic_filters_invalid -> + deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1), + {error, topic_filters_invalid, PState1}; + error : topic_name_invalid -> + deliver({disconnect, ?RC_TOPIC_FILTER_INVALID}, PState1), + {error, topic_filters_invalid, PState1}; + error : Reason -> + deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1), + {error, Reason, PState1} end. %%------------------------------------------------------------------------------ @@ -309,7 +333,7 @@ process_packet(?CONNECT_PACKET( {error, Error} -> ?LOG(error, "Failed to open session: ~p", [Error], PState1), {?RC_UNSPECIFIED_ERROR, PState1} - end; + end; {error, Reason} -> ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], PState2), {?RC_NOT_AUTHORIZED, PState1} @@ -417,13 +441,13 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> case Interval =/= 0 andalso OldInterval =:= 0 of - true -> + true -> deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), {error, protocol_error, PState#pstate{will_msg = undefined}}; - false -> + false -> emqx_session:update_expiry_interval(SPid, Interval), %% Clean willmsg {stop, normal, PState#pstate{will_msg = undefined}} @@ -491,7 +515,14 @@ deliver({connack, ReasonCode}, PState) -> deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, proto_ver = ?MQTT_PROTO_V5, client_id = ClientId, + conn_props = ConnProps, is_assigned = IsAssigned}) -> + ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of + {ok, 1} -> + iolist_to_binary(emqx_config:get_env(response_topic_prefix)); + _ -> + <<>> + end, #{max_packet_size := MaxPktSize, max_qos_allowed := MaxQoS, mqtt_retain_available := Retain, @@ -503,18 +534,21 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Topic-Alias-Maximum' => MaxAlias, 'Wildcard-Subscription-Available' => flag(Wildcard), 'Subscription-Identifier-Available' => 1, + 'Response-Information' => ResponseInformation, + 'Shared-Subscription-Available' => flag(Shared)}, - Props1 = if - MaxQoS =:= ?QOS_2 -> + Props1 = if + MaxQoS =:= ?QOS_2 -> Props; true -> maps:put('Maximum-QoS', MaxQoS, Props) end, - + Props2 = if IsAssigned -> Props1#{'Assigned-Client-Identifier' => ClientId}; true -> Props1 + end, Props3 = case emqx_zone:get_env(Zone, server_keepalive) of @@ -621,7 +655,7 @@ set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, c true -> case CleanStart of true -> 0; - false -> + false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) end end, SessAttrs); @@ -762,7 +796,11 @@ check_sub_acl(TopicFilters, PState) -> fun({Topic, SubOpts}, {Ok, Acc}) -> case emqx_access_control:check_acl(Credentials, subscribe, Topic) of allow -> {Ok, [{Topic, SubOpts}|Acc]}; - deny -> {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} + deny -> + emqx_logger:warning([{client, PState#pstate.client_id}], + "ACL(~s) Cannot SUBSCRIBE ~p for ACL Deny", + [PState#pstate.client_id, Topic]), + {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} end end, {ok, []}, TopicFilters). @@ -814,14 +852,6 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 -> Backoff = emqx_zone:get_env(Zone, keepalive_backoff, 0.75), self() ! {keepalive, start, round(Secs * Backoff)}. -rc(Reason) -> - case Reason of - protocol_error -> ?RC_PROTOCOL_ERROR; - topic_filters_invalid -> ?RC_TOPIC_FILTER_INVALID; - topic_name_invalid -> ?RC_TOPIC_NAME_INVALID; - _ -> ?RC_MALFORMED_PACKET - end. - %%----------------------------------------------------------------------------- %% Parse topic filters %%----------------------------------------------------------------------------- diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index d0d0e8075..5431c9900 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -31,6 +31,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% internal export +-export([stats_fun/0]). + -record(routing_node, {name, const = unused}). -record(state, {nodes = []}). @@ -90,7 +93,7 @@ init([]) -> [Node | Acc] end end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), - emqx_stats:update_interval(route_stats, stats_fun()), + emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0), {ok, #state{nodes = Nodes}, hibernate}. handle_call(Req, _From, State) -> @@ -143,13 +146,11 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ stats_fun() -> - fun() -> - case ets:info(?ROUTE, size) of - undefined -> ok; - Size -> - emqx_stats:setstat('routes/count', 'routes/max', Size), - emqx_stats:setstat('topics/count', 'topics/max', Size) - end + case ets:info(?ROUTE, size) of + undefined -> ok; + Size -> + emqx_stats:setstat('routes/count', 'routes/max', Size), + emqx_stats:setstat('topics/count', 'topics/max', Size) end. cleanup_routes(Node) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 1fa0b488b..d45548a78 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -31,6 +31,9 @@ %% Internal functions for rpc -export([dispatch/3]). +%% Internal function for stats +-export([stats_fun/0]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -210,7 +213,7 @@ init([]) -> _ = emqx_tables:new(?SESSION_P_TAB, TabOpts), _ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), _ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), - emqx_stats:update_interval(sm_stats, stats_fun()), + emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), {ok, #{session_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> @@ -251,10 +254,8 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ stats_fun() -> - fun() -> - safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'), - safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max') - end. + safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'), + safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max'). safe_update_stats(Tab, Stat, MaxStat) -> case ets:info(Tab, size) of diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 510b2d91a..61ff6cbc3 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -18,7 +18,7 @@ -include("emqx.hrl"). --export([start_link/0]). +-export([start_link/0, start_link/1, stop/0]). %% Stats API. -export([getstats/0, getstat/1]). @@ -31,7 +31,8 @@ code_change/3]). -record(update, {name, countdown, interval, func}). --record(state, {timer, updates :: [#update{}]}). +-record(state, {timer, updates :: [#update{}], + tick_ms :: timeout()}). -type(stats() :: list({atom(), non_neg_integer()})). @@ -77,10 +78,20 @@ -define(TAB, ?MODULE). -define(SERVER, ?MODULE). +-type opts() :: #{tick_ms := timeout()}. + %% @doc Start stats server -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + start_link(#{tick_ms => timer:seconds(1)}). + +-spec(start_link(opts()) -> emqx_types:startlink_ret()). +start_link(Opts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, Opts, []). + +-spec(stop() -> ok). +stop() -> + gen_server:call(?SERVER, stop, infinity). %% @doc Generate stats fun -spec(statsfun(Stat :: atom()) -> fun()). @@ -140,16 +151,18 @@ cast(Msg) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init([]) -> +init(#{tick_ms := TickMs}) -> _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS, ?ROUTE_STATS, ?RETAINED_STATS]), true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]), - {ok, start_timer(#state{updates = []}), hibernate}. + {ok, start_timer(#state{updates = [], tick_ms = TickMs}), hibernate}. -start_timer(State) -> - State#state{timer = emqx_misc:start_timer(timer:seconds(1), tick)}. +start_timer(#state{tick_ms = Ms} = State) -> + State#state{timer = emqx_misc:start_timer(Ms, tick)}. +handle_call(stop, _From, State) -> + {stop, normal, _Reply = ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Stats] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -201,7 +214,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> - timer:cancel(TRef). + emqx_misc:cancel_timer(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 6540bbef2..f7e229a13 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -34,6 +34,8 @@ -define(MAX_TOPIC_LEN, 4096). +-include("emqx_mqtt.hrl"). + %% @doc Is wildcard topic? -spec(wildcard(topic() | words()) -> true | false). wildcard(Topic) when is_binary(Topic) -> @@ -180,11 +182,11 @@ parse(Topic) when is_binary(Topic) -> parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) -> error({invalid_topic, Topic}); -parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) -> +parse(Topic = <>, #{share := _Group}) -> error({invalid_topic, Topic}); parse(<<"$queue/", Topic1/binary>>, Options) -> parse(Topic1, maps:put(share, <<"$queue">>, Options)); -parse(Topic = <<"$share/", Topic1/binary>>, Options) -> +parse(Topic = <>, Options) -> case binary:split(Topic1, <<"/">>) of [<<>>] -> error({invalid_topic, Topic}); [_] -> error({invalid_topic, Topic}); diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index ddff04490..8d8819a8f 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -27,7 +27,6 @@ -record(ssl_socket, {tcp, ssl}). --type(socket() :: inet:socket() | #ssl_socket{}). -define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ client_id = <<"mqtt_client">>, @@ -112,7 +111,7 @@ mqtt_connect_with_tcp(_) -> %% Issue #599 %% Empty clientId and clean_session = false {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), - Packet = raw_send_serialise(?CLIENT2), + Packet = raw_send_serialize(?CLIENT2), emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data), @@ -133,7 +132,7 @@ mqtt_connect_with_ssl_oneway(_) -> ClientSsl = emqx_ct_broker_helpers:client_ssl(), {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), - Packet = raw_send_serialise(?CLIENT), + Packet = raw_send_serialize(?CLIENT), emqx_client_sock:setopts(Sock, [{active, once}]), emqx_client_sock:send(Sock, Packet), ?assert( @@ -151,7 +150,7 @@ mqtt_connect_with_ssl_twoway(_Config) -> ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(), {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), - Packet = raw_send_serialise(?CLIENT), + Packet = raw_send_serialize(?CLIENT), emqx_client_sock:setopts(Sock, [{active, once}]), emqx_client_sock:send(Sock, Packet), timer:sleep(500), @@ -161,6 +160,7 @@ mqtt_connect_with_ssl_twoway(_Config) -> after 1000 -> false end), + ssl:close(SslSock), emqx_client_sock:close(Sock). mqtt_connect_with_ws(_Config) -> @@ -168,19 +168,19 @@ mqtt_connect_with_ws(_Config) -> {ok, _} = rfc6455_client:open(WS), %% Connect Packet - Packet = raw_send_serialise(?CLIENT), + Packet = raw_send_serialize(?CLIENT), ok = rfc6455_client:send_binary(WS, Packet), {binary, CONACK} = rfc6455_client:recv(WS), {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), %% Sub Packet - SubPacket = raw_send_serialise(?SUBPACKET), + SubPacket = raw_send_serialize(?SUBPACKET), rfc6455_client:send_binary(WS, SubPacket), {binary, SubAck} = rfc6455_client:recv(WS), {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck), %% Pub Packet QoS 1 - PubPacket = raw_send_serialise(?PUBPACKET), + PubPacket = raw_send_serialize(?PUBPACKET), rfc6455_client:send_binary(WS, PubPacket), {binary, PubAck} = rfc6455_client:recv(WS), {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck), @@ -190,22 +190,21 @@ mqtt_connect_with_ws(_Config) -> %%issue 1811 packet_size(_Config) -> {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), - Packet = raw_send_serialise(?CLIENT), + Packet = raw_send_serialize(?CLIENT), emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, 0), {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), %% Pub Packet QoS 1 - PubPacket = raw_send_serialise(?BIG_PUBPACKET), + PubPacket = raw_send_serialize(?BIG_PUBPACKET), emqx_client_sock:send(Sock, PubPacket), {ok, Data1} = gen_tcp:recv(Sock, 0), {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(Data1), emqx_client_sock:close(Sock). -raw_send_serialise(Packet) -> +raw_send_serialize(Packet) -> emqx_frame:serialize(Packet). raw_recv_pase(P) -> emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, version => ?MQTT_PROTO_V4} }). - diff --git a/test/emqx_mqtt_compat_SUITE.erl b/test/emqx_client_SUITE.erl similarity index 60% rename from test/emqx_mqtt_compat_SUITE.erl rename to test/emqx_client_SUITE.erl index af2583678..2ad3dbaf7 100644 --- a/test/emqx_mqtt_compat_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_mqtt_compat_SUITE). +-module(emqx_client_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -32,15 +32,24 @@ <<"+/+">>, <<"TopicA/#">>]). all() -> - [basic_test, - will_message_test, - zero_length_clientid_test, - offline_message_queueing_test, - overlapping_subscriptions_test, - %% keepalive_test, - redelivery_on_reconnect_test, - %% subscribe_failure_test, - dollar_topics_test]. + [ {group, mqttv4}, + {group, mqttv5} + ]. + +groups() -> + [{mqttv4, [non_parallel_tests], + [basic_test, + will_message_test, + offline_message_queueing_test, + overlapping_subscriptions_test, + %% keepalive_test, + redelivery_on_reconnect_test, + %% subscribe_failure_test, + dollar_topics_test]}, + {mqttv5, [non_parallel_tests], + [request_response, + share_sub_request_topic]} +]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -49,6 +58,77 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). +request_response_exception(QoS) -> + {ok, Client, _} = emqx_client:start_link([{proto_ver, v5}, + {properties, #{ 'Request-Response-Information' => 0 }}]), + ?assertError(no_response_information, + emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)), + ok = emqx_client:disconnect(Client). + +request_response_per_qos(QoS) -> + {ok, Requester, _} = emqx_client:start_link([{proto_ver, v5}, + {client_id, <<"requester">>}, + {properties, #{ 'Request-Response-Information' => 1}}]), + {ok, Responser, _} = emqx_client:start_link([{proto_ver, v5}, + {client_id, <<"responser">>}, + {properties, #{ 'Request-Response-Information' => 1}}, + {request_handler, fun(_Req) -> <<"ResponseTest">> end}]), + ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>), + {ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), + ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) -> + <<"ResponseFunctionTest">>; + (_) -> + <<"404">> + end), + {ok, <<"ResponseFunctionTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), + {ok, <<"404">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"invalid_request">>, QoS), + ok = emqx_client:disconnect(Responser), + ok = emqx_client:disconnect(Requester). + +request_response(_Config) -> + request_response_per_qos(?QOS_2), + request_response_per_qos(?QOS_1), + request_response_per_qos(?QOS_0), + request_response_exception(?QOS_0), + request_response_exception(?QOS_1), + request_response_exception(?QOS_2). + +share_sub_request_topic(_Config) -> + share_sub_request_topic_per_qos(?QOS_2), + share_sub_request_topic_per_qos(?QOS_1), + share_sub_request_topic_per_qos(?QOS_0). + +share_sub_request_topic_per_qos(QoS) -> + application:set_env(?APPLICATION, shared_subscription_strategy, random), + ReqTopic = <<"request-topic">>, + RspTopic = <<"response-topic">>, + Group = <<"g1">>, + Properties = #{ 'Request-Response-Information' => 1}, + Opts = fun(ClientId) -> [{proto_ver, v5}, + {client_id, atom_to_binary(ClientId, utf8)}, + {properties, Properties} + ] end, + {ok, Requester, _} = emqx_client:start_link(Opts(requester)), + {ok, Responser1, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]), + {ok, Responser2, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]), + ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group), + ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group), + %% Send a request, wait for response, validate response then return responser ID + ReqFun = fun(Req) -> + {ok, Rsp} = emqx_client:request(Requester, RspTopic, ReqTopic, Req, QoS), + case Rsp of + <<"1-", Req/binary>> -> 1; + <<"2-", Req/binary>> -> 2 + end + end, + Ids = lists:map(fun(I) -> ReqFun(integer_to_binary(I)) end, lists:seq(1, 100)), + %% we are testing with random shared-dispatch strategy, + %% fail if not all responsers got a chance to handle requests + ?assertEqual([1, 2], lists:usort(Ids)), + ok = emqx_client:disconnect(Responser1), + ok = emqx_client:disconnect(Responser2), + ok = emqx_client:disconnect(Requester). + receive_messages(Count) -> receive_messages(Count, []). @@ -59,7 +139,7 @@ receive_messages(Count, Msgs) -> {publish, Msg} -> receive_messages(Count-1, [Msg|Msgs]); Other -> - ct:log("~p~n", [Other]), + ct:log("~p~n", [Other]), receive_messages(Count, Msgs) after 10 -> Msgs @@ -90,18 +170,6 @@ will_message_test(_Config) -> ok = emqx_client:disconnect(C2), ct:print("Will message test succeeded"). -zero_length_clientid_test(_Config) -> - ct:print("Zero length clientid test starting"), - - %% TODO: There are some controversies on the situation when - %% clean_start flag is true and clientid is zero length. - - %% {error, _} = emqx_client:start_link([{clean_start, false}, - %% {client_id, <<>>}]), - {ok, _, _} = emqx_client:start_link([{clean_start, true}, - {client_id, <<>>}]), - ct:print("Zero length clientid test succeeded"). - offline_message_queueing_test(_) -> {ok, C1, _} = emqx_client:start_link([{clean_start, false}, {client_id, <<"c1">>}]), @@ -109,7 +177,7 @@ offline_message_queueing_test(_) -> ok = emqx_client:disconnect(C1), {ok, C2, _} = emqx_client:start_link([{clean_start, true}, {client_id, <<"c2">>}]), - + ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), @@ -196,4 +264,3 @@ dollar_topics_test(_) -> ?assertEqual(0, length(receive_messages(1))), ok = emqx_client:disconnect(C), ct:print("$ topics test succeeded"). - diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl deleted file mode 100644 index 766691869..000000000 --- a/test/emqx_misc_SUITE.erl +++ /dev/null @@ -1,45 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_misc_SUITE). - --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). --compile(nowarn_export_all). - --define(SOCKOPTS, [binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, true}]). - -all() -> [t_merge_opts]. - -t_merge_opts(_) -> - Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw, - binary, - {backlog, 1024}, - {nodelay, false}, - {max_clients, 1024}, - {acceptors, 16}]), - ?assertEqual(1024, proplists:get_value(backlog, Opts)), - ?assertEqual(1024, proplists:get_value(max_clients, Opts)), - [binary, raw, - {acceptors, 16}, - {backlog, 1024}, - {max_clients, 1024}, - {nodelay, false}, - {packet, raw}, - {reuseaddr, true}] = lists:sort(Opts). diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl index 50513ee86..92b0973e8 100644 --- a/test/emqx_misc_tests.erl +++ b/test/emqx_misc_tests.erl @@ -15,6 +15,30 @@ -module(emqx_misc_tests). -include_lib("eunit/include/eunit.hrl"). +-define(SOCKOPTS, [binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 512}, + {nodelay, true}]). + + +t_merge_opts_test() -> + Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw, + binary, + {backlog, 1024}, + {nodelay, false}, + {max_clients, 1024}, + {acceptors, 16}]), + ?assertEqual(1024, proplists:get_value(backlog, Opts)), + ?assertEqual(1024, proplists:get_value(max_clients, Opts)), + [binary, raw, + {acceptors, 16}, + {backlog, 1024}, + {max_clients, 1024}, + {nodelay, false}, + {packet, raw}, + {reuseaddr, true}] = lists:sort(Opts). + timer_cancel_flush_test() -> Timer = emqx_misc:start_timer(0, foo), ok = emqx_misc:cancel_timer(Timer), @@ -39,4 +63,3 @@ message_queue_too_long_test() -> conn_proc_mng_policy(L) -> emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}). - diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index ec4205957..dda27c45b 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -44,14 +44,49 @@ packet_type_name(_) -> ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). packet_validate(_) -> - ?assertEqual(true, emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))), - ?assertEqual(true, emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), - ?assertEqual(true, emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), - ?assertEqual(true, emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}))), - case catch emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 0}})) of - {'EXIT', {protocol_error, _}} -> ?assertEqual(true, true); - true -> ?assertEqual(true, false) - end. + ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))), + ?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), + ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), + ?assert(emqx_packet:validate(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, <<"payload">>))), + ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}))), + ?assertError(subscription_identifier_invalid, + emqx_packet:validate( + ?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1}, + [{<<"topic">>, #{qos => ?QOS0}}]))), + ?assertError(topic_filters_invalid, + emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))), + ?assertError(topic_name_invalid, + emqx_packet:validate(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>))), + ?assertError(topic_name_invalid, + emqx_packet:validate(?PUBLISH_PACKET + (1, <<"+/+">>, 1, #{}, <<"payload">>))), + ?assertError(topic_alias_invalid, + emqx_packet:validate( + ?PUBLISH_PACKET + (1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>))), + ?assertError(protocol_error, + emqx_packet:validate( + ?PUBLISH_PACKET(1, <<"topic">>, 1, + #{'Subscription-Identifier' => 10}, <<"payload">>))), + ?assertError(protocol_error, + emqx_packet:validate( + ?PUBLISH_PACKET(1, <<"topic">>, 1, + #{'Response-Topic' => <<"+/+">>}, <<"payload">>))), + ?assertError(protocol_error, + emqx_packet:validate( + ?CONNECT_PACKET(#mqtt_packet_connect{ + properties = + #{'Request-Response-Information' => -1}}))), + ?assertError(protocol_error, + emqx_packet:validate( + ?CONNECT_PACKET(#mqtt_packet_connect{ + properties = + #{'Request-Problem-Information' => 2}}))), + ?assertError(protocol_error, + emqx_packet:validate( + ?CONNECT_PACKET(#mqtt_packet_connect{ + properties = + #{'Receive-Maximum' => 0}}))). packet_message(_) -> Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, @@ -83,16 +118,14 @@ packet_format(_) -> io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). packet_will_msg(_) -> - Pkt = #mqtt_packet_connect{ will_flag = true, - client_id = <<"clientid">>, - username = "test", - will_retain = true, - will_qos = ?QOS2, - will_topic = <<"topic">>, - will_props = #{}, + Pkt = #mqtt_packet_connect{ will_flag = true, + client_id = <<"clientid">>, + username = "test", + will_retain = true, + will_qos = ?QOS2, + will_topic = <<"topic">>, + will_props = #{}, will_payload = <<"payload">>}, Msg = emqx_packet:will_msg(Pkt), ?assertEqual(<<"clientid">>, Msg#message.from), ?assertEqual(<<"topic">>, Msg#message.topic). - - diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 2323519b1..f97f475f8 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -19,114 +19,211 @@ -compile(export_all). -compile(nowarn_export_all). --include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("common_test/include/ct.hrl"). -include("emqx_mqtt.hrl"). --include_lib("eunit/include/eunit.hrl"). +-define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>, + <<"/TopicA">>]). --import(emqx_serializer, [serialize/1]). +-define(CLIENT2, ?CONNECT_PACKET(#mqtt_packet_connect{ + username = <<"admin">>, + clean_start = false, + password = <<"public">>})). all() -> - [%% {group, parser}, - %% {group, serializer}, - {group, packet}, - {group, message}]. + [ + {group, mqttv4}, + {group, mqttv5}]. groups() -> - [%% {parser, [], - %% [ - %% parse_connect, - %% parse_bridge, - %% parse_publish, - %% parse_puback, - %% parse_pubrec, - %% parse_pubrel, - %% parse_pubcomp, - %% parse_subscribe, - %% parse_unsubscribe, - %% parse_pingreq, - %% parse_disconnect]}, - %% {serializer, [], - %% [serialize_connect, - %% serialize_connack, - %% serialize_publish, - %% serialize_puback, - %% serialize_pubrel, - %% serialize_subscribe, - %% serialize_suback, - %% serialize_unsubscribe, - %% serialize_unsuback, - %% serialize_pingreq, - %% serialize_pingresp, - %% serialize_disconnect]}, - {packet, [], - [packet_proto_name, - packet_type_name, - packet_format]}, - {message, [], - [message_make - %% message_from_packet - ]} - ]. + [{mqttv4, + [sequence], + [ + connect_v4, + subscribe_v4 + ]}, + {mqttv5, + [sequence], + [ + connect_v5, + subscribe_v5 + ] + }]. + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps(), + Config. + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +with_connection(DoFun) -> + {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + [binary, {packet, raw}, + {active, false}], 3000), + try + DoFun(Sock) + after + emqx_client_sock:close(Sock) + end. + +connect_v4(_) -> + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))), + {error, closed} =gen_tcp:recv(Sock, 0) + end), + with_connection(fun(Sock) -> + ConnectPacket = raw_send_serialize(?CONNECT_PACKET + (#mqtt_packet_connect{ + client_id = <<"mqttv4_client">>, + username = <<"admin">>, + password = <<"public">>, + proto_ver = ?MQTT_PROTO_V4 + })), + emqx_client_sock:send(Sock, ConnectPacket), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4), + + emqx_client_sock:send(Sock, ConnectPacket), + {error, closed} = gen_tcp:recv(Sock, 0) + end), + ok. +connect_v5(_) -> + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = + #{'Request-Response-Information' => -1}}))), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + end), -%%-------------------------------------------------------------------- -%% Packet Cases -%%-------------------------------------------------------------------- + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = + #{'Request-Problem-Information' => 2}}))), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5) + end), -packet_proto_name(_) -> - ?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), - ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)). + with_connection(fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5, + properties = + #{'Request-Response-Information' => 1}}) + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, + #{'Response-Information' := _RespInfo}), _} = + raw_recv_parse(Data, ?MQTT_PROTO_V5) + end), + ok. -packet_type_name(_) -> - ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), - ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). +do_connect(Sock, ProtoVer) -> + emqx_client_sock:send(Sock, raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + client_id = <<"mqtt_client">>, + proto_ver = ProtoVer + }))), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ProtoVer). -%% packet_connack_name(_) -> -%% ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)), -%% ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)), -%% ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)), -%% ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)), -%% ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)), -%% ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)). +subscribe_v4(_) -> + with_connection(fun(Sock) -> + do_connect(Sock, ?MQTT_PROTO_V4), + SubPacket = raw_send_serialize( + ?SUBSCRIBE_PACKET(15, + [{<<"topic">>, #{rh => 1, + qos => ?QOS_2, + rap => 0, + nl => 0, + rc => 0}}])), + emqx_client_sock:send(Sock, SubPacket), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?SUBACK_PACKET(15, _), _} = raw_recv_parse(Data, ?MQTT_PROTO_V4) + end), + ok. -packet_format(_) -> - io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), - io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), - io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), - io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), - io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), - io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). +subscribe_v5(_) -> + with_connection(fun(Sock) -> + do_connect(Sock, ?MQTT_PROTO_V5), + SubPacket = raw_send_serialize(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1},[]), + #{version => ?MQTT_PROTO_V5}), + emqx_client_sock:send(Sock, SubPacket), + {ok, DisConnData} = gen_tcp:recv(Sock, 0), + {ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), _} = + raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) + end), + with_connection(fun(Sock) -> + do_connect(Sock, ?MQTT_PROTO_V5), + SubPacket = raw_send_serialize( + ?SUBSCRIBE_PACKET(0, #{}, [{<<"TopicQos0">>, + #{rh => 1, qos => ?QOS_2, + rap => 0, nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5}), + emqx_client_sock:send(Sock, SubPacket), + {ok, DisConnData} = gen_tcp:recv(Sock, 0), + {ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} = + raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) + end), + with_connection(fun(Sock) -> + do_connect(Sock, ?MQTT_PROTO_V5), + SubPacket = raw_send_serialize( + ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 0}, + [{<<"TopicQos0">>, + #{rh => 1, qos => ?QOS_2, + rap => 0, nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5}), + emqx_client_sock:send(Sock, SubPacket), + {ok, DisConnData} = gen_tcp:recv(Sock, 0), + {ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), _} = + raw_recv_parse(DisConnData, ?MQTT_PROTO_V5) + end), + with_connection(fun(Sock) -> + do_connect(Sock, ?MQTT_PROTO_V5), + SubPacket = raw_send_serialize( + ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1}, + [{<<"TopicQos0">>, + #{rh => 1, qos => ?QOS_2, + rap => 0, nl => 0, + rc => 0}}]), + #{version => ?MQTT_PROTO_V5}), + emqx_client_sock:send(Sock, SubPacket), + {ok, SubData} = gen_tcp:recv(Sock, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2]), _} = + raw_recv_parse(SubData, ?MQTT_PROTO_V5) + end), + ok. -%%-------------------------------------------------------------------- -%% Message Cases -%%-------------------------------------------------------------------- +publish_v4(_) -> + ok. -message_make(_) -> - Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertEqual(0, Msg#message.qos), - Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>), - ?assert(is_binary(Msg1#message.id)), - ?assertEqual(qos2, Msg1#message.qos). +publish_v5(_) -> + ok. -%% message_from_packet(_) -> -%% Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), -%% ?assertEqual(1, Msg#message.qos), -%% %% ?assertEqual(10, Msg#message.pktid), -%% ?assertEqual(<<"topic">>, Msg#message.topic), -%% WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true, -%% will_topic = <<"WillTopic">>, -%% will_payload = <<"WillMsg">>}), -%% ?assertEqual(<<"WillTopic">>, WillMsg#message.topic), -%% ?assertEqual(<<"WillMsg">>, WillMsg#message.payload). - - %% Msg2 = emqx_message:fomat_packet(<<"username">>, <<"clientid">>, - %% ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)), +raw_send_serialize(Packet) -> + emqx_frame:serialize(Packet). +raw_send_serialize(Packet, Opts) -> + emqx_frame:serialize(Packet, Opts). +raw_recv_parse(P, ProtoVersion) -> + emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, + version => ProtoVersion}}). diff --git a/test/emqx_stats_SUITE.erl b/test/emqx_stats_SUITE.erl deleted file mode 100644 index 5c7254468..000000000 --- a/test/emqx_stats_SUITE.erl +++ /dev/null @@ -1,56 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_stats_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("common_test/include/ct.hrl"). - -all() -> [t_set_get_state, t_update_interval]. - -t_set_get_state(_) -> - emqx_stats:start_link(), - SetConnsCount = emqx_stats:statsfun('connections/count'), - SetConnsCount(1), - 1 = emqx_stats:getstat('connections/count'), - emqx_stats:setstat('connections/count', 2), - 2 = emqx_stats:getstat('connections/count'), - emqx_stats:setstat('connections/count', 'connections/max', 3), - timer:sleep(100), - 3 = emqx_stats:getstat('connections/count'), - 3 = emqx_stats:getstat('connections/max'), - emqx_stats:setstat('connections/count', 'connections/max', 2), - timer:sleep(100), - 2 = emqx_stats:getstat('connections/count'), - 3 = emqx_stats:getstat('connections/max'), - SetConns = emqx_stats:statsfun('connections/count', 'connections/max'), - SetConns(4), - timer:sleep(100), - 4 = emqx_stats:getstat('connections/count'), - 4 = emqx_stats:getstat('connections/max'), - Conns = emqx_stats:getstats(), - 4 = proplists:get_value('connections/count', Conns), - 4 = proplists:get_value('connections/max', Conns). - -t_update_interval(_) -> - emqx_stats:start_link(), - emqx_stats:cancel_update(cm_stats), - ok = emqx_stats:update_interval(stats_test, fun update_stats/0), - timer:sleep(2500), - 1 = emqx_stats:getstat('connections/count'). - -update_stats() -> - emqx_stats:setstat('connections/count', 1). diff --git a/test/emqx_stats_tests.erl b/test/emqx_stats_tests.erl new file mode 100644 index 000000000..dd9733a88 --- /dev/null +++ b/test/emqx_stats_tests.erl @@ -0,0 +1,101 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_stats_tests). + +-include_lib("eunit/include/eunit.hrl"). + +get_state_test() -> + with_proc(fun() -> + SetConnsCount = emqx_stats:statsfun('connections/count'), + SetConnsCount(1), + 1 = emqx_stats:getstat('connections/count'), + emqx_stats:setstat('connections/count', 2), + 2 = emqx_stats:getstat('connections/count'), + emqx_stats:setstat('connections/count', 'connections/max', 3), + timer:sleep(100), + 3 = emqx_stats:getstat('connections/count'), + 3 = emqx_stats:getstat('connections/max'), + emqx_stats:setstat('connections/count', 'connections/max', 2), + timer:sleep(100), + 2 = emqx_stats:getstat('connections/count'), + 3 = emqx_stats:getstat('connections/max'), + SetConns = emqx_stats:statsfun('connections/count', 'connections/max'), + SetConns(4), + timer:sleep(100), + 4 = emqx_stats:getstat('connections/count'), + 4 = emqx_stats:getstat('connections/max'), + Conns = emqx_stats:getstats(), + 4 = proplists:get_value('connections/count', Conns), + 4 = proplists:get_value('connections/max', Conns) + end). + +update_interval_test() -> + TickMs = 200, + with_proc(fun() -> + SleepMs = TickMs * 2 + TickMs div 2, %% sleep for 2.5 ticks + emqx_stats:cancel_update(cm_stats), + UpdFun = fun() -> emqx_stats:setstat('connections/count', 1) end, + ok = emqx_stats:update_interval(stats_test, UpdFun), + timer:sleep(SleepMs), + ?assertEqual(1, emqx_stats:getstat('connections/count')) + end, TickMs). + +helper_test_() -> + TickMs = 200, + TestF = + fun(CbModule, CbFun) -> + SleepMs = TickMs + TickMs div 2, %% sleep for 1.5 ticks + Ref = make_ref(), + Tester = self(), + UpdFun = + fun() -> + CbModule:CbFun(), + Tester ! Ref, + ok + end, + ok = emqx_stats:update_interval(stats_test, UpdFun), + timer:sleep(SleepMs), + receive Ref -> ok after 2000 -> error(timeout) end + end, + MkTestFun = + fun(CbModule, CbFun) -> + fun() -> + with_proc(fun() -> TestF(CbModule, CbFun) end, TickMs) + end + end, + [{"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)}, + {"emqx_sm", MkTestFun(emqx_sm, stats_fun)}, + {"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)}, + {"emqx_cm", MkTestFun(emqx_cm, update_conn_stats)} + ]. + +with_proc(F) -> + {ok, _Pid} = emqx_stats:start_link(), + with_stop(F). + +with_proc(F, TickMs) -> + {ok, _Pid} = emqx_stats:start_link(#{tick_ms => TickMs}), + with_stop(F). + +with_stop(F) -> + try + %% make a synced call to the gen_server so we know it has + %% started running, hence it is safe to continue with less risk of race condition + ?assertEqual(ignored, gen_server:call(emqx_stats, ignored)), + F() + after + ok = emqx_stats:stop() + end. + diff --git a/test/emqx_topic_SUITE.erl b/test/emqx_topic_SUITE.erl index 816579ebc..8ce3faf79 100644 --- a/test/emqx_topic_SUITE.erl +++ b/test/emqx_topic_SUITE.erl @@ -21,7 +21,7 @@ -compile(nowarn_export_all). -import(emqx_topic, [wildcard/1, match/2, validate/1, triples/1, join/1, - words/1, systop/1, feed_var/3, parse/1, parse/2]). + words/1, systop/1, feed_var/3, parse/1]). -define(N, 10000). @@ -57,10 +57,10 @@ t_match(_) -> true = match(<<"abc">>, <<"+">>), true = match(<<"a/b/c">>, <<"a/b/c">>), false = match(<<"a/b/c">>, <<"a/c/d">>), - false = match(<<"$shared/x/y">>, <<"+">>), - false = match(<<"$shared/x/y">>, <<"+/x/y">>), - false = match(<<"$shared/x/y">>, <<"#">>), - false = match(<<"$shared/x/y">>, <<"+/+/#">>), + false = match(<<"$share/x/y">>, <<"+">>), + false = match(<<"$share/x/y">>, <<"+/x/y">>), + false = match(<<"$share/x/y">>, <<"#">>), + false = match(<<"$share/x/y">>, <<"+/+/#">>), false = match(<<"house/1/sensor/0">>, <<"house/+">>), false = match(<<"house">>, <<"house/+">>). @@ -77,10 +77,10 @@ t_match2(_) -> true = match(<<"abc">>, <<"+">>), true = match(<<"a/b/c">>, <<"a/b/c">>), false = match(<<"a/b/c">>, <<"a/c/d">>), - false = match(<<"$shared/x/y">>, <<"+">>), - false = match(<<"$shared/x/y">>, <<"+/x/y">>), - false = match(<<"$shared/x/y">>, <<"#">>), - false = match(<<"$shared/x/y">>, <<"+/+/#">>), + false = match(<<"$share/x/y">>, <<"+">>), + false = match(<<"$share/x/y">>, <<"+/x/y">>), + false = match(<<"$share/x/y">>, <<"#">>), + false = match(<<"$share/x/y">>, <<"+/+/#">>), false = match(<<"house/1/sensor/0">>, <<"house/+">>). t_match3(_) -> @@ -208,4 +208,3 @@ t_parse(_) -> ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)), ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)). - diff --git a/test/rfc6455_client.erl b/test/rfc6455_client.erl index 4696f7ab3..f5d8f1ef4 100644 --- a/test/rfc6455_client.erl +++ b/test/rfc6455_client.erl @@ -153,7 +153,7 @@ do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid}, R) -> ok end, die(Socket, PPid, WsReason, normal); - {_, _, _, Rest2} -> + {_, _, _, _Rest2} -> io:format("Unknown frame type~n"), die(Socket, PPid, {1006, "Unknown frame type"}, normal) end. diff --git a/test/ws_client.erl b/test/ws_client.erl index f049e3256..25f38164d 100644 --- a/test/ws_client.erl +++ b/test/ws_client.erl @@ -1,7 +1,5 @@ -module(ws_client). --behaviour(websocket_client_handler). - -export([ start_link/0, start_link/1,