Merge branch 'rm-emqx-client-hrl' into develop

This commit is contained in:
周子博 2019-08-02 10:31:49 +08:00
commit a98835279d
4 changed files with 16 additions and 41 deletions

View File

@ -1,21 +0,0 @@
%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-ifndef(EMQX_CLIENT_HRL).
-define(EMQX_CLIENT_HRL, true).
-include("emqx_mqtt.hrl").
-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false,
packet_id, topic, props, payload}).
-endif.

View File

@ -20,7 +20,7 @@
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include("emqx_client.hrl"). -include("emqx_mqtt.hrl").
-logger_header("[Client]"). -logger_header("[Client]").
@ -144,7 +144,17 @@
| {force_ping, boolean()} | {force_ping, boolean()}
| {properties, properties()}). | {properties, properties()}).
-type(mqtt_msg() :: #mqtt_msg{}). -record(mqtt_msg, {
qos = ?QOS_0,
retain = false,
dup = false,
packet_id,
topic,
props,
payload
}).
-opaque(mqtt_msg() :: #mqtt_msg{}).
-record(state, {name :: atom(), -record(state, {name :: atom(),
owner :: pid(), owner :: pid(),

View File

@ -23,8 +23,6 @@
-export([start_link/4, stop/1]). -export([start_link/4, stop/1]).
-include("emqx_client.hrl").
-type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos(). -type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos().
-type topic() :: emqx_topic:topic(). -type topic() :: emqx_topic:topic().
-type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()). -type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()).
@ -65,26 +63,21 @@ handle_msg(ReqMsg, RequestHandler, Parent) ->
CorrData = maps:get('Correlation-Data', Props), CorrData = maps:get('Correlation-Data', Props),
RspProps = maps:without(['Response-Topic'], Props), RspProps = maps:without(['Response-Topic'], Props),
RspPayload = RequestHandler(CorrData, ReqPayload), RspPayload = RequestHandler(CorrData, ReqPayload),
RspMsg = #mqtt_msg{qos = QoS,
topic = RspTopic,
props = RspProps,
payload = RspPayload
},
emqx_logger:debug("~p sending response msg to topic ~s with~n" emqx_logger:debug("~p sending response msg to topic ~s with~n"
"corr-data=~p~npayload=~p", "corr-data=~p~npayload=~p",
[?MODULE, RspTopic, CorrData, RspPayload]), [?MODULE, RspTopic, CorrData, RspPayload]),
ok = send_response(RspMsg); ok = send_response(RspTopic, RspProps, RspPayload, QoS);
_ -> _ ->
Parent ! {discarded, ReqPayload}, Parent ! {discarded, ReqPayload},
ok ok
end. end.
send_response(Msg) -> send_response(Topic, Properties, Payload, QoS) ->
%% This function is evaluated by emqx_client itself. %% This function is evaluated by emqx_client itself.
%% hence delegate to another temp process for the loopback gen_statem call. %% hence delegate to another temp process for the loopback gen_statem call.
Client = self(), Client = self(),
_ = spawn_link(fun() -> _ = spawn_link(fun() ->
case emqx_client:publish(Client, Msg) of case emqx_client:publish(Client, Topic, Properties, Payload, [{qos, QoS}]) of
ok -> ok; ok -> ok;
{ok, _} -> ok; {ok, _} -> ok;
{error, Reason} -> exit({failed_to_publish_response, Reason}) {error, Reason} -> exit({failed_to_publish_response, Reason})

View File

@ -22,8 +22,6 @@
-export([start_link/3, stop/1, send/6]). -export([start_link/3, stop/1, send/6]).
-include("emqx_client.hrl").
start_link(ResponseTopic, QoS, Options0) -> start_link(ResponseTopic, QoS, Options0) ->
Parent = self(), Parent = self(),
MsgHandler = make_msg_handler(Parent), MsgHandler = make_msg_handler(Parent),
@ -48,12 +46,7 @@ send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) ->
Props = #{'Response-Topic' => RspTopic, Props = #{'Response-Topic' => RspTopic,
'Correlation-Data' => CorrData 'Correlation-Data' => CorrData
}, },
Msg = #mqtt_msg{qos = QoS, case emqx_client:publish(Client, ReqTopic, Props, Payload, [{qos, QoS}]) of
topic = ReqTopic,
props = Props,
payload = Payload
},
case emqx_client:publish(Client, Msg) of
ok -> ok; %% QoS = 0 ok -> ok; %% QoS = 0
{ok, _} -> ok; {ok, _} -> ok;
{error, _} = E -> E {error, _} = E -> E