diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index fc050f4b8..e8af40341 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -25,25 +25,23 @@ # certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" # cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" # } -# ## we will create one MQTT connection for each element of the `ingress_channels` -# ingress_channels: [{ -# ## the `id` will be used as part of the clientid -# id = "pull_msgs_from_aws" +# ## We will create one MQTT connection for each element of the `ingress_channels` +# ## Syntax: ingress_channels. +# ingress_channels.pull_msgs_from_aws { # subscribe_remote_topic = "aws/#" # subscribe_qos = 1 # local_topic = "from_aws/${topic}" # payload = "${payload}" # qos = "${qos}" # retain = "${retain}" -# }] -# ## we will create one MQTT connection for each element of the `egress_channels` -# egress_channels: [{ -# ## the `id` will be used as part of the clientid -# id = "push_msgs_to_aws" +# } +# ## We will create one MQTT connection for each element of the `egress_channels` +# ## Syntax: egress_channels. +# egress_channels.push_msgs_to_aws { # subscribe_local_topic = "emqx/#" # remote_topic = "from_emqx/${topic}" # payload = "${payload}" # qos = 1 # retain = false -# }] +# } #} diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 94cdaa30b..87eb40372 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -5,9 +5,9 @@ %%====================================================================================== %% Hocon Schema Definitions -roots() -> ["bridges"]. +roots() -> [bridges]. -fields("bridges") -> +fields(bridges) -> [{mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}]; fields("mqtt_bridge") -> diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 125b5cbe4..36ffd5706 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -88,20 +88,20 @@ on_start(InstId, Conf) -> logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), NamePrefix = binary_to_list(InstId), BasicConf = basic_config(Conf), - InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}}, - InOutConfigs = check_channel_id_dup(maps:get(ingress_channels, Conf, []) - ++ maps:get(egress_channels, Conf, [])), + InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, + InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{})) + ++ taged_map_list(egress_channels, maps:get(egress_channels, Conf, #{})), lists:foldl(fun (_InOutConf, {error, Reason}) -> {error, Reason}; - (InOutConf, {ok, #{sub_bridges := SubBridges} = Res}) -> + (InOutConf, {ok, #{channels := SubBridges} = Res}) -> case create_channel(InOutConf, NamePrefix, BasicConf) of {error, Reason} -> {error, Reason}; - {ok, Name} -> {ok, Res#{sub_bridges => [Name | SubBridges]}} + {ok, Name} -> {ok, Res#{channels => [Name | SubBridges]}} end end, InitRes, InOutConfigs). -on_stop(InstId, #{sub_bridges := NameList}) -> +on_stop(InstId, #{channels := NameList}) -> logger:info("stopping mqtt connector: ~p", [InstId]), lists:foreach(fun(Name) -> remove_channel(Name) @@ -117,63 +117,53 @@ on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) -> on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) -> logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]). -on_health_check(_InstId, #{sub_bridges := NameList} = State) -> +on_health_check(_InstId, #{channels := NameList} = State) -> Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList], case lists:all(fun({_, pong}) -> true; ({_, _}) -> false end, Results) of true -> {ok, State}; - false -> {error, {some_sub_bridge_down, Results}, State} + false -> {error, {some_channel_down, Results}, State} end. -check_channel_id_dup(Confs) -> - lists:foreach(fun(#{id := Id}) -> - case length([Id || #{id := Id0} <- Confs, Id0 == Id]) of - 1 -> ok; - L when L > 1 -> error({mqtt_bridge_conf, {duplicate_id_found, Id}}) - end - end, Confs), - Confs. - -%% this is an `ingress_channels` bridge -create_channel(#{subscribe_remote_topic := RemoteT, local_topic := LocalT, id := Id} = InConf, - NamePrefix, BasicConf) -> - Name = bridge_name(NamePrefix, Id), +create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT, + local_topic := LocalT} = Conf}, NamePrefix, BasicConf) -> + Name = ingress_channel_name(NamePrefix, Id), logger:info("creating ingress channel ~p, remote ~s -> local ~s", [Name, RemoteT, LocalT]), - create_sub_bridge(BasicConf#{ + do_create_channel(BasicConf#{ name => Name, - clientid => clientid(Id), - subscriptions => InConf, forwards => undefined}); -%% this is an `egress_channels` bridge -create_channel(#{subscribe_local_topic := LocalT, remote_topic := RemoteT, id := Id} = OutConf, - NamePrefix, BasicConf) -> - Name = bridge_name(NamePrefix, Id), + clientid => clientid(Name), + subscriptions => Conf, forwards => undefined}); + +create_channel({{egress_channels, Id}, #{subscribe_local_topic := LocalT, + remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) -> + Name = egress_channel_name(NamePrefix, Id), logger:info("creating egress channel ~p, local ~s -> remote ~s", [Name, LocalT, RemoteT]), - create_sub_bridge(BasicConf#{ - name => bridge_name(NamePrefix, Id), - clientid => clientid(Id), - subscriptions => undefined, forwards => OutConf}). + do_create_channel(BasicConf#{ + name => Name, + clientid => clientid(Name), + subscriptions => undefined, forwards => Conf}). -remove_channel(BridgeName) -> - logger:info("removing channel ~p", [BridgeName]), - case ?MODULE:drop_bridge(BridgeName) of +remove_channel(ChannelName) -> + logger:info("removing channel ~p", [ChannelName]), + case ?MODULE:drop_bridge(ChannelName) of ok -> ok; {error, not_found} -> ok; {error, Reason} -> - logger:error("stop channel ~p failed, error: ~p", [BridgeName, Reason]) + logger:error("stop channel ~p failed, error: ~p", [ChannelName, Reason]) end. -create_sub_bridge(#{name := Name} = Conf) -> +do_create_channel(#{name := Name} = Conf) -> case ?MODULE:create_bridge(Conf) of {ok, _Pid} -> - start_sub_bridge(Name); + start_channel(Name); {error, {already_started, _Pid}} -> {ok, Name}; {error, Reason} -> {error, Reason} end. -start_sub_bridge(Name) -> +start_channel(Name) -> case emqx_connector_mqtt_worker:ensure_started(Name) of ok -> {ok, Name}; {error, Reason} -> {error, Reason} @@ -210,11 +200,19 @@ basic_config(#{ if_record_metrics => true }. -bridge_name(Prefix, Id) -> - list_to_atom(str(Prefix) ++ ":" ++ str(Id)). +taged_map_list(Tag, Map) -> + [{{Tag, K}, V} || {K, V} <- maps:to_list(Map)]. + +ingress_channel_name(Prefix, Id) -> + channel_name("ingress_channels", Prefix, Id). +egress_channel_name(Prefix, Id) -> + channel_name("egress_channels", Prefix, Id). + +channel_name(Type, Prefix, Id) -> + list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)). clientid(Id) -> - list_to_binary(str(Id) ++ ":" ++ emqx_misc:gen_id(16)). + list_to_binary(str(Id) ++ ":" ++ emqx_misc:gen_id(8)). str(A) when is_atom(A) -> atom_to_list(A); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 28995b666..89fe5f581 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -38,8 +38,8 @@ fields("config") -> , {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})} , {max_inflight, hoconsc:mk(integer(), #{default => 32})} , {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))} - , {ingress_channels, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})} - , {egress_channels, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "egress_channels")), #{default => []})} + , {ingress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})} + , {egress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "egress_channels")), #{default => []})} ] ++ emqx_connector_schema_lib:ssl_fields(); fields("ingress_channels") -> @@ -61,9 +61,6 @@ fields("replayq") -> ]. common_inout_confs() -> - [{id, #{type => binary(), nullable => false}}] ++ publish_confs(). - -publish_confs() -> [ {qos, hoconsc:mk(qos(), #{default => <<"${qos}">>})} , {retain, hoconsc:mk(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})} , {payload, hoconsc:mk(binary(), #{default => <<"${payload}">>})}