210 lines
6.6 KiB
Erlang
210 lines
6.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
|
|
|
|
-module(emqx_bridge_mqtt).
|
|
|
|
-behaviour(emqx_bridge_connect).
|
|
|
|
%% behaviour callbacks
|
|
-export([ start/1
|
|
, send/2
|
|
, stop/1
|
|
]).
|
|
|
|
%% optional behaviour callbacks
|
|
-export([ ensure_subscribed/3
|
|
, ensure_unsubscribed/2
|
|
]).
|
|
|
|
%% callbacks for emqtt
|
|
-export([ handle_puback/2
|
|
, handle_publish/2
|
|
, handle_disconnected/2
|
|
]).
|
|
|
|
%% for testing
|
|
-ifdef(TEST).
|
|
-export([ replvar/1 ]).
|
|
-endif.
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
-define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}).
|
|
|
|
%% Messages towards ack collector process
|
|
-define(REF_IDS(Ref, Ids), {Ref, Ids}).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% emqx_bridge_connect callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
start(Config = #{address := Address}) ->
|
|
Parent = self(),
|
|
Mountpoint = maps:get(receive_mountpoint, Config, undefined),
|
|
Handlers = make_hdlr(Parent, Mountpoint),
|
|
{Host, Port} = case string:tokens(Address, ":") of
|
|
[H] -> {H, 1883};
|
|
[H, P] -> {H, list_to_integer(P)}
|
|
end,
|
|
ClientConfig = Config#{msg_handler => Handlers,
|
|
host => Host,
|
|
port => Port,
|
|
force_ping => true
|
|
},
|
|
case emqtt:start_link(replvar(ClientConfig)) of
|
|
{ok, Pid} ->
|
|
case emqtt:connect(Pid) of
|
|
{ok, _} ->
|
|
try
|
|
subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])),
|
|
{ok, #{client_pid => Pid}}
|
|
catch
|
|
throw : Reason ->
|
|
ok = stop(#{client_pid => Pid}),
|
|
{error, Reason}
|
|
end;
|
|
{error, Reason} ->
|
|
ok = stop(#{client_pid => Pid}),
|
|
{error, Reason}
|
|
end;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
stop(#{client_pid := Pid}) ->
|
|
safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
|
|
ok.
|
|
|
|
ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
|
|
case emqtt:subscribe(Pid, Topic, QoS) of
|
|
{ok, _, _} -> ok;
|
|
Error -> Error
|
|
end;
|
|
ensure_subscribed(_Conn, _Topic, _QoS) ->
|
|
%% return ok for now
|
|
%% next re-connect should should call start with new topic added to config
|
|
ok.
|
|
|
|
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
|
|
case emqtt:unsubscribe(Pid, Topic) of
|
|
{ok, _, _} -> ok;
|
|
Error -> Error
|
|
end;
|
|
ensure_unsubscribed(_, _) ->
|
|
%% return ok for now
|
|
%% next re-connect should should call start with this topic deleted from config
|
|
ok.
|
|
|
|
safe_stop(Pid, StopF, Timeout) ->
|
|
MRef = monitor(process, Pid),
|
|
unlink(Pid),
|
|
try
|
|
StopF()
|
|
catch
|
|
_ : _ ->
|
|
ok
|
|
end,
|
|
receive
|
|
{'DOWN', MRef, _, _, _} ->
|
|
ok
|
|
after
|
|
Timeout ->
|
|
exit(Pid, kill)
|
|
end.
|
|
|
|
send(Conn, Msgs) ->
|
|
send(Conn, Msgs, []).
|
|
|
|
send(_Conn, [], []) ->
|
|
%% all messages in the batch are QoS-0
|
|
Ref = make_ref(),
|
|
%% QoS-0 messages do not have packet ID
|
|
%% the batch ack is simulated with a loop-back message
|
|
self() ! {batch_ack, Ref},
|
|
{ok, Ref};
|
|
send(_Conn, [], PktIds) ->
|
|
%% PktIds is not an empty list if there is any non-QoS-0 message in the batch,
|
|
%% And the worker should wait for all acks
|
|
{ok, PktIds};
|
|
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) ->
|
|
case emqtt:publish(ClientPid, Msg) of
|
|
ok ->
|
|
send(Conn, Rest, PktIds);
|
|
{ok, PktId} ->
|
|
send(Conn, Rest, [PktId | PktIds]);
|
|
{error, Reason} ->
|
|
%% NOTE: There is no partial success of a batch and recover from the middle
|
|
%% only to retry all messages in one batch
|
|
{error, Reason}
|
|
end.
|
|
|
|
handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
|
|
when RC =:= ?RC_SUCCESS;
|
|
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->
|
|
Parent ! {batch_ack, PktId}, ok;
|
|
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
|
|
?LOG(warning, "Publish ~p to remote node failed, reason_code: ~p", [PktId, RC]).
|
|
|
|
handle_publish(Msg, Mountpoint) ->
|
|
emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Mountpoint)).
|
|
|
|
handle_disconnected(Reason, Parent) ->
|
|
Parent ! {disconnected, self(), Reason}.
|
|
|
|
make_hdlr(Parent, Mountpoint) ->
|
|
#{puback => {fun ?MODULE:handle_puback/2, [Parent]},
|
|
publish => {fun ?MODULE:handle_publish/2, [Mountpoint]},
|
|
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
|
|
}.
|
|
|
|
subscribe_remote_topics(ClientPid, Subscriptions) ->
|
|
lists:foreach(fun({Topic, Qos}) ->
|
|
case emqtt:subscribe(ClientPid, Topic, Qos) of
|
|
{ok, _, _} -> ok;
|
|
Error -> throw(Error)
|
|
end
|
|
end, Subscriptions).
|
|
|
|
replvar(Options) ->
|
|
replvar([topic, clientid, max_inflight], Options).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
replvar([], Options) ->
|
|
Options;
|
|
replvar([Key|More], Options) ->
|
|
case maps:get(Key, Options, undefined) of
|
|
undefined ->
|
|
replvar(More, Options);
|
|
Val ->
|
|
replvar(More, maps:put(Key, feedvar(Key, Val, Options), Options))
|
|
end.
|
|
|
|
%% ${node} => node()
|
|
feedvar(Key, Value, _) when Key =:= topic; Key =:= clientid ->
|
|
iolist_to_binary(re:replace(Value, "\\${node}", atom_to_list(node())));
|
|
|
|
feedvar(max_inflight, 0, _) ->
|
|
infinity;
|
|
|
|
feedvar(max_inflight, Size, _) ->
|
|
Size.
|