refactor(bridge): change confs for channel from array to map

This commit is contained in:
Shawn 2021-09-17 13:46:25 +08:00
parent ab2cdfeab1
commit 502f962b4c
4 changed files with 52 additions and 59 deletions

View File

@ -25,25 +25,23 @@
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# ## we will create one MQTT connection for each element of the `ingress_channels`
# ingress_channels: [{
# ## the `id` will be used as part of the clientid
# id = "pull_msgs_from_aws"
# ## We will create one MQTT connection for each element of the `ingress_channels`
# ## Syntax: ingress_channels.<id>
# ingress_channels.pull_msgs_from_aws {
# subscribe_remote_topic = "aws/#"
# subscribe_qos = 1
# local_topic = "from_aws/${topic}"
# payload = "${payload}"
# qos = "${qos}"
# retain = "${retain}"
# }]
# ## we will create one MQTT connection for each element of the `egress_channels`
# egress_channels: [{
# ## the `id` will be used as part of the clientid
# id = "push_msgs_to_aws"
# }
# ## We will create one MQTT connection for each element of the `egress_channels`
# ## Syntax: egress_channels.<id>
# egress_channels.push_msgs_to_aws {
# subscribe_local_topic = "emqx/#"
# remote_topic = "from_emqx/${topic}"
# payload = "${payload}"
# qos = 1
# retain = false
# }]
# }
#}

View File

@ -5,9 +5,9 @@
%%======================================================================================
%% Hocon Schema Definitions
roots() -> ["bridges"].
roots() -> [bridges].
fields("bridges") ->
fields(bridges) ->
[{mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}];
fields("mqtt_bridge") ->

View File

@ -88,20 +88,20 @@ on_start(InstId, Conf) ->
logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]),
NamePrefix = binary_to_list(InstId),
BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}},
InOutConfigs = check_channel_id_dup(maps:get(ingress_channels, Conf, [])
++ maps:get(egress_channels, Conf, [])),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}},
InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{}))
++ taged_map_list(egress_channels, maps:get(egress_channels, Conf, #{})),
lists:foldl(fun
(_InOutConf, {error, Reason}) ->
{error, Reason};
(InOutConf, {ok, #{sub_bridges := SubBridges} = Res}) ->
(InOutConf, {ok, #{channels := SubBridges} = Res}) ->
case create_channel(InOutConf, NamePrefix, BasicConf) of
{error, Reason} -> {error, Reason};
{ok, Name} -> {ok, Res#{sub_bridges => [Name | SubBridges]}}
{ok, Name} -> {ok, Res#{channels => [Name | SubBridges]}}
end
end, InitRes, InOutConfigs).
on_stop(InstId, #{sub_bridges := NameList}) ->
on_stop(InstId, #{channels := NameList}) ->
logger:info("stopping mqtt connector: ~p", [InstId]),
lists:foreach(fun(Name) ->
remove_channel(Name)
@ -117,63 +117,53 @@ on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) ->
on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) ->
logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]).
on_health_check(_InstId, #{sub_bridges := NameList} = State) ->
on_health_check(_InstId, #{channels := NameList} = State) ->
Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList],
case lists:all(fun({_, pong}) -> true; ({_, _}) -> false end, Results) of
true -> {ok, State};
false -> {error, {some_sub_bridge_down, Results}, State}
false -> {error, {some_channel_down, Results}, State}
end.
check_channel_id_dup(Confs) ->
lists:foreach(fun(#{id := Id}) ->
case length([Id || #{id := Id0} <- Confs, Id0 == Id]) of
1 -> ok;
L when L > 1 -> error({mqtt_bridge_conf, {duplicate_id_found, Id}})
end
end, Confs),
Confs.
%% this is an `ingress_channels` bridge
create_channel(#{subscribe_remote_topic := RemoteT, local_topic := LocalT, id := Id} = InConf,
NamePrefix, BasicConf) ->
Name = bridge_name(NamePrefix, Id),
create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT,
local_topic := LocalT} = Conf}, NamePrefix, BasicConf) ->
Name = ingress_channel_name(NamePrefix, Id),
logger:info("creating ingress channel ~p, remote ~s -> local ~s",
[Name, RemoteT, LocalT]),
create_sub_bridge(BasicConf#{
do_create_channel(BasicConf#{
name => Name,
clientid => clientid(Id),
subscriptions => InConf, forwards => undefined});
%% this is an `egress_channels` bridge
create_channel(#{subscribe_local_topic := LocalT, remote_topic := RemoteT, id := Id} = OutConf,
NamePrefix, BasicConf) ->
Name = bridge_name(NamePrefix, Id),
clientid => clientid(Name),
subscriptions => Conf, forwards => undefined});
create_channel({{egress_channels, Id}, #{subscribe_local_topic := LocalT,
remote_topic := RemoteT} = Conf}, NamePrefix, BasicConf) ->
Name = egress_channel_name(NamePrefix, Id),
logger:info("creating egress channel ~p, local ~s -> remote ~s",
[Name, LocalT, RemoteT]),
create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id),
clientid => clientid(Id),
subscriptions => undefined, forwards => OutConf}).
do_create_channel(BasicConf#{
name => Name,
clientid => clientid(Name),
subscriptions => undefined, forwards => Conf}).
remove_channel(BridgeName) ->
logger:info("removing channel ~p", [BridgeName]),
case ?MODULE:drop_bridge(BridgeName) of
remove_channel(ChannelName) ->
logger:info("removing channel ~p", [ChannelName]),
case ?MODULE:drop_bridge(ChannelName) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
logger:error("stop channel ~p failed, error: ~p", [BridgeName, Reason])
logger:error("stop channel ~p failed, error: ~p", [ChannelName, Reason])
end.
create_sub_bridge(#{name := Name} = Conf) ->
do_create_channel(#{name := Name} = Conf) ->
case ?MODULE:create_bridge(Conf) of
{ok, _Pid} ->
start_sub_bridge(Name);
start_channel(Name);
{error, {already_started, _Pid}} ->
{ok, Name};
{error, Reason} ->
{error, Reason}
end.
start_sub_bridge(Name) ->
start_channel(Name) ->
case emqx_connector_mqtt_worker:ensure_started(Name) of
ok -> {ok, Name};
{error, Reason} -> {error, Reason}
@ -210,11 +200,19 @@ basic_config(#{
if_record_metrics => true
}.
bridge_name(Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ str(Id)).
taged_map_list(Tag, Map) ->
[{{Tag, K}, V} || {K, V} <- maps:to_list(Map)].
ingress_channel_name(Prefix, Id) ->
channel_name("ingress_channels", Prefix, Id).
egress_channel_name(Prefix, Id) ->
channel_name("egress_channels", Prefix, Id).
channel_name(Type, Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)).
clientid(Id) ->
list_to_binary(str(Id) ++ ":" ++ emqx_misc:gen_id(16)).
list_to_binary(str(Id) ++ ":" ++ emqx_misc:gen_id(8)).
str(A) when is_atom(A) ->
atom_to_list(A);

View File

@ -38,8 +38,8 @@ fields("config") ->
, {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
, {ingress_channels, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})}
, {egress_channels, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "egress_channels")), #{default => []})}
, {ingress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})}
, {egress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "egress_channels")), #{default => []})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress_channels") ->
@ -61,9 +61,6 @@ fields("replayq") ->
].
common_inout_confs() ->
[{id, #{type => binary(), nullable => false}}] ++ publish_confs().
publish_confs() ->
[ {qos, hoconsc:mk(qos(), #{default => <<"${qos}">>})}
, {retain, hoconsc:mk(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})}
, {payload, hoconsc:mk(binary(), #{default => <<"${payload}">>})}