diff --git a/rebar.config b/rebar.config index c28a43276..798d812a9 100644 --- a/rebar.config +++ b/rebar.config @@ -31,7 +31,8 @@ [{deps, [{meck, "0.8.13"}, % hex {bbmustache, "1.7.0"}, % hex - {emqx_ct_helpers, "1.1.3"} % hex + {emqx_ct_helpers, "1.1.3"}, % hex + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "v1.0.1"}}} ]} ]} ]}. diff --git a/test/emqx_request_handler.erl b/test/emqx_request_handler.erl deleted file mode 100644 index 19bd4b880..000000000 --- a/test/emqx_request_handler.erl +++ /dev/null @@ -1,97 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc This module implements a request handler based on emqx_client. -%% A request handler is a MQTT client which subscribes to a request topic, -%% processes the requests then send response to another topic which is -%% subscribed by the request sender. -%% This code is in test directory because request and response are pure -%% client-side behaviours. - --module(emqx_request_handler). - --export([start_link/4, stop/1]). - --type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos(). --type topic() :: emqx_topic:topic(). --type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()). - --spec start_link(topic(), qos(), handler(), emqx_client:options()) -> - {ok, pid()} | {error, any()}. -start_link(RequestTopic, QoS, RequestHandler, Options0) -> - Parent = self(), - MsgHandler = make_msg_handler(RequestHandler, Parent), - Options = [{msg_handler, MsgHandler} | Options0], - case emqx_client:start_link(Options) of - {ok, Pid} -> - {ok, _} = emqx_client:connect(Pid), - try subscribe(Pid, RequestTopic, QoS) of - ok -> {ok, Pid}; - {error, _} = Error -> Error - catch - C : E : S -> - emqx_client:stop(Pid), - {error, {C, E, S}} - end; - {error, _} = Error -> Error - end. - -stop(Pid) -> - emqx_client:disconnect(Pid). - -make_msg_handler(RequestHandler, Parent) -> - #{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end, - puback => fun(_Ack) -> ok end, - disconnected => fun(_Reason) -> ok end - }. - -handle_msg(ReqMsg, RequestHandler, Parent) -> - #{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg, - case maps:find('Response-Topic', Props) of - {ok, RspTopic} when RspTopic =/= <<>> -> - CorrData = maps:get('Correlation-Data', Props), - RspProps = maps:without(['Response-Topic'], Props), - RspPayload = RequestHandler(CorrData, ReqPayload), - emqx_logger:debug("~p sending response msg to topic ~s with~n" - "corr-data=~p~npayload=~p", - [?MODULE, RspTopic, CorrData, RspPayload]), - ok = send_response(RspTopic, RspProps, RspPayload, QoS); - _ -> - Parent ! {discarded, ReqPayload}, - ok - end. - -send_response(Topic, Properties, Payload, QoS) -> - %% This function is evaluated by emqx_client itself. - %% hence delegate to another temp process for the loopback gen_statem call. - Client = self(), - _ = spawn_link(fun() -> - case emqx_client:publish(Client, Topic, Properties, Payload, [{qos, QoS}]) of - ok -> ok; - {ok, _} -> ok; - {error, Reason} -> exit({failed_to_publish_response, Reason}) - end - end), - ok. - -subscribe(Client, Topic, QoS) -> - {ok, _Props, _QoS} = - emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false}, - {nl, true}, {qos, QoS}]}]), - ok. - - - diff --git a/test/emqx_request_response_SUITE.erl b/test/emqx_request_response_SUITE.erl deleted file mode 100644 index e1f56caa4..000000000 --- a/test/emqx_request_response_SUITE.erl +++ /dev/null @@ -1,71 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_request_response_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include("emqx_mqtt.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - -init_per_suite(Config) -> - emqx_ct_helpers:start_apps([]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -all() -> - [request_response]. - -request_response(_Config) -> - request_response_per_qos(?QOS_0), - request_response_per_qos(?QOS_1), - request_response_per_qos(?QOS_2). - -request_response_per_qos(QoS) -> - ReqTopic = <<"request_topic">>, - RspTopic = <<"response_topic">>, - {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS, - [{proto_ver, v5}, - {client_id, <<"requester">>}, - {properties, #{ 'Request-Response-Information' => 1}}]), - %% This is a square service - Square = fun(_CorrData, ReqBin) -> - I = b2i(ReqBin), - i2b(I * I) - end, - {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square, - [{proto_ver, v5}, - {client_id, <<"responser">>} - ]), - ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS), - receive - {response, <<"corr-1">>, <<"4">>} -> - ok; - Other -> - erlang:error({unexpected, Other}) - after - 100 -> - erlang:error(timeout) - end, - ok = emqx_request_sender:stop(Requester), - ok = emqx_request_handler:stop(Responser). - -b2i(B) -> binary_to_integer(B). -i2b(I) -> integer_to_binary(I). diff --git a/test/emqx_request_sender.erl b/test/emqx_request_sender.erl deleted file mode 100644 index 729cfec2b..000000000 --- a/test/emqx_request_sender.erl +++ /dev/null @@ -1,77 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc This module implements a request sender based on emqx_client. -%% A request sender is a MQTT client which sends messages to a request -%% topic, and subscribes to another topic for responses. -%% This code is in test directory because request and response are pure -%% client-side behaviours. - --module(emqx_request_sender). - --export([start_link/3, stop/1, send/6]). - -start_link(ResponseTopic, QoS, Options0) -> - Parent = self(), - MsgHandler = make_msg_handler(Parent), - Options = [{msg_handler, MsgHandler} | Options0], - case emqx_client:start_link(Options) of - {ok, Pid} -> - {ok, _} = emqx_client:connect(Pid), - try subscribe(Pid, ResponseTopic, QoS) of - ok -> {ok, Pid}; - {error, _} = Error -> Error - catch - C : E : S -> - emqx_client:stop(Pid), - {error, {C, E, S}} - end; - {error, _} = Error -> Error - end. - -%% @doc Send a message to request topic with correlation-data `CorrData'. -%% Response should be delivered as a `{response, CorrData, Payload}' -send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) -> - Props = #{'Response-Topic' => RspTopic, - 'Correlation-Data' => CorrData - }, - case emqx_client:publish(Client, ReqTopic, Props, Payload, [{qos, QoS}]) of - ok -> ok; %% QoS = 0 - {ok, _} -> ok; - {error, _} = E -> E - end. - -stop(Pid) -> - emqx_client:disconnect(Pid). - -subscribe(Client, Topic, QoS) -> - case emqx_client:subscribe(Client, Topic, QoS) of - {ok, _, _} -> ok; - {error, _} = Error -> Error - end. - -make_msg_handler(Parent) -> - #{publish => fun(Msg) -> handle_msg(Msg, Parent) end, - puback => fun(_Ack) -> ok end, - disconnected => fun(_Reason) -> ok end - }. - -handle_msg(Msg, Parent) -> - #{properties := Props, payload := Payload} = Msg, - CorrData = maps:get('Correlation-Data', Props), - Parent ! {response, CorrData, Payload}, - ok. -