feat(bridges): add connector for mqtt bridge

This commit is contained in:
Shawn 2021-09-06 17:34:14 +08:00
parent bfb2df37ce
commit 9ed90ba7a9
4 changed files with 152 additions and 16 deletions

View File

@ -50,15 +50,11 @@
start(Config) ->
Parent = self(),
Address = maps:get(address, Config),
{Host, Port} = maps:get(server, Config),
Mountpoint = maps:get(receive_mountpoint, Config, undefined),
Subscriptions = maps:get(subscriptions, Config, []),
Subscriptions1 = check_subscriptions(Subscriptions),
Handlers = make_hdlr(Parent, Mountpoint),
{Host, Port} = case string:tokens(Address, ":") of
[H] -> {H, 1883};
[H, P] -> {H, list_to_integer(P)}
end,
Config1 = Config#{
msg_handler => Handlers,
host => Host,

View File

@ -39,12 +39,10 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
BridgesConf = emqx:get_config([?APP, bridges], []),
BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf),
SupFlag = #{strategy => one_for_one,
intensity => 100,
period => 10},
{ok, {SupFlag, BridgeSpec}}.
{ok, {SupFlag, []}}.
bridge_spec(Config) ->
Name = list_to_atom(maps:get(name, Config)),
@ -57,7 +55,8 @@ bridge_spec(Config) ->
-spec(bridges() -> [{node(), map()}]).
bridges() ->
[{Name, emqx_bridge_worker:status(Name)} || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)].
[{Name, emqx_bridge_worker:status(Name)}
|| {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)].
create_bridge(Config) ->
supervisor:start_child(?MODULE, bridge_spec(Config)).

View File

@ -135,12 +135,12 @@
%% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disable
%% forwards: Local topics to subscribe.
%% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each
%% replayq.batch_bytes_limit: Max number of bytes to collect in a batch for each
%% send call towards emqx_bridge_connect
%% queue.batch_count_limit: Max number of messages to collect in a batch for
%% replayq.batch_count_limit: Max number of messages to collect in a batch for
%% each send call towards emqx_bridge_connect
%% queue.replayq_dir: Directory where replayq should persist messages
%% queue.replayq_seg_bytes: Size in bytes for each replayq segment file
%% replayq.dir: Directory where replayq should persist messages
%% replayq.seg_bytes: Size in bytes for each replayq segment file
%%
%% Find more connection specific configs in the callback modules
%% of emqx_bridge_connect behaviour.
@ -208,7 +208,7 @@ init(Opts) ->
ConnectOpts = maps:get(config, Opts),
ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)),
Forwards = maps:get(forwards, Opts, []),
Queue = open_replayq(maps:get(queue, Opts, #{})),
Queue = open_replayq(maps:get(replayq, Opts, #{})),
State = init_opts(Opts),
self() ! idle,
{ok, idle, State#{connect_module => ConnectModule,
@ -236,8 +236,8 @@ init_opts(Opts) ->
name => Name}.
open_replayq(QCfg) ->
Dir = maps:get(replayq_dir, QCfg, undefined),
SegBytes = maps:get(replayq_seg_bytes, QCfg, ?DEFAULT_SEG_BYTES),
Dir = maps:get(dir, QCfg, undefined),
SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES),
MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE),
QueueConfig = case Dir =:= undefined orelse Dir =:= "" of
true -> #{mem_only => true};

View File

@ -0,0 +1,141 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
%% callbacks of behaviour emqx_resource
-export([ on_start/2
, on_stop/2
, on_query/4
, on_health_check/2
]).
-behaviour(hocon_schema).
-export([ roots/0
, fields/1]).
%%=====================================================================
%% Hocon schema
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, "config")}}].
fields("config") ->
[ {server, emqx_schema:t(string(), undefined, "127.0.0.1:1883")}
, {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, emqx_schema:t(boolean(), undefined, true)}
, {clientid, emqx_schema:t(string())}
, {username, emqx_schema:t(string())}
, {password, emqx_schema:t(string())}
, {clean_start, emqx_schema:t(boolean(), undefined, true)}
, {keepalive, emqx_schema:t(integer(), undefined, 300)}
, {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
, {max_inflight, emqx_schema:t(integer(), undefined, 32)}
, {replayq, emqx_schema:t(hoconsc:ref(?MODULE, "replayq"))}
, {in, hoconsc:array("in")}
, {out, hoconsc:array("out")}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("in") ->
[ {from_remote_topic, #{type => binary(), nullable => false}}
, {to_local_topic, #{type => binary(), nullable => false}}
, {qos, emqx_schema:t(integer(), undefined, 1)}
, {payload_template, emqx_schema:t(binary(), undefined, <<"${message}">>)}
];
fields("out") ->
[ {to_remote_topic, #{type => binary(), nullable => false}}
, {from_local_topic, #{type => binary(), nullable => false}}
, {payload_template, emqx_schema:t(binary(), undefined, <<"${payload}">>)}
];
fields("replayq") ->
[ {dir, hoconsc:union([boolean(), string()])}
, {seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")}
, {offload_mode, emqx_schema:t(boolean(), undefined, false)}
, {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")}
].
proto_ver(type) -> hoconsc:enum([v3, v4, v5]);
proto_ver(default) -> v4;
proto_ver(_) -> undefined.
%% ===================================================================
on_start(InstId, #{server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMod,
clientid := ClientID,
username := User,
password := Password,
clean_start := CleanStart,
keepalive := KeepAlive,
retry_interval := RetryIntv,
max_inflight := MaxInflight,
replayq := ReplayQ,
in := In,
out := Out,
ssl := #{enable := EnableSsl} = Ssl} = Conf) ->
logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]),
BridgeConf = Conf#{
name => InstId,
config => #{
conn_type => mqtt,
subscriptions => In,
forwards => Out,
replayq => ReplayQ,
%% connection opts
server => Server,
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
bridge_mode => BridgeMod,
clientid => ClientID,
username => User,
password => Password,
clean_start => CleanStart,
keepalive => KeepAlive,
retry_interval => RetryIntv,
max_inflight => MaxInflight,
ssl => EnableSsl,
ssl_opts => maps:to_list(maps:remove(enable, Ssl)),
if_record_metrics => true
}
},
case emqx_bridge_mqtt_sup:create_bridge(BridgeConf) of
{ok, _Pid} ->
{ok, #{}};
{error, {already_started, _Pid}} ->
{ok, #{}};
{error, Reason} ->
{error, Reason}
end.
on_stop(InstId, #{}) ->
logger:info("stopping mqtt connector: ~p", [InstId]),
emqx_bridge_mqtt_sup:drop_bridge(InstId).
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
%% `in` and `out` config
on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) ->
logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]);
on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) ->
logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]).
on_health_check(InstId, #{}) ->
emqx_bridge_mqtt_sup:try_ping(InstId).