diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl
index 113867aed..4bc81a085 100644
--- a/apps/emqx/test/emqx_common_test_helpers.erl
+++ b/apps/emqx/test/emqx_common_test_helpers.erl
@@ -132,7 +132,7 @@ start_apps(Apps) ->
start_apps(Apps, Handler) when is_function(Handler) ->
%% Load all application code to beam vm first
%% Because, minirest, ekka etc.. application will scan these modules
- lists:foreach(fun load/1, [emqx_machine, emqx_conf, emqx | Apps]),
+ lists:foreach(fun load/1, [emqx_conf, emqx | Apps]),
ekka:start(),
lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf
index 5762c6f42..82087387d 100644
--- a/apps/emqx_bridge/etc/emqx_bridge.conf
+++ b/apps/emqx_bridge/etc/emqx_bridge.conf
@@ -3,73 +3,54 @@
##--------------------------------------------------------------------
## MQTT bridges to/from another MQTT broker
-bridges.mqtt.my_mqtt_bridge_to_aws {
- server = "127.0.0.1:1883"
- proto_ver = "v4"
- username = "username1"
- password = ""
- clean_start = true
- keepalive = 300
- retry_interval = "30s"
- max_inflight = 32
- reconnect_interval = "30s"
- bridge_mode = true
- replayq {
- dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
- seg_bytes = "100MB"
- offload = false
- }
- ssl {
- enable = false
- keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
- certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
- cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
- }
-
- ## topic mappings for this bridge
- ingress {
- from_remote_topic = "aws/#"
- subscribe_qos = 1
- to_local_topic = "from_aws/${topic}"
- payload = "${payload}"
- qos = "${qos}"
- retain = "${retain}"
- }
-
- egress {
- from_local_topic = "emqx/#"
- to_remote_topic = "from_emqx/${topic}"
- payload = "${payload}"
- qos = 1
- retain = false
- }
-
-}
+#bridges.mqtt.my_ingress_mqtt_bridge {
+# connector = my_mqtt_connector
+# direction = ingress
+# ## topic mappings for this bridge
+# from_remote_topic = "aws/#"
+# subscribe_qos = 1
+# to_local_topic = "from_aws/${topic}"
+# payload = "${payload}"
+# qos = "${qos}"
+# retain = "${retain}"
+#
+#}
+#
+#bridges.mqtt.my_egress_mqtt_bridge {
+# connector = my_mqtt_connector
+# direction = egress
+# ## topic mappings for this bridge
+# from_local_topic = "emqx/#"
+# to_remote_topic = "from_emqx/${topic}"
+# payload = "${payload}"
+# qos = 1
+# retain = false
+#}
## HTTP bridges to an HTTP server
-bridges.http.my_http_bridge {
- ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
- url = "http://localhost:9901/messages/${topic}"
- request_timeout = "30s"
- connect_timeout = "30s"
- max_retries = 3
- retry_interval = "10s"
- pool_type = "random"
- pool_size = 4
- enable_pipelining = true
- ssl {
- enable = false
- keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
- certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
- cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
- }
-
- from_local_topic = "emqx_http/#"
- ## the following config entries can use placehodler variables:
- ## url, method, body, headers
- method = post
- body = "${payload}"
- headers {
- "content-type": "application/json"
- }
-}
+#bridges.http.my_http_bridge {
+# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
+# url = "http://localhost:9901/messages/${topic}"
+# request_timeout = "30s"
+# connect_timeout = "30s"
+# max_retries = 3
+# retry_interval = "10s"
+# pool_type = "random"
+# pool_size = 4
+# enable_pipelining = true
+# ssl {
+# enable = false
+# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
+# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
+# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
+# }
+#
+# from_local_topic = "emqx_http/#"
+# ## the following config entries can use placehodler variables:
+# ## url, method, body, headers
+# method = post
+# body = "${payload}"
+# headers {
+# "content-type": "application/json"
+# }
+#}
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index feab8dda4..f40e57133 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -84,7 +84,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
- emqx_resource:query(ResId, {send_message, BridgeId, Message}).
+ emqx_resource:query(ResId, {send_message, Message}).
config_key_path() ->
[bridges].
@@ -178,7 +178,7 @@ create(Type, Name, Conf) ->
config => Conf}),
ResId = resource_id(Type, Name),
case emqx_resource:create(ResId,
- emqx_bridge:resource_type(Type), parse_confs(Type, Conf)) of
+ emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
{ok, already_created} ->
emqx_resource:get_instance(ResId);
{ok, Data} ->
@@ -199,7 +199,7 @@ update(Type, Name, {_OldConf, Conf}) ->
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
emqx_resource:recreate(resource_id(Type, Name),
- emqx_bridge:resource_type(Type), parse_confs(Type, Conf), []).
+ emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
remove(Type, Name, _Conf) ->
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
@@ -227,8 +227,12 @@ get_matched_bridges(Topic) ->
Bridges = emqx:get_config([bridges], #{}),
maps:fold(fun (BType, Conf, Acc0) ->
maps:fold(fun
- (BName, #{egress := Egress}, Acc1) ->
+ %% Confs for MQTT, Kafka bridges have the `direction` flag
+ (_BName, #{direction := ingress}, Acc1) ->
+ Acc1;
+ (BName, #{direction := egress} = Egress, Acc1) ->
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1);
+ %% HTTP, MySQL bridges only have egress direction
(BName, BridgeConf, Acc1) ->
get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1)
end, Acc0, Conf)
@@ -240,12 +244,13 @@ get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) -
false -> Acc
end.
-parse_confs(http, #{ url := Url
- , method := Method
- , body := Body
- , headers := Headers
- , request_timeout := ReqTimeout
- } = Conf) ->
+parse_confs(http, _Name,
+ #{ url := Url
+ , method := Method
+ , body := Body
+ , headers := Headers
+ , request_timeout := ReqTimeout
+ } = Conf) ->
{BaseUrl, Path} = parse_url(Url),
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
Conf#{ base_url => BaseUrl2
@@ -257,8 +262,20 @@ parse_confs(http, #{ url := Url
, request_timeout => ReqTimeout
}
};
-parse_confs(_Type, Conf) ->
- Conf.
+parse_confs(Type, Name, #{connector := ConnName, direction := Direction} = Conf) ->
+ ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
+ make_resource_confs(Direction, ConnectorConfs,
+ maps:without([connector, direction], Conf), Name).
+
+make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) ->
+ BName = bin(Name),
+ ConnectorConfs#{
+ ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
+ };
+make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) ->
+ ConnectorConfs#{
+ egress => BridgeConf
+ }.
parse_url(Url) ->
case string:split(Url, "//", leading) of
diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl
index acab647c2..26a1d5bd1 100644
--- a/apps/emqx_bridge/src/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl
@@ -11,17 +11,26 @@ roots() -> [bridges].
fields(bridges) ->
[ {mqtt,
- sc(hoconsc:map(name, ref("mqtt_bridge")),
+ sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge")
+ , ref("egress_mqtt_bridge")
+ ])),
#{ desc => "MQTT bridges"
- })}
+ })}
, {http,
sc(hoconsc:map(name, ref("http_bridge")),
#{ desc => "HTTP bridges"
- })}
+ })}
];
-fields("mqtt_bridge") ->
- emqx_connector_mqtt:fields("config");
+fields("ingress_mqtt_bridge") ->
+ [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
+ , connector_name()
+ ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
+
+fields("egress_mqtt_bridge") ->
+ [ direction(egress, emqx_connector_mqtt_schema:egress_desc())
+ , connector_name()
+ ] ++ emqx_connector_mqtt_schema:fields("egress");
fields("http_bridge") ->
basic_config_http() ++
@@ -85,6 +94,24 @@ How long will the HTTP request timeout.
})}
].
+direction(Dir, Desc) ->
+ {direction,
+ sc(Dir,
+ #{ nullable => false
+ , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
" ++
+ Desc
+ })}.
+
+connector_name() ->
+ {connector,
+ sc(binary(),
+ #{ nullable => false
+ , desc =>"""
+The connector name to be used for this bridge.
+Connectors are configured by 'connectors..
+"""
+ })}.
+
basic_config_http() ->
proplists:delete(base_url, emqx_connector_http:fields(config)).
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index d85cae5f4..12d05da7b 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -112,6 +112,7 @@ t_crud_apis(_) ->
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a http bridge now
+ %% PUT /bridges/:id will create or update a bridge
{ok, 200, Bridge} = request(put, uri(["bridges", "http:test_bridge"]), ?HTTP_BRIDGE(?URL1)),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch([ #{ <<"id">> := <<"http:test_bridge">>
@@ -139,11 +140,35 @@ t_crud_apis(_) ->
, <<"url">> := ?URL2
}], jsx:decode(Bridge2Str)),
+ %% get the bridge by id
+ {ok, 200, Bridge3Str} = request(get, uri(["bridges", "http:test_bridge"]), []),
+ ?assertMatch([#{ <<"id">> := <<"http:test_bridge">>
+ , <<"bridge_type">> := <<"http">>
+ , <<"is_connected">> := _
+ , <<"node">> := _
+ , <<"url">> := ?URL2
+ }], jsx:decode(Bridge3Str)),
+
%% delete the bridge
{ok,200,<<>>} = request(delete, uri(["bridges", "http:test_bridge"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
+t_change_is_connnected_to_status() ->
+ error(not_implimented).
+
+t_start_stop_bridges(_) ->
+ start_http_server(9901, fun handle_fun_200_ok/1),
+ {ok, 200, Bridge} = request(put, uri(["bridges", "http:test_bridge"]), ?HTTP_BRIDGE(?URL1)),
+ ?assertMatch( #{ <<"id">> := <<"http:test_bridge">>
+ , <<"bridge_type">> := <<"http">>
+ , <<"is_connected">> := true
+ , <<"node">> := _
+ , <<"url">> := ?URL1
+ }, jsx:decode(Bridge)),
+ {ok, 200, <<>>} = request(put,
+ uri(["nodes", node(), "bridges", "http:test_bridge", "operation", "stop"]),
+ ?HTTP_BRIDGE(?URL1)).
%%--------------------------------------------------------------------
%% HTTP Request
diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl
index 350ee9e3a..d8bb2423b 100644
--- a/apps/emqx_conf/src/emqx_conf_schema.erl
+++ b/apps/emqx_conf/src/emqx_conf_schema.erl
@@ -56,6 +56,7 @@
, emqx_exhook_schema
, emqx_psk_schema
, emqx_limiter_schema
+ , emqx_connector_schema
]).
namespace() -> undefined.
diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf
new file mode 100644
index 000000000..d2473eb13
--- /dev/null
+++ b/apps/emqx_connector/etc/emqx_connector.conf
@@ -0,0 +1,23 @@
+#connectors.mqtt.my_mqtt_connector {
+# server = "127.0.0.1:1883"
+# proto_ver = "v4"
+# username = "username1"
+# password = ""
+# clean_start = true
+# keepalive = 300
+# retry_interval = "30s"
+# max_inflight = 32
+# reconnect_interval = "30s"
+# bridge_mode = true
+# replayq {
+# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
+# seg_bytes = "100MB"
+# offload = false
+# }
+# ssl {
+# enable = false
+# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
+# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
+# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
+# }
+#}
\ No newline at end of file
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 87ebf3863..5bfd72b6b 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -178,9 +178,9 @@ on_stop(InstId, #{pool_name := PoolName}) ->
connector => InstId}),
ehttpc_sup:stop_pool(PoolName).
-on_query(InstId, {send_message, BridgeId, Msg}, AfterQuery, State) ->
+on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
case maps:get(request, State, undefined) of
- undefined -> ?SLOG(error, #{msg => "request not found", bridge_id => BridgeId});
+ undefined -> ?SLOG(error, #{msg => "request not found", connector => InstId});
Request ->
#{method := Method, path := Path, body := Body, headers := Headers,
request_timeout := Timeout} = process_request(Request, Msg),
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 05eab7704..abd3d2f7b 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -89,64 +89,70 @@ drop_bridge(Name) ->
%% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
%% if the bridge received msgs from the remote broker.
-on_message_received(Msg, BridgeId) ->
- Name = atom_to_binary(BridgeId, utf8),
- emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]).
+on_message_received(Msg, HookPoint) ->
+ emqx:run_hook(HookPoint, [Msg]).
%% ===================================================================
on_start(InstId, Conf) ->
- "bridge:" ++ NamePrefix = binary_to_list(InstId),
- BridgeId = list_to_atom(NamePrefix),
+ InstanceId = binary_to_atom(InstId, utf8),
?SLOG(info, #{msg => "starting mqtt connector",
- connector => BridgeId, config => Conf}),
+ connector => InstanceId, config => Conf}),
BasicConf = basic_config(Conf),
- SubRemoteConf = maps:get(ingress, Conf, #{}),
- FrowardConf = maps:get(egress, Conf, #{}),
BridgeConf = BasicConf#{
- name => BridgeId,
- clientid => clientid(BridgeId),
- subscriptions => SubRemoteConf#{
- to_local_topic => maps:get(to_local_topic, SubRemoteConf, undefined),
- on_message_received => {fun ?MODULE:on_message_received/2, [BridgeId]}
- },
- forwards => FrowardConf#{
- from_local_topic => maps:get(from_local_topic, FrowardConf, undefined)
- }
+ name => InstanceId,
+ clientid => clientid(InstanceId),
+ subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
+ forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
case ?MODULE:create_bridge(BridgeConf) of
{ok, _Pid} ->
- case emqx_connector_mqtt_worker:ensure_started(BridgeId) of
- ok -> {ok, #{name => BridgeId}};
+ case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
+ ok -> {ok, #{name => InstanceId}};
{error, Reason} -> {error, Reason}
end;
{error, {already_started, _Pid}} ->
- {ok, #{name => BridgeId}};
+ {ok, #{name => InstanceId}};
{error, Reason} ->
{error, Reason}
end.
-on_stop(_InstId, #{name := BridgeId}) ->
+on_stop(_InstId, #{name := InstanceId}) ->
?SLOG(info, #{msg => "stopping mqtt connector",
- connector => BridgeId}),
- case ?MODULE:drop_bridge(BridgeId) of
+ connector => InstanceId}),
+ case ?MODULE:drop_bridge(InstanceId) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
?SLOG(error, #{msg => "stop mqtt connector",
- connector => BridgeId, reason => Reason})
+ connector => InstanceId, reason => Reason})
end.
-on_query(_InstId, {send_message, BridgeId, Msg}, _AfterQuery, _State) ->
+on_query(_InstId, {send_message, Msg}, _AfterQuery, #{name := InstanceId}) ->
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
- connector => BridgeId}),
- emqx_connector_mqtt_worker:send_to_remote(BridgeId, Msg).
+ connector => InstanceId}),
+ emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg).
-on_health_check(_InstId, #{name := BridgeId} = State) ->
- case emqx_connector_mqtt_worker:ping(BridgeId) of
+on_health_check(_InstId, #{name := InstanceId} = State) ->
+ case emqx_connector_mqtt_worker:ping(InstanceId) of
pong -> {ok, State};
- _ -> {error, {connector_down, BridgeId}, State}
+ _ -> {error, {connector_down, InstanceId}, State}
end.
+make_sub_confs(undefined) ->
+ undefined;
+make_sub_confs(SubRemoteConf) ->
+ case maps:take(hookpoint, SubRemoteConf) of
+ error -> SubRemoteConf;
+ {HookPoint, SubConf} ->
+ MFA = {?MODULE, on_message_received, [HookPoint]},
+ SubConf#{on_message_received => MFA}
+ end.
+
+make_forward_confs(undefined) ->
+ undefined;
+make_forward_confs(FrowardConf) ->
+ FrowardConf.
+
basic_config(#{
server := Server,
reconnect_interval := ReconnIntv,
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
new file mode 100644
index 000000000..f2b99cf3e
--- /dev/null
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -0,0 +1,28 @@
+-module(emqx_connector_schema).
+
+-behaviour(hocon_schema).
+
+-include_lib("typerefl/include/types.hrl").
+
+-export([roots/0, fields/1]).
+
+%%======================================================================================
+%% Hocon Schema Definitions
+
+roots() -> ["connectors"].
+
+fields("connectors") ->
+ [ {mqtt,
+ sc(hoconsc:map(name,
+ hoconsc:union([ ref("mqtt_connector")
+ ])),
+ #{ desc => "MQTT bridges"
+ })}
+ ];
+
+fields("mqtt_connector") ->
+ emqx_connector_mqtt_schema:fields("connector").
+
+sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+ref(Field) -> hoconsc:ref(?MODULE, Field).
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
index 9b529b340..4cc240d9d 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
@@ -160,13 +160,17 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
handle_publish(Msg, undefined) ->
?SLOG(error, #{msg => "cannot publish to local broker as"
- " ingress_channles' is not configured",
+ " 'ingress' is not configured",
message => Msg});
-handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) ->
+handle_publish(Msg, Vars) ->
?SLOG(debug, #{msg => "publish to local broker",
message => Msg, vars => Vars}),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
- _ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]),
+ case Vars of
+ #{on_message_received := {Mod, Func, Args}} ->
+ _ = erlang:apply(Mod, Func, [Msg | Args]);
+ _ -> ok
+ end,
case maps:get(to_local_topic, Vars, undefined) of
undefined -> ok;
_Topic ->
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 5a52dc613..6436a4c96 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -21,7 +21,12 @@
-behaviour(hocon_schema).
-export([ roots/0
- , fields/1]).
+ , fields/1
+ ]).
+
+-export([ ingress_desc/0
+ , egress_desc/0
+ ]).
-import(emqx_schema, [mk_duration/2]).
@@ -29,6 +34,10 @@ roots() ->
fields("config").
fields("config") ->
+ fields("connector") ++
+ topic_mappings();
+
+fields("connector") ->
[ {server,
sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883"
@@ -76,31 +85,6 @@ fields("config") ->
sc(ref("replayq"),
#{ desc => """
Queue messages in disk files.
-"""
- })}
- , {ingress,
- sc(ref("ingress"),
- #{ default => #{}
- , desc => """
-The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
-send them to the local broker.
-Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
-'payload'.
-NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
-configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
-the rule.
-"""
- })}
- , {egress,
- sc(ref("egress"),
- #{ default => #{}
- , desc => """
-The egress config defines how this bridge forwards messages from the local broker to the remote
-broker.
-Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
-is configured, then both the data got from the rule and the MQTT messages that matches
-from_local_topic will be forwarded.
"""
})}
] ++ emqx_connector_schema_lib:ssl_fields();
@@ -122,6 +106,12 @@ fields("ingress") ->
#{ desc => """
Send messages to which topic of the local broker.
Template with variables is allowed.
+"""
+ })}
+ , {hookpoint,
+ sc(binary(),
+ #{ desc => """
+The hookpoint will be triggered when there's any message received from the remote broker.
"""
})}
] ++ common_inout_confs();
@@ -170,6 +160,38 @@ the memory cache reaches 'seg_bytes'.
})}
].
+topic_mappings() ->
+ [ {ingress,
+ sc(ref("ingress"),
+ #{ default => #{}
+ , desc => ingress_desc()
+ })}
+ , {egress,
+ sc(ref("egress"),
+ #{ default => #{}
+ , desc => egress_desc()
+ })}
+ ].
+
+ingress_desc() -> """
+The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
+send them to the local broker.
+Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
+'payload'.
+NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
+configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
+the rule.
+""".
+
+egress_desc() -> """
+The egress config defines how this bridge forwards messages from the local broker to the remote
+broker.
+Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
+is configured, then both the data got from the rule and the MQTT messages that matches
+from_local_topic will be forwarded.
+""".
+
common_inout_confs() ->
[ {qos,
sc(qos(),
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
index 4027ee898..95424fe3a 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
@@ -381,7 +381,7 @@ pop_and_send_loop(#{replayq := Q} = State, N) ->
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
?SLOG(error, #{msg => "cannot forward messages to remote broker"
- " as forwards is not configured",
+ " as 'egress' is not configured",
messages => Msg});
do_send(#{inflight := Inflight,
connection := Connection,
diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl
index 614dc841b..e80eb2e5f 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_events.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl
@@ -68,7 +68,7 @@ reload() ->
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
end, emqx_rule_engine:get_rules()).
-load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) ->
+load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) ->
emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
[#{bridge_topic => BridgeTopic}]});
load(Topic) ->