From d46241fe2f00f9e95bfd252d5d4fc9ebd7fcc3bf Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 10 Sep 2021 14:19:36 +0800 Subject: [PATCH] feat(bridges): avoid clientid competition between bridges on different nodes --- .../emqx_authz/src/emqx_authz_api_sources.erl | 6 ++-- .../test/emqx_authz_api_sources_SUITE.erl | 6 ++-- apps/emqx_bridge/etc/emqx_bridge.conf | 14 ++++---- .../src/emqx_connector_mqtt.erl | 35 +++++++++---------- .../src/mqtt/emqx_connector_mqtt_schema.erl | 9 +++-- .../src/emqx_plugin_libs_id.erl} | 2 +- .../test/emqx_plugin_libs_id_SUITE.erl} | 6 ++-- apps/emqx_resource/examples/demo.md | 2 +- .../emqx_rule_engine/src/emqx_rule_engine.erl | 2 +- .../src/emqx_rule_sqltester.erl | 4 +-- 10 files changed, 42 insertions(+), 44 deletions(-) rename apps/{emqx_rule_engine/src/emqx_rule_id.erl => emqx_plugin_libs/src/emqx_plugin_libs_id.erl} (98%) rename apps/{emqx_rule_engine/test/emqx_rule_id_SUITE.erl => emqx_plugin_libs/test/emqx_plugin_libs_id_SUITE.erl} (85%) diff --git a/apps/emqx_authz/src/emqx_authz_api_sources.erl b/apps/emqx_authz/src/emqx_authz_api_sources.erl index 209bbc01f..a84dea333 100644 --- a/apps/emqx_authz/src/emqx_authz_api_sources.erl +++ b/apps/emqx_authz/src/emqx_authz_api_sources.erl @@ -457,21 +457,21 @@ write_cert(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) -> CertPath = filename:join([emqx:get_config([node, data_dir]), "certs"]), CaCert = case maps:is_key(<<"cacertfile">>, SSL) of true -> - {ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_rule_id:gen() ++".pem"]), + {ok, CaCertFile} = write_file(filename:join([CertPath, "cacert-" ++ emqx_plugin_libs_id:gen() ++".pem"]), maps:get(<<"cacertfile">>, SSL)), CaCertFile; false -> "" end, Cert = case maps:is_key(<<"certfile">>, SSL) of true -> - {ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_rule_id:gen() ++".pem"]), + {ok, CertFile} = write_file(filename:join([CertPath, "cert-" ++ emqx_plugin_libs_id:gen() ++".pem"]), maps:get(<<"certfile">>, SSL)), CertFile; false -> "" end, Key = case maps:is_key(<<"keyfile">>, SSL) of true -> - {ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_rule_id:gen() ++".pem"]), + {ok, KeyFile} = write_file(filename:join([CertPath, "key-" ++ emqx_plugin_libs_id:gen() ++".pem"]), maps:get(<<"keyfile">>, SSL)), KeyFile; false -> "" diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index 8c37189c9..afe0f19bf 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -151,8 +151,8 @@ set_special_configs(_App) -> ok. init_per_testcase(t_api, Config) -> - meck:new(emqx_rule_id, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_rule_id, gen, fun() -> "fake" end), + meck:new(emqx_plugin_libs_id, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_plugin_libs_id, gen, fun() -> "fake" end), meck:new(emqx, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx, get_config, fun([node, data_dir]) -> @@ -165,7 +165,7 @@ init_per_testcase(t_api, Config) -> init_per_testcase(_, Config) -> Config. end_per_testcase(t_api, _Config) -> - meck:unload(emqx_rule_id), + meck:unload(emqx_plugin_libs_id), meck:unload(emqx), ok; end_per_testcase(_, _Config) -> ok. diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index 08873228d..bfba34e7c 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -2,11 +2,9 @@ ## EMQ X Bridge ##-------------------------------------------------------------------- -#bridges.mqtt.my_mqtt_bridge { +#bridges.mqtt.my_mqtt_bridge_to_aws { # server = "127.0.0.1:1883" # proto_ver = "v4" -# ## the clientid will be the concatenation of `clientid_prefix` and ids in `in` and `out`. -# clientid_prefix = "bridge_client:" # username = "username1" # password = "" # clean_start = true @@ -27,8 +25,9 @@ # certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" # cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" # } -# ## we will create one MQTT connection for each element of the `in` -# in: [{ +# ## we will create one MQTT connection for each element of the `message_in` +# message_in: [{ +# ## the `id` will be used as part of the clientid # id = "pull_msgs_from_aws" # subscribe_remote_topic = "aws/#" # subscribe_qos = 1 @@ -37,8 +36,9 @@ # qos = "${qos}" # retain = "${retain}" # }] -# ## we will create one MQTT connection for each element of the `out` -# out: [{ +# ## we will create one MQTT connection for each element of the `message_out` +# message_out: [{ +# ## the `id` will be used as part of the clientid # id = "push_msgs_to_aws" # subscribe_local_topic = "emqx/#" # remote_topic = "from_emqx/${topic}" diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 6631fd23a..708bcdeb9 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -89,7 +89,8 @@ on_start(InstId, Conf) -> NamePrefix = binary_to_list(InstId), BasicConf = basic_config(Conf), InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, sub_bridges => []}}, - InOutConfigs = check_channel_id_dup(maps:get(in, Conf, []) ++ maps:get(out, Conf, [])), + InOutConfigs = check_channel_id_dup(maps:get(message_in, Conf, []) + ++ maps:get(message_out, Conf, [])), lists:foldl(fun (_InOutConf, {error, Reason}) -> {error, Reason}; @@ -110,7 +111,7 @@ on_stop(InstId, #{}) -> end. %% TODO: let the emqx_resource trigger on_query/4 automatically according to the -%% `in` and `out` config +%% `message_in` and `message_out` config on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, baisc_conf := BasicConf}) -> logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]), @@ -136,19 +137,19 @@ check_channel_id_dup(Confs) -> end, Confs), Confs. -%% this is an `in` bridge -create_channel(#{subscribe_remote_topic := _, id := BridgeId} = InConf, NamePrefix, - #{clientid_prefix := ClientPrefix} = BasicConf) -> - logger:info("creating 'in' channel for: ~p", [BridgeId]), - create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId), - clientid => clientid(ClientPrefix, BridgeId), +%% this is an `message_in` bridge +create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) -> + logger:info("creating 'message_in' channel for: ~p", [Id]), + create_sub_bridge(BasicConf#{ + name => bridge_name(NamePrefix, Id), + clientid => clientid(Id), subscriptions => InConf, forwards => undefined}); -%% this is an `out` bridge -create_channel(#{subscribe_local_topic := _, id := BridgeId} = OutConf, NamePrefix, - #{clientid_prefix := ClientPrefix} = BasicConf) -> - logger:info("creating 'out' channel for: ~p", [BridgeId]), - create_sub_bridge(BasicConf#{name => bridge_name(NamePrefix, BridgeId), - clientid => clientid(ClientPrefix, BridgeId), +%% this is an `message_out` bridge +create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) -> + logger:info("creating 'message_out' channel for: ~p", [Id]), + create_sub_bridge(BasicConf#{ + name => bridge_name(NamePrefix, Id), + clientid => clientid(Id), subscriptions => undefined, forwards => OutConf}). create_sub_bridge(#{name := Name} = Conf) -> @@ -172,7 +173,6 @@ basic_config(#{ reconnect_interval := ReconnIntv, proto_ver := ProtoVer, bridge_mode := BridgeMod, - clientid_prefix := ClientIdPrefix, username := User, password := Password, clean_start := CleanStart, @@ -188,7 +188,6 @@ basic_config(#{ reconnect_interval => ReconnIntv, proto_ver => ProtoVer, bridge_mode => BridgeMod, - clientid_prefix => ClientIdPrefix, username => User, password => Password, clean_start => CleanStart, @@ -203,8 +202,8 @@ basic_config(#{ bridge_name(Prefix, Id) -> list_to_atom(str(Prefix) ++ ":" ++ str(Id)). -clientid(Prefix, Id) -> - list_to_binary(str(Prefix) ++ str(Id)). +clientid(Id) -> + list_to_binary(str(Id) ++ ":" ++ emqx_plugin_libs_id:gen(4)). str(A) when is_atom(A) -> atom_to_list(A); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index ed7fd4408..184a8610c 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -31,7 +31,6 @@ fields("config") -> , {reconnect_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})} , {proto_ver, fun proto_ver/1} , {bridge_mode, hoconsc:mk(boolean(), #{default => true})} - , {clientid_prefix, hoconsc:mk(string(), #{default => ""})} , {username, hoconsc:mk(string())} , {password, hoconsc:mk(string())} , {clean_start, hoconsc:mk(boolean(), #{default => true})} @@ -39,17 +38,17 @@ fields("config") -> , {retry_interval, hoconsc:mk(emqx_schema:duration_ms(), #{default => "30s"})} , {max_inflight, hoconsc:mk(integer(), #{default => 32})} , {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))} - , {in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "in")), #{default => []})} - , {out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "out")), #{default => []})} + , {message_in, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_in")), #{default => []})} + , {message_out, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, "message_out")), #{default => []})} ] ++ emqx_connector_schema_lib:ssl_fields(); -fields("in") -> +fields("message_in") -> [ {subscribe_remote_topic, #{type => binary(), nullable => false}} , {local_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})} , {subscribe_qos, hoconsc:mk(qos(), #{default => 1})} ] ++ common_inout_confs(); -fields("out") -> +fields("message_out") -> [ {subscribe_local_topic, #{type => binary(), nullable => false}} , {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})} ] ++ common_inout_confs(); diff --git a/apps/emqx_rule_engine/src/emqx_rule_id.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_id.erl similarity index 98% rename from apps/emqx_rule_engine/src/emqx_rule_id.erl rename to apps/emqx_plugin_libs/src/emqx_plugin_libs_id.erl index 3f9c1fc6d..ef5fb08c3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_id.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_id.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_rule_id). +-module(emqx_plugin_libs_id). -export([gen/0, gen/1]). diff --git a/apps/emqx_rule_engine/test/emqx_rule_id_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_id_SUITE.erl similarity index 85% rename from apps/emqx_rule_engine/test/emqx_rule_id_SUITE.erl rename to apps/emqx_plugin_libs/test/emqx_plugin_libs_id_SUITE.erl index a1e1c4f8c..d13144ed3 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_id_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_id_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_rule_id_SUITE). +-module(emqx_plugin_libs_id_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -24,5 +24,5 @@ all() -> emqx_ct:all(?MODULE). t_gen(_) -> - ?assertEqual(10, length(emqx_rule_id:gen(10))), - ?assertEqual(20, length(emqx_rule_id:gen(20))). + ?assertEqual(10, length(emqx_plugin_libs_id:gen(10))), + ?assertEqual(20, length(emqx_plugin_libs_id:gen(20))). diff --git a/apps/emqx_resource/examples/demo.md b/apps/emqx_resource/examples/demo.md index 52dd79ee8..c5d3bb52c 100644 --- a/apps/emqx_resource/examples/demo.md +++ b/apps/emqx_resource/examples/demo.md @@ -26,7 +26,7 @@ It is intended to be used by the emqx_bridges and all other resources that need # The Demo -The data_bridge for mysql +The bridge for mysql --- ## The callback module 'emqx_mysql_connector' diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 5095bc5b1..24cfecb03 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -507,7 +507,7 @@ rule_id() -> gen_id("rule:", fun emqx_rule_registry:get_rule/1). gen_id(Prefix, TestFun) -> - Id = iolist_to_binary([Prefix, emqx_rule_id:gen()]), + Id = iolist_to_binary([Prefix, emqx_plugin_libs_id:gen()]), case TestFun(Id) of not_found -> Id; _Res -> gen_id(Prefix, TestFun) diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 1ebc808eb..3076f414c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -48,8 +48,8 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) -> end. test_rule(Sql, Select, Context, EventTopics) -> - RuleId = iolist_to_binary(["test_rule", emqx_rule_id:gen()]), - ActInstId = iolist_to_binary(["test_action", emqx_rule_id:gen()]), + RuleId = iolist_to_binary(["test_rule", emqx_plugin_libs_id:gen()]), + ActInstId = iolist_to_binary(["test_action", emqx_plugin_libs_id:gen()]), ok = emqx_rule_metrics:create_rule_metrics(RuleId), ok = emqx_rule_metrics:create_metrics(ActInstId), Rule = #rule{