Merge pull request #8773 from terry-xiaoyu/remove_connector_config

Remove connector config
This commit is contained in:
Xinyu Liu 2022-08-23 09:26:10 +08:00 committed by GitHub
commit 91a9e5535c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 922 additions and 2334 deletions

View File

@ -4,7 +4,7 @@
{vsn, "0.1.4"},
{modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},
{applications, [kernel, stdlib, emqx_resource, emqx_connector, ehttpc, epgsql, mysql, jose]},
{mod, {emqx_authn_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -50,7 +50,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
Config;
false ->
{skip, no_mongo}
@ -61,7 +61,7 @@ end_per_suite(_Config) ->
[authentication],
?GLOBAL
),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -46,7 +46,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
Config;
false ->
{skip, no_mongo}
@ -57,7 +57,7 @@ end_per_suite(_Config) ->
[authentication],
?GLOBAL
),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -58,7 +58,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
{ok, _} = emqx_resource:create_local(
?MYSQL_RESOURCE,
?RESOURCE_GROUP,
@ -77,7 +77,7 @@ end_per_suite(_Config) ->
?GLOBAL
),
ok = emqx_resource:remove_local(?MYSQL_RESOURCE),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -49,7 +49,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
Config;
false ->
{skip, no_mysql_tls}
@ -60,7 +60,7 @@ end_per_suite(_Config) ->
[authentication],
?GLOBAL
),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -59,7 +59,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
{ok, _} = emqx_resource:create_local(
?PGSQL_RESOURCE,
?RESOURCE_GROUP,
@ -78,7 +78,7 @@ end_per_suite(_Config) ->
?GLOBAL
),
ok = emqx_resource:remove_local(?PGSQL_RESOURCE),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -49,7 +49,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
Config;
false ->
{skip, no_pgsql_tls}
@ -60,7 +60,7 @@ end_per_suite(_Config) ->
[authentication],
?GLOBAL
),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -58,7 +58,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
{ok, _} = emqx_resource:create_local(
?REDIS_RESOURCE,
?RESOURCE_GROUP,
@ -77,7 +77,7 @@ end_per_suite(_Config) ->
?GLOBAL
),
ok = emqx_resource:remove_local(?REDIS_RESOURCE),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -49,7 +49,7 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_TLS_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_authn]),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
Config;
false ->
{skip, no_redis}
@ -60,7 +60,7 @@ end_per_suite(_Config) ->
[authentication],
?GLOBAL
),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authn]).
%%------------------------------------------------------------------------------

View File

@ -8,6 +8,7 @@
kernel,
stdlib,
crypto,
emqx_resource,
emqx_connector
]},
{env, []},

View File

@ -42,7 +42,7 @@ init_per_suite(Config) ->
),
ok = emqx_common_test_helpers:start_apps(
[emqx_connector, emqx_conf, emqx_authz],
[emqx_conf, emqx_authz],
fun set_special_configs/1
),
Config.
@ -56,8 +56,7 @@ end_per_suite(_Config) ->
<<"sources">> => []
}
),
ok = stop_apps([emqx_resource]),
emqx_common_test_helpers:stop_apps([emqx_connector, emqx_authz, emqx_conf]),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -47,7 +47,6 @@ end_per_suite(_Config) ->
<<"sources">> => []
}
),
ok = stop_apps([emqx_resource, emqx_connector]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf, emqx_management]),
ok.

View File

@ -45,7 +45,7 @@ end_per_suite(_Config) ->
<<"sources">> => []
}
),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
ok.

View File

@ -103,7 +103,7 @@ groups() ->
[].
init_per_suite(Config) ->
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, health_check, fun(St) -> {ok, St} end),
@ -120,7 +120,7 @@ init_per_suite(Config) ->
[emqx_conf, emqx_authz, emqx_dashboard],
fun set_special_configs/1
),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
Config.
end_per_suite(_Config) ->
@ -134,7 +134,7 @@ end_per_suite(_Config) ->
),
%% resource and connector should be stop first,
%% or authz_[mysql|pgsql|redis..]_SUITE would be failed
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.

View File

@ -55,7 +55,6 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
init_per_testcase(_TestCase, Config) ->

View File

@ -39,17 +39,17 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = stop_apps([emqx_resource, emqx_connector, cowboy]),
ok = stop_apps([emqx_resource, cowboy]),
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
fun set_special_configs/1
),
ok = start_apps([emqx_resource, emqx_connector, cowboy]),
ok = start_apps([emqx_resource, cowboy]),
Config.
end_per_suite(_Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
ok = stop_apps([emqx_resource, emqx_connector, cowboy]),
ok = stop_apps([emqx_resource, cowboy]),
ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
set_special_configs(emqx_authz) ->

View File

@ -34,14 +34,14 @@ groups() ->
[].
init_per_suite(Config) ->
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
fun set_special_configs/1
),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
Config;
false ->
{skip, no_mongo}
@ -49,7 +49,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
set_special_configs(emqx_authz) ->

View File

@ -33,14 +33,14 @@ groups() ->
[].
init_per_suite(Config) ->
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
fun set_special_configs/1
),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
{ok, _} = emqx_resource:create_local(
?MYSQL_RESOURCE,
?RESOURCE_GROUP,
@ -56,7 +56,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
ok = emqx_resource:remove_local(?MYSQL_RESOURCE),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
init_per_testcase(_TestCase, Config) ->

View File

@ -33,14 +33,14 @@ groups() ->
[].
init_per_suite(Config) ->
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
fun set_special_configs/1
),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
{ok, _} = emqx_resource:create_local(
?PGSQL_RESOURCE,
?RESOURCE_GROUP,
@ -56,7 +56,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
ok = emqx_resource:remove_local(?PGSQL_RESOURCE),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
init_per_testcase(_TestCase, Config) ->

View File

@ -34,14 +34,14 @@ groups() ->
[].
init_per_suite(Config) ->
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authz],
fun set_special_configs/1
),
ok = start_apps([emqx_resource, emqx_connector]),
ok = start_apps([emqx_resource]),
{ok, _} = emqx_resource:create_local(
?REDIS_RESOURCE,
?RESOURCE_GROUP,
@ -57,7 +57,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
ok = emqx_resource:remove_local(?REDIS_RESOURCE),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = stop_apps([emqx_resource]),
ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
init_per_testcase(_TestCase, Config) ->

View File

@ -1,16 +1,14 @@
emqx_bridge_mqtt_schema {
desc_rec {
desc {
en: """Configuration for MQTT bridge."""
zh: """MQTT Bridge 配置"""
}
label: {
en: "MQTT Bridge Configuration"
zh: "MQTT Bridge 配置"
}
}
config {
desc {
en: """The config for MQTT Bridges."""
zh: """MQTT Bridge 的配置。"""
}
label: {
en: "Config"
zh: "配置"
}
}
desc_type {
desc {
en: """The bridge type."""

View File

@ -11,24 +11,6 @@ emqx_bridge_schema {
}
}
desc_connector {
desc {
en: """
The ID or the configs of the connector to be used for this bridge. Connector IDs must be of format:
<code>{type}:{name}</code>.</br>
In config files, you can find the corresponding config entry for a connector by such path:
'connectors.{type}.{name}'.</br>
"""
zh: """
Bridge 使用的 Connector 的 ID 或者配置。Connector ID 的格式必须为:<code>{type}:{name}</code>.</br>
在配置文件中,您可以通过以下路径找到 Connector 的相应配置条目:'connector.{type}.{name}'。</br>"""
}
label: {
en: "Connector ID"
zh: "Connector ID"
}
}
desc_metrics {
desc {
en: """The metrics of the bridge"""
@ -85,7 +67,7 @@ Bridge 使用的 Connector 的 ID 或者配置。Connector ID 的格式必须为
}
bridges_name {
bridges_mqtt {
desc {
en: """MQTT bridges to/from another MQTT broker"""
zh: """桥接到另一个 MQTT Broker 的 MQTT Bridge"""

View File

@ -11,17 +11,6 @@ emqx_bridge_webhook_schema {
}
}
config_direction {
desc {
en: """The direction of this bridge, MUST be 'egress'"""
zh: """Bridge 的方向, 必须是 egress"""
}
label: {
en: "Bridge Direction"
zh: "Bridge 方向"
}
}
config_url {
desc {
en: """

View File

@ -37,8 +37,7 @@
create/3,
disable_enable/3,
remove/2,
list/0,
list_bridges_by_connector/1
list/0
]).
-export([send_message/2]).
@ -48,6 +47,8 @@
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
-define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql).
load() ->
Bridges = emqx:get_config([bridges], #{}),
lists:foreach(
@ -93,10 +94,10 @@ load_hook() ->
load_hook(Bridges) ->
lists:foreach(
fun({_Type, Bridge}) ->
fun({Type, Bridge}) ->
lists:foreach(
fun({_Name, BridgeConf}) ->
do_load_hook(BridgeConf)
do_load_hook(Type, BridgeConf)
end,
maps:to_list(Bridge)
)
@ -104,12 +105,13 @@ load_hook(Bridges) ->
maps:to_list(Bridges)
).
do_load_hook(#{local_topic := _} = Conf) ->
case maps:get(direction, Conf, egress) of
egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
ingress -> ok
end;
do_load_hook(_Conf) ->
do_load_hook(Type, #{local_topic := _}) when ?EGRESS_DIR_BRIDGES(Type) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(kafka, #{producer := #{mqtt := #{topic := _}}}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(_Type, _Conf) ->
ok.
unload_hook() ->
@ -197,13 +199,6 @@ list() ->
maps:to_list(emqx:get_raw_config([bridges], #{}))
).
list_bridges_by_connector(ConnectorId) ->
[
B
|| B = #{raw_config := #{<<"connector">> := Id}} <- list(),
ConnectorId =:= Id
].
lookup(Id) ->
{Type, Name} = emqx_bridge_resource:parse_bridge_id(Id),
lookup(Type, Name).
@ -303,13 +298,8 @@ get_matched_bridges(Topic) ->
maps:fold(
fun(BType, Conf, Acc0) ->
maps:fold(
fun
%% Confs for MQTT, Kafka bridges have the `direction` flag
(_BName, #{direction := ingress}, Acc1) ->
Acc1;
(BName, #{direction := egress} = Egress, Acc1) ->
%% WebHook, MySQL bridges only have egress direction
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1)
fun(BName, BConf, Acc1) ->
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
end,
Acc0,
Conf
@ -319,9 +309,18 @@ get_matched_bridges(Topic) ->
Bridges
).
get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
Acc;
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when
?EGRESS_DIR_BRIDGES(BType)
->
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc);
get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
get_matched_bridge_id(kafka, #{producer := #{mqtt := #{topic := Filter}}}, Topic, BName, Acc) ->
do_get_matched_bridge_id(Topic, Filter, kafka, BName, Acc).
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [emqx_bridge_resource:bridge_id(BType, BName) | Acc];
false -> Acc

View File

@ -42,8 +42,6 @@
-export([lookup_from_local_node/2]).
-define(CONN_TYPES, [mqtt]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge_resource:parse_bridge_id(Id) of
{BridgeType, BridgeName} ->
@ -146,7 +144,7 @@ param_path_id() ->
#{
in => path,
required => true,
example => <<"webhook:my_webhook">>,
example => <<"webhook:webhook_example">>,
desc => ?DESC("desc_param_path_id")
}
)}.
@ -155,66 +153,45 @@ bridge_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
bridge_info_examples(Method) ->
maps:merge(conn_bridge_examples(Method), #{
<<"my_webhook">> => #{
summary => <<"WebHook">>,
value => info_example(webhook, awesome, Method)
}
}).
conn_bridge_examples(Method) ->
Fun =
fun(Type, Acc) ->
SType = atom_to_list(Type),
KeyIngress = bin(SType ++ "_ingress"),
KeyEgress = bin(SType ++ "_egress"),
maps:merge(Acc, #{
KeyIngress => #{
summary => bin(string:uppercase(SType) ++ " Ingress Bridge"),
value => info_example(Type, ingress, Method)
},
KeyEgress => #{
summary => bin(string:uppercase(SType) ++ " Egress Bridge"),
value => info_example(Type, egress, Method)
}
})
end,
Broker = lists:foldl(Fun, #{}, ?CONN_TYPES),
EE = ee_conn_bridge_examples(Method),
maps:merge(Broker, EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_conn_bridge_examples(Method) ->
emqx_ee_bridge:conn_bridge_examples(Method).
-else.
ee_conn_bridge_examples(_Method) ->
#{}.
-endif.
info_example(Type, Direction, Method) ->
maps:merge(
info_example_basic(Type, Direction),
method_example(Type, Direction, Method)
#{
<<"webhook_example">> => #{
summary => <<"WebHook">>,
value => info_example(webhook, Method)
},
<<"mqtt_example">> => #{
summary => <<"MQTT Bridge">>,
value => info_example(mqtt, Method)
}
},
ee_bridge_examples(Method)
).
method_example(Type, Direction, Method) when Method == get; Method == post ->
ee_bridge_examples(Method) ->
case erlang:function_exported(emqx_ee_bridge, examples, 1) of
true -> emqx_ee_bridge:examples(Method);
false -> #{}
end.
info_example(Type, Method) ->
maps:merge(
info_example_basic(Type),
method_example(Type, Method)
).
method_example(Type, Method) when Method == get; Method == post ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
SName =
case Type of
webhook -> "my_" ++ SType;
_ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
end,
TypeNameExamp = #{
SName = SType ++ "_example",
TypeNameExam = #{
type => bin(SType),
name => bin(SName)
},
maybe_with_metrics_example(TypeNameExamp, Method);
method_example(_Type, _Direction, put) ->
maybe_with_metrics_example(TypeNameExam, Method);
method_example(_Type, put) ->
#{}.
maybe_with_metrics_example(TypeNameExamp, get) ->
TypeNameExamp#{
maybe_with_metrics_example(TypeNameExam, get) ->
TypeNameExam#{
metrics => ?METRICS(0, 0, 0, 0, 0, 0),
node_metrics => [
#{
@ -223,10 +200,10 @@ maybe_with_metrics_example(TypeNameExamp, get) ->
}
]
};
maybe_with_metrics_example(TypeNameExamp, _) ->
TypeNameExamp.
maybe_with_metrics_example(TypeNameExam, _) ->
TypeNameExam.
info_example_basic(webhook, _) ->
info_example_basic(webhook) ->
#{
enable => true,
url => <<"http://localhost:9901/messages/${topic}">>,
@ -241,28 +218,52 @@ info_example_basic(webhook, _) ->
method => post,
body => <<"${payload}">>
};
info_example_basic(mqtt, ingress) ->
info_example_basic(mqtt) ->
(mqtt_main_example())#{
egress => mqtt_egress_example(),
ingress => mqtt_ingress_example()
}.
mqtt_main_example() ->
#{
enable => true,
connector => <<"mqtt:my_mqtt_connector">>,
direction => ingress,
remote_topic => <<"aws/#">>,
remote_qos => 1,
local_topic => <<"from_aws/${topic}">>,
local_qos => <<"${qos}">>,
payload => <<"${payload}">>,
retain => <<"${retain}">>
};
info_example_basic(mqtt, egress) ->
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
proto_ver => <<"v4">>,
username => <<"foo">>,
password => <<"bar">>,
clean_start => true,
keepalive => <<"300s">>,
retry_interval => <<"15s">>,
max_inflight => 100,
ssl => #{
enable => false
}
}.
mqtt_egress_example() ->
#{
enable => true,
connector => <<"mqtt:my_mqtt_connector">>,
direction => egress,
local_topic => <<"emqx/#">>,
remote_topic => <<"from_emqx/${topic}">>,
remote_qos => <<"${qos}">>,
payload => <<"${payload}">>,
retain => false
local => #{
topic => <<"emqx/#">>
},
remote => #{
topic => <<"from_emqx/${topic}">>,
qos => <<"${qos}">>,
payload => <<"${payload}">>,
retain => false
}
}.
mqtt_ingress_example() ->
#{
remote => #{
topic => <<"aws/#">>,
qos => 1
},
local => #{
topic => <<"from_aws/${topic}">>,
qos => <<"${qos}">>,
payload => <<"${payload}">>,
retain => <<"${retain}">>
}
}.
schema("/bridges") ->

View File

@ -45,7 +45,6 @@ stop(_State) ->
-if(?EMQX_RELEASE_EDITION == ee).
start_ee_apps() ->
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
ok.
-else.
start_ee_apps() ->

View File

@ -43,6 +43,9 @@
reset_metrics/1
]).
%% bi-directional bridge with producer/consumer or ingress/egress configs
-define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>).
-if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
@ -102,7 +105,7 @@ create(Type, Name, Conf, Opts) ->
resource_id(Type, Name),
<<"emqx_bridge">>,
bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf),
parse_confs(bin(Type), Name, Conf),
Opts
),
maybe_disable_bridge(Type, Name, Conf).
@ -168,27 +171,21 @@ recreate(Type, Name, Conf, Opts) ->
emqx_resource:recreate_local(
resource_id(Type, Name),
bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf),
parse_confs(bin(Type), Name, Conf),
Opts
).
create_dry_run(Type, Conf) ->
Conf0 = fill_dry_run_conf(Conf),
case emqx_resource:check_config(bridge_to_resource_type(Type), Conf0) of
{ok, Conf1} ->
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
case emqx_connector_ssl:convert_certs(TmpPath, Conf1) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
Res = emqx_resource:create_dry_run_local(
bridge_to_resource_type(Type), ConfNew
),
_ = maybe_clear_certs(TmpPath, ConfNew),
Res
end;
{error, _} = Error ->
Error
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
Res = emqx_resource:create_dry_run_local(
bridge_to_resource_type(Type), ConfNew
),
_ = maybe_clear_certs(TmpPath, ConfNew),
Res
end.
remove(BridgeId) ->
@ -213,19 +210,6 @@ maybe_disable_bridge(Type, Name, Conf) ->
true -> ok
end.
fill_dry_run_conf(Conf) ->
Conf#{
<<"egress">> =>
#{
<<"remote_topic">> => <<"t">>,
<<"remote_qos">> => 0,
<<"retain">> => true,
<<"payload">> => <<"val">>
},
<<"ingress">> =>
#{<<"remote_topic">> => <<"t">>}
}.
maybe_clear_certs(TmpPath, #{ssl := SslConf} = Conf) ->
%% don't remove the cert files if they are in use
case is_tmp_path_conf(TmpPath, SslConf) of
@ -245,8 +229,9 @@ is_tmp_path_conf(_TmpPath, _Conf) ->
is_tmp_path(TmpPath, File) ->
string:str(str(File), str(TmpPath)) > 0.
%% convert bridge configs to what the connector modules want
parse_confs(
Type,
<<"webhook">>,
_Name,
#{
url := Url,
@ -256,7 +241,7 @@ parse_confs(
request_timeout := ReqTimeout,
max_retries := Retry
} = Conf
) when Type == webhook orelse Type == <<"webhook">> ->
) ->
{BaseUrl, Path} = parse_url(Url),
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
Conf#{
@ -271,42 +256,14 @@ parse_confs(
max_retries => Retry
}
};
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
is_binary(ConnId)
->
case emqx_connector:parse_connector_id(ConnId) of
{Type, ConnName} ->
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
make_resource_confs(
Direction,
ConnectorConfs,
maps:without([connector, direction], Conf),
Type,
Name
);
{_ConnType, _ConnName} ->
error({cannot_use_connector_with_different_type, ConnId})
end;
parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when
is_map(ConnectorConfs)
->
make_resource_confs(
Direction,
ConnectorConfs,
maps:without([connector, direction], Conf),
Type,
Name
).
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) ->
%% For some drivers that can be used as data-sources, we need to provide a
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
%% receives a message from the external database.
BName = bridge_id(Type, Name),
ConnectorConfs#{
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
};
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
ConnectorConfs#{
egress => BridgeConf
}.
Conf#{hookpoint => <<"$bridges/", BName/binary>>};
parse_confs(_Type, _Name, Conf) ->
Conf.
parse_url(Url) ->
case string:split(Url, "//", leading) of

View File

@ -3,43 +3,30 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2]).
-import(hoconsc, [mk/2, ref/2]).
-export([roots/0, fields/1, desc/1]).
-export([roots/0, fields/1, desc/1, namespace/0]).
%%======================================================================================
%% Hocon Schema Definitions
namespace() -> "bridge_mqtt".
roots() -> [].
fields("ingress") ->
[emqx_bridge_schema:direction_field(ingress, emqx_connector_mqtt_schema:ingress_desc())] ++
emqx_bridge_schema:common_bridge_fields(mqtt_connector_ref()) ++
proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress") ->
[emqx_bridge_schema:direction_field(egress, emqx_connector_mqtt_schema:egress_desc())] ++
emqx_bridge_schema:common_bridge_fields(mqtt_connector_ref()) ++
emqx_connector_mqtt_schema:fields("egress");
fields("post_ingress") ->
fields("config") ->
emqx_bridge_schema:common_bridge_fields() ++
emqx_connector_mqtt_schema:fields("config");
fields("post") ->
[
type_field(),
name_field()
] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") ->
[
type_field(),
name_field()
] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") ->
proplists:delete(enable, fields("ingress"));
fields("put_egress") ->
proplists:delete(enable, fields("egress"));
fields("get_ingress") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress");
fields("get_egress") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress").
] ++ emqx_connector_mqtt_schema:fields("config");
fields("put") ->
emqx_connector_mqtt_schema:fields("config");
fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("config").
desc(Rec) when Rec =:= "ingress"; Rec =:= "egress" ->
?DESC("desc_rec");
desc("config") ->
?DESC("config");
desc(_) ->
undefined.
@ -63,6 +50,3 @@ name_field() ->
desc => ?DESC("desc_name")
}
)}.
mqtt_connector_ref() ->
?R_REF(emqx_connector_mqtt_schema, "connector").

View File

@ -14,16 +14,13 @@
]).
-export([
common_bridge_fields/1,
metrics_status_fields/0,
direction_field/2
common_bridge_fields/0,
metrics_status_fields/0
]).
%%======================================================================================
%% Hocon Schema Definitions
-define(CONN_TYPES, [mqtt]).
%%======================================================================================
%% For HTTP APIs
get_response() ->
@ -36,34 +33,26 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
Broker =
lists:flatmap(
fun(Type) ->
[
ref(schema_mod(Type), Method ++ "_ingress"),
ref(schema_mod(Type), Method ++ "_egress")
]
end,
?CONN_TYPES
) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]],
Broker = [
ref(Mod, Method)
|| Mod <- [emqx_bridge_webhook_schema, emqx_bridge_mqtt_schema]
],
EE = ee_api_schemas(Method),
hoconsc:union(Broker ++ EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_api_schemas(Method) ->
emqx_ee_bridge:api_schemas(Method).
case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of
true -> emqx_ee_bridge:api_schemas(Method);
false -> []
end.
ee_fields_bridges() ->
emqx_ee_bridge:fields(bridges).
-else.
ee_api_schemas(_) ->
[].
case erlang:function_exported(emqx_ee_bridge, fields, 1) of
true -> emqx_ee_bridge:fields(bridges);
false -> []
end.
ee_fields_bridges() ->
[].
-endif.
common_bridge_fields(ConnectorRef) ->
common_bridge_fields() ->
[
{enable,
mk(
@ -72,15 +61,6 @@ common_bridge_fields(ConnectorRef) ->
desc => ?DESC("desc_enable"),
default => true
}
)},
{connector,
mk(
hoconsc:union([binary(), ConnectorRef]),
#{
required => true,
example => <<"mqtt:my_mqtt_connector">>,
desc => ?DESC("desc_connector")
}
)}
].
@ -100,18 +80,6 @@ metrics_status_fields() ->
)}
].
direction_field(Dir, Desc) ->
{direction,
mk(
Dir,
#{
required => true,
default => egress,
desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.</br>" ++
Desc
}
)}.
%%======================================================================================
%% For config files
@ -125,22 +93,13 @@ fields(bridges) ->
mk(
hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")),
#{desc => ?DESC("bridges_webhook")}
)},
{mqtt,
mk(
hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")),
#{desc => ?DESC("bridges_mqtt")}
)}
] ++
[
{T,
mk(
hoconsc:map(
name,
hoconsc:union([
ref(schema_mod(T), "ingress"),
ref(schema_mod(T), "egress")
])
),
#{desc => ?DESC("bridges_name")}
)}
|| T <- ?CONN_TYPES
] ++ ee_fields_bridges();
] ++ ee_fields_bridges();
fields("metrics") ->
[
{"matched", mk(integer(), #{desc => ?DESC("metric_matched")})},
@ -181,6 +140,3 @@ status() ->
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).

View File

@ -50,14 +50,6 @@ basic_config() ->
desc => ?DESC("config_enable"),
default => true
}
)},
{direction,
mk(
egress,
#{
desc => ?DESC("config_direction"),
default => egress
}
)}
] ++ webhook_creation_opts() ++
proplists:delete(

View File

@ -89,36 +89,29 @@ t_get_basic_usage_info_1(_Config) ->
).
setup_fake_telemetry_data() ->
ConnectorConf =
#{
<<"connectors">> =>
#{
<<"mqtt">> => #{
<<"my_mqtt_connector">> =>
#{server => "127.0.0.1:1883"},
<<"my_mqtt_connector2">> =>
#{server => "127.0.0.1:1884"}
}
}
},
MQTTConfig1 = #{
connector => <<"mqtt:my_mqtt_connector">>,
server => "127.0.0.1:1883",
enable => true,
direction => ingress,
remote_topic => <<"aws/#">>,
remote_qos => 1
ingress => #{
remote => #{
topic => <<"aws/#">>,
qos => 1
}
}
},
MQTTConfig2 = #{
connector => <<"mqtt:my_mqtt_connector2">>,
server => "127.0.0.1:1884",
enable => true,
direction => ingress,
remote_topic => <<"$bridges/mqtt:some_bridge_in">>,
remote_qos => 1
ingress => #{
remote => #{
topic => <<"$bridges/mqtt:some_bridge_in">>,
qos => 1
}
}
},
HTTPConfig = #{
url => <<"http://localhost:9901/messages/${topic}">>,
enable => true,
direction => egress,
local_topic => "emqx_webhook/#",
method => post,
body => <<"${payload}">>,
@ -143,7 +136,6 @@ setup_fake_telemetry_data() ->
}
},
Opts = #{raw_with_default => true},
ok = emqx_common_test_helpers:load_config(emqx_connector_schema, ConnectorConf, Opts),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts),
ok = snabbkaffe:start_trace(),

View File

@ -0,0 +1,447 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_mqtt_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
-include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_dashboard/include/emqx_dashboard.hrl").
%% output functions
-export([inspect/3]).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(TYPE_MQTT, <<"mqtt">>).
-define(NAME_MQTT, <<"my_mqtt_bridge">>).
-define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>).
-define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>).
-define(SERVER_CONF(Username), #{
<<"server">> => <<"127.0.0.1:1883">>,
<<"username">> => Username,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v4">>,
<<"ssl">> => #{<<"enable">> => false}
}).
-define(INGRESS_CONF, #{
<<"remote">> => #{
<<"topic">> => <<"remote_topic/#">>,
<<"qos">> => 2
},
<<"local">> => #{
<<"topic">> => <<"local_topic/${topic}">>,
<<"qos">> => <<"${qos}">>,
<<"payload">> => <<"${payload}">>,
<<"retain">> => <<"${retain}">>
}
}).
-define(EGRESS_CONF, #{
<<"local">> => #{
<<"topic">> => <<"local_topic/#">>
},
<<"remote">> => #{
<<"topic">> => <<"remote_topic/${topic}">>,
<<"payload">> => <<"${payload}">>,
<<"qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}
}).
-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{
<<"matched">> := MATCH,
<<"success">> := SUCC,
<<"failed">> := FAILED,
<<"rate">> := SPEED,
<<"rate_last5m">> := SPEED5M,
<<"rate_max">> := SPEEDMAX
}).
inspect(Selected, _Envs, _Args) ->
persistent_term:put(?MODULE, #{inspect => Selected}).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
suite() ->
[{timetrap, {seconds, 30}}].
init_per_suite(Config) ->
_ = application:load(emqx_conf),
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps(
[
emqx_rule_engine,
emqx_bridge,
emqx_dashboard
],
fun set_special_configs/1
),
ok = emqx_common_test_helpers:load_config(
emqx_rule_engine_schema,
<<"rule_engine {rules {}}">>
),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([
emqx_rule_engine,
emqx_bridge,
emqx_dashboard
]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(<<"connector_admin">>);
set_special_configs(_) ->
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
ok.
clear_resources() ->
lists:foreach(
fun(#{id := Id}) ->
ok = emqx_rule_engine:delete_rule(Id)
end,
emqx_rule_engine:get_rules()
),
lists:foreach(
fun(#{type := Type, name := Name}) ->
{ok, _} = emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_mqtt_conn_bridge_ingress(_) ->
User1 = <<"user1">>,
%% create an MQTT bridge, using POST
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF(User1)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_INGRESS,
<<"ingress">> => ?INGRESS_CONF
}
),
#{
<<"type">> := ?TYPE_MQTT,
<<"name">> := ?BRIDGE_NAME_INGRESS
} = jsx:decode(Bridge),
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
timer:sleep(100),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_mqtt_conn_bridge_egress(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF(User1)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF
}
),
#{
<<"type">> := ?TYPE_MQTT,
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"metrics">> := ?metrics(1, 1, 0, _, _, _),
<<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
},
jsx:decode(BridgeStr)
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_ingress_mqtt_bridge_with_rules(_) ->
{ok, 201, _} = request(
post,
uri(["bridges"]),
?SERVER_CONF(<<"user1">>)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_INGRESS,
<<"ingress">> => ?INGRESS_CONF
}
),
BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
<<"enable">> => true,
<<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
}
),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
timer:sleep(100),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% and also the rule should be matched, with matched + 1:
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
?assertMatch(
#{
<<"id">> := RuleId,
<<"metrics">> := #{
<<"matched">> := 1,
<<"passed">> := 1,
<<"failed">> := 0,
<<"failed.exception">> := 0,
<<"failed.no_result">> := 0,
<<"matched.rate">> := _,
<<"matched.rate.max">> := _,
<<"matched.rate.last5m">> := _,
<<"actions.total">> := 1,
<<"actions.success">> := 1,
<<"actions.failed">> := 0,
<<"actions.failed.out_of_service">> := 0,
<<"actions.failed.unknown">> := 0
}
},
jsx:decode(Rule1)
),
%% we also check if the actions of the rule is triggered
?assertMatch(
#{
inspect := #{
event := <<"$bridges/mqtt", _/binary>>,
id := MsgId,
payload := Payload,
topic := RemoteTopic,
qos := 0,
dup := false,
retain := false,
pub_props := #{},
timestamp := _
}
} when is_binary(MsgId),
persistent_term:get(?MODULE)
),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []).
t_egress_mqtt_bridge_with_rules(_) ->
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF(<<"user1">>)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF
}
),
#{<<"type">> := ?TYPE_MQTT, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>,
<<"enable">> => true,
<<"actions">> => [BridgeIDEgress],
<<"sql">> => <<"SELECT * from \"t/1\"">>
}
),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
emqx:unsubscribe(RemoteTopic),
%% PUBLISH a message to the rule.
Payload2 = <<"hi">>,
RuleTopic = <<"t/1">>,
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
emqx:subscribe(RemoteTopic2),
timer:sleep(100),
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{
<<"id">> := RuleId,
<<"metrics">> := #{
<<"matched">> := 1,
<<"passed">> := 1,
<<"failed">> := 0,
<<"failed.exception">> := 0,
<<"failed.no_result">> := 0,
<<"matched.rate">> := _,
<<"matched.rate.max">> := _,
<<"matched.rate.last5m">> := _,
<<"actions.total">> := 1,
<<"actions.success">> := 1,
<<"actions.failed">> := 0,
<<"actions.failed.out_of_service">> := 0,
<<"actions.failed.unknown">> := 0
}
} = jsx:decode(Rule1),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic2, #message{payload = Payload2}} ->
ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"metrics">> := ?metrics(2, 2, 0, _, _, _),
<<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
},
jsx:decode(BridgeStr)
),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []).
request(Method, Url, Body) ->
request(<<"connector_admin">>, Method, Url, Body).

View File

@ -165,7 +165,6 @@ gen_schema_json(Dir, I18nFile, SchemaModule) ->
gen_api_schema_json(Dir, I18nFile, Lang) ->
emqx_dashboard:init_i18n(I18nFile, Lang),
gen_api_schema_json_hotconf(Dir, Lang),
gen_api_schema_json_connector(Dir, Lang),
gen_api_schema_json_bridge(Dir, Lang),
emqx_dashboard:clear_i18n().
@ -174,11 +173,6 @@ gen_api_schema_json_hotconf(Dir, Lang) ->
File = schema_filename(Dir, "hot-config-schema-", Lang),
ok = do_gen_api_schema_json(File, emqx_mgmt_api_configs, SchemaInfo).
gen_api_schema_json_connector(Dir, Lang) ->
SchemaInfo = #{title => <<"EMQX Connector API Schema">>, version => <<"0.1.0">>},
File = schema_filename(Dir, "connector-api-", Lang),
ok = do_gen_api_schema_json(File, emqx_connector_api, SchemaInfo).
gen_api_schema_json_bridge(Dir, Lang) ->
SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>},
File = schema_filename(Dir, "bridge-api-", Lang),

View File

@ -60,7 +60,6 @@
emqx_exhook_schema,
emqx_psk_schema,
emqx_limiter_schema,
emqx_connector_schema,
emqx_slow_subs_schema
]).

View File

@ -1,5 +1,4 @@
emqx_connector_mqtt {
num_of_bridges {
desc {
en: "The current number of bridges that are using this connector."

View File

@ -1,4 +1,85 @@
emqx_connector_mqtt_schema {
ingress_desc {
desc {
en: """The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
send them to the local broker.</br>
Template with variables is allowed in 'remote.qos', 'local.topic', 'local.qos', 'local.retain', 'local.payload'.</br>
NOTE: if this bridge is used as the input of a rule, and also 'local.topic' is
configured, then messages got from the remote broker will be sent to both the 'local.topic' and
the rule."""
zh: """入口配置定义了该桥接如何从远程 MQTT Broker 接收消息,然后将消息发送到本地 Broker。</br>
以下字段中允许使用带有变量的模板:'remote.qos', 'local.topic', 'local.qos', 'local.retain', 'local.payload'。</br>
注意:如果此桥接被用作规则的输入,并且配置了 'local.topic',则从远程代理获取的消息将同时被发送到 'local.topic' 和规则。
"""
}
label: {
en: "Ingress Configs"
zh: "入方向配置"
}
}
egress_desc {
desc {
en: """The egress config defines how this bridge forwards messages from the local broker to the remote broker.</br>
Template with variables is allowed in 'remote.topic', 'local.qos', 'local.retain', 'local.payload'.</br>
NOTE: if this bridge is used as the action of a rule, and also 'local.topic'
is configured, then both the data got from the rule and the MQTT messages that matches
'local.topic' will be forwarded."""
zh: """出口配置定义了该桥接如何将消息从本地 Broker 转发到远程 Broker。
以下字段中允许使用带有变量的模板:'remote.topic', 'local.qos', 'local.retain', 'local.payload'。</br>
注意:如果此桥接被用作规则的动作,并且配置了 'local.topic',则从规则输出的数据以及匹配到 'local.topic' 的 MQTT 消息都会被转发。
"""
}
label: {
en: "Egress Configs"
zh: "出方向配置"
}
}
ingress_remote {
desc {
en: """The configs about subscribing to the remote broker."""
zh: """订阅远程 Broker 相关的配置。"""
}
label: {
en: "Remote Configs"
zh: "远程配置"
}
}
ingress_local {
desc {
en: """The configs about sending message to the local broker."""
zh: """发送消息到本地 Broker 相关的配置。"""
}
label: {
en: "Local Configs"
zh: "本地配置"
}
}
egress_remote {
desc {
en: """The configs about sending message to the remote broker."""
zh: """发送消息到远程 Broker 相关的配置。"""
}
label: {
en: "Remote Configs"
zh: "远程配置"
}
}
egress_local {
desc {
en: """The configs about receiving messages from local broker."""
zh: """如何从本地 Broker 接收消息相关的配置。"""
}
label: {
en: "Local Configs"
zh: "本地配置"
}
}
mode {
desc {
en: """
@ -9,7 +90,7 @@ In 'cluster_shareload' mode, the incoming load from the remote broker is shared
using shared subscription.</br>
Note that the 'clientid' is suffixed by the node name, this is to avoid
clientid conflicts between different nodes. And we can only use shared subscription
topic filters for 'remote_topic' of ingress connections.
topic filters for 'remote.topic' of ingress connections.
"""
zh: """
MQTT 桥的模式。 </br>
@ -17,7 +98,7 @@ MQTT 桥的模式。 </br>
- cluster_shareload在 emqx 集群的每个节点上创建一个 MQTT 连接。</br>
在“cluster_shareload”模式下来自远程代理的传入负载通过共享订阅的方式接收。</br>
请注意“clientid”以节点名称为后缀这是为了避免不同节点之间的clientid冲突。
而且对于入口连接的“remote_topic”我们只能使用共享订阅主题过滤器。
而且对于入口连接的“remote.topic”我们只能使用共享订阅主题过滤器。
"""
}
label: {
@ -166,17 +247,6 @@ Template with variables is allowed.
}
}
ingress_hookpoint {
desc {
en: "The hook point will be triggered when there's any message received from the remote broker."
zh: "当从远程borker收到任何消息时将触发钩子。"
}
label: {
en: "Hookpoint"
zh: "挂载点"
}
}
egress_local_topic {
desc {
en: "The local topic to be forwarded to the remote broker"
@ -222,59 +292,6 @@ Template with variables is allowed.
}
}
dir {
desc {
en: """
The dir where the replayq file saved.</br>
Set to 'false' disables the replayq feature.
"""
zh: """
replayq 文件保存的目录。</br>
设置为 'false' 会禁用 replayq 功能。
"""
}
label: {
en: "Replyq file Save Dir"
zh: "Replyq 文件保存目录"
}
}
seg_bytes {
desc {
en: """
The size in bytes of a single segment.</br>
A segment is mapping to a file in the replayq dir. If the current segment is full, a new segment
(file) will be opened to write.
"""
zh: """
单个段的大小(以字节为单位)。</br>
一个段映射到 replayq 目录中的一个文件。 如果当前段已满,则新段(文件)将被打开写入。
"""
}
label: {
en: "Segment Size"
zh: "Segment 大小"
}
}
offload {
desc {
en: """
In offload mode, the disk queue is only used to offload queue tail segments.</br>
The messages are cached in the memory first, then it writes to the replayq files after the size of
the memory cache reaches 'seg_bytes'.
"""
zh: """
在Offload模式下磁盘队列仅用于卸载队列尾段。</br>
消息首先缓存在内存中然后写入replayq文件。内存缓大小为“seg_bytes” 指定的值。
"""
}
label: {
en: "Offload Mode"
zh: "Offload 模式"
}
}
retain {
desc {
en: """
@ -309,66 +326,15 @@ Template with variables is allowed.
}
}
desc_connector {
server_configs {
desc {
en: """Generic configuration for the connector."""
zh: """连接器的通用配置。"""
en: """Configs related to the server."""
zh: """服务器相关的配置。"""
}
label: {
en: "Connector Generic Configuration"
zh: "连接器通用配置。"
en: "Server Configs"
zh: "服务配置。"
}
}
desc_ingress {
desc {
en: """
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then send them to the local broker.</br>
Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain', 'payload'.</br>
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is configured, then messages got from the remote broker will be sent to both the 'local_topic' and the rule.
"""
zh: """
Ingress 模式定义了这个 bridge 如何从远程 MQTT broker 接收消息,然后将它们发送到本地 broker 。</br>
允许带有的模板变量: 'local_topic'、'remote_qos'、'qos'、'retain'、'payload' 。</br>
注意:如果这个 bridge 被用作规则的输入emqx 规则引擎),并且还配置了 local_topic那么从远程 broker 获取的消息将同时被发送到 'local_topic' 和规则引擎。
"""
}
label: {
en: "Ingress Config"
zh: "Ingress 模式配置"
}
}
desc_egress {
desc {
en: """
The egress config defines how this bridge forwards messages from the local broker to the remote broker.</br>
Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>
NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded.
"""
zh: """
Egress 模式定义了 bridge 如何将消息从本地 broker 转发到远程 broker。</br>
允许带有的模板变量: 'remote_topic'、'qos'、'retain'、'payload' 。</br>
注意:如果这个 bridge 作为规则emqx 规则引擎)的输出,并且还配置了 local_topic那么从规则引擎中获取的数据和匹配 local_topic 的 MQTT 消息都会被转发到远程 broker 。
"""
}
label: {
en: "Egress Config"
zh: "Egress 模式配置"
}
}
desc_replayq {
desc {
en: """Queue messages in disk files."""
zh: """本地磁盘消息队列"""
}
label: {
en: "Replayq"
zh: "本地磁盘消息队列"
}
}
}

View File

@ -1,31 +0,0 @@
emqx_connector_schema {
mqtt {
desc {
en: "MQTT bridges."
zh: "MQTT bridges。"
}
label: {
en: "MQTT bridges"
zh: "MQTT bridges"
}
}
desc_connector {
desc {
en: """
Configuration for EMQX connectors.</br>
A connector maintains the data related to the external resources, such as MySQL database.
"""
zh: """
EMQX 连接器的配置。</br>
连接器维护与外部资源相关的数据,比如 MySQL 数据库。
"""
}
label: {
en: "Connector"
zh: "连接器"
}
}
}

View File

@ -1,165 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector).
-export([
config_key_path/0,
pre_config_update/3,
post_config_update/5
]).
-export([
parse_connector_id/1,
connector_id/2
]).
-export([
list_raw/0,
lookup_raw/1,
lookup_raw/2,
create_dry_run/2,
update/2,
update/3,
delete/1,
delete/2
]).
config_key_path() ->
[connectors].
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
emqx_connector_ssl:convert_certs(filename:join(Path), Conf).
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
post_config_update([connectors, Type, Name] = Path, '$remove', _, OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
try
foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) ->
throw({dependency_bridges_exist, emqx_bridge_resource:bridge_id(BType, BName)})
end),
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf)
catch
throw:Error -> {error, Error}
end;
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
foreach_linked_bridges(
ConnId,
fun(#{type := BType, name := BName}) ->
BridgeConf = emqx:get_config([bridges, BType, BName]),
case
emqx_bridge_resource:update(
BType,
BName,
{BridgeConf#{connector => OldConf}, BridgeConf#{connector => NewConf}}
)
of
ok -> ok;
{error, Reason} -> error({update_bridge_error, Reason})
end
end
).
connector_id(Type0, Name0) ->
Type = bin(Type0),
Name = bin(Name0),
<<Type/binary, ":", Name/binary>>.
parse_connector_id(ConnectorId) ->
case string:split(bin(ConnectorId), ":", all) of
[Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
_ -> error({invalid_connector_id, ConnectorId})
end.
list_raw() ->
case get_raw_connector_conf() of
not_found ->
[];
Config ->
lists:foldl(
fun({Type, NameAndConf}, Connectors) ->
lists:foldl(
fun({Name, RawConf}, Acc) ->
[RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc]
end,
Connectors,
maps:to_list(NameAndConf)
)
end,
[],
maps:to_list(Config)
)
end.
lookup_raw(Id) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
lookup_raw(Type, Name).
lookup_raw(Type, Name) ->
Path = [bin(P) || P <- [Type, Name]],
case get_raw_connector_conf() of
not_found ->
{error, not_found};
Conf ->
case emqx_map_lib:deep_get(Path, Conf, not_found) of
not_found -> {error, not_found};
Conf1 -> {ok, Conf1#{<<"type">> => Type, <<"name">> => Name}}
end
end.
-spec create_dry_run(module(), binary() | #{binary() => term()} | [#{binary() => term()}]) ->
ok | {error, Reason :: term()}.
create_dry_run(Type, Conf) ->
emqx_bridge_resource:create_dry_run(Type, Conf).
update(Id, Conf) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
update(Type, Name, Conf).
update(Type, Name, Conf) ->
emqx_conf:update(config_key_path() ++ [Type, Name], Conf, #{override_to => cluster}).
delete(Id) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
delete(Type, Name).
delete(Type, Name) ->
emqx_conf:remove(config_key_path() ++ [Type, Name], #{override_to => cluster}).
get_raw_connector_conf() ->
case emqx:get_raw_config(config_key_path(), not_found) of
not_found ->
not_found;
RawConf ->
#{<<"connectors">> := Conf} =
emqx_config:fill_defaults(#{<<"connectors">> => RawConf}),
Conf
end.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
foreach_linked_bridges(ConnId, Do) ->
lists:foreach(
fun
(#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId ->
Do(Bridge);
(_) ->
ok
end,
emqx_bridge:list()
).

View File

@ -1,339 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_api).
-behaviour(minirest_api).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, ref/2, array/1, enum/1]).
%% Swagger specs from hocon schema
-export([api_spec/0, paths/0, schema/1, namespace/0]).
%% API callbacks
-export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]).
-define(CONN_TYPES, [mqtt]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_connector:parse_connector_id(Id) of
{ConnType, ConnName} ->
_ = ConnName,
EXPR
catch
error:{invalid_connector_id, Id0} ->
{400, #{
code => 'INVALID_ID',
message =>
<<"invalid_connector_id: ", Id0/binary,
". Connector Ids must be of format {type}:{name}">>
}}
end
).
namespace() -> "connector".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
paths() -> ["/connectors_test", "/connectors", "/connectors/:id"].
error_schema(Codes, Message) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message));
error_schema(Codes, Message) when is_binary(Message) ->
emqx_dashboard_swagger:error_codes(Codes, Message).
put_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:put_request(), connector_info_examples(put)
).
post_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:post_request(), connector_info_examples(post)
).
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:get_response(), connector_info_examples(get)
).
connector_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(connector_info_examples(Method))].
connector_info_examples(Method) ->
Fun =
fun(Type, Acc) ->
SType = atom_to_list(Type),
maps:merge(Acc, #{
Type => #{
summary => bin(string:uppercase(SType) ++ " Connector"),
value => info_example(Type, Method)
}
})
end,
Broker = lists:foldl(Fun, #{}, ?CONN_TYPES),
EE = ee_example(Method),
maps:merge(Broker, EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_example(Method) ->
emqx_ee_connector:connector_examples(Method).
-else.
ee_example(_Method) ->
#{}.
-endif.
info_example(Type, Method) ->
maps:merge(
info_example_basic(Type),
method_example(Type, Method)
).
method_example(Type, Method) when Method == get; Method == post ->
SType = atom_to_list(Type),
SName = "my_" ++ SType ++ "_connector",
#{
type => bin(SType),
name => bin(SName)
};
method_example(_Type, put) ->
#{}.
info_example_basic(mqtt) ->
#{
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"15s">>,
proto_ver => <<"v4">>,
username => <<"foo">>,
password => <<"bar">>,
clientid => <<"foo">>,
clean_start => true,
keepalive => <<"300s">>,
retry_interval => <<"15s">>,
max_inflight => 100,
ssl => #{
enable => false
}
}.
param_path_id() ->
[
{id,
mk(
binary(),
#{
in => path,
example => <<"mqtt:my_mqtt_connector">>,
desc => ?DESC("id")
}
)}
].
schema("/connectors_test") ->
#{
'operationId' => '/connectors_test',
post => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_test_post"),
summary => <<"Test creating connector">>,
'requestBody' => post_request_body_schema(),
responses => #{
204 => <<"Test connector OK">>,
400 => error_schema(['TEST_FAILED'], "connector test failed")
}
}
};
schema("/connectors") ->
#{
'operationId' => '/connectors',
get => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_get"),
summary => <<"List connectors">>,
responses => #{
200 => emqx_dashboard_swagger:schema_with_example(
array(emqx_connector_schema:get_response()),
connector_info_array_example(get)
)
}
},
post => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_post"),
summary => <<"Create connector">>,
'requestBody' => post_request_body_schema(),
responses => #{
201 => get_response_body_schema(),
400 => error_schema(['ALREADY_EXISTS'], "connector already exists")
}
}
};
schema("/connectors/:id") ->
#{
'operationId' => '/connectors/:id',
get => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_id_get"),
summary => <<"Get connector">>,
parameters => param_path_id(),
responses => #{
200 => get_response_body_schema(),
404 => error_schema(['NOT_FOUND'], "Connector not found"),
400 => error_schema(['INVALID_ID'], "Bad connector ID")
}
},
put => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_id_put"),
summary => <<"Update connector">>,
parameters => param_path_id(),
'requestBody' => put_request_body_schema(),
responses => #{
200 => get_response_body_schema(),
404 => error_schema(['NOT_FOUND'], "Connector not found"),
400 => error_schema(['INVALID_ID'], "Bad connector ID")
}
},
delete => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_id_delete"),
summary => <<"Delete connector">>,
parameters => param_path_id(),
responses => #{
204 => <<"Delete connector successfully">>,
403 => error_schema(['DEPENDENCY_EXISTS'], "Cannot remove dependent connector"),
404 => error_schema(['NOT_FOUND'], "Delete failed, not found"),
400 => error_schema(['INVALID_ID'], "Bad connector ID")
}
}
}.
'/connectors_test'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
case emqx_connector:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of
ok ->
{204};
{error, Error} ->
{400, error_msg(['TEST_FAILED'], Error)}
end.
'/connectors'(get, _Request) ->
{200, [format_resp(Conn) || Conn <- emqx_connector:list_raw()]};
'/connectors'(post, #{body := #{<<"type">> := ConnType, <<"name">> := ConnName} = Params}) ->
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
{error, not_found} ->
case
emqx_connector:update(
ConnType,
ConnName,
filter_out_request_body(Params)
)
of
{ok, #{raw_config := RawConf}} ->
{201,
format_resp(RawConf#{
<<"type">> => ConnType,
<<"name">> => ConnName
})};
{error, Error} ->
{400, error_msg('BAD_REQUEST', Error)}
end
end;
'/connectors'(post, _) ->
{400, error_msg('BAD_REQUEST', <<"missing some required fields: [name, type]">>)}.
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(
Id,
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, Conf} ->
{200, format_resp(Conf)};
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end
);
'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
Params = filter_out_request_body(Params0),
?TRY_PARSE_ID(
Id,
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:update(ConnType, ConnName, Params) of
{ok, #{raw_config := RawConf}} ->
{200,
format_resp(RawConf#{
<<"type">> => ConnType,
<<"name">> => ConnName
})};
{error, Error} ->
{500, error_msg('INTERNAL_ERROR', Error)}
end;
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end
);
'/connectors/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(
Id,
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:delete(ConnType, ConnName) of
{ok, _} ->
{204};
{error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} ->
{403,
error_msg(
'DEPENDENCY_EXISTS',
<<"Cannot remove the connector as it's in use by a bridge: ",
BridgeID/binary>>
)};
{error, Error} ->
{500, error_msg('INTERNAL_ERROR', Error)}
end;
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end
).
error_msg(Code, Msg) ->
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
format_resp(#{<<"type">> := ConnType, <<"name">> := ConnName} = RawConf) ->
NumOfBridges = length(
emqx_bridge:list_bridges_by_connector(
emqx_connector:connector_id(ConnType, ConnName)
)
),
RawConf#{
<<"type">> => ConnType,
<<"name">> => ConnName,
<<"num_of_bridges">> => NumOfBridges
}.
filter_out_request_body(Conf) ->
ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>],
maps:without(ExtraConfs, Conf).
bin(S) when is_list(S) ->
list_to_binary(S).

View File

@ -20,15 +20,10 @@
-export([start/2, stop/1]).
-define(CONF_HDLR_PATH, (emqx_connector:config_key_path() ++ ['?', '?'])).
start(_StartType, _StartArgs) ->
ok = emqx_config_handler:add_handler(?CONF_HDLR_PATH, emqx_connector),
emqx_connector_mqtt_worker:register_metrics(),
emqx_connector_sup:start_link().
stop(_State) ->
emqx_config_handler:remove_handler(?CONF_HDLR_PATH),
ok.
%% internal functions

View File

@ -67,7 +67,7 @@ fields("get") ->
)}
] ++ fields("post");
fields("put") ->
emqx_connector_mqtt_schema:fields("connector");
emqx_connector_mqtt_schema:fields("server_configs");
fields("post") ->
[
{type,
@ -153,7 +153,7 @@ on_start(InstId, Conf) ->
BridgeConf = BasicConf#{
name => InstanceId,
clientid => clientid(InstId),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), InstId),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
case ?MODULE:create_bridge(BridgeConf) of
@ -204,18 +204,18 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
{error, Reason} -> {error, Reason}
end.
make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 ->
make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined;
make_sub_confs(undefined, _) ->
make_sub_confs(undefined, _Conf, _) ->
undefined;
make_sub_confs(SubRemoteConf, InstId) ->
make_sub_confs(SubRemoteConf, Conf, InstId) ->
ResId = emqx_resource_manager:manager_id_to_resource_id(InstId),
case maps:take(hookpoint, SubRemoteConf) of
case maps:find(hookpoint, Conf) of
error ->
SubRemoteConf;
{HookPoint, SubConf} ->
error({no_hookpoint_provided, Conf});
{ok, HookPoint} ->
MFA = {?MODULE, on_message_received, [HookPoint, ResId]},
SubConf#{on_message_received => MFA}
SubRemoteConf#{on_message_received => MFA}
end.
make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
@ -236,11 +236,9 @@ basic_config(#{
keepalive := KeepAlive,
retry_interval := RetryIntv,
max_inflight := MaxInflight,
replayq := ReplayQ,
ssl := #{enable := EnableSsl} = Ssl
}) ->
#{
replayq => ReplayQ,
%% connection opts
server => Server,
%% 30s

View File

@ -1,95 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_schema).
-behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-export([namespace/0, roots/0, fields/1, desc/1]).
-export([
get_response/0,
put_request/0,
post_request/0
]).
%% the config for webhook bridges do not need connectors
-define(CONN_TYPES, [mqtt]).
%%======================================================================================
%% For HTTP APIs
get_response() ->
http_schema("get").
put_request() ->
http_schema("put").
post_request() ->
http_schema("post").
http_schema(Method) ->
Broker = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES],
EE = ee_schemas(Method),
Schemas = Broker ++ EE,
?UNION(Schemas).
%%======================================================================================
%% Hocon Schema Definitions
namespace() -> connector.
roots() -> ["connectors"].
fields(connectors) ->
fields("connectors");
fields("connectors") ->
Broker = [
{mqtt,
?HOCON(
?MAP(name, ?R_REF(emqx_connector_mqtt_schema, "connector")),
#{desc => ?DESC("mqtt")}
)}
],
EE = ee_fields_connectors(),
Broker ++ EE.
-if(?EMQX_RELEASE_EDITION == ee).
ee_schemas(Method) ->
emqx_ee_connector:api_schemas(Method).
ee_fields_connectors() ->
emqx_ee_connector:fields(connectors).
-else.
ee_fields_connectors() ->
[].
ee_schemas(_) ->
[].
-endif.
desc(Record) when
Record =:= connectors;
Record =:= "connectors"
->
?DESC("desc_connector");
desc(_) ->
undefined.
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_connector_", Type])).

View File

@ -207,7 +207,7 @@ make_hdlr(Parent, Vars, Opts) ->
sub_remote_topics(_ClientPid, undefined) ->
ok;
sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
sub_remote_topics(ClientPid, #{remote := #{topic := FromTopic, qos := QoS}}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok;
Error -> throw(Error)
@ -217,12 +217,10 @@ process_config(Config) ->
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
maybe_publish_to_local_broker(Msg, Vars, Props) ->
case maps:get(local_topic, Vars, undefined) of
undefined ->
%% local topic is not set, discard it
ok;
_ ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
case emqx_map_lib:deep_get([local, topic], Vars, undefined) of
%% local topic is not set, discard it
undefined -> ok;
_ -> emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
end.
format_msg_received(

View File

@ -38,14 +38,16 @@
-type msg() :: emqx_types:message().
-type exp_msg() :: emqx_types:message() | #mqtt_msg{}.
-type variables() :: #{
mountpoint := undefined | binary(),
remote_topic := binary(),
remote_qos := original | integer(),
-type remote_config() :: #{
topic := binary(),
qos := original | integer(),
retain := original | boolean(),
payload := binary()
}.
-type variables() :: #{
mountpoint := undefined | binary(),
remote := remote_config()
}.
make_pub_vars(_, undefined) ->
undefined;
@ -67,10 +69,12 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
MapMsg = maps:put(retain, Retain0, Columns),
to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{
remote_topic := TopicToken,
payload := PayloadToken,
remote_qos := QoSToken,
retain := RetainToken,
remote := #{
topic := TopicToken,
payload := PayloadToken,
qos := QoSToken,
retain := RetainToken
},
mountpoint := Mountpoint
}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
@ -91,10 +95,12 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
local_topic := TopicToken,
payload := PayloadToken,
local_qos := QoSToken,
retain := RetainToken,
local := #{
topic := TopicToken,
payload := PayloadToken,
qos := QoSToken,
retain := RetainToken
},
mountpoint := Mountpoint
},
Props

View File

@ -28,25 +28,39 @@
desc/1
]).
-export([
ingress_desc/0,
egress_desc/0
]).
-import(emqx_schema, [mk_duration/2]).
-import(hoconsc, [mk/2, ref/2]).
namespace() -> "connector-mqtt".
roots() ->
fields("config").
fields("config") ->
fields("connector") ++
topic_mappings();
fields("connector") ->
fields("server_configs") ++
[
{"ingress",
mk(
hoconsc:union([none, ref(?MODULE, "ingress")]),
#{
default => undefined,
desc => ?DESC("ingress_desc")
}
)},
{"egress",
mk(
hoconsc:union([none, ref(?MODULE, "egress")]),
#{
default => undefined,
desc => ?DESC("egress_desc")
}
)}
];
fields("server_configs") ->
[
{mode,
sc(
mk(
hoconsc:enum([cluster_shareload]),
#{
default => cluster_shareload,
@ -54,7 +68,7 @@ fields("connector") ->
}
)},
{server,
sc(
mk(
emqx_schema:ip_port(),
#{
required => true,
@ -68,7 +82,7 @@ fields("connector") ->
#{default => "15s"}
)},
{proto_ver,
sc(
mk(
hoconsc:enum([v3, v4, v5]),
#{
default => v4,
@ -76,7 +90,7 @@ fields("connector") ->
}
)},
{bridge_mode,
sc(
mk(
boolean(),
#{
default => false,
@ -84,7 +98,7 @@ fields("connector") ->
}
)},
{username,
sc(
mk(
binary(),
#{
default => "emqx",
@ -92,7 +106,7 @@ fields("connector") ->
}
)},
{password,
sc(
mk(
binary(),
#{
default => "emqx",
@ -101,7 +115,7 @@ fields("connector") ->
}
)},
{clean_start,
sc(
mk(
boolean(),
#{
default => true,
@ -116,20 +130,31 @@ fields("connector") ->
#{default => "15s"}
)},
{max_inflight,
sc(
mk(
non_neg_integer(),
#{
default => 32,
desc => ?DESC("max_inflight")
}
)},
{replayq, sc(ref("replayq"), #{})}
)}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
[
{remote_topic,
sc(
{"remote",
mk(
ref(?MODULE, "ingress_remote"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")}
)},
{"local",
mk(
ref(?MODULE, "ingress_local"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")}
)}
];
fields("ingress_remote") ->
[
{topic,
mk(
binary(),
#{
required => true,
@ -137,47 +162,43 @@ fields("ingress") ->
desc => ?DESC("ingress_remote_topic")
}
)},
{remote_qos,
sc(
{qos,
mk(
qos(),
#{
default => 1,
desc => ?DESC("ingress_remote_qos")
}
)},
{local_topic,
sc(
)}
];
fields("ingress_local") ->
[
{topic,
mk(
binary(),
#{
validator => fun emqx_schema:non_empty_string/1,
desc => ?DESC("ingress_local_topic")
}
)},
{local_qos,
sc(
{qos,
mk(
qos(),
#{
default => <<"${qos}">>,
desc => ?DESC("ingress_local_qos")
}
)},
{hookpoint,
sc(
binary(),
#{desc => ?DESC("ingress_hookpoint")}
)},
{retain,
sc(
mk(
hoconsc:union([boolean(), binary()]),
#{
default => <<"${retain}">>,
desc => ?DESC("retain")
}
)},
{payload,
sc(
mk(
binary(),
#{
default => <<"${payload}">>,
@ -186,18 +207,33 @@ fields("ingress") ->
)}
];
fields("egress") ->
%% the message maybe sent from rules, in this case 'local_topic' is not necessary
[
{local_topic,
sc(
{"local",
mk(
ref(?MODULE, "egress_local"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "egress_local")}
)},
{"remote",
mk(
ref(?MODULE, "egress_remote"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote")}
)}
];
fields("egress_local") ->
[
{topic,
mk(
binary(),
#{
desc => ?DESC("egress_local_topic"),
validator => fun emqx_schema:non_empty_string/1
}
)},
{remote_topic,
sc(
)}
];
fields("egress_remote") ->
[
{topic,
mk(
binary(),
#{
required => true,
@ -205,104 +241,48 @@ fields("egress") ->
desc => ?DESC("egress_remote_topic")
}
)},
{remote_qos,
sc(
{qos,
mk(
qos(),
#{
required => true,
desc => ?DESC("egress_remote_qos")
}
)},
{retain,
sc(
mk(
hoconsc:union([boolean(), binary()]),
#{
required => true,
desc => ?DESC("retain")
}
)},
{payload,
sc(
mk(
binary(),
#{
required => true,
desc => ?DESC("payload")
}
)}
];
fields("replayq") ->
[
{dir,
sc(
hoconsc:union([boolean(), string()]),
#{desc => ?DESC("dir")}
)},
{seg_bytes,
sc(
emqx_schema:bytesize(),
#{
default => "100MB",
desc => ?DESC("seg_bytes")
}
)},
{offload,
sc(
boolean(),
#{
default => false,
desc => ?DESC("offload")
}
)}
].
desc("connector") ->
?DESC("desc_connector");
desc("server_configs") ->
?DESC("server_configs");
desc("ingress") ->
ingress_desc();
?DESC("ingress_desc");
desc("ingress_remote") ->
?DESC("ingress_remote");
desc("ingress_local") ->
?DESC("ingress_local");
desc("egress") ->
egress_desc();
desc("replayq") ->
?DESC("desc_replayq");
?DESC("egress_desc");
desc("egress_remote") ->
?DESC("egress_remote");
desc("egress_local") ->
?DESC("egress_local");
desc(_) ->
undefined.
topic_mappings() ->
[
{ingress,
sc(
ref("ingress"),
#{default => #{}}
)},
{egress,
sc(
ref("egress"),
#{default => #{}}
)}
].
ingress_desc() ->
"\n"
"The ingress config defines how this bridge receive messages from the remote MQTT broker, and then\n"
"send them to the local broker.</br>\n"
"Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain',\n"
"'payload'.</br>\n"
"NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is\n"
"configured, then messages got from the remote broker will be sent to both the 'local_topic' and\n"
"the rule.\n".
egress_desc() ->
"\n"
"The egress config defines how this bridge forwards messages from the local broker to the remote\n"
"broker.</br>\n"
"Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>\n"
"NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic\n"
"is configured, then both the data got from the rule and the MQTT messages that matches\n"
"local_topic will be forwarded.\n".
qos() ->
hoconsc:union([emqx_schema:qos(), binary()]).
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -68,7 +68,6 @@
%% APIs
-export([
start_link/1,
register_metrics/0,
stop/1
]).
@ -247,18 +246,22 @@ pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts)
pre_process_in_out(_, undefined) ->
undefined;
pre_process_in_out(in, #{local := LC} = Conf) when is_map(Conf) ->
Conf#{local => pre_process_in_out_common(LC)};
pre_process_in_out(in, Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(local_topic, Conf),
Conf2 = pre_process_conf(local_qos, Conf1),
pre_process_in_out_common(Conf2);
%% have no 'local' field in the config
undefined;
pre_process_in_out(out, #{remote := RC} = Conf) when is_map(Conf) ->
Conf#{remote => pre_process_in_out_common(RC)};
pre_process_in_out(out, Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(remote_topic, Conf),
Conf2 = pre_process_conf(remote_qos, Conf1),
pre_process_in_out_common(Conf2).
%% have no 'remote' field in the config
undefined.
pre_process_in_out_common(Conf) ->
Conf1 = pre_process_conf(payload, Conf),
pre_process_conf(retain, Conf1).
pre_process_in_out_common(Conf0) ->
Conf1 = pre_process_conf(topic, Conf0),
Conf2 = pre_process_conf(qos, Conf1),
Conf3 = pre_process_conf(payload, Conf2),
pre_process_conf(retain, Conf3).
pre_process_conf(Key, Conf) ->
case maps:find(Key, Conf) of
@ -450,7 +453,6 @@ do_send(
) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
ExportMsg = fun(Message) ->
emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
end,
?SLOG(debug, #{
@ -551,15 +553,6 @@ format_mountpoint(Prefix) ->
name(Id) -> list_to_atom(str(Id)).
register_metrics() ->
lists:foreach(
fun emqx_metrics:ensure/1,
[
'bridge.mqtt.message_sent_to_remote',
'bridge.mqtt.message_received_from_remote'
]
).
obfuscate(Map) ->
maps:fold(
fun(K, V, Acc) ->

View File

@ -1,94 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(MQTT_CONNECTOR(Username), #{
<<"server">> => <<"127.0.0.1:1883">>,
<<"username">> => Username,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v4">>,
<<"ssl">> => #{<<"enable">> => false}
}).
-define(CONNECTOR_TYPE, <<"mqtt">>).
-define(CONNECTOR_NAME, <<"test_connector_42">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
suite() ->
[].
init_per_suite(Config) ->
_ = application:load(emqx_conf),
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps(
[
emqx_connector,
emqx_bridge
]
),
ok = emqx_common_test_helpers:load_config(emqx_connector_schema, <<"connectors: {}">>),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([
emqx_connector,
emqx_bridge
]),
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
Config.
end_per_testcase(_, _Config) ->
ok.
t_list_raw_empty(_) ->
ok = emqx_config:erase(hd(emqx_connector:config_key_path())),
Result = emqx_connector:list_raw(),
?assertEqual([], Result).
t_lookup_raw_error(_) ->
Result = emqx_connector:lookup_raw(<<"foo:bar">>),
?assertEqual({error, not_found}, Result).
t_parse_connector_id_error(_) ->
?assertError(
{invalid_connector_id, <<"foobar">>}, emqx_connector:parse_connector_id(<<"foobar">>)
).
t_update_connector_does_not_exist(_) ->
Config = ?MQTT_CONNECTOR(<<"user1">>),
?assertMatch({ok, _Config}, emqx_connector:update(?CONNECTOR_TYPE, ?CONNECTOR_NAME, Config)).
t_delete_connector_does_not_exist(_) ->
?assertEqual({ok, #{post_config_update => #{}}}, emqx_connector:delete(<<"foo:bar">>)).
t_connector_id_using_list(_) ->
<<"foo:bar">> = emqx_connector:connector_id("foo", "bar").

View File

@ -1,812 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
-include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_dashboard/include/emqx_dashboard.hrl").
%% output functions
-export([inspect/3]).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>).
-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>).
-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>).
-define(MQTT_CONNECTOR(Username), #{
<<"server">> => <<"127.0.0.1:1883">>,
<<"username">> => Username,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v4">>,
<<"ssl">> => #{<<"enable">> => false}
}).
-define(MQTT_CONNECTOR2(Server), ?MQTT_CONNECTOR(<<"user1">>)#{<<"server">> => Server}).
-define(MQTT_BRIDGE_INGRESS(ID), #{
<<"connector">> => ID,
<<"direction">> => <<"ingress">>,
<<"remote_topic">> => <<"remote_topic/#">>,
<<"remote_qos">> => 2,
<<"local_topic">> => <<"local_topic/${topic}">>,
<<"local_qos">> => <<"${qos}">>,
<<"payload">> => <<"${payload}">>,
<<"retain">> => <<"${retain}">>
}).
-define(MQTT_BRIDGE_EGRESS(ID), #{
<<"connector">> => ID,
<<"direction">> => <<"egress">>,
<<"local_topic">> => <<"local_topic/#">>,
<<"remote_topic">> => <<"remote_topic/${topic}">>,
<<"payload">> => <<"${payload}">>,
<<"remote_qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}).
-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{
<<"matched">> := MATCH,
<<"success">> := SUCC,
<<"failed">> := FAILED,
<<"rate">> := SPEED,
<<"rate_last5m">> := SPEED5M,
<<"rate_max">> := SPEEDMAX
}).
inspect(Selected, _Envs, _Args) ->
persistent_term:put(?MODULE, #{inspect => Selected}).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
suite() ->
[{timetrap, {seconds, 30}}].
init_per_suite(Config) ->
_ = application:load(emqx_conf),
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps(
[
emqx_rule_engine,
emqx_connector,
emqx_bridge,
emqx_dashboard
],
fun set_special_configs/1
),
ok = emqx_common_test_helpers:load_config(emqx_connector_schema, <<"connectors: {}">>),
ok = emqx_common_test_helpers:load_config(
emqx_rule_engine_schema,
<<"rule_engine {rules {}}">>
),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([
emqx_rule_engine,
emqx_connector,
emqx_bridge,
emqx_dashboard
]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(<<"connector_admin">>);
set_special_configs(_) ->
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
ok.
clear_resources() ->
lists:foreach(
fun(#{id := Id}) ->
ok = emqx_rule_engine:delete_rule(Id)
end,
emqx_rule_engine:get_rules()
),
lists:foreach(
fun(#{type := Type, name := Name}) ->
{ok, _} = emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
),
lists:foreach(
fun(#{<<"type">> := Type, <<"name">> := Name}) ->
{ok, _} = emqx_connector:delete(Type, Name)
end,
emqx_connector:list_raw()
).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_mqtt_crud_apis(_) ->
%% assert we there's no connectors at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
%% then we add a mqtt connector, using POST
%% POST /connectors/ will create a connector
User1 = <<"user1">>,
{ok, 400, <<
"{\"code\":\"BAD_REQUEST\",\"message\""
":\"missing some required fields: [name, type]\"}"
>>} =
request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR(User1)#{<<"type">> => ?CONNECTR_TYPE}
),
{ok, 201, Connector} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR(User1)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?CONNECTR_NAME,
<<"server">> := <<"127.0.0.1:1883">>,
<<"username">> := User1,
<<"password">> := <<"">>,
<<"proto_ver">> := <<"v4">>,
<<"ssl">> := #{<<"enable">> := false}
} = jsx:decode(Connector),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
%% update the request-path of the connector
User2 = <<"user2">>,
{ok, 200, Connector2} = request(
put,
uri(["connectors", ConnctorID]),
?MQTT_CONNECTOR(User2)
),
?assertMatch(
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?CONNECTR_NAME,
<<"server">> := <<"127.0.0.1:1883">>,
<<"username">> := User2,
<<"password">> := <<"">>,
<<"proto_ver">> := <<"v4">>,
<<"ssl">> := #{<<"enable">> := false}
},
jsx:decode(Connector2)
),
%% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
?assertMatch(
[
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?CONNECTR_NAME,
<<"server">> := <<"127.0.0.1:1883">>,
<<"username">> := User2,
<<"password">> := <<"">>,
<<"proto_ver">> := <<"v4">>,
<<"ssl">> := #{<<"enable">> := false}
}
],
jsx:decode(Connector2Str)
),
%% get the connector by id
{ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []),
?assertMatch(
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?CONNECTR_NAME,
<<"server">> := <<"127.0.0.1:1883">>,
<<"username">> := User2,
<<"password">> := <<"">>,
<<"proto_ver">> := <<"v4">>,
<<"ssl">> := #{<<"enable">> := false}
},
jsx:decode(Connector3Str)
),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
%% update a deleted connector returns an error
{ok, 404, ErrMsg2} = request(
put,
uri(["connectors", ConnctorID]),
?MQTT_CONNECTOR(User2)
),
?assertMatch(
#{
<<"code">> := _,
<<"message">> := <<"connector not found">>
},
jsx:decode(ErrMsg2)
),
ok.
t_mqtt_conn_bridge_ingress(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR(User1)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?CONNECTR_NAME,
<<"server">> := <<"127.0.0.1:1883">>,
<<"num_of_bridges">> := 0,
<<"username">> := User1,
<<"password">> := <<"">>,
<<"proto_ver">> := <<"v4">>,
<<"ssl">> := #{<<"enable">> := false}
} = jsx:decode(Connector),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
timer:sleep(50),
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS
}
),
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?BRIDGE_NAME_INGRESS,
<<"connector">> := ConnctorID
} = jsx:decode(Bridge),
BridgeIDIngress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS),
wait_for_resource_ready(BridgeIDIngress, 5),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
timer:sleep(100),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% get the connector by id, verify the num_of_bridges now is 1
{ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []),
?assertMatch(#{<<"num_of_bridges">> := 1}, jsx:decode(Connector1Str)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok.
t_mqtt_conn_bridge_egress(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR(User1)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
%ct:pal("---connector: ~p", [Connector]),
#{
<<"server">> := <<"127.0.0.1:1883">>,
<<"username">> := User1,
<<"password">> := <<"">>,
<<"proto_ver">> := <<"v4">>,
<<"ssl">> := #{<<"enable">> := false}
} = jsx:decode(Connector),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}
),
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?BRIDGE_NAME_EGRESS,
<<"connector">> := ConnctorID
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
wait_for_resource_ready(BridgeIDEgress, 5),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"metrics">> := ?metrics(1, 1, 0, _, _, _),
<<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
},
jsx:decode(BridgeStr)
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok.
%% t_mqtt_conn_update:
%% - update a connector should also update all of the the bridges
%% - cannot delete a connector that is used by at least one bridge
t_mqtt_conn_update(_) ->
%% then we add a mqtt connector, using POST
{ok, 201, Connector} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
%ct:pal("---connector: ~p", [Connector]),
#{<<"server">> := <<"127.0.0.1:1883">>} = jsx:decode(Connector),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}
),
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?BRIDGE_NAME_EGRESS,
<<"connector">> := ConnctorID
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
wait_for_resource_ready(BridgeIDEgress, 5),
%% Then we try to update 'server' of the connector, to an unavailable IP address
%% The update OK, we recreate the resource even if the resource is current connected,
%% and the target resource we're going to update is unavailable.
{ok, 200, _} = request(
put,
uri(["connectors", ConnctorID]),
?MQTT_CONNECTOR2(<<"127.0.0.1:2603">>)
),
%% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(
put,
uri(["connectors", ConnctorID]),
?MQTT_CONNECTOR2(<<"127.0.0.1 : 1883">>)
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update2(_) ->
%% then we add a mqtt connector, using POST
%% but this connector is point to a unreachable server "2603"
{ok, 201, Connector} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR2(<<"127.0.0.1:2603">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
#{<<"server">> := <<"127.0.0.1:2603">>} = jsx:decode(Connector),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}
),
#{
<<"type">> := ?CONNECTR_TYPE,
<<"name">> := ?BRIDGE_NAME_EGRESS,
<<"status">> := <<"disconnected">>,
<<"connector">> := ConnctorID
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
%% We try to fix the 'server' parameter, to another unavailable server..
%% The update should success: we don't check the connectivity of the new config
%% if the resource is now disconnected.
{ok, 200, _} = request(
put,
uri(["connectors", ConnctorID]),
?MQTT_CONNECTOR2(<<"127.0.0.1:2604">>)
),
%% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(
put,
uri(["connectors", ConnctorID]),
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)
),
wait_for_resource_ready(BridgeIDEgress, 5),
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(BridgeStr)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update3(_) ->
%% we add a mqtt connector, using POST
{ok, 201, _} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}
),
#{<<"connector">> := ConnctorID} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
wait_for_resource_ready(BridgeIDEgress, 5),
%% delete the connector should fail because it is in use by a bridge
{ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
%% the connector now can be deleted without problems
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
t_mqtt_conn_testing(_) ->
%% APIs for testing the connectivity
%% then we add a mqtt connector, using POST
{ok, 204, <<>>} = request(
post,
uri(["connectors_test"]),
?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}
),
{ok, 400, _} = request(
post,
uri(["connectors_test"]),
?MQTT_CONNECTOR2(<<"127.0.0.1:2883">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}
).
t_ingress_mqtt_bridge_with_rules(_) ->
{ok, 201, _} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR(<<"user1">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS
}
),
BridgeIDIngress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
<<"enable">> => true,
<<"actions">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
}
),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
timer:sleep(100),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDIngress, 5),
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% and also the rule should be matched, with matched + 1:
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{
<<"id">> := RuleId,
<<"metrics">> := #{
<<"matched">> := 1,
<<"passed">> := 1,
<<"failed">> := 0,
<<"failed.exception">> := 0,
<<"failed.no_result">> := 0,
<<"matched.rate">> := _,
<<"matched.rate.max">> := _,
<<"matched.rate.last5m">> := _,
<<"actions.total">> := 1,
<<"actions.success">> := 1,
<<"actions.failed">> := 0,
<<"actions.failed.out_of_service">> := 0,
<<"actions.failed.unknown">> := 0
}
} = jsx:decode(Rule1),
%% we also check if the actions of the rule is triggered
?assertMatch(
#{
inspect := #{
event := <<"$bridges/mqtt", _/binary>>,
id := MsgId,
payload := Payload,
topic := RemoteTopic,
qos := 0,
dup := false,
retain := false,
pub_props := #{},
timestamp := _
}
} when is_binary(MsgId),
persistent_term:get(?MODULE)
),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
t_egress_mqtt_bridge_with_rules(_) ->
{ok, 201, _} = request(
post,
uri(["connectors"]),
?MQTT_CONNECTOR(<<"user1">>)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?CONNECTR_NAME
}
),
ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME),
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}
),
#{<<"type">> := ?CONNECTR_TYPE, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>,
<<"enable">> => true,
<<"actions">> => [BridgeIDEgress],
<<"sql">> => <<"SELECT * from \"t/1\"">>
}
),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDEgress, 5),
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
emqx:unsubscribe(RemoteTopic),
%% PUBLISH a message to the rule.
Payload2 = <<"hi">>,
RuleTopic = <<"t/1">>,
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
emqx:subscribe(RemoteTopic2),
timer:sleep(100),
wait_for_resource_ready(BridgeIDEgress, 5),
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{
<<"id">> := RuleId,
<<"metrics">> := #{
<<"matched">> := 1,
<<"passed">> := 1,
<<"failed">> := 0,
<<"failed.exception">> := 0,
<<"failed.no_result">> := 0,
<<"matched.rate">> := _,
<<"matched.rate.max">> := _,
<<"matched.rate.last5m">> := _,
<<"actions.total">> := 1,
<<"actions.success">> := 1,
<<"actions.failed">> := 0,
<<"actions.failed.out_of_service">> := 0,
<<"actions.failed.unknown">> := 0
}
} = jsx:decode(Rule1),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic2, #message{payload = Payload2}} ->
ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end
),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"metrics">> := ?metrics(2, 2, 0, _, _, _),
<<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
},
jsx:decode(BridgeStr)
),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
request(Method, Url, Body) ->
request(<<"connector_admin">>, Method, Url, Body).
wait_for_resource_ready(InstId, 0) ->
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
ct:fail(wait_resource_timeout);
wait_for_resource_ready(InstId, Retry) ->
case emqx_bridge:lookup(InstId) of
{ok, #{resource_data := #{status := connected}}} ->
ok;
_ ->
timer:sleep(100),
wait_for_resource_ready(InstId, Retry - 1)
end.

View File

@ -36,7 +36,8 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?MONGO_HOST, ?MONGO_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
Config;
false ->
{skip, no_mongo}
@ -44,7 +45,8 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource, emqx_connector]).
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.

View File

@ -45,22 +45,6 @@ send(SendFun, Batch) when is_function(SendFun, 2) ->
stop(_Pid) -> ok.
%% bridge worker should retry connecting remote node indefinitely
% reconnect_test() ->
% emqx_metrics:start_link(),
% emqx_connector_mqtt_worker:register_metrics(),
% Ref = make_ref(),
% Config = make_config(Ref, self(), {error, test}),
% {ok, Pid} = emqx_connector_mqtt_worker:start_link(?BRIDGE_NAME, Config),
% %% assert name registered
% ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
% ?WAIT({connection_start_attempt, Ref}, 1000),
% %% expect same message again
% ?WAIT({connection_start_attempt, Ref}, 1000),
% ok = emqx_connector_mqtt_worker:stop(?BRIDGE_REG_NAME),
% emqx_metrics:stop(),
% ok.
%% connect first, disconnect, then connect again
disturbance_test() ->
meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]),
@ -69,7 +53,6 @@ disturbance_test() ->
meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end),
try
emqx_metrics:start_link(),
emqx_connector_mqtt_worker:register_metrics(),
Ref = make_ref(),
TestPid = self(),
Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}),
@ -84,36 +67,6 @@ disturbance_test() ->
meck:unload(emqx_connector_mqtt_mod)
end.
% % %% buffer should continue taking in messages when disconnected
% buffer_when_disconnected_test_() ->
% {timeout, 10000, fun test_buffer_when_disconnected/0}.
% test_buffer_when_disconnected() ->
% Ref = make_ref(),
% Nums = lists:seq(1, 100),
% Sender = spawn_link(fun() -> receive {bridge, Pid} -> sender_loop(Pid, Nums, _Interval = 5) end end),
% SenderMref = monitor(process, Sender),
% Receiver = spawn_link(fun() -> receive {bridge, Pid} -> receiver_loop(Pid, Nums, _Interval = 1) end end),
% ReceiverMref = monitor(process, Receiver),
% SendFun = fun(Batch) ->
% BatchRef = make_ref(),
% Receiver ! {batch, BatchRef, Batch},
% {ok, BatchRef}
% end,
% Config0 = make_config(Ref, false, {ok, #{client_pid => undefined}}),
% Config = Config0#{reconnect_delay_ms => 100},
% emqx_metrics:start_link(),
% emqx_connector_mqtt_worker:register_metrics(),
% {ok, Pid} = emqx_connector_mqtt_worker:start_link(?BRIDGE_NAME, Config),
% Sender ! {bridge, Pid},
% Receiver ! {bridge, Pid},
% ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)),
% Pid ! {disconnected, Ref, test},
% ?WAIT({'DOWN', SenderMref, process, Sender, normal}, 5000),
% ?WAIT({'DOWN', ReceiverMref, process, Receiver, normal}, 1000),
% ok = emqx_connector_mqtt_worker:stop(?BRIDGE_REG_NAME),
% emqx_metrics:stop().
manual_start_stop_test() ->
meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]),
meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end),
@ -121,7 +74,6 @@ manual_start_stop_test() ->
meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end),
try
emqx_metrics:start_link(),
emqx_connector_mqtt_worker:register_metrics(),
Ref = make_ref(),
TestPid = self(),
BridgeName = manual_start_stop,

View File

@ -36,7 +36,8 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
Config;
false ->
{skip, no_mysql}
@ -44,7 +45,8 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource, emqx_connector]).
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.

View File

@ -36,7 +36,8 @@ init_per_suite(Config) ->
case emqx_common_test_helpers:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_DEFAULT_PORT) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
Config;
false ->
{skip, no_pgsql}
@ -44,7 +45,8 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource, emqx_connector]).
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.

View File

@ -46,14 +46,16 @@ init_per_suite(Config) ->
of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
Config;
false ->
{skip, no_redis}
end.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_connector]).
ok = emqx_common_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.

View File

@ -47,7 +47,6 @@ init_per_suite(Config) ->
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_connector,
emqx_gateway,
emqx_statsd,
emqx_resource,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"},
{vsn, "4.3.2"},
{vsn, "4.3.3"},
{modules, []},
{applications, [kernel, stdlib]},
{env, []}

View File

@ -7,7 +7,7 @@
-export([
api_schemas/1,
conn_bridge_examples/1,
examples/1,
resource_type/1,
fields/1
]).
@ -28,7 +28,7 @@ schema_modules() ->
emqx_ee_bridge_mysql
].
conn_bridge_examples(Method) ->
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)

View File

@ -1,57 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
fields/1,
connector_examples/1,
api_schemas/1
]).
api_schemas(Method) ->
[
ref(emqx_ee_connector_hstreamdb, Method),
ref(emqx_ee_connector_influxdb, "udp_" ++ Method),
ref(emqx_ee_connector_influxdb, "api_v1_" ++ Method),
ref(emqx_ee_connector_influxdb, "api_v2_" ++ Method)
].
fields(connectors) ->
[
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_ee_connector_hstreamdb, config)),
#{desc => <<"EMQX Enterprise Config">>}
)}
] ++ fields(influxdb);
fields(influxdb) ->
[
{
Protocol,
mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{
desc => <<"EMQX Enterprise Config">>
})
}
|| Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2]
].
connector_examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() ->
[
emqx_ee_connector_hstreamdb,
emqx_ee_connector_influxdb
].