feat(bridges): avoid clientid competition between bridges on different nodes

This commit is contained in:
Shawn 2021-09-10 14:19:36 +08:00
parent 07069898b1
commit d46241fe2f
10 changed files with 42 additions and 44 deletions

View File

@ -457,21 +457,21 @@ write_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
CertPath = filename:join([emqx:get_config([node, data_dir]), "certs"]),
CaCert = case maps:is_key(<<"cacertfile">>, SSL) of
true ->
{ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_rule_id:gen() ++".pem"]),
{ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"cacertfile">>, SSL)),
CaCertFile;
false -> ""
end,
Cert = case maps:is_key(<<"certfile">>, SSL) of
true ->
{ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_rule_id:gen() ++".pem"]),
{ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"certfile">>, SSL)),
CertFile;
false -> ""
end,
Key = case maps:is_key(<<"keyfile">>, SSL) of
true ->
{ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_rule_id:gen() ++".pem"]),
{ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_plugin_libs_id:gen() ++".pem"]),
maps:get(<<"keyfile">>, SSL)),
KeyFile;
false -> ""

View File

@ -151,8 +151,8 @@ set_special_configs(_App) ->
ok.
init_per_testcase(t_api, Config) ->
meck:new(emqx_rule_id, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_rule_id, gen, fun() -> "fake" end),
meck:new(emqx_plugin_libs_id, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_plugin_libs_id, gen, fun() -> "fake" end),
meck:new(emqx, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx, get_config, fun([node, data_dir]) ->
@ -165,7 +165,7 @@ init_per_testcase(t_api, Config) ->
init_per_testcase(_, Config) -> Config.
end_per_testcase(t_api, _Config) ->
meck:unload(emqx_rule_id),
meck:unload(emqx_plugin_libs_id),
meck:unload(emqx),
ok;
end_per_testcase(_, _Config) -> ok.

View File

@ -2,11 +2,9 @@
## EMQ X Bridge
##--------------------------------------------------------------------
#bridges.mqtt.my_mqtt_bridge {
#bridges.mqtt.my_mqtt_bridge_to_aws {
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# ## the clientid will be the concatenation of `clientid_prefix` and ids in `in` and `out`.
# clientid_prefix = "bridge_client:"
# username = "username1"
# password = ""
# clean_start = true
@ -27,8 +25,9 @@
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# ## we will create one MQTT connection for each element of the `in`
# in: [{
# ## we will create one MQTT connection for each element of the `message_in`
# message_in: [{
# ## the `id` will be used as part of the clientid
# id = "pull_msgs_from_aws"
# subscribe_remote_topic = "aws/#"
# subscribe_qos = 1
@ -37,8 +36,9 @@
# qos = "${qos}"
# retain = "${retain}"
# }]
# ## we will create one MQTT connection for each element of the `out`
# out: [{
# ## we will create one MQTT connection for each element of the `message_out`
# message_out: [{
# ## the `id` will be used as part of the clientid
# id = "push_msgs_to_aws"
# subscribe_local_topic = "emqx/#"
# remote_topic = "from_emqx/${topic}"

View File

@ -89,7 +89,8 @@ on_start(InstId, Conf) ->
NamePrefix = binary_to_list(InstId),
BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}},
InOutConfigs = check_channel_id_dup(maps:get(in, Conf, []) ++ maps:get(out, Conf, [])),
InOutConfigs = check_channel_id_dup(maps:get(message_in, Conf, [])
++ maps:get(message_out, Conf, [])),
lists:foldl(fun
(_InOutConf, {error, Reason}) ->
{error, Reason};
@ -110,7 +111,7 @@ on_stop(InstId, #{}) ->
end.
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
%% `in` and `out` config
%% `message_in` and `message_out` config
on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) ->
logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]),
@ -136,19 +137,19 @@ check_channel_id_dup(Confs) ->
end, Confs),
Confs.
%% this is an `in` bridge
create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix,
#{clientid_prefix := ClientPrefix} = BasicConf) ->
logger:info("creating 'in' channel for: ~p", [BridgeId]),
create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
clientid => clientid(ClientPrefix, BridgeId),
%% this is an `message_in` bridge
create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) ->
logger:info("creating 'message_in' channel for: ~p", [Id]),
create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id),
clientid => clientid(Id),
subscriptions => InConf, forwards => undefined});
%% this is an `out` bridge
create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix,
#{clientid_prefix := ClientPrefix} = BasicConf) ->
logger:info("creating 'out' channel for: ~p", [BridgeId]),
create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId),
clientid => clientid(ClientPrefix, BridgeId),
%% this is an `message_out` bridge
create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) ->
logger:info("creating 'message_out' channel for: ~p", [Id]),
create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id),
clientid => clientid(Id),
subscriptions => undefined, forwards => OutConf}).
create_sub_bridge(#{name := Name} = Conf) ->
@ -172,7 +173,6 @@ basic_config(#{
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
bridge_mode := BridgeMod,
clientid_prefix := ClientIdPrefix,
username := User,
password := Password,
clean_start := CleanStart,
@ -188,7 +188,6 @@ basic_config(#{
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
bridge_mode => BridgeMod,
clientid_prefix => ClientIdPrefix,
username => User,
password => Password,
clean_start => CleanStart,
@ -203,8 +202,8 @@ basic_config(#{
bridge_name(Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ str(Id)).
clientid(Prefix, Id) ->
list_to_binary(str(Prefix) ++ str(Id)).
clientid(Id) ->
list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(4)).
str(A) when is_atom(A) ->
atom_to_list(A);

View File

@ -31,7 +31,6 @@ fields("config") ->
, {reconnect_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, hoconsc:mk(boolean(), #{default => true})}
, {clientid_prefix, hoconsc:mk(string(), #{default => ""})}
, {username, hoconsc:mk(string())}
, {password, hoconsc:mk(string())}
, {clean_start, hoconsc:mk(boolean(), #{default => true})}
@ -39,17 +38,17 @@ fields("config") ->
, {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})}
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
, {in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "in")), #{default => []})}
, {out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "out")), #{default => []})}
, {message_in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_in")), #{default => []})}
, {message_out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_out")), #{default => []})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("in") ->
fields("message_in") ->
[ {subscribe_remote_topic, #{type => binary(), nullable => false}}
, {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
, {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
] ++ common_inout_confs();
fields("out") ->
fields("message_out") ->
[ {subscribe_local_topic, #{type => binary(), nullable => false}}
, {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
] ++ common_inout_confs();

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_id).
-module(emqx_plugin_libs_id).
-export([gen/0, gen/1]).

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_id_SUITE).
-module(emqx_plugin_libs_id_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -24,5 +24,5 @@
all() -> emqx_ct:all(?MODULE).
t_gen(_) ->
?assertEqual(10, length(emqx_rule_id:gen(10))),
?assertEqual(20, length(emqx_rule_id:gen(20))).
?assertEqual(10, length(emqx_plugin_libs_id:gen(10))),
?assertEqual(20, length(emqx_plugin_libs_id:gen(20))).

View File

@ -26,7 +26,7 @@ It is intended to be used by the emqx_bridges and all other resources that need
# The Demo
The data_bridge for mysql
The bridge for mysql
---
## The callback module 'emqx_mysql_connector'

View File

@ -507,7 +507,7 @@ rule_id() ->
gen_id("rule:", fun emqx_rule_registry:get_rule/1).
gen_id(Prefix, TestFun) ->
Id = iolist_to_binary([Prefix, emqx_rule_id:gen()]),
Id = iolist_to_binary([Prefix, emqx_plugin_libs_id:gen()]),
case TestFun(Id) of
not_found -> Id;
_Res -> gen_id(Prefix, TestFun)

View File

@ -48,8 +48,8 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) ->
end.
test_rule(Sql, Select, Context, EventTopics) ->
RuleId = iolist_to_binary(["test_rule", emqx_rule_id:gen()]),
ActInstId = iolist_to_binary(["test_action", emqx_rule_id:gen()]),
RuleId = iolist_to_binary(["test_rule", emqx_plugin_libs_id:gen()]),
ActInstId = iolist_to_binary(["test_action", emqx_plugin_libs_id:gen()]),
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
ok = emqx_rule_metrics:create_metrics(ActInstId),
Rule = #rule{