diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 0153f6570..389c9e902 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -59,7 +59,7 @@ -define(RESPONSE_TIMEOUT_SECONDS, timer:seconds(5)). --define(NO_HANDLER, undefined). +-define(NO_REQ_HANDLER, undefined). -define(NO_GROUP, <<>>). @@ -67,10 +67,23 @@ -type(host() :: inet:ip_address() | inet:hostname()). --type corr_data() :: binary(). +-type(corr_data() :: binary()). + +%% NOTE: Message handler is different from request handler. +%% Message handler is a set of callbacks defined to handle MQTT messages as well as +%% the disconnect event. +%% Request handler is a callback to handle received MQTT message as in 'request', +%% and publish another MQTT message back to the defined topic as in 'response'. +%% `owner' and `msg_handler' has no effect when `request_handler' is set. +-define(NO_MSG_HDLR, undefined). +-type(msg_handler() :: #{puback := fun((_) -> any()), + publish := fun((emqx_types:message()) -> any()), + disconnected := fun(({reason_code(), _Properties :: term()}) -> any()) + }). -type(option() :: {name, atom()} | {owner, pid()} + | {msg_handler, msg_handler()} | {host, host()} | {hosts, [{host(), inet:port_number()}]} | {port, inet:port_number()} @@ -102,6 +115,7 @@ -record(state, {name :: atom(), owner :: pid(), + msg_handler :: ?NO_MSG_HDLR | msg_handler(), host :: host(), port :: inet:port_number(), hosts :: [{host(), inet:port_number()}], @@ -497,7 +511,7 @@ init([Options]) -> auto_ack = true, ack_timeout = ?DEFAULT_ACK_TIMEOUT, retry_interval = 0, - request_handler = ?NO_HANDLER, + request_handler = ?NO_REQ_HANDLER, connect_timeout = ?DEFAULT_CONNECT_TIMEOUT, last_packet_id = 1}), {ok, initialized, init_parse_state(State)}. @@ -516,6 +530,8 @@ init([{name, Name} | Opts], State) -> init([{owner, Owner} | Opts], State) when is_pid(Owner) -> link(Owner), init(Opts, State#state{owner = Owner}); +init([{msg_handler, Hdlr} | Opts], State) -> + init(Opts, State#state{msg_handler = Hdlr}); init([{host, Host} | Opts], State) -> init(Opts, State#state{host = Host}); init([{port, Port} | Opts], State) -> @@ -857,12 +873,12 @@ connected(cast, Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> publish_process(?QOS_2, Packet, State); connected(cast, ?PUBACK_PACKET(PacketId, ReasonCode, Properties), - State = #state{owner = Owner, inflight = Inflight}) -> + State = #state{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, #mqtt_msg{packet_id = PacketId}, _Ts}} -> - Owner ! {puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}}, + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; none -> emqx_logger:warning("Unexpected PUBACK: ~p", [PacketId]), @@ -899,12 +915,12 @@ connected(cast, ?PUBREL_PACKET(PacketId), end; connected(cast, ?PUBCOMP_PACKET(PacketId, ReasonCode, Properties), - State = #state{owner = Owner, inflight = Inflight}) -> + State = #state{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {pubrel, _PacketId, _Ts}} -> - Owner ! {puback, #{packet_id => PacketId, - reason_code => ReasonCode, - properties => Properties}}, + ok = eval_msg_handler(State, puback, #{packet_id => PacketId, + reason_code => ReasonCode, + properties => Properties}), {keep_state, State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}}; none -> emqx_logger:warning("Unexpected PUBCOMP Packet: ~p", [PacketId]), @@ -943,9 +959,8 @@ connected(cast, ?PACKET(?PINGRESP), State) -> false -> {keep_state, State} end; -connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), - State = #state{owner = Owner}) -> - Owner ! {disconnected, ReasonCode, Properties}, +connected(cast, ?DISCONNECT_PACKET(ReasonCode, Properties), State) -> + ok = eval_msg_handler(State, disconnected, {ReasonCode, Properties}), {stop, disconnected, State}; connected(info, {timeout, _TRef, keepalive}, State = #state{force_ping = true}) -> @@ -1101,8 +1116,8 @@ assign_id(?NO_CLIENT_ID, Props) -> assign_id(Id, _Props) -> Id. -publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State = #state{auto_ack = AutoAck}) -> - _ = deliver(packet_to_msg(Packet), State), +publish_process(?QOS_1, Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), State0 = #state{auto_ack = AutoAck}) -> + State = deliver(packet_to_msg(Packet), State0), case AutoAck of true -> send_puback(?PUBACK_PACKET(PacketId), State); false -> {keep_state, State} @@ -1116,18 +1131,11 @@ publish_process(?QOS_2, Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), Stop -> Stop end. -response_publish(undefined, State, _QoS, _Payload) -> - State; -response_publish(Properties, State = #state{request_handler = RequestHandler}, QoS, Payload) -> - case maps:find('Response-Topic', Properties) of - {ok, ResponseTopic} -> - case RequestHandler of - ?NO_HANDLER -> State; - _ -> do_publish(ResponseTopic, Properties, State, QoS, Payload) - end; - _ -> - State - end. +response_publish(#{'Response-Topic' := ResponseTopic} = Properties, + State = #state{request_handler = RequestHandler}, QoS, Payload) + when RequestHandler =/= ?NO_REQ_HANDLER -> + do_publish(ResponseTopic, Properties, State, QoS, Payload); +response_publish(_Properties, State, _QoS, _Payload) -> State. do_publish(ResponseTopic, Properties, State = #state{request_handler = RequestHandler}, ?QOS_0, Payload) -> Msg = #mqtt_msg{qos = ?QOS_0, @@ -1251,19 +1259,33 @@ retry_send(pubrel, PacketId, Now, State = #state{inflight = Inflight}) -> Error end. +deliver(_Msg, State = #state{request_handler = Hdlr}) when Hdlr =/= ?NO_REQ_HANDLER -> + %% message has been terminated by request handler, hence should not continue processing + State; deliver(#mqtt_msg{qos = QoS, dup = Dup, retain = Retain, packet_id = PacketId, topic = Topic, props = Props, payload = Payload}, - State = #state{owner = Owner, request_handler = RequestHandler}) -> - case RequestHandler of - ?NO_HANDLER -> - Owner ! {publish, #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, - topic => Topic, properties => Props, payload => Payload, - client_pid => self()}}; - _ -> - ok - end, + State) -> + Msg = #{qos => QoS, dup => Dup, retain => Retain, packet_id => PacketId, + topic => Topic, properties => Props, payload => Payload, + client_pid => self()}, + ok = eval_msg_handler(State, publish, Msg), State. +eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, + owner = Owner}, + disconnected, {ReasonCode, Properties}) -> + %% Special handling for disconnected message when there is no handler callback + Owner ! {disconnected, ReasonCode, Properties}, + ok; +eval_msg_handler(#state{msg_handler = ?NO_REQ_HANDLER, + owner = Owner}, Kind, Msg) -> + Owner ! {Kind, Msg}, + ok; +eval_msg_handler(#state{msg_handler = Handler}, Kind, Msg) -> + F = maps:get(Kind, Handler), + _ = F(Msg), + ok. + packet_to_msg(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, dup = Dup, qos = QoS,