emqx/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl

210 lines
6.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
-module(emqx_bridge_mqtt).
-behaviour(emqx_bridge_connect).
%% behaviour callbacks
-export([ start/1
, send/2
, stop/1
]).
%% optional behaviour callbacks
-export([ ensure_subscribed/3
, ensure_unsubscribed/2
]).
%% callbacks for emqtt
-export([ handle_puback/2
, handle_publish/2
, handle_disconnected/2
]).
%% for testing
-ifdef(TEST).
-export([ replvar/1 ]).
-endif.
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).
%% Messages towards ack collector process
-define(REF_IDS(Ref, Ids), {Ref, Ids}).
%%--------------------------------------------------------------------
%% emqx_bridge_connect callbacks
%%--------------------------------------------------------------------
start(Config = #{address := Address}) ->
Parent = self(),
Mountpoint = maps:get(receive_mountpoint, Config, undefined),
Handlers = make_hdlr(Parent, Mountpoint),
{Host, Port} = case string:tokens(Address, ":") of
[H] -> {H, 1883};
[H, P] -> {H, list_to_integer(P)}
end,
ClientConfig = Config#{msg_handler => Handlers,
host => Host,
port => Port,
force_ping => true
},
case emqtt:start_link(replvar(ClientConfig)) of
{ok, Pid} ->
case emqtt:connect(Pid) of
{ok, _} ->
try
subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])),
{ok, #{client_pid => Pid}}
catch
throw : Reason ->
ok = stop(#{client_pid => Pid}),
{error, Reason}
end;
{error, Reason} ->
ok = stop(#{client_pid => Pid}),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
stop(#{client_pid := Pid}) ->
safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
ok.
ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
case emqtt:subscribe(Pid, Topic, QoS) of
{ok, _, _} -> ok;
Error -> Error
end;
ensure_subscribed(_Conn, _Topic, _QoS) ->
%% return ok for now
%% next re-connect should should call start with new topic added to config
ok.
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
case emqtt:unsubscribe(Pid, Topic) of
{ok, _, _} -> ok;
Error -> Error
end;
ensure_unsubscribed(_, _) ->
%% return ok for now
%% next re-connect should should call start with this topic deleted from config
ok.
safe_stop(Pid, StopF, Timeout) ->
MRef = monitor(process, Pid),
unlink(Pid),
try
StopF()
catch
_ : _ ->
ok
end,
receive
{'DOWN', MRef, _, _, _} ->
ok
after
Timeout ->
exit(Pid, kill)
end.
send(Conn, Msgs) ->
send(Conn, Msgs, []).
send(_Conn, [], []) ->
%% all messages in the batch are QoS-0
Ref = make_ref(),
%% QoS-0 messages do not have packet ID
%% the batch ack is simulated with a loop-back message
self() ! {batch_ack, Ref},
{ok, Ref};
send(_Conn, [], PktIds) ->
%% PktIds is not an empty list if there is any non-QoS-0 message in the batch,
%% And the worker should wait for all acks
{ok, PktIds};
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) ->
case emqtt:publish(ClientPid, Msg) of
ok ->
send(Conn, Rest, PktIds);
{ok, PktId} ->
send(Conn, Rest, [PktId | PktIds]);
{error, Reason} ->
%% NOTE: There is no partial success of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.
handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
when RC =:= ?RC_SUCCESS;
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->
Parent ! {batch_ack, PktId}, ok;
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
?LOG(warning, "Publish ~p to remote node failed, reason_code: ~p", [PktId, RC]).
handle_publish(Msg, Mountpoint) ->
emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Mountpoint)).
handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}.
make_hdlr(Parent, Mountpoint) ->
#{puback => {fun ?MODULE:handle_puback/2, [Parent]},
publish => {fun ?MODULE:handle_publish/2, [Mountpoint]},
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}.
subscribe_remote_topics(ClientPid, Subscriptions) ->
lists:foreach(fun({Topic, Qos}) ->
case emqtt:subscribe(ClientPid, Topic, Qos) of
{ok, _, _} -> ok;
Error -> throw(Error)
end
end, Subscriptions).
replvar(Options) ->
replvar([topic, clientid, max_inflight], Options).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
replvar([], Options) ->
Options;
replvar([Key|More], Options) ->
case maps:get(Key, Options, undefined) of
undefined ->
replvar(More, Options);
Val ->
replvar(More, maps:put(Key, feedvar(Key, Val, Options), Options))
end.
%% ${node} => node()
feedvar(Key, Value, _) when Key =:= topic; Key =:= clientid ->
iolist_to_binary(re:replace(Value, "\\${node}", atom_to_list(node())));
feedvar(max_inflight, 0, _) ->
infinity;
feedvar(max_inflight, Size, _) ->
Size.