refactor: new config structure for mqtt bridge

This commit is contained in:
Shawn 2022-08-22 12:59:51 +08:00
parent 4ad04b646f
commit aea8c77b49
26 changed files with 378 additions and 1301 deletions

View File

@ -1,16 +1,4 @@
emqx_bridge_mqtt_schema {
desc_rec {
desc {
en: """Configuration for MQTT bridge."""
zh: """MQTT Bridge 配置"""
}
label: {
en: "MQTT Bridge Configuration"
zh: "MQTT Bridge 配置"
}
}
desc_type {
desc {
en: """The bridge type."""

View File

@ -11,24 +11,6 @@ emqx_bridge_schema {
}
}
desc_connector {
desc {
en: """
The ID or the configs of the connector to be used for this bridge. Connector IDs must be of format:
<code>{type}:{name}</code>.</br>
In config files, you can find the corresponding config entry for a connector by such path:
'connectors.{type}.{name}'.</br>
"""
zh: """
Bridge 使用的 Connector 的 ID 或者配置。Connector ID 的格式必须为:<code>{type}:{name}</code>.</br>
在配置文件中,您可以通过以下路径找到 Connector 的相应配置条目:'connector.{type}.{name}'。</br>"""
}
label: {
en: "Connector ID"
zh: "Connector ID"
}
}
desc_metrics {
desc {
en: """The metrics of the bridge"""
@ -85,7 +67,7 @@ Bridge 使用的 Connector 的 ID 或者配置。Connector ID 的格式必须为
}
bridges_name {
bridges_mqtt {
desc {
en: """MQTT bridges to/from another MQTT broker"""
zh: """桥接到另一个 MQTT Broker 的 MQTT Bridge"""

View File

@ -11,17 +11,6 @@ emqx_bridge_webhook_schema {
}
}
config_direction {
desc {
en: """The direction of this bridge, MUST be 'egress'"""
zh: """Bridge 的方向, 必须是 egress"""
}
label: {
en: "Bridge Direction"
zh: "Bridge 方向"
}
}
config_url {
desc {
en: """

View File

@ -37,8 +37,7 @@
create/3,
disable_enable/3,
remove/2,
list/0,
list_bridges_by_connector/1
list/0
]).
-export([send_message/2]).
@ -48,6 +47,8 @@
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
-define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql).
load() ->
Bridges = emqx:get_config([bridges], #{}),
lists:foreach(
@ -93,10 +94,10 @@ load_hook() ->
load_hook(Bridges) ->
lists:foreach(
fun({_Type, Bridge}) ->
fun({Type, Bridge}) ->
lists:foreach(
fun({_Name, BridgeConf}) ->
do_load_hook(BridgeConf)
do_load_hook(Type, BridgeConf)
end,
maps:to_list(Bridge)
)
@ -104,12 +105,13 @@ load_hook(Bridges) ->
maps:to_list(Bridges)
).
do_load_hook(#{local_topic := _} = Conf) ->
case maps:get(direction, Conf, egress) of
egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
ingress -> ok
end;
do_load_hook(_Conf) ->
do_load_hook(Type, #{local_topic := _}) when ?EGRESS_DIR_BRIDGES(Type) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(mqtt, #{egress := #{local := #{topic := _}}}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(kafka, #{producer := #{mqtt := #{topic := _}}}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
do_load_hook(_Type, _Conf) ->
ok.
unload_hook() ->
@ -197,13 +199,6 @@ list() ->
maps:to_list(emqx:get_raw_config([bridges], #{}))
).
list_bridges_by_connector(ConnectorId) ->
[
B
|| B = #{raw_config := #{<<"connector">> := Id}} <- list(),
ConnectorId =:= Id
].
lookup(Id) ->
{Type, Name} = emqx_bridge_resource:parse_bridge_id(Id),
lookup(Type, Name).
@ -303,13 +298,8 @@ get_matched_bridges(Topic) ->
maps:fold(
fun(BType, Conf, Acc0) ->
maps:fold(
fun
%% Confs for MQTT, Kafka bridges have the `direction` flag
(_BName, #{direction := ingress}, Acc1) ->
Acc1;
(BName, #{direction := egress} = Egress, Acc1) ->
%% WebHook, MySQL bridges only have egress direction
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1)
fun(BName, BConf, Acc1) ->
get_matched_bridge_id(BType, BConf, Topic, BName, Acc1)
end,
Acc0,
Conf
@ -319,9 +309,18 @@ get_matched_bridges(Topic) ->
Bridges
).
get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
Acc;
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when
?EGRESS_DIR_BRIDGES(BType)
->
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc);
get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
get_matched_bridge_id(kafka, #{producer := #{mqtt := #{topic := Filter}}}, Topic, BName, Acc) ->
do_get_matched_bridge_id(Topic, Filter, kafka, BName, Acc).
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [emqx_bridge_resource:bridge_id(BType, BName) | Acc];
false -> Acc

View File

@ -42,8 +42,6 @@
-export([lookup_from_local_node/2]).
-define(CONN_TYPES, [mqtt]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge_resource:parse_bridge_id(Id) of
{BridgeType, BridgeName} ->
@ -146,7 +144,7 @@ param_path_id() ->
#{
in => path,
required => true,
example => <<"webhook:my_webhook">>,
example => <<"webhook:webhook_example">>,
desc => ?DESC("desc_param_path_id")
}
)}.
@ -155,66 +153,45 @@ bridge_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
bridge_info_examples(Method) ->
maps:merge(conn_bridge_examples(Method), #{
<<"my_webhook">> => #{
summary => <<"WebHook">>,
value => info_example(webhook, awesome, Method)
}
}).
conn_bridge_examples(Method) ->
Fun =
fun(Type, Acc) ->
SType = atom_to_list(Type),
KeyIngress = bin(SType ++ "_ingress"),
KeyEgress = bin(SType ++ "_egress"),
maps:merge(Acc, #{
KeyIngress => #{
summary => bin(string:uppercase(SType) ++ " Ingress Bridge"),
value => info_example(Type, ingress, Method)
},
KeyEgress => #{
summary => bin(string:uppercase(SType) ++ " Egress Bridge"),
value => info_example(Type, egress, Method)
}
})
end,
Broker = lists:foldl(Fun, #{}, ?CONN_TYPES),
EE = ee_conn_bridge_examples(Method),
maps:merge(Broker, EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_conn_bridge_examples(Method) ->
emqx_ee_bridge:conn_bridge_examples(Method).
-else.
ee_conn_bridge_examples(_Method) ->
#{}.
-endif.
info_example(Type, Direction, Method) ->
maps:merge(
info_example_basic(Type, Direction),
method_example(Type, Direction, Method)
#{
<<"webhook_example">> => #{
summary => <<"WebHook">>,
value => info_example(webhook, Method)
},
<<"mqtt_example">> => #{
summary => <<"MQTT Bridge">>,
value => info_example(mqtt, Method)
}
},
ee_bridge_examples(Method)
).
method_example(Type, Direction, Method) when Method == get; Method == post ->
ee_bridge_examples(Method) ->
case erlang:function_exported(emqx_ee_bridge, examples, 1) of
true -> emqx_ee_bridge:examples(Method);
false -> #{}
end.
info_example(Type, Method) ->
maps:merge(
info_example_basic(Type),
method_example(Type, Method)
).
method_example(Type, Method) when Method == get; Method == post ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
SName =
case Type of
webhook -> "my_" ++ SType;
_ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
end,
TypeNameExamp = #{
SName = SType ++ "_example",
TypeNameExam = #{
type => bin(SType),
name => bin(SName)
},
maybe_with_metrics_example(TypeNameExamp, Method);
method_example(_Type, _Direction, put) ->
maybe_with_metrics_example(TypeNameExam, Method);
method_example(_Type, put) ->
#{}.
maybe_with_metrics_example(TypeNameExamp, get) ->
TypeNameExamp#{
maybe_with_metrics_example(TypeNameExam, get) ->
TypeNameExam#{
metrics => ?METRICS(0, 0, 0, 0, 0, 0),
node_metrics => [
#{
@ -223,10 +200,10 @@ maybe_with_metrics_example(TypeNameExamp, get) ->
}
]
};
maybe_with_metrics_example(TypeNameExamp, _) ->
TypeNameExamp.
maybe_with_metrics_example(TypeNameExam, _) ->
TypeNameExam.
info_example_basic(webhook, _) ->
info_example_basic(webhook) ->
#{
enable => true,
url => <<"http://localhost:9901/messages/${topic}">>,
@ -241,28 +218,52 @@ info_example_basic(webhook, _) ->
method => post,
body => <<"${payload}">>
};
info_example_basic(mqtt, ingress) ->
info_example_basic(mqtt) ->
(mqtt_main_example())#{
egress => mqtt_egress_example(),
ingress => mqtt_ingress_example()
}.
mqtt_main_example() ->
#{
enable => true,
connector => <<"mqtt:my_mqtt_connector">>,
direction => ingress,
remote_topic => <<"aws/#">>,
remote_qos => 1,
local_topic => <<"from_aws/${topic}">>,
local_qos => <<"${qos}">>,
payload => <<"${payload}">>,
retain => <<"${retain}">>
};
info_example_basic(mqtt, egress) ->
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
proto_ver => <<"v4">>,
username => <<"foo">>,
password => <<"bar">>,
clean_start => true,
keepalive => <<"300s">>,
retry_interval => <<"15s">>,
max_inflight => 100,
ssl => #{
enable => false
}
}.
mqtt_egress_example() ->
#{
enable => true,
connector => <<"mqtt:my_mqtt_connector">>,
direction => egress,
local_topic => <<"emqx/#">>,
remote_topic => <<"from_emqx/${topic}">>,
remote_qos => <<"${qos}">>,
local => #{
topic => <<"emqx/#">>
},
remote => #{
topic => <<"from_emqx/${topic}">>,
qos => <<"${qos}">>,
payload => <<"${payload}">>,
retain => false
}
}.
mqtt_ingress_example() ->
#{
remote => #{
topic => <<"aws/#">>,
qos => 1
},
local => #{
topic => <<"from_aws/${topic}">>,
qos => <<"${qos}">>,
payload => <<"${payload}">>,
retain => <<"${retain}">>
}
}.
schema("/bridges") ->

View File

@ -45,7 +45,6 @@ stop(_State) ->
-if(?EMQX_RELEASE_EDITION == ee).
start_ee_apps() ->
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
ok.
-else.
start_ee_apps() ->

View File

@ -43,6 +43,9 @@
reset_metrics/1
]).
%% bi-directional bridge with producer/consumer or ingress/egress configs
-define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>).
-if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
@ -102,7 +105,7 @@ create(Type, Name, Conf, Opts) ->
resource_id(Type, Name),
<<"emqx_bridge">>,
bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf),
parse_confs(bin(Type), Name, Conf),
Opts
),
maybe_disable_bridge(Type, Name, Conf).
@ -168,16 +171,13 @@ recreate(Type, Name, Conf, Opts) ->
emqx_resource:recreate_local(
resource_id(Type, Name),
bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf),
parse_confs(bin(Type), Name, Conf),
Opts
).
create_dry_run(Type, Conf) ->
Conf0 = fill_dry_run_conf(Conf),
case emqx_resource:check_config(bridge_to_resource_type(Type), Conf0) of
{ok, Conf1} ->
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
case emqx_connector_ssl:convert_certs(TmpPath, Conf1) of
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
{error, Reason} ->
{error, Reason};
{ok, ConfNew} ->
@ -186,9 +186,6 @@ create_dry_run(Type, Conf) ->
),
_ = maybe_clear_certs(TmpPath, ConfNew),
Res
end;
{error, _} = Error ->
Error
end.
remove(BridgeId) ->
@ -213,19 +210,6 @@ maybe_disable_bridge(Type, Name, Conf) ->
true -> ok
end.
fill_dry_run_conf(Conf) ->
Conf#{
<<"egress">> =>
#{
<<"remote_topic">> => <<"t">>,
<<"remote_qos">> => 0,
<<"retain">> => true,
<<"payload">> => <<"val">>
},
<<"ingress">> =>
#{<<"remote_topic">> => <<"t">>}
}.
maybe_clear_certs(TmpPath, #{ssl := SslConf} = Conf) ->
%% don't remove the cert files if they are in use
case is_tmp_path_conf(TmpPath, SslConf) of
@ -245,8 +229,9 @@ is_tmp_path_conf(_TmpPath, _Conf) ->
is_tmp_path(TmpPath, File) ->
string:str(str(File), str(TmpPath)) > 0.
%% convert bridge configs to what the connector modules want
parse_confs(
Type,
<<"webhook">>,
_Name,
#{
url := Url,
@ -256,7 +241,7 @@ parse_confs(
request_timeout := ReqTimeout,
max_retries := Retry
} = Conf
) when Type == webhook orelse Type == <<"webhook">> ->
) ->
{BaseUrl, Path} = parse_url(Url),
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
Conf#{
@ -271,42 +256,14 @@ parse_confs(
max_retries => Retry
}
};
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when
is_binary(ConnId)
->
case emqx_connector:parse_connector_id(ConnId) of
{Type, ConnName} ->
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
make_resource_confs(
Direction,
ConnectorConfs,
maps:without([connector, direction], Conf),
Type,
Name
);
{_ConnType, _ConnName} ->
error({cannot_use_connector_with_different_type, ConnId})
end;
parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when
is_map(ConnectorConfs)
->
make_resource_confs(
Direction,
ConnectorConfs,
maps:without([connector, direction], Conf),
Type,
Name
).
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) ->
%% For some drivers that can be used as data-sources, we need to provide a
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
%% receives a message from the external database.
BName = bridge_id(Type, Name),
ConnectorConfs#{
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
};
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
ConnectorConfs#{
egress => BridgeConf
}.
Conf#{hookpoint => <<"$bridges/", BName/binary>>};
parse_confs(_Type, _Name, Conf) ->
Conf.
parse_url(Url) ->
case string:split(Url, "//", leading) of

View File

@ -3,43 +3,28 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2]).
-import(hoconsc, [mk/2, ref/2]).
-export([roots/0, fields/1, desc/1]).
-export([roots/0, fields/1, desc/1, namespace/0]).
%%======================================================================================
%% Hocon Schema Definitions
namespace() -> "bridge_mqtt".
roots() -> [].
fields("ingress") ->
[emqx_bridge_schema:direction_field(ingress, emqx_connector_mqtt_schema:ingress_desc())] ++
emqx_bridge_schema:common_bridge_fields(mqtt_connector_ref()) ++
proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress") ->
[emqx_bridge_schema:direction_field(egress, emqx_connector_mqtt_schema:egress_desc())] ++
emqx_bridge_schema:common_bridge_fields(mqtt_connector_ref()) ++
emqx_connector_mqtt_schema:fields("egress");
fields("post_ingress") ->
fields("config") ->
emqx_bridge_schema:common_bridge_fields() ++
emqx_connector_mqtt_schema:fields("config");
fields("post") ->
[
type_field(),
name_field()
] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") ->
[
type_field(),
name_field()
] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") ->
proplists:delete(enable, fields("ingress"));
fields("put_egress") ->
proplists:delete(enable, fields("egress"));
fields("get_ingress") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post_ingress");
fields("get_egress") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post_egress").
] ++ emqx_connector_mqtt_schema:fields("config");
fields("put") ->
emqx_connector_mqtt_schema:fields("config");
fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("config").
desc(Rec) when Rec =:= "ingress"; Rec =:= "egress" ->
?DESC("desc_rec");
desc(_) ->
undefined.
@ -63,6 +48,3 @@ name_field() ->
desc => ?DESC("desc_name")
}
)}.
mqtt_connector_ref() ->
?R_REF(emqx_connector_mqtt_schema, "connector").

View File

@ -14,16 +14,13 @@
]).
-export([
common_bridge_fields/1,
metrics_status_fields/0,
direction_field/2
common_bridge_fields/0,
metrics_status_fields/0
]).
%%======================================================================================
%% Hocon Schema Definitions
-define(CONN_TYPES, [mqtt]).
%%======================================================================================
%% For HTTP APIs
get_response() ->
@ -36,34 +33,26 @@ post_request() ->
api_schema("post").
api_schema(Method) ->
Broker =
lists:flatmap(
fun(Type) ->
[
ref(schema_mod(Type), Method ++ "_ingress"),
ref(schema_mod(Type), Method ++ "_egress")
]
end,
?CONN_TYPES
) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]],
Broker = [
ref(Mod, Method)
|| Mod <- [emqx_bridge_webhook_schema, emqx_bridge_mqtt_schema]
],
EE = ee_api_schemas(Method),
hoconsc:union(Broker ++ EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_api_schemas(Method) ->
emqx_ee_bridge:api_schemas(Method).
case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of
true -> emqx_ee_bridge:api_schemas(Method);
false -> []
end.
ee_fields_bridges() ->
emqx_ee_bridge:fields(bridges).
-else.
ee_api_schemas(_) ->
[].
case erlang:function_exported(emqx_ee_bridge, fields, 1) of
true -> emqx_ee_bridge:fields(bridges);
false -> []
end.
ee_fields_bridges() ->
[].
-endif.
common_bridge_fields(ConnectorRef) ->
common_bridge_fields() ->
[
{enable,
mk(
@ -72,15 +61,6 @@ common_bridge_fields(ConnectorRef) ->
desc => ?DESC("desc_enable"),
default => true
}
)},
{connector,
mk(
hoconsc:union([binary(), ConnectorRef]),
#{
required => true,
example => <<"mqtt:my_mqtt_connector">>,
desc => ?DESC("desc_connector")
}
)}
].
@ -100,18 +80,6 @@ metrics_status_fields() ->
)}
].
direction_field(Dir, Desc) ->
{direction,
mk(
Dir,
#{
required => true,
default => egress,
desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.</br>" ++
Desc
}
)}.
%%======================================================================================
%% For config files
@ -125,21 +93,12 @@ fields(bridges) ->
mk(
hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")),
#{desc => ?DESC("bridges_webhook")}
)}
] ++
[
{T,
)},
{mqtt,
mk(
hoconsc:map(
name,
hoconsc:union([
ref(schema_mod(T), "ingress"),
ref(schema_mod(T), "egress")
])
),
#{desc => ?DESC("bridges_name")}
hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")),
#{desc => ?DESC("bridges_mqtt")}
)}
|| T <- ?CONN_TYPES
] ++ ee_fields_bridges();
fields("metrics") ->
[
@ -181,6 +140,3 @@ status() ->
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_bridge_", Type, "_schema"])).

View File

@ -50,14 +50,6 @@ basic_config() ->
desc => ?DESC("config_enable"),
default => true
}
)},
{direction,
mk(
egress,
#{
desc => ?DESC("config_direction"),
default => egress
}
)}
] ++ webhook_creation_opts() ++
proplists:delete(

View File

@ -165,7 +165,6 @@ gen_schema_json(Dir, I18nFile, SchemaModule) ->
gen_api_schema_json(Dir, I18nFile, Lang) ->
emqx_dashboard:init_i18n(I18nFile, Lang),
gen_api_schema_json_hotconf(Dir, Lang),
gen_api_schema_json_connector(Dir, Lang),
gen_api_schema_json_bridge(Dir, Lang),
emqx_dashboard:clear_i18n().
@ -174,11 +173,6 @@ gen_api_schema_json_hotconf(Dir, Lang) ->
File = schema_filename(Dir, "hot-config-schema-", Lang),
ok = do_gen_api_schema_json(File, emqx_mgmt_api_configs, SchemaInfo).
gen_api_schema_json_connector(Dir, Lang) ->
SchemaInfo = #{title => <<"EMQX Connector API Schema">>, version => <<"0.1.0">>},
File = schema_filename(Dir, "connector-api-", Lang),
ok = do_gen_api_schema_json(File, emqx_connector_api, SchemaInfo).
gen_api_schema_json_bridge(Dir, Lang) ->
SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>},
File = schema_filename(Dir, "bridge-api-", Lang),

View File

@ -60,7 +60,6 @@
emqx_exhook_schema,
emqx_psk_schema,
emqx_limiter_schema,
emqx_connector_schema,
emqx_slow_subs_schema
]).

View File

@ -1,5 +1,4 @@
emqx_connector_mqtt {
num_of_bridges {
desc {
en: "The current number of bridges that are using this connector."

View File

@ -1,4 +1,85 @@
emqx_connector_mqtt_schema {
ingress_desc {
desc {
en: """The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
send them to the local broker.</br>
Template with variables is allowed in 'remote.qos', 'local.topic', 'local.qos', 'local.retain', 'local.payload'.</br>
NOTE: if this bridge is used as the input of a rule, and also 'local.topic' is
configured, then messages got from the remote broker will be sent to both the 'local.topic' and
the rule."""
zh: """入口配置定义了该桥接如何从远程 MQTT Broker 接收消息,然后将消息发送到本地 Broker。</br>
以下字段中允许使用带有变量的模板:'remote.qos', 'local.topic', 'local.qos', 'local.retain', 'local.payload'。</br>
注意:如果此桥接被用作规则的输入,并且配置了 'local.topic',则从远程代理获取的消息将同时被发送到 'local.topic' 和规则。
"""
}
label: {
en: "Ingress Configs"
zh: "入方向配置"
}
}
egress_desc {
desc {
en: """The egress config defines how this bridge forwards messages from the local broker to the remote broker.</br>
Template with variables is allowed in 'remote.topic', 'local.qos', 'local.retain', 'local.payload'.</br>
NOTE: if this bridge is used as the action of a rule, and also 'local.topic'
is configured, then both the data got from the rule and the MQTT messages that matches
'local.topic' will be forwarded."""
zh: """出口配置定义了该桥接如何将消息从本地 Broker 转发到远程 Broker。
以下字段中允许使用带有变量的模板:'remote.topic', 'local.qos', 'local.retain', 'local.payload'。</br>
注意:如果此桥接被用作规则的动作,并且配置了 'local.topic',则从规则输出的数据以及匹配到 'local.topic' 的 MQTT 消息都会被转发。
"""
}
label: {
en: "Egress Configs"
zh: "出方向配置"
}
}
ingress_remote {
desc {
en: """The configs about subscribing to the remote broker."""
zh: """订阅远程 Broker 相关的配置。"""
}
label: {
en: "Remote Configs"
zh: "远程配置"
}
}
ingress_local {
desc {
en: """The configs about sending message to the local broker."""
zh: """发送消息到本地 Broker 相关的配置。"""
}
label: {
en: "Local Configs"
zh: "本地配置"
}
}
egress_remote {
desc {
en: """The configs about sending message to the remote broker."""
zh: """发送消息到远程 Broker 相关的配置。"""
}
label: {
en: "Remote Configs"
zh: "远程配置"
}
}
egress_local {
desc {
en: """The configs about receiving messages from ben."""
zh: """收取本地 Broker 消息相关的配置。"""
}
label: {
en: "Local Configs"
zh: "本地配置"
}
}
mode {
desc {
en: """
@ -9,7 +90,7 @@ In 'cluster_shareload' mode, the incoming load from the remote broker is shared
using shared subscription.</br>
Note that the 'clientid' is suffixed by the node name, this is to avoid
clientid conflicts between different nodes. And we can only use shared subscription
topic filters for 'remote_topic' of ingress connections.
topic filters for 'remote.topic' of ingress connections.
"""
zh: """
MQTT 桥的模式。 </br>
@ -17,7 +98,7 @@ MQTT 桥的模式。 </br>
- cluster_shareload在 emqx 集群的每个节点上创建一个 MQTT 连接。</br>
在“cluster_shareload”模式下来自远程代理的传入负载通过共享订阅的方式接收。</br>
请注意“clientid”以节点名称为后缀这是为了避免不同节点之间的clientid冲突。
而且对于入口连接的“remote_topic”我们只能使用共享订阅主题过滤器。
而且对于入口连接的“remote.topic”我们只能使用共享订阅主题过滤器。
"""
}
label: {
@ -166,17 +247,6 @@ Template with variables is allowed.
}
}
ingress_hookpoint {
desc {
en: "The hook point will be triggered when there's any message received from the remote broker."
zh: "当从远程borker收到任何消息时将触发钩子。"
}
label: {
en: "Hookpoint"
zh: "挂载点"
}
}
egress_local_topic {
desc {
en: "The local topic to be forwarded to the remote broker"
@ -222,59 +292,6 @@ Template with variables is allowed.
}
}
dir {
desc {
en: """
The dir where the replayq file saved.</br>
Set to 'false' disables the replayq feature.
"""
zh: """
replayq 文件保存的目录。</br>
设置为 'false' 会禁用 replayq 功能。
"""
}
label: {
en: "Replyq file Save Dir"
zh: "Replyq 文件保存目录"
}
}
seg_bytes {
desc {
en: """
The size in bytes of a single segment.</br>
A segment is mapping to a file in the replayq dir. If the current segment is full, a new segment
(file) will be opened to write.
"""
zh: """
单个段的大小(以字节为单位)。</br>
一个段映射到 replayq 目录中的一个文件。 如果当前段已满,则新段(文件)将被打开写入。
"""
}
label: {
en: "Segment Size"
zh: "Segment 大小"
}
}
offload {
desc {
en: """
In offload mode, the disk queue is only used to offload queue tail segments.</br>
The messages are cached in the memory first, then it writes to the replayq files after the size of
the memory cache reaches 'seg_bytes'.
"""
zh: """
在Offload模式下磁盘队列仅用于卸载队列尾段。</br>
消息首先缓存在内存中然后写入replayq文件。内存缓大小为“seg_bytes” 指定的值。
"""
}
label: {
en: "Offload Mode"
zh: "Offload 模式"
}
}
retain {
desc {
en: """
@ -309,66 +326,15 @@ Template with variables is allowed.
}
}
desc_connector {
server_configs {
desc {
en: """Generic configuration for the connector."""
zh: """连接器的通用配置。"""
en: """Configs related to the server."""
zh: """服务器相关的配置。"""
}
label: {
en: "Connector Generic Configuration"
zh: "连接器通用配置。"
en: "Server Configs"
zh: "服务配置。"
}
}
desc_ingress {
desc {
en: """
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then send them to the local broker.</br>
Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain', 'payload'.</br>
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is configured, then messages got from the remote broker will be sent to both the 'local_topic' and the rule.
"""
zh: """
Ingress 模式定义了这个 bridge 如何从远程 MQTT broker 接收消息,然后将它们发送到本地 broker 。</br>
允许带有的模板变量: 'local_topic'、'remote_qos'、'qos'、'retain'、'payload' 。</br>
注意:如果这个 bridge 被用作规则的输入emqx 规则引擎),并且还配置了 local_topic那么从远程 broker 获取的消息将同时被发送到 'local_topic' 和规则引擎。
"""
}
label: {
en: "Ingress Config"
zh: "Ingress 模式配置"
}
}
desc_egress {
desc {
en: """
The egress config defines how this bridge forwards messages from the local broker to the remote broker.</br>
Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>
NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded.
"""
zh: """
Egress 模式定义了 bridge 如何将消息从本地 broker 转发到远程 broker。</br>
允许带有的模板变量: 'remote_topic'、'qos'、'retain'、'payload' 。</br>
注意:如果这个 bridge 作为规则emqx 规则引擎)的输出,并且还配置了 local_topic那么从规则引擎中获取的数据和匹配 local_topic 的 MQTT 消息都会被转发到远程 broker 。
"""
}
label: {
en: "Egress Config"
zh: "Egress 模式配置"
}
}
desc_replayq {
desc {
en: """Queue messages in disk files."""
zh: """本地磁盘消息队列"""
}
label: {
en: "Replayq"
zh: "本地磁盘消息队列"
}
}
}

View File

@ -1,31 +0,0 @@
emqx_connector_schema {
mqtt {
desc {
en: "MQTT bridges."
zh: "MQTT bridges。"
}
label: {
en: "MQTT bridges"
zh: "MQTT bridges"
}
}
desc_connector {
desc {
en: """
Configuration for EMQX connectors.</br>
A connector maintains the data related to the external resources, such as MySQL database.
"""
zh: """
EMQX 连接器的配置。</br>
连接器维护与外部资源相关的数据,比如 MySQL 数据库。
"""
}
label: {
en: "Connector"
zh: "连接器"
}
}
}

View File

@ -1,165 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector).
-export([
config_key_path/0,
pre_config_update/3,
post_config_update/5
]).
-export([
parse_connector_id/1,
connector_id/2
]).
-export([
list_raw/0,
lookup_raw/1,
lookup_raw/2,
create_dry_run/2,
update/2,
update/3,
delete/1,
delete/2
]).
config_key_path() ->
[connectors].
pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
emqx_connector_ssl:convert_certs(filename:join(Path), Conf).
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
post_config_update([connectors, Type, Name] = Path, '$remove', _, OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
try
foreach_linked_bridges(ConnId, fun(#{type := BType, name := BName}) ->
throw({dependency_bridges_exist, emqx_bridge_resource:bridge_id(BType, BName)})
end),
_ = emqx_connector_ssl:clear_certs(filename:join(Path), OldConf)
catch
throw:Error -> {error, Error}
end;
post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
foreach_linked_bridges(
ConnId,
fun(#{type := BType, name := BName}) ->
BridgeConf = emqx:get_config([bridges, BType, BName]),
case
emqx_bridge_resource:update(
BType,
BName,
{BridgeConf#{connector => OldConf}, BridgeConf#{connector => NewConf}}
)
of
ok -> ok;
{error, Reason} -> error({update_bridge_error, Reason})
end
end
).
connector_id(Type0, Name0) ->
Type = bin(Type0),
Name = bin(Name0),
<<Type/binary, ":", Name/binary>>.
parse_connector_id(ConnectorId) ->
case string:split(bin(ConnectorId), ":", all) of
[Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
_ -> error({invalid_connector_id, ConnectorId})
end.
list_raw() ->
case get_raw_connector_conf() of
not_found ->
[];
Config ->
lists:foldl(
fun({Type, NameAndConf}, Connectors) ->
lists:foldl(
fun({Name, RawConf}, Acc) ->
[RawConf#{<<"type">> => Type, <<"name">> => Name} | Acc]
end,
Connectors,
maps:to_list(NameAndConf)
)
end,
[],
maps:to_list(Config)
)
end.
lookup_raw(Id) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
lookup_raw(Type, Name).
lookup_raw(Type, Name) ->
Path = [bin(P) || P <- [Type, Name]],
case get_raw_connector_conf() of
not_found ->
{error, not_found};
Conf ->
case emqx_map_lib:deep_get(Path, Conf, not_found) of
not_found -> {error, not_found};
Conf1 -> {ok, Conf1#{<<"type">> => Type, <<"name">> => Name}}
end
end.
-spec create_dry_run(module(), binary() | #{binary() => term()} | [#{binary() => term()}]) ->
ok | {error, Reason :: term()}.
create_dry_run(Type, Conf) ->
emqx_bridge_resource:create_dry_run(Type, Conf).
update(Id, Conf) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
update(Type, Name, Conf).
update(Type, Name, Conf) ->
emqx_conf:update(config_key_path() ++ [Type, Name], Conf, #{override_to => cluster}).
delete(Id) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
delete(Type, Name).
delete(Type, Name) ->
emqx_conf:remove(config_key_path() ++ [Type, Name], #{override_to => cluster}).
get_raw_connector_conf() ->
case emqx:get_raw_config(config_key_path(), not_found) of
not_found ->
not_found;
RawConf ->
#{<<"connectors">> := Conf} =
emqx_config:fill_defaults(#{<<"connectors">> => RawConf}),
Conf
end.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
foreach_linked_bridges(ConnId, Do) ->
lists:foreach(
fun
(#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId ->
Do(Bridge);
(_) ->
ok
end,
emqx_bridge:list()
).

View File

@ -1,339 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_api).
-behaviour(minirest_api).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, ref/2, array/1, enum/1]).
%% Swagger specs from hocon schema
-export([api_spec/0, paths/0, schema/1, namespace/0]).
%% API callbacks
-export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]).
-define(CONN_TYPES, [mqtt]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_connector:parse_connector_id(Id) of
{ConnType, ConnName} ->
_ = ConnName,
EXPR
catch
error:{invalid_connector_id, Id0} ->
{400, #{
code => 'INVALID_ID',
message =>
<<"invalid_connector_id: ", Id0/binary,
". Connector Ids must be of format {type}:{name}">>
}}
end
).
namespace() -> "connector".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
paths() -> ["/connectors_test", "/connectors", "/connectors/:id"].
error_schema(Codes, Message) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message));
error_schema(Codes, Message) when is_binary(Message) ->
emqx_dashboard_swagger:error_codes(Codes, Message).
put_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:put_request(), connector_info_examples(put)
).
post_request_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:post_request(), connector_info_examples(post)
).
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_connector_schema:get_response(), connector_info_examples(get)
).
connector_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(connector_info_examples(Method))].
connector_info_examples(Method) ->
Fun =
fun(Type, Acc) ->
SType = atom_to_list(Type),
maps:merge(Acc, #{
Type => #{
summary => bin(string:uppercase(SType) ++ " Connector"),
value => info_example(Type, Method)
}
})
end,
Broker = lists:foldl(Fun, #{}, ?CONN_TYPES),
EE = ee_example(Method),
maps:merge(Broker, EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_example(Method) ->
emqx_ee_connector:connector_examples(Method).
-else.
ee_example(_Method) ->
#{}.
-endif.
info_example(Type, Method) ->
maps:merge(
info_example_basic(Type),
method_example(Type, Method)
).
method_example(Type, Method) when Method == get; Method == post ->
SType = atom_to_list(Type),
SName = "my_" ++ SType ++ "_connector",
#{
type => bin(SType),
name => bin(SName)
};
method_example(_Type, put) ->
#{}.
info_example_basic(mqtt) ->
#{
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"15s">>,
proto_ver => <<"v4">>,
username => <<"foo">>,
password => <<"bar">>,
clientid => <<"foo">>,
clean_start => true,
keepalive => <<"300s">>,
retry_interval => <<"15s">>,
max_inflight => 100,
ssl => #{
enable => false
}
}.
param_path_id() ->
[
{id,
mk(
binary(),
#{
in => path,
example => <<"mqtt:my_mqtt_connector">>,
desc => ?DESC("id")
}
)}
].
schema("/connectors_test") ->
#{
'operationId' => '/connectors_test',
post => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_test_post"),
summary => <<"Test creating connector">>,
'requestBody' => post_request_body_schema(),
responses => #{
204 => <<"Test connector OK">>,
400 => error_schema(['TEST_FAILED'], "connector test failed")
}
}
};
schema("/connectors") ->
#{
'operationId' => '/connectors',
get => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_get"),
summary => <<"List connectors">>,
responses => #{
200 => emqx_dashboard_swagger:schema_with_example(
array(emqx_connector_schema:get_response()),
connector_info_array_example(get)
)
}
},
post => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_post"),
summary => <<"Create connector">>,
'requestBody' => post_request_body_schema(),
responses => #{
201 => get_response_body_schema(),
400 => error_schema(['ALREADY_EXISTS'], "connector already exists")
}
}
};
schema("/connectors/:id") ->
#{
'operationId' => '/connectors/:id',
get => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_id_get"),
summary => <<"Get connector">>,
parameters => param_path_id(),
responses => #{
200 => get_response_body_schema(),
404 => error_schema(['NOT_FOUND'], "Connector not found"),
400 => error_schema(['INVALID_ID'], "Bad connector ID")
}
},
put => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_id_put"),
summary => <<"Update connector">>,
parameters => param_path_id(),
'requestBody' => put_request_body_schema(),
responses => #{
200 => get_response_body_schema(),
404 => error_schema(['NOT_FOUND'], "Connector not found"),
400 => error_schema(['INVALID_ID'], "Bad connector ID")
}
},
delete => #{
tags => [<<"connectors">>],
desc => ?DESC("conn_id_delete"),
summary => <<"Delete connector">>,
parameters => param_path_id(),
responses => #{
204 => <<"Delete connector successfully">>,
403 => error_schema(['DEPENDENCY_EXISTS'], "Cannot remove dependent connector"),
404 => error_schema(['NOT_FOUND'], "Delete failed, not found"),
400 => error_schema(['INVALID_ID'], "Bad connector ID")
}
}
}.
'/connectors_test'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
case emqx_connector:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of
ok ->
{204};
{error, Error} ->
{400, error_msg(['TEST_FAILED'], Error)}
end.
'/connectors'(get, _Request) ->
{200, [format_resp(Conn) || Conn <- emqx_connector:list_raw()]};
'/connectors'(post, #{body := #{<<"type">> := ConnType, <<"name">> := ConnName} = Params}) ->
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
{error, not_found} ->
case
emqx_connector:update(
ConnType,
ConnName,
filter_out_request_body(Params)
)
of
{ok, #{raw_config := RawConf}} ->
{201,
format_resp(RawConf#{
<<"type">> => ConnType,
<<"name">> => ConnName
})};
{error, Error} ->
{400, error_msg('BAD_REQUEST', Error)}
end
end;
'/connectors'(post, _) ->
{400, error_msg('BAD_REQUEST', <<"missing some required fields: [name, type]">>)}.
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(
Id,
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, Conf} ->
{200, format_resp(Conf)};
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end
);
'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
Params = filter_out_request_body(Params0),
?TRY_PARSE_ID(
Id,
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:update(ConnType, ConnName, Params) of
{ok, #{raw_config := RawConf}} ->
{200,
format_resp(RawConf#{
<<"type">> => ConnType,
<<"name">> => ConnName
})};
{error, Error} ->
{500, error_msg('INTERNAL_ERROR', Error)}
end;
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end
);
'/connectors/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(
Id,
case emqx_connector:lookup_raw(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:delete(ConnType, ConnName) of
{ok, _} ->
{204};
{error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} ->
{403,
error_msg(
'DEPENDENCY_EXISTS',
<<"Cannot remove the connector as it's in use by a bridge: ",
BridgeID/binary>>
)};
{error, Error} ->
{500, error_msg('INTERNAL_ERROR', Error)}
end;
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end
).
error_msg(Code, Msg) ->
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
format_resp(#{<<"type">> := ConnType, <<"name">> := ConnName} = RawConf) ->
NumOfBridges = length(
emqx_bridge:list_bridges_by_connector(
emqx_connector:connector_id(ConnType, ConnName)
)
),
RawConf#{
<<"type">> => ConnType,
<<"name">> => ConnName,
<<"num_of_bridges">> => NumOfBridges
}.
filter_out_request_body(Conf) ->
ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>],
maps:without(ExtraConfs, Conf).
bin(S) when is_list(S) ->
list_to_binary(S).

View File

@ -20,15 +20,10 @@
-export([start/2, stop/1]).
-define(CONF_HDLR_PATH, (emqx_connector:config_key_path() ++ ['?', '?'])).
start(_StartType, _StartArgs) ->
ok = emqx_config_handler:add_handler(?CONF_HDLR_PATH, emqx_connector),
emqx_connector_mqtt_worker:register_metrics(),
emqx_connector_sup:start_link().
stop(_State) ->
emqx_config_handler:remove_handler(?CONF_HDLR_PATH),
ok.
%% internal functions

View File

@ -67,7 +67,7 @@ fields("get") ->
)}
] ++ fields("post");
fields("put") ->
emqx_connector_mqtt_schema:fields("connector");
emqx_connector_mqtt_schema:fields("server_configs");
fields("post") ->
[
{type,
@ -236,11 +236,9 @@ basic_config(#{
keepalive := KeepAlive,
retry_interval := RetryIntv,
max_inflight := MaxInflight,
replayq := ReplayQ,
ssl := #{enable := EnableSsl} = Ssl
}) ->
#{
replayq => ReplayQ,
%% connection opts
server => Server,
%% 30s

View File

@ -1,95 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_schema).
-behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-export([namespace/0, roots/0, fields/1, desc/1]).
-export([
get_response/0,
put_request/0,
post_request/0
]).
%% the config for webhook bridges do not need connectors
-define(CONN_TYPES, [mqtt]).
%%======================================================================================
%% For HTTP APIs
get_response() ->
http_schema("get").
put_request() ->
http_schema("put").
post_request() ->
http_schema("post").
http_schema(Method) ->
Broker = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES],
EE = ee_schemas(Method),
Schemas = Broker ++ EE,
?UNION(Schemas).
%%======================================================================================
%% Hocon Schema Definitions
namespace() -> connector.
roots() -> ["connectors"].
fields(connectors) ->
fields("connectors");
fields("connectors") ->
Broker = [
{mqtt,
?HOCON(
?MAP(name, ?R_REF(emqx_connector_mqtt_schema, "connector")),
#{desc => ?DESC("mqtt")}
)}
],
EE = ee_fields_connectors(),
Broker ++ EE.
-if(?EMQX_RELEASE_EDITION == ee).
ee_schemas(Method) ->
emqx_ee_connector:api_schemas(Method).
ee_fields_connectors() ->
emqx_ee_connector:fields(connectors).
-else.
ee_fields_connectors() ->
[].
ee_schemas(_) ->
[].
-endif.
desc(Record) when
Record =:= connectors;
Record =:= "connectors"
->
?DESC("desc_connector");
desc(_) ->
undefined.
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_connector_", Type])).

View File

@ -207,7 +207,7 @@ make_hdlr(Parent, Vars, Opts) ->
sub_remote_topics(_ClientPid, undefined) ->
ok;
sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
sub_remote_topics(ClientPid, #{remote := #{topic := FromTopic, qos := QoS}}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok;
Error -> throw(Error)
@ -217,12 +217,10 @@ process_config(Config) ->
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
maybe_publish_to_local_broker(Msg, Vars, Props) ->
case maps:get(local_topic, Vars, undefined) of
undefined ->
case emqx_map_lib:deep_get([local, topic], Vars, undefined) of
%% local topic is not set, discard it
ok;
_ ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
undefined -> ok;
_ -> emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
end.
format_msg_received(

View File

@ -38,14 +38,16 @@
-type msg() :: emqx_types:message().
-type exp_msg() :: emqx_types:message() | #mqtt_msg{}.
-type variables() :: #{
mountpoint := undefined | binary(),
remote_topic := binary(),
remote_qos := original | integer(),
-type remote_config() :: #{
topic := binary(),
qos := original | integer(),
retain := original | boolean(),
payload := binary()
}.
-type variables() :: #{
mountpoint := undefined | binary(),
remote := remote_config()
}.
make_pub_vars(_, undefined) ->
undefined;
@ -67,10 +69,12 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
MapMsg = maps:put(retain, Retain0, Columns),
to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{
remote_topic := TopicToken,
remote := #{
topic := TopicToken,
payload := PayloadToken,
remote_qos := QoSToken,
retain := RetainToken,
qos := QoSToken,
retain := RetainToken
},
mountpoint := Mountpoint
}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
@ -91,11 +95,13 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
local_topic := TopicToken,
local := #{
topic := TopicToken,
payload := PayloadToken,
local_qos := QoSToken,
qos := QoSToken,
retain := RetainToken,
mountpoint := Mountpoint
}
},
Props
) ->

View File

@ -28,25 +28,33 @@
desc/1
]).
-export([
ingress_desc/0,
egress_desc/0
]).
-import(emqx_schema, [mk_duration/2]).
-import(hoconsc, [mk/2, ref/2]).
namespace() -> "connector-mqtt".
roots() ->
fields("config").
fields("config") ->
fields("connector") ++
topic_mappings();
fields("connector") ->
fields("server_configs") ++
[
{ingress,
mk(
ref(?MODULE, "ingress"),
#{default => #{}}
)},
{egress,
mk(
ref(?MODULE, "egress"),
#{default => #{}}
)}
];
fields("server_configs") ->
[
{mode,
sc(
mk(
hoconsc:enum([cluster_shareload]),
#{
default => cluster_shareload,
@ -54,7 +62,7 @@ fields("connector") ->
}
)},
{server,
sc(
mk(
emqx_schema:ip_port(),
#{
required => true,
@ -68,7 +76,7 @@ fields("connector") ->
#{default => "15s"}
)},
{proto_ver,
sc(
mk(
hoconsc:enum([v3, v4, v5]),
#{
default => v4,
@ -76,7 +84,7 @@ fields("connector") ->
}
)},
{bridge_mode,
sc(
mk(
boolean(),
#{
default => false,
@ -84,7 +92,7 @@ fields("connector") ->
}
)},
{username,
sc(
mk(
binary(),
#{
default => "emqx",
@ -92,7 +100,7 @@ fields("connector") ->
}
)},
{password,
sc(
mk(
binary(),
#{
default => "emqx",
@ -101,7 +109,7 @@ fields("connector") ->
}
)},
{clean_start,
sc(
mk(
boolean(),
#{
default => true,
@ -116,20 +124,31 @@ fields("connector") ->
#{default => "15s"}
)},
{max_inflight,
sc(
mk(
non_neg_integer(),
#{
default => 32,
desc => ?DESC("max_inflight")
}
)},
{replayq, sc(ref("replayq"), #{})}
)}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
[
{remote_topic,
sc(
{"remote",
mk(
ref(?MODULE, "ingress_remote"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")}
)},
{"local",
mk(
ref(?MODULE, "ingress_local"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")}
)}
];
fields("ingress_remote") ->
[
{topic,
mk(
binary(),
#{
required => true,
@ -137,47 +156,43 @@ fields("ingress") ->
desc => ?DESC("ingress_remote_topic")
}
)},
{remote_qos,
sc(
{qos,
mk(
qos(),
#{
default => 1,
desc => ?DESC("ingress_remote_qos")
}
)},
{local_topic,
sc(
)}
];
fields("ingress_local") ->
[
{topic,
mk(
binary(),
#{
validator => fun emqx_schema:non_empty_string/1,
desc => ?DESC("ingress_local_topic")
}
)},
{local_qos,
sc(
{qos,
mk(
qos(),
#{
default => <<"${qos}">>,
desc => ?DESC("ingress_local_qos")
}
)},
{hookpoint,
sc(
binary(),
#{desc => ?DESC("ingress_hookpoint")}
)},
{retain,
sc(
mk(
hoconsc:union([boolean(), binary()]),
#{
default => <<"${retain}">>,
desc => ?DESC("retain")
}
)},
{payload,
sc(
mk(
binary(),
#{
default => <<"${payload}">>,
@ -186,18 +201,33 @@ fields("ingress") ->
)}
];
fields("egress") ->
%% the message maybe sent from rules, in this case 'local_topic' is not necessary
[
{local_topic,
sc(
{"local",
mk(
ref(?MODULE, "egress_local"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "egress_local")}
)},
{"remote",
mk(
ref(?MODULE, "egress_remote"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote")}
)}
];
fields("egress_local") ->
[
{topic,
mk(
binary(),
#{
desc => ?DESC("egress_local_topic"),
validator => fun emqx_schema:non_empty_string/1
}
)},
{remote_topic,
sc(
)}
];
fields("egress_remote") ->
[
{topic,
mk(
binary(),
#{
required => true,
@ -205,104 +235,48 @@ fields("egress") ->
desc => ?DESC("egress_remote_topic")
}
)},
{remote_qos,
sc(
{qos,
mk(
qos(),
#{
required => true,
desc => ?DESC("egress_remote_qos")
}
)},
{retain,
sc(
mk(
hoconsc:union([boolean(), binary()]),
#{
required => true,
desc => ?DESC("retain")
}
)},
{payload,
sc(
mk(
binary(),
#{
required => true,
desc => ?DESC("payload")
}
)}
];
fields("replayq") ->
[
{dir,
sc(
hoconsc:union([boolean(), string()]),
#{desc => ?DESC("dir")}
)},
{seg_bytes,
sc(
emqx_schema:bytesize(),
#{
default => "100MB",
desc => ?DESC("seg_bytes")
}
)},
{offload,
sc(
boolean(),
#{
default => false,
desc => ?DESC("offload")
}
)}
].
desc("connector") ->
?DESC("desc_connector");
desc("server_configs") ->
?DESC("server_configs");
desc("ingress") ->
ingress_desc();
?DESC("ingress_desc");
desc("ingress_remote") ->
?DESC("ingress_remote");
desc("ingress_local") ->
?DESC("ingress_local");
desc("egress") ->
egress_desc();
desc("replayq") ->
?DESC("desc_replayq");
?DESC("egress_desc");
desc("egress_remote") ->
?DESC("egress_remote");
desc("egress_local") ->
?DESC("egress_local");
desc(_) ->
undefined.
topic_mappings() ->
[
{ingress,
sc(
ref("ingress"),
#{default => #{}}
)},
{egress,
sc(
ref("egress"),
#{default => #{}}
)}
].
ingress_desc() ->
"\n"
"The ingress config defines how this bridge receive messages from the remote MQTT broker, and then\n"
"send them to the local broker.</br>\n"
"Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain',\n"
"'payload'.</br>\n"
"NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is\n"
"configured, then messages got from the remote broker will be sent to both the 'local_topic' and\n"
"the rule.\n".
egress_desc() ->
"\n"
"The egress config defines how this bridge forwards messages from the local broker to the remote\n"
"broker.</br>\n"
"Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>\n"
"NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic\n"
"is configured, then both the data got from the rule and the MQTT messages that matches\n"
"local_topic will be forwarded.\n".
qos() ->
hoconsc:union([emqx_schema:qos(), binary()]).
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -68,7 +68,6 @@
%% APIs
-export([
start_link/1,
register_metrics/0,
stop/1
]).
@ -247,18 +246,19 @@ pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts)
pre_process_in_out(_, undefined) ->
undefined;
pre_process_in_out(in, #{local := LC} = Conf) when is_map(Conf) ->
Conf#{local => pre_process_in_out_common(LC)};
pre_process_in_out(in, Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(local_topic, Conf),
Conf2 = pre_process_conf(local_qos, Conf1),
pre_process_in_out_common(Conf2);
pre_process_in_out(out, Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(remote_topic, Conf),
Conf2 = pre_process_conf(remote_qos, Conf1),
pre_process_in_out_common(Conf2).
%% have no 'local' field in the config
Conf;
pre_process_in_out(out, #{remote := RC} = Conf) when is_map(Conf) ->
Conf#{remote => pre_process_in_out_common(RC)}.
pre_process_in_out_common(Conf) ->
Conf1 = pre_process_conf(payload, Conf),
pre_process_conf(retain, Conf1).
pre_process_in_out_common(Conf0) ->
Conf1 = pre_process_conf(topic, Conf0),
Conf2 = pre_process_conf(qos, Conf1),
Conf3 = pre_process_conf(payload, Conf2),
pre_process_conf(retain, Conf3).
pre_process_conf(Key, Conf) ->
case maps:find(Key, Conf) of
@ -450,7 +450,6 @@ do_send(
) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
ExportMsg = fun(Message) ->
emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
end,
?SLOG(debug, #{
@ -551,15 +550,6 @@ format_mountpoint(Prefix) ->
name(Id) -> list_to_atom(str(Id)).
register_metrics() ->
lists:foreach(
fun emqx_metrics:ensure/1,
[
'bridge.mqtt.message_sent_to_remote',
'bridge.mqtt.message_received_from_remote'
]
).
obfuscate(Map) ->
maps:fold(
fun(K, V, Acc) ->

View File

@ -7,7 +7,7 @@
-export([
api_schemas/1,
conn_bridge_examples/1,
examples/1,
resource_type/1,
fields/1
]).
@ -28,7 +28,7 @@ schema_modules() ->
emqx_ee_bridge_mysql
].
conn_bridge_examples(Method) ->
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)

View File

@ -1,57 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
fields/1,
connector_examples/1,
api_schemas/1
]).
api_schemas(Method) ->
[
ref(emqx_ee_connector_hstreamdb, Method),
ref(emqx_ee_connector_influxdb, "udp_" ++ Method),
ref(emqx_ee_connector_influxdb, "api_v1_" ++ Method),
ref(emqx_ee_connector_influxdb, "api_v2_" ++ Method)
].
fields(connectors) ->
[
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_ee_connector_hstreamdb, config)),
#{desc => <<"EMQX Enterprise Config">>}
)}
] ++ fields(influxdb);
fields(influxdb) ->
[
{
Protocol,
mk(hoconsc:map(name, ref(emqx_ee_connector_influxdb, Protocol)), #{
desc => <<"EMQX Enterprise Config">>
})
}
|| Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2]
].
connector_examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, connector_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() ->
[
emqx_ee_connector_hstreamdb,
emqx_ee_connector_influxdb
].