diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index 8d442463b..2fd5447a5 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -50,15 +50,11 @@ start(Config) -> Parent = self(), - Address = maps:get(address, Config), + {Host, Port} = maps:get(server, Config), Mountpoint = maps:get(receive_mountpoint, Config, undefined), Subscriptions = maps:get(subscriptions, Config, []), Subscriptions1 = check_subscriptions(Subscriptions), Handlers = make_hdlr(Parent, Mountpoint), - {Host, Port} = case string:tokens(Address, ":") of - [H] -> {H, 1883}; - [H, P] -> {H, list_to_integer(P)} - end, Config1 = Config#{ msg_handler => Handlers, host => Host, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl index 4207067fe..23ecdb5f9 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl @@ -39,12 +39,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - BridgesConf = emqx:get_config([?APP, bridges], []), - BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf), SupFlag = #{strategy => one_for_one, intensity => 100, period => 10}, - {ok, {SupFlag, BridgeSpec}}. + {ok, {SupFlag, []}}. bridge_spec(Config) -> Name = list_to_atom(maps:get(name, Config)), @@ -57,7 +55,8 @@ bridge_spec(Config) -> -spec(bridges() -> [{node(), map()}]). bridges() -> - [{Name, emqx_bridge_worker:status(Name)} || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)]. + [{Name, emqx_bridge_worker:status(Name)} + || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)]. create_bridge(Config) -> supervisor:start_child(?MODULE, bridge_spec(Config)). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 630fb4443..11d9182d9 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -135,12 +135,12 @@ %% mountpoint: The topic mount point for messages sent to remote node/cluster %% `undefined', `<<>>' or `""' to disable %% forwards: Local topics to subscribe. -%% queue.batch_bytes_limit: Max number of bytes to collect in a batch for each +%% replayq.batch_bytes_limit: Max number of bytes to collect in a batch for each %% send call towards emqx_bridge_connect -%% queue.batch_count_limit: Max number of messages to collect in a batch for +%% replayq.batch_count_limit: Max number of messages to collect in a batch for %% each send call towards emqx_bridge_connect -%% queue.replayq_dir: Directory where replayq should persist messages -%% queue.replayq_seg_bytes: Size in bytes for each replayq segment file +%% replayq.dir: Directory where replayq should persist messages +%% replayq.seg_bytes: Size in bytes for each replayq segment file %% %% Find more connection specific configs in the callback modules %% of emqx_bridge_connect behaviour. @@ -208,7 +208,7 @@ init(Opts) -> ConnectOpts = maps:get(config, Opts), ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)), Forwards = maps:get(forwards, Opts, []), - Queue = open_replayq(maps:get(queue, Opts, #{})), + Queue = open_replayq(maps:get(replayq, Opts, #{})), State = init_opts(Opts), self() ! idle, {ok, idle, State#{connect_module => ConnectModule, @@ -236,8 +236,8 @@ init_opts(Opts) -> name => Name}. open_replayq(QCfg) -> - Dir = maps:get(replayq_dir, QCfg, undefined), - SegBytes = maps:get(replayq_seg_bytes, QCfg, ?DEFAULT_SEG_BYTES), + Dir = maps:get(dir, QCfg, undefined), + SegBytes = maps:get(seg_bytes, QCfg, ?DEFAULT_SEG_BYTES), MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE), QueueConfig = case Dir =:= undefined orelse Dir =:= "" of true -> #{mem_only => true}; diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl new file mode 100644 index 000000000..15137039b --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -0,0 +1,141 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_mqtt). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). + +%% callbacks of behaviour emqx_resource +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_health_check/2 + ]). + +-behaviour(hocon_schema). + +-export([ roots/0 + , fields/1]). + +%%===================================================================== +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, "config")}}]. + +fields("config") -> + [ {server, emqx_schema:t(string(), undefined, "127.0.0.1:1883")} + , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} + , {proto_ver, fun proto_ver/1} + , {bridge_mode, emqx_schema:t(boolean(), undefined, true)} + , {clientid, emqx_schema:t(string())} + , {username, emqx_schema:t(string())} + , {password, emqx_schema:t(string())} + , {clean_start, emqx_schema:t(boolean(), undefined, true)} + , {keepalive, emqx_schema:t(integer(), undefined, 300)} + , {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} + , {max_inflight, emqx_schema:t(integer(), undefined, 32)} + , {replayq, emqx_schema:t(hoconsc:ref(?MODULE, "replayq"))} + , {in, hoconsc:array("in")} + , {out, hoconsc:array("out")} + ] ++ emqx_connector_schema_lib:ssl_fields(); + +fields("in") -> + [ {from_remote_topic, #{type => binary(), nullable => false}} + , {to_local_topic, #{type => binary(), nullable => false}} + , {qos, emqx_schema:t(integer(), undefined, 1)} + , {payload_template, emqx_schema:t(binary(), undefined, <<"${message}">>)} + ]; + +fields("out") -> + [ {to_remote_topic, #{type => binary(), nullable => false}} + , {from_local_topic, #{type => binary(), nullable => false}} + , {payload_template, emqx_schema:t(binary(), undefined, <<"${payload}">>)} + ]; + +fields("replayq") -> + [ {dir, hoconsc:union([boolean(), string()])} + , {seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")} + , {offload_mode, emqx_schema:t(boolean(), undefined, false)} + , {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")} + ]. + +proto_ver(type) -> hoconsc:enum([v3, v4, v5]); +proto_ver(default) -> v4; +proto_ver(_) -> undefined. + +%% =================================================================== +on_start(InstId, #{server := Server, + reconnect_interval := ReconnIntv, + proto_ver := ProtoVer, + bridge_mode := BridgeMod, + clientid := ClientID, + username := User, + password := Password, + clean_start := CleanStart, + keepalive := KeepAlive, + retry_interval := RetryIntv, + max_inflight := MaxInflight, + replayq := ReplayQ, + in := In, + out := Out, + ssl := #{enable := EnableSsl} = Ssl} = Conf) -> + logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), + BridgeConf = Conf#{ + name => InstId, + config => #{ + conn_type => mqtt, + subscriptions => In, + forwards => Out, + replayq => ReplayQ, + %% connection opts + server => Server, + reconnect_interval => ReconnIntv, + proto_ver => ProtoVer, + bridge_mode => BridgeMod, + clientid => ClientID, + username => User, + password => Password, + clean_start => CleanStart, + keepalive => KeepAlive, + retry_interval => RetryIntv, + max_inflight => MaxInflight, + ssl => EnableSsl, + ssl_opts => maps:to_list(maps:remove(enable, Ssl)), + if_record_metrics => true + } + }, + case emqx_bridge_mqtt_sup:create_bridge(BridgeConf) of + {ok, _Pid} -> + {ok, #{}}; + {error, {already_started, _Pid}} -> + {ok, #{}}; + {error, Reason} -> + {error, Reason} + end. + +on_stop(InstId, #{}) -> + logger:info("stopping mqtt connector: ~p", [InstId]), + emqx_bridge_mqtt_sup:drop_bridge(InstId). + +%% TODO: let the emqx_resource trigger on_query/4 automatically according to the +%% `in` and `out` config +on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) -> + logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]); +on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) -> + logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]). + +on_health_check(InstId, #{}) -> + emqx_bridge_mqtt_sup:try_ping(InstId).