From 48450d1d37febb2b549cb5e510a27ebef9ab8584 Mon Sep 17 00:00:00 2001 From: spring2maz <40776645+spring2maz@users.noreply.github.com> Date: Wed, 13 Mar 2019 04:27:25 +0100 Subject: [PATCH] Move request response out of emqx client (#2293) * Delete response_topic_prefix The Response-Infomation in CONNACK was misinterpreated, it should not be a broker's global config, instead, it should be per-client or even per-session prefix/suffix mechanism * Delete request-response code from emqx_client * Add request-response test code * meck as default dependency --- temp workaround for dep-vsn-check --- etc/emqx.conf | 6 - include/emqx_client.hrl | 1 + priv/emqx.schema | 5 - rebar.config | 4 +- src/emqx_client.erl | 245 +-------------------------- src/emqx_protocol.erl | 20 ++- test/emqx_client_SUITE.erl | 91 +--------- test/emqx_protocol_SUITE.erl | 7 +- test/emqx_request_handler.erl | 102 +++++++++++ test/emqx_request_response_SUITE.erl | 68 ++++++++ test/emqx_request_sender.erl | 82 +++++++++ 11 files changed, 279 insertions(+), 352 deletions(-) create mode 100644 test/emqx_request_handler.erl create mode 100644 test/emqx_request_response_SUITE.erl create mode 100644 test/emqx_request_sender.erl diff --git a/etc/emqx.conf b/etc/emqx.conf index 758b0208b..f119b05ea 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -432,12 +432,6 @@ acl_deny_action = ignore ## MQTT Protocol ##-------------------------------------------------------------------- -## Response Topic Prefix -## -## Value: String -## Default: emqxrspv1 -mqtt.response_topic_prefix = emqxrspv1 - ## Maximum MQTT packet size allowed. ## ## Value: Bytes diff --git a/include/emqx_client.hrl b/include/emqx_client.hrl index 535b8ad55..bf2f49283 100644 --- a/include/emqx_client.hrl +++ b/include/emqx_client.hrl @@ -15,6 +15,7 @@ -ifndef(EMQX_CLIENT_HRL). -define(EMQX_CLIENT_HRL, true). +-include("emqx_mqtt.hrl"). -record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, packet_id, topic, props, payload}). -endif. diff --git a/priv/emqx.schema b/priv/emqx.schema index 5dbbdfca0..3cd8a5032 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -563,11 +563,6 @@ end}. %% MQTT Protocol %%-------------------------------------------------------------------- -%% @doc Response Topic Prefix -{mapping, "mqtt.response_topic_prefix", "emqx.response_topic_prefix",[ - {datatype, string} -]}. - %% @doc Max Packet Size Allowed, 1MB by default. {mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [ {default, "1MB"}, diff --git a/rebar.config b/rebar.config index 745c22f68..6fefc5bfb 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,7 @@ {deps, [{jsx, "2.9.0"}, {gproc, "0.8.0"}, - {cowboy, "2.4.0"} + {cowboy, "2.4.0"}, + {meck, "0.8.13"} %% temp workaround for version check ]}. %% appended to deps in rebar.config.script @@ -28,4 +29,3 @@ {plugins, [coveralls]}. -{profiles, [{test, [{deps, [{meck, "0.8.13"}]}]}]}. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 2275164a9..4ec28d2d9 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -17,12 +17,9 @@ -behaviour(gen_statem). -include("types.hrl"). --include("emqx_mqtt.hrl"). -include("emqx_client.hrl"). -export([start_link/0, start_link/1]). --export([request/5, request/6, request_async/7, receive_response/3]). --export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]). -export([connect/1]). -export([subscribe/2, subscribe/3, subscribe/4]). -export([publish/2, publish/3, publish/4, publish/5]). @@ -41,9 +38,7 @@ -export([initialized/3, waiting_for_connack/3, connected/3, inflight_full/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]). --export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, - request_input/0, response_payload/0, request_handler/0, - corr_data/0, mqtt_msg/0]). +-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0, mqtt_msg/0]). -export_type([host/0, option/0]). @@ -57,24 +52,12 @@ -define(WILL_MSG(QoS, Retain, Topic, Props, Payload), #mqtt_msg{qos = QoS, retain = Retain, topic = Topic, props = Props, payload = Payload}). --define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)). - --define(NO_REQ_HANDLER, undefined). - --define(NO_GROUP, <<>>). - -define(NO_CLIENT_ID, <<>>). -type(host() :: inet:ip_address() | inet:hostname()). --type(corr_data() :: binary()). - -%% NOTE: Message handler is different from request handler. -%% Message handler is a set of callbacks defined to handle MQTT messages as well as -%% the disconnect event. -%% Request handler is a callback to handle received MQTT message as in 'request', -%% and publish another MQTT message back to the defined topic as in 'response'. -%% `owner' and `msg_handler' has no effect when `request_handler' is set. +%% Message handler is a set of callbacks defined to handle MQTT messages +%% as well as the disconnect event. -define(NO_MSG_HDLR, undefined). -type(msg_handler() :: #{puback := fun((_) -> any()), publish := fun((emqx_types:message()) -> any()), @@ -100,7 +83,6 @@ | {keepalive, non_neg_integer()} | {max_inflight, pos_integer()} | {retry_interval, timeout()} - | {request_handler, request_handler()} | {will_topic, iodata()} | {will_payload, iodata()} | {will_retain, boolean()} @@ -146,7 +128,6 @@ ack_timer :: reference(), retry_interval :: pos_integer(), retry_timer :: reference(), - request_handler :: request_handler(), session_present :: boolean(), last_packet_id :: packet_id(), parse_state :: emqx_frame:state()}). @@ -176,35 +157,10 @@ -type(subscribe_ret() :: {ok, properties(), [reason_code()]} | {error, term()}). --type(request_input() :: binary()). - --type(response_payload() :: binary()). - --type(request_handler() :: fun((request_input()) -> response_payload())). - --type(group() :: binary()). - %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ -%% @doc Swap in a new request handler on the fly. --spec(set_request_handler(client(), request_handler()) -> ok). -set_request_handler(Responser, RequestHandler) -> - gen_statem:call(Responser, {set_request_handler, RequestHandler}). - -%% @doc Subscribe to request topic. --spec(sub_request_topic(client(), qos(), topic()) -> ok). -sub_request_topic(Client, QoS, Topic) -> - sub_request_topic(Client, QoS, Topic, ?NO_GROUP). - -%% @doc Share-subscribe to request topic. --spec(sub_request_topic(client(), qos(), topic(), group()) -> ok). -sub_request_topic(Client, QoS, Topic, Group) -> - Properties = get_properties(Client), - NewTopic = make_req_rsp_topic(Properties, Topic, Group), - subscribe_req_rsp_topic(Client, QoS, NewTopic). - -spec(start_link() -> gen_statem:start_ret()). start_link() -> start_link([]). @@ -293,76 +249,6 @@ parse_subopt([{nl, false} | Opts], Result) -> parse_subopt([{qos, QoS} | Opts], Result) -> parse_subopt(Opts, Result#{qos := ?QOS_I(QoS)}). --spec(request(client(), topic(), topic(), payload(), qos() | [pubopt()]) - -> ok | {ok, packet_id()} | {error, term()}). -request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), is_atom(QoS) -> - request(Client, ResponseTopic, RequestTopic, Payload, [{qos, ?QOS_I(QoS)}]); -request(Client, ResponseTopic, RequestTopic, Payload, QoS) when is_binary(ResponseTopic), ?IS_QOS(QoS) -> - request(Client, ResponseTopic, RequestTopic, Payload, [{qos, QoS}]); -request(Client, ResponseTopic, RequestTopic, Payload, Opts) when is_binary(ResponseTopic), is_list(Opts) -> - request(Client, ResponseTopic, RequestTopic, Payload, Opts, _Properties = #{}). - -%% @doc Send a request to request topic and wait for response. --spec(request(client(), topic(), topic(), payload(), [pubopt()], properties()) - -> {ok, response_payload()} | {error, term()}). -request(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties) -> - CorrData = make_corr_data(), - case request_async(Client, ResponseTopic, RequestTopic, - Payload, Opts, Properties, CorrData) of - ok -> receive_response(Client, CorrData, Opts); - {error, Reason} -> {error, Reason} - end. - -%% @doc Get client properties. --spec(get_properties(client()) -> properties()). -get_properties(Client) -> gen_statem:call(Client, get_properties, infinity). - -%% @doc Send a request, but do not wait for response. -%% The caller should expect a `{publish, Response}' message, -%% or call `receive_response/3' to receive the message. --spec(request_async(client(), topic(), topic(), payload(), - [pubopt()], properties(), corr_data()) -> ok | {error, any()}). -request_async(Client, ResponseTopic, RequestTopic, Payload, Opts, Properties, CorrData) - when is_binary(ResponseTopic), - is_binary(RequestTopic), - is_map(Properties), - is_list(Opts) -> - ok = emqx_mqtt_props:validate(Properties), - Retain = proplists:get_bool(retain, Opts), - QoS = ?QOS_I(proplists:get_value(qos, Opts, ?QOS_0)), - ClientProperties = get_properties(Client), - NewResponseTopic = make_req_rsp_topic(ClientProperties, ResponseTopic), - NewRequestTopic = make_req_rsp_topic(ClientProperties, RequestTopic), - %% This is perhaps not optimal to subscribe the response topic for - %% each and every request even though the response topic is always the same - ok = sub_response_topic(Client, QoS, NewResponseTopic), - NewProperties = maps:merge(Properties, #{'Response-Topic' => NewResponseTopic, - 'Correlation-Data' => CorrData}), - case publish(Client, #mqtt_msg{qos = QoS, - retain = Retain, - topic = NewRequestTopic, - props = NewProperties, - payload = iolist_to_binary(Payload)}) of - ok -> ok; - {ok, _PacketId} -> ok; %% assume auto_ack - {error, Reason} -> {error, Reason} - end. - -%% @doc Block wait the response for a request sent earlier. --spec(receive_response(client(), corr_data(), [pubopt()]) - -> {ok, response_payload()} | {error, any()}). -receive_response(Client, CorrData, Opts) -> - TimeOut = proplists:get_value(timeout, Opts, ?RESPONSE_TIMEOUT_SECONDS), - MRef = erlang:monitor(process, Client), - TRef = erlang:start_timer(TimeOut, self(), response), - try - receive_response(Client, CorrData, TRef, MRef) - after - erlang:cancel_timer(TRef), - receive {timeout, TRef, _} -> ok after 0 -> ok end, - erlang:demonitor(MRef, [flush]) - end. - -spec(publish(client(), topic(), payload()) -> ok | {error, term()}). publish(Client, Topic, Payload) when is_binary(Topic) -> publish(Client, #mqtt_msg{topic = Topic, qos = ?QOS_0, payload = iolist_to_binary(Payload)}). @@ -511,7 +397,6 @@ init([Options]) -> auto_ack = true, ack_timeout = ?DEFAULT_ACK_TIMEOUT, retry_interval = 0, - request_handler = ?NO_REQ_HANDLER, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, last_packet_id = 1}), {ok, initialized, init_parse_state(State)}. @@ -616,8 +501,6 @@ init([{auto_ack, AutoAck} | Opts], State) when is_boolean(AutoAck) -> init(Opts, State#state{auto_ack = AutoAck}); init([{retry_interval, I} | Opts], State) -> init(Opts, State#state{retry_interval = timer:seconds(I)}); -init([{request_handler, Handler} | Opts], State) -> - init(Opts, State#state{request_handler = Handler}); init([{bridge_mode, Mode} | Opts], State) when is_boolean(Mode) -> init(Opts, State#state{bridge_mode = Mode}); init([_Opt | Opts], State) -> @@ -756,15 +639,9 @@ connected({call, From}, pause, State) -> connected({call, From}, resume, State) -> {keep_state, State#state{paused = false}, [{reply, From, ok}]}; -connected({call, From}, get_properties, #state{properties = Properties}) -> - {keep_state_and_data, [{reply, From, Properties}]}; - connected({call, From}, client_id, #state{client_id = ClientId}) -> {keep_state_and_data, [{reply, From, ClientId}]}; -connected({call, From}, {set_request_handler, RequestHandler}, State) -> - {keep_state, State#state{request_handler = RequestHandler}, [{reply, From, ok}]}; - connected({call, From}, SubReq = {subscribe, Properties, Topics}, State = #state{last_packet_id = PacketId, subscriptions = Subscriptions}) -> case send(?SUBSCRIBE_PACKET(PacketId, Properties, Topics), State) of @@ -846,25 +723,12 @@ connected(cast, {pubcomp, PacketId, ReasonCode, Properties}, State) -> connected(cast, ?PUBLISH_PACKET(_QoS, _PacketId), #state{paused = true}) -> keep_state_and_data; -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, Properties, Payload), - State) when Properties =/= undefined -> - NewState = response_publish(Properties, State, ?QOS_0, Payload), - {keep_state, deliver(packet_to_msg(Packet), NewState)}; connected(cast, Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), State) -> {keep_state, deliver(packet_to_msg(Packet), State)}; -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _Topic, _PacketId, Properties, Payload), State) - when Properties =/= undefined -> - NewState = response_publish(Properties, State, ?QOS_1, Payload), - publish_process(?QOS_1, Packet, NewState); - connected(cast, Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> publish_process(?QOS_1, Packet, State); -connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _Topic, _PacketId, Properties, Payload), State) - when Properties =/= undefined -> - NewState = response_publish(Properties, State, ?QOS_2, Payload), - publish_process(?QOS_2, Packet, NewState); connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> publish_process(?QOS_2, Packet, State); @@ -1086,57 +950,6 @@ delete_inflight_when_full(Packet, State0) -> false -> {next_state, connected, State} end. -%% Subscribe to response topic. --spec(sub_response_topic(client(), qos(), topic()) -> ok). -sub_response_topic(Client, QoS, Topic) when is_binary(Topic) -> - subscribe_req_rsp_topic(Client, QoS, Topic). - -receive_response(Client, CorrData, TRef, MRef) -> - receive - {publish, Response} -> - {ok, Properties} = maps:find(properties, Response), - case maps:find('Correlation-Data', Properties) of - {ok, CorrData} -> - maps:find(payload, Response); - _ -> - emqx_logger:debug("Discarded stale response: ~p", [Response]), - receive_response(Client, CorrData, TRef, MRef) - end; - {timeout, TRef, response} -> - {error, timeout}; - {'DOWN', MRef, process, _, _} -> - {error, client_down} - end. - -%% Make a unique correlation data for each request. -%% It has to be unique because stale responses should be discarded. -make_corr_data() -> term_to_binary(make_ref()). - -%% Shared function for request and response topic subscription. -subscribe_req_rsp_topic(Client, QoS, Topic) -> - %% It is a Protocol Error to set the No Local bit to 1 on a Shared Subscription - {ok, _Props, _QoS} = subscribe(Client, [{Topic, [{rh, 2}, {rap, false}, - {nl, not ?IS_SHARE(Topic)}, - {qos, QoS}]}]), - emqx_logger:debug("Subscribed to topic ~s", [Topic]), - ok. - -%% Make a request or response topic. -make_req_rsp_topic(Properties, Topic) -> - make_req_rsp_topic(Properties, Topic, ?NO_GROUP). - -%% Same as make_req_rsp_topic/2, but allow shared subscription (for request topics) -make_req_rsp_topic(Properties, Topic, Group) -> - case maps:find('Response-Information', Properties) of - {ok, ResponseInformation} when ResponseInformation =/= <<>> -> - emqx_topic:join([req_rsp_topic_prefix(Group, ResponseInformation), Topic]); - _ -> - erlang:error(no_response_information) - end. - -req_rsp_topic_prefix(?NO_GROUP, Prefix) -> Prefix; -req_rsp_topic_prefix(Group, Prefix) -> ?SHARE(Group, Prefix). - assign_id(?NO_CLIENT_ID, Props) -> case maps:find('Assigned-Client-Identifier', Props) of {ok, Value} -> @@ -1162,49 +975,6 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), Stop -> Stop end. -response_publish(#{'Response-Topic' := ResponseTopic} = Properties, - State = #state{request_handler = RequestHandler}, QoS, Payload) - when RequestHandler =/= ?NO_REQ_HANDLER -> - do_publish(ResponseTopic, Properties, State, QoS, Payload); -response_publish(_Properties, State, _QoS, _Payload) -> State. - -do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) -> - Msg = #mqtt_msg{qos = ?QOS_0, - retain = false, - topic = ResponseTopic, - props = Properties, - payload = RequestHandler(Payload) - }, - case send(Msg, State) of - {ok, NewState} -> NewState; - _Error -> State - end; -do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler, - inflight = Inflight, - last_packet_id = PacketId}, - QoS, Payload) - when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2)-> - case emqx_inflight:is_full(Inflight) of - true -> - emqx_logger:error("Inflight is full"), - State; - false -> - Msg = #mqtt_msg{packet_id = PacketId, - qos = QoS, - retain = false, - topic = ResponseTopic, - props = Properties, - payload = RequestHandler(Payload)}, - case send(Msg, State) of - {ok, NewState} -> - Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg, os:timestamp()}, Inflight), - ensure_retry_timer(NewState#state{inflight = Inflight1}); - {error, Reason} -> - emqx_logger:error("Send failed: ~p", [Reason]), - State - end - end. - ensure_keepalive_timer(State = ?PROPERTY('Server-Keep-Alive', Secs)) -> ensure_keepalive_timer(timer:seconds(Secs), State); ensure_keepalive_timer(State = #state{keepalive = 0}) -> @@ -1291,9 +1061,6 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> Error end. -deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER -> - %% message has been terminated by request handler, hence should not continue processing - State; deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}, State) -> @@ -1303,17 +1070,17 @@ deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, ok = eval_msg_handler(State, publish, Msg), State. -eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, owner = Owner}, disconnected, {ReasonCode, Properties}) -> %% Special handling for disconnected message when there is no handler callback Owner ! {disconnected, ReasonCode, Properties}, ok; -eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER}, +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR}, disconnected, _OtherReason) -> %% do nothing to be backward compatible ok; -eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, +eval_msg_handler(#state{msg_handler = ?NO_MSG_HDLR, owner = Owner}, Kind, Msg) -> Owner ! {Kind, Msg}, ok; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9618a351e..96aae9c56 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -607,27 +607,31 @@ deliver({connack, ReasonCode}, PState) -> deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, proto_ver = ?MQTT_PROTO_V5, client_id = ClientId, - conn_props = ConnProps, is_assigned = IsAssigned, topic_alias_maximum = TopicAliasMaximum}) -> - ResponseInformation = case maps:find('Request-Response-Information', ConnProps) of - {ok, 1} -> - iolist_to_binary(emqx_config:get_env(response_topic_prefix)); - _ -> - <<>> - end, #{max_packet_size := MaxPktSize, max_qos_allowed := MaxQoS, mqtt_retain_available := Retain, max_topic_alias := MaxAlias, mqtt_shared_subscription := Shared, mqtt_wildcard_subscription := Wildcard} = caps(PState), + %% Response-Information is so far not set by broker. + %% i.e. It's a Client-to-Client contract for the request-response topic naming scheme. + %% According to MQTT 5.0 spec: + %% A common use of this is to pass a globally unique portion of the topic tree which + %% is reserved for this Client for at least the lifetime of its Session. + %% This often cannot just be a random name as both the requesting Client and the + %% responding Client need to be authorized to use it. + %% If we are to support it in the feature, the implementation should be flexible + %% to allow prefixing the response topic based on different ACL config. + %% e.g. prefix by username or client-id, so that unauthorized clients can not + %% subscribe requests or responses that are not intended for them. Props = #{'Retain-Available' => flag(Retain), 'Maximum-Packet-Size' => MaxPktSize, 'Topic-Alias-Maximum' => MaxAlias, 'Wildcard-Subscription-Available' => flag(Wildcard), 'Subscription-Identifier-Available' => 1, - 'Response-Information' => ResponseInformation, + %'Response-Information' => 'Shared-Subscription-Available' => flag(Shared)}, Props1 = if diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 7b86f851c..5ac11f1af 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -32,8 +32,7 @@ <<"+/+">>, <<"TopicA/#">>]). all() -> - [{group, mqttv4}, - {group, mqttv5}]. + [{group, mqttv4}]. groups() -> [{mqttv4, [non_parallel_tests], @@ -44,10 +43,7 @@ groups() -> %% keepalive_test, redelivery_on_reconnect_test, %% subscribe_failure_test, - dollar_topics_test]}, - {mqttv5, [non_parallel_tests], - [request_response, - share_sub_request_topic]}]. + dollar_topics_test]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -56,89 +52,6 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). -request_response_exception(QoS) -> - {ok, Client} = emqx_client:start_link([{proto_ver, v5}, - {properties, #{ 'Request-Response-Information' => 0 }}]), - {ok, _} = emqx_client:connect(Client), - ?assertError(no_response_information, - emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)), - ok = emqx_client:disconnect(Client). - -request_response_per_qos(QoS) -> - {ok, Requester} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"requester">>}, - {properties, #{ 'Request-Response-Information' => 1}}]), - {ok, _} = emqx_client:connect(Requester), - {ok, Responser} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"responser">>}, - {properties, #{ - 'Request-Response-Information' => 1}}, - {request_handler, - fun(_Req) -> <<"ResponseTest">> end} - ]), - {ok, _} = emqx_client:connect(Responser), - ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>), - {ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), - ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) -> - <<"ResponseFunctionTest">>; - (_) -> - <<"404">> - end), - {ok, <<"ResponseFunctionTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), - {ok, <<"404">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"invalid_request">>, QoS), - ok = emqx_client:disconnect(Responser), - ok = emqx_client:disconnect(Requester). - -request_response(_Config) -> - request_response_per_qos(?QOS_2), - request_response_per_qos(?QOS_1), - request_response_per_qos(?QOS_0), - request_response_exception(?QOS_0), - request_response_exception(?QOS_1), - request_response_exception(?QOS_2). - -share_sub_request_topic(_Config) -> - share_sub_request_topic_per_qos(?QOS_2), - share_sub_request_topic_per_qos(?QOS_1), - share_sub_request_topic_per_qos(?QOS_0). - -share_sub_request_topic_per_qos(QoS) -> - application:set_env(?APPLICATION, shared_subscription_strategy, random), - ReqTopic = <<"request-topic">>, - RspTopic = <<"response-topic">>, - Group = <<"g1">>, - Properties = #{ 'Request-Response-Information' => 1}, - Opts = fun(ClientId) -> [{proto_ver, v5}, - {client_id, atom_to_binary(ClientId, utf8)}, - {properties, Properties} - ] end, - {ok, Requester} = emqx_client:start_link(Opts(requester)), - {ok, _} = emqx_client:connect(Requester), - - {ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]), - {ok, _} = emqx_client:connect(Responser1), - - {ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]), - {ok, _} = emqx_client:connect(Responser2), - - ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group), - ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group), - %% Send a request, wait for response, validate response then return responser ID - ReqFun = fun(Req) -> - {ok, Rsp} = emqx_client:request(Requester, RspTopic, ReqTopic, Req, QoS), - case Rsp of - <<"1-", Req/binary>> -> 1; - <<"2-", Req/binary>> -> 2 - end - end, - Ids = lists:map(fun(I) -> ReqFun(integer_to_binary(I)) end, lists:seq(1, 100)), - %% we are testing with random shared-dispatch strategy, - %% fail if not all responsers got a chance to handle requests - ?assertEqual([1, 2], lists:usort(Ids)), - ok = emqx_client:disconnect(Responser1), - ok = emqx_client:disconnect(Responser2), - ok = emqx_client:disconnect(Requester). - receive_messages(Count) -> receive_messages(Count, []). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 74271d685..0297d2615 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -154,9 +154,10 @@ connect_v5(_) -> #{'Request-Response-Information' => 1}}) )), {ok, Data} = gen_tcp:recv(Sock, 0), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, - #{'Response-Information' := _RespInfo}), _} = - raw_recv_parse(Data, ?MQTT_PROTO_V5) + {ok, ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), _} = + raw_recv_parse(Data, ?MQTT_PROTO_V5), + ?assertNot(maps:is_key('Response-Information', Props)), + ok end), % topic alias = 0 diff --git a/test/emqx_request_handler.erl b/test/emqx_request_handler.erl new file mode 100644 index 000000000..d80288023 --- /dev/null +++ b/test/emqx_request_handler.erl @@ -0,0 +1,102 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements a request handler based on emqx_client. +%% A request handler is a MQTT client which subscribes to a request topic, +%% processes the requests then send response to another topic which is +%% subscribed by the request sender. +%% This code is in test directory because request and response are pure +%% client-side behaviours. + +-module(emqx_request_handler). + +-export([start_link/4, stop/1]). + +-include("emqx_client.hrl"). + +-type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos(). +-type topic() :: emqx_topic:topic(). +-type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()). + +-spec start_link(topic(), qos(), handler(), emqx_client:options()) -> + {ok, pid()} | {error, any()}. +start_link(RequestTopic, QoS, RequestHandler, Options0) -> + Parent = self(), + MsgHandler = make_msg_handler(RequestHandler, Parent), + Options = [{msg_handler, MsgHandler} | Options0], + case emqx_client:start_link(Options) of + {ok, Pid} -> + {ok, _} = emqx_client:connect(Pid), + try subscribe(Pid, RequestTopic, QoS) of + ok -> {ok, Pid}; + {error, _} = Error -> Error + catch + C : E : S -> + emqx_client:stop(Pid), + {error, {C, E, S}} + end; + {error, _} = Error -> Error + end. + +stop(Pid) -> + emqx_client:disconnect(Pid). + +make_msg_handler(RequestHandler, Parent) -> + #{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end, + puback => fun(_Ack) -> ok end, + disconnected => fun(_Reason) -> ok end + }. + +handle_msg(ReqMsg, RequestHandler, Parent) -> + #{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg, + case maps:find('Response-Topic', Props) of + {ok, RspTopic} when RspTopic =/= <<>> -> + CorrData = maps:get('Correlation-Data', Props), + RspProps = maps:without(['Response-Topic'], Props), + RspPayload = RequestHandler(CorrData, ReqPayload), + RspMsg = #mqtt_msg{qos = QoS, + topic = RspTopic, + props = RspProps, + payload = RspPayload + }, + emqx_logger:debug("~p sending response msg to topic ~s with~n" + "corr-data=~p~npayload=~p", + [?MODULE, RspTopic, CorrData, RspPayload]), + ok = send_response(RspMsg); + _ -> + Parent ! {discarded, ReqPayload}, + ok + end. + +send_response(Msg) -> + %% This function is evaluated by emqx_client itself. + %% hence delegate to another temp process for the loopback gen_statem call. + Client = self(), + _ = spawn_link(fun() -> + case emqx_client:publish(Client, Msg) of + ok -> ok; + {ok, _} -> ok; + {error, Reason} -> exit({failed_to_publish_response, Reason}) + end + end), + ok. + +subscribe(Client, Topic, QoS) -> + {ok, _Props, _QoS} = + emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false}, + {nl, true}, {qos, QoS}]}]), + ok. + + + diff --git a/test/emqx_request_response_SUITE.erl b/test/emqx_request_response_SUITE.erl new file mode 100644 index 000000000..6709e958e --- /dev/null +++ b/test/emqx_request_response_SUITE.erl @@ -0,0 +1,68 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_request_response_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +init_per_suite(Config) -> + emqx_ct_broker_helpers:run_setup_steps([{log_level, error} | Config]). + +end_per_suite(_Config) -> + emqx_ct_broker_helpers:run_teardown_steps(). + +all() -> + [request_response]. + +request_response(_Config) -> + request_response_per_qos(?QOS_0), + request_response_per_qos(?QOS_1), + request_response_per_qos(?QOS_2). + +request_response_per_qos(QoS) -> + ReqTopic = <<"request_topic">>, + RspTopic = <<"response_topic">>, + {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS, + [{proto_ver, v5}, + {client_id, <<"requester">>}, + {properties, #{ 'Request-Response-Information' => 1}}]), + %% This is a square service + Square = fun(_CorrData, ReqBin) -> + I = b2i(ReqBin), + i2b(I * I) + end, + {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square, + [{proto_ver, v5}, + {client_id, <<"responser">>} + ]), + ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS), + receive + {response, <<"corr-1">>, <<"4">>} -> + ok; + Other -> + erlang:error({unexpected, Other}) + after + 100 -> + erlang:error(timeout) + end, + ok = emqx_request_sender:stop(Requester), + ok = emqx_request_handler:stop(Responser). + +b2i(B) -> binary_to_integer(B). +i2b(I) -> integer_to_binary(I). diff --git a/test/emqx_request_sender.erl b/test/emqx_request_sender.erl new file mode 100644 index 000000000..e01db96c0 --- /dev/null +++ b/test/emqx_request_sender.erl @@ -0,0 +1,82 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% @doc This module implements a request sender based on emqx_client. +%% A request sender is a MQTT client which sends messages to a request +%% topic, and subscribes to another topic for responses. +%% This code is in test directory because request and response are pure +%% client-side behaviours. + +-module(emqx_request_sender). + +-export([start_link/3, stop/1, send/6]). + +-include("emqx_client.hrl"). + +start_link(ResponseTopic, QoS, Options0) -> + Parent = self(), + MsgHandler = make_msg_handler(Parent), + Options = [{msg_handler, MsgHandler} | Options0], + case emqx_client:start_link(Options) of + {ok, Pid} -> + {ok, _} = emqx_client:connect(Pid), + try subscribe(Pid, ResponseTopic, QoS) of + ok -> {ok, Pid}; + {error, _} = Error -> Error + catch + C : E : S -> + emqx_client:stop(Pid), + {error, {C, E, S}} + end; + {error, _} = Error -> Error + end. + +%% @doc Send a message to request topic with correlation-data `CorrData'. +%% Response should be delivered as a `{response, CorrData, Payload}' +send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) -> + Props = #{'Response-Topic' => RspTopic, + 'Correlation-Data' => CorrData + }, + Msg = #mqtt_msg{qos = QoS, + topic = ReqTopic, + props = Props, + payload = Payload + }, + case emqx_client:publish(Client, Msg) of + ok -> ok; %% QoS = 0 + {ok, _} -> ok; + {error, _} = E -> E + end. + +stop(Pid) -> + emqx_client:disconnect(Pid). + +subscribe(Client, Topic, QoS) -> + case emqx_client:subscribe(Client, Topic, QoS) of + {ok, _, _} -> ok; + {error, _} = Error -> Error + end. + +make_msg_handler(Parent) -> + #{publish => fun(Msg) -> handle_msg(Msg, Parent) end, + puback => fun(_Ack) -> ok end, + disconnected => fun(_Reason) -> ok end + }. + +handle_msg(Msg, Parent) -> + #{properties := Props, payload := Payload} = Msg, + CorrData = maps:get('Correlation-Data', Props), + Parent ! {response, CorrData, Payload}, + ok. +