diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl
index b86322c94..9728e7a23 100644
--- a/apps/emqx/src/emqx_authentication.erl
+++ b/apps/emqx/src/emqx_authentication.erl
@@ -79,8 +79,8 @@
]).
%% proxy callback
--export([ pre_config_update/2
- , post_config_update/4
+-export([ pre_config_update/3
+ , post_config_update/5
]).
-export_type([ authenticator_id/0
@@ -238,10 +238,10 @@ get_enabled(Authenticators) ->
%% APIs
%%------------------------------------------------------------------------------
-pre_config_update(UpdateReq, OldConfig) ->
+pre_config_update(_, UpdateReq, OldConfig) ->
emqx_authentication_config:pre_config_update(UpdateReq, OldConfig).
-post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
+post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
emqx_authentication_config:post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs).
%% @doc Get all registered authentication providers.
diff --git a/apps/emqx/src/emqx_authentication_config.erl b/apps/emqx/src/emqx_authentication_config.erl
index 2f7e55eba..a2d8cada2 100644
--- a/apps/emqx/src/emqx_authentication_config.erl
+++ b/apps/emqx/src/emqx_authentication_config.erl
@@ -19,8 +19,8 @@
-behaviour(emqx_config_handler).
--export([ pre_config_update/2
- , post_config_update/4
+-export([ pre_config_update/3
+ , post_config_update/5
]).
-export([ authenticator_id/1
@@ -53,9 +53,9 @@
%% Callbacks of config handler
%%------------------------------------------------------------------------------
--spec pre_config_update(update_request(), emqx_config:raw_config())
+-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config())
-> {ok, map() | list()} | {error, term()}.
-pre_config_update(UpdateReq, OldConfig) ->
+pre_config_update(_, UpdateReq, OldConfig) ->
try do_pre_config_update(UpdateReq, to_list(OldConfig)) of
{error, Reason} -> {error, Reason};
{ok, NewConfig} -> {ok, return_map(NewConfig)}
@@ -102,9 +102,9 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
end
end.
--spec post_config_update(update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
+-spec post_config_update(list(atom()), update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
-> ok | {ok, map()} | {error, term()}.
-post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
+post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(UpdateReq, check_configs(to_list(NewConfig)), OldConfig, AppEnvs).
do_post_config_update({create_authenticator, ChainName, Config}, _NewConfig, _OldConfig, _AppEnvs) ->
diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl
index c75f0ee4d..d99e18e70 100644
--- a/apps/emqx/src/emqx_config_handler.erl
+++ b/apps/emqx/src/emqx_config_handler.erl
@@ -45,14 +45,14 @@
-type handler_name() :: module().
-type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
--optional_callbacks([ pre_config_update/2
- , post_config_update/4
+-optional_callbacks([ pre_config_update/3
+ , post_config_update/5
]).
--callback pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) ->
+-callback pre_config_update([atom()], emqx_config:update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
--callback post_config_update(emqx_config:update_request(), emqx_config:config(),
+-callback post_config_update([atom()], emqx_config:update_request(), emqx_config:config(),
emqx_config:config(), emqx_config:app_envs()) ->
ok | {ok, Result::any()} | {error, Reason::term()}.
@@ -181,14 +181,20 @@ process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
Error -> Error
end.
-do_update_config([], Handlers, OldRawConf, UpdateReq) ->
- call_pre_config_update(Handlers, OldRawConf, UpdateReq);
-do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) ->
+do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) ->
+ do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, []).
+
+do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
+ call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath);
+do_update_config([ConfKey | SubConfKeyPath], Handlers, OldRawConf,
+ UpdateReq, ConfKeyPath0) ->
+ ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
SubOldRawConf = get_sub_config(bin(ConfKey), OldRawConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers),
- case do_update_config(ConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq) of
+ case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
{ok, NewUpdateReq} ->
- call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq});
+ call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq},
+ ConfKeyPath);
Error ->
Error
end.
@@ -211,18 +217,25 @@ check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, Override
Error -> Error
end.
-do_post_config_update([], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
- call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs), Result);
-do_post_config_update([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs,
- Result) ->
+do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
+ do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs,
+ Result, []).
+
+do_post_config_update([], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result,
+ ConfKeyPath) ->
+ call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs),
+ Result, ConfKeyPath);
+do_post_config_update([ConfKey | SubConfKeyPath], Handlers, OldConf, NewConf, AppEnvs,
+ UpdateArgs, Result, ConfKeyPath0) ->
+ ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
SubOldConf = get_sub_config(ConfKey, OldConf),
SubNewConf = get_sub_config(ConfKey, NewConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers),
- case do_post_config_update(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf, AppEnvs,
- UpdateArgs, Result) of
+ case do_post_config_update(SubConfKeyPath, SubHandlers, SubOldConf, SubNewConf, AppEnvs,
+ UpdateArgs, Result, ConfKeyPath) of
{ok, Result1} ->
call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs),
- Result1);
+ Result1, ConfKeyPath);
Error -> Error
end.
@@ -237,22 +250,23 @@ get_sub_config(ConfKey, Conf) when is_map(Conf) ->
get_sub_config(_, _Conf) -> %% the Conf is a primitive
undefined.
-call_pre_config_update(Handlers, OldRawConf, UpdateReq) ->
+call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
HandlerName = maps:get(?MOD, Handlers, undefined),
- case erlang:function_exported(HandlerName, pre_config_update, 2) of
+ case erlang:function_exported(HandlerName, pre_config_update, 3) of
true ->
- case HandlerName:pre_config_update(UpdateReq, OldRawConf) of
+ case HandlerName:pre_config_update(ConfKeyPath, UpdateReq, OldRawConf) of
{ok, NewUpdateReq} -> {ok, NewUpdateReq};
{error, Reason} -> {error, {pre_config_update, HandlerName, Reason}}
end;
false -> merge_to_old_config(UpdateReq, OldRawConf)
end.
-call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result) ->
+call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result, ConfKeyPath) ->
HandlerName = maps:get(?MOD, Handlers, undefined),
- case erlang:function_exported(HandlerName, post_config_update, 4) of
+ case erlang:function_exported(HandlerName, post_config_update, 5) of
true ->
- case HandlerName:post_config_update(UpdateReq, NewConf, OldConf, AppEnvs) of
+ case HandlerName:post_config_update(ConfKeyPath, UpdateReq, NewConf, OldConf,
+ AppEnvs) of
ok -> {ok, Result};
{ok, Result1} ->
{ok, Result#{HandlerName => Result1}};
diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl
index c6ef92b95..2b9a76fe3 100644
--- a/apps/emqx/src/emqx_listeners.erl
+++ b/apps/emqx/src/emqx_listeners.erl
@@ -46,7 +46,7 @@
, parse_listener_id/1
]).
--export([post_config_update/4]).
+-export([post_config_update/5]).
-define(CONF_KEY_PATH, [listeners]).
-define(TYPES_STRING, ["tcp","ssl","ws","wss","quic"]).
@@ -272,7 +272,7 @@ delete_authentication(Type, ListenerName, _Conf) ->
emqx_authentication:delete_chain(listener_id(Type, ListenerName)).
%% Update the listeners at runtime
-post_config_update(_Req, NewListeners, OldListeners, _AppEnvs) ->
+post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_listeners(NewListeners, OldListeners),
perform_listener_changes(fun stop_listener/3, Removed),
diff --git a/apps/emqx/src/emqx_logger.erl b/apps/emqx/src/emqx_logger.erl
index 29f5bd597..c3e91be4b 100644
--- a/apps/emqx/src/emqx_logger.erl
+++ b/apps/emqx/src/emqx_logger.erl
@@ -70,7 +70,7 @@
, stop_log_handler/1
]).
--export([post_config_update/4]).
+-export([post_config_update/5]).
-type(peername_str() :: list()).
-type(logger_dst() :: file:filename() | console | unknown).
@@ -123,7 +123,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%% emqx_config_handler callbacks
%%--------------------------------------------------------------------
-post_config_update(_Req, _NewConf, _OldConf, AppEnvs) ->
+post_config_update(_, _Req, _NewConf, _OldConf, AppEnvs) ->
gen_server:call(?MODULE, {update_config, AppEnvs}, 5000).
%%--------------------------------------------------------------------
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 3e3318475..b7f3c5690 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1326,7 +1326,7 @@ to_bar_separated_list(Str) ->
{ok, string:tokens(Str, "| ")}.
to_ip_port(Str) ->
- case string:tokens(Str, ":") of
+ case string:tokens(Str, ": ") of
[Ip, Port] ->
PortVal = list_to_integer(Port),
case inet:parse_address(Ip) of
@@ -1377,7 +1377,7 @@ validate_alarm_actions(Actions) ->
end.
parse_user_lookup_fun(StrConf) ->
- [ModStr, FunStr] = string:tokens(str(StrConf), ":"),
+ [ModStr, FunStr] = string:tokens(str(StrConf), ": "),
Mod = list_to_atom(ModStr),
Fun = list_to_atom(FunStr),
{fun Mod:Fun/3, undefined}.
diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl
index cfabdefa9..4496e0299 100644
--- a/apps/emqx_authz/src/emqx_authz.erl
+++ b/apps/emqx_authz/src/emqx_authz.erl
@@ -36,7 +36,7 @@
, authorize/5
]).
--export([post_config_update/4, pre_config_update/2]).
+-export([post_config_update/5, pre_config_update/3]).
-export([acl_conf_file/0]).
@@ -127,13 +127,13 @@ do_update({_, Sources}, _Conf) when is_list(Sources)->
do_update({Op, Sources}, Conf) ->
error({bad_request, #{op => Op, sources => Sources, conf => Conf}}).
-pre_config_update(Cmd, Conf) ->
+pre_config_update(_, Cmd, Conf) ->
{ok, do_update(Cmd, Conf)}.
-post_config_update(_, undefined, _Conf, _AppEnvs) ->
+post_config_update(_, _, undefined, _Conf, _AppEnvs) ->
ok;
-post_config_update(Cmd, NewSources, _OldSource, _AppEnvs) ->
+post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) ->
ok = do_post_update(Cmd, NewSources),
ok = emqx_authz_cache:drain_cache().
diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf
index 82087387d..c658bc4ce 100644
--- a/apps/emqx_bridge/etc/emqx_bridge.conf
+++ b/apps/emqx_bridge/etc/emqx_bridge.conf
@@ -4,7 +4,7 @@
## MQTT bridges to/from another MQTT broker
#bridges.mqtt.my_ingress_mqtt_bridge {
-# connector = my_mqtt_connector
+# connector = "mqtt:my_mqtt_connector"
# direction = ingress
# ## topic mappings for this bridge
# from_remote_topic = "aws/#"
@@ -13,11 +13,10 @@
# payload = "${payload}"
# qos = "${qos}"
# retain = "${retain}"
-#
#}
#
#bridges.mqtt.my_egress_mqtt_bridge {
-# connector = my_mqtt_connector
+# connector = "mqtt:my_mqtt_connector"
# direction = egress
# ## topic mappings for this bridge
# from_local_topic = "emqx/#"
@@ -26,10 +25,10 @@
# qos = 1
# retain = false
#}
-
+#
## HTTP bridges to an HTTP server
#bridges.http.my_http_bridge {
-# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
+# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
# url = "http://localhost:9901/messages/${topic}"
# request_timeout = "30s"
# connect_timeout = "30s"
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index 34d851155..a1e436c61 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -18,7 +18,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
--export([post_config_update/4]).
+-export([post_config_update/5]).
-export([ load_hook/0
, reload_hook/0
@@ -39,6 +39,9 @@
, lookup/3
, list/0
, create/3
+ , recreate/2
+ , recreate/3
+ , create_dry_run/2
, remove/3
, update/3
, start/2
@@ -90,13 +93,15 @@ send_message(BridgeId, Message) ->
config_key_path() ->
[bridges].
+resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
resource_type(mqtt) -> emqx_connector_mqtt;
+resource_type(<<"http">>) -> emqx_connector_http;
resource_type(http) -> emqx_connector_http.
bridge_type(emqx_connector_mqtt) -> mqtt;
bridge_type(emqx_connector_http) -> http.
-post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
+post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_confs(NewConf, OldConf),
Result = perform_bridge_changes([
@@ -179,7 +184,7 @@ create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
ResId = resource_id(Type, Name),
- case emqx_resource:create(ResId,
+ case emqx_resource:create_local(ResId,
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
{ok, already_created} ->
emqx_resource:get_instance(ResId);
@@ -200,12 +205,27 @@ update(Type, Name, {_OldConf, Conf}) ->
%%
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
- emqx_resource:recreate(resource_id(Type, Name),
+ recreate(Type, Name, Conf).
+
+recreate(Type, Name) ->
+ recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
+
+recreate(Type, Name, Conf) ->
+ emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
+create_dry_run(Type, Conf) ->
+ Conf0 = Conf#{<<"ingress">> => #{<<"from_remote_topic">> => <<"t">>}},
+ case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
+ {ok, Conf1} ->
+ emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1);
+ {error, _} = Error ->
+ Error
+ end.
+
remove(Type, Name, _Conf) ->
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
- case emqx_resource:remove(resource_id(Type, Name)) of
+ case emqx_resource:remove_local(resource_id(Type, Name)) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
@@ -264,8 +284,18 @@ parse_confs(http, _Name,
, request_timeout => ReqTimeout
}
};
-parse_confs(Type, Name, #{connector := ConnName, direction := Direction} = Conf) ->
- ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
+parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
+ when is_binary(ConnId) ->
+ case emqx_connector:parse_connector_id(ConnId) of
+ {Type, ConnName} ->
+ ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
+ make_resource_confs(Direction, ConnectorConfs,
+ maps:without([connector, direction], Conf), Name);
+ {_ConnType, _ConnName} ->
+ error({cannot_use_connector_with_different_type, ConnId})
+ end;
+parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
+ when is_map(ConnectorConfs) ->
make_resource_confs(Direction, ConnectorConfs,
maps:without([connector, direction], Conf), Name).
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 2c6b1954d..fe8c02341 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -22,8 +22,8 @@
-export([ list_create_bridges_in_cluster/2
, list_local_bridges/1
, crud_bridges_in_cluster/2
- , crud_local_bridges/4
, manage_bridges/2
+ , lookup_from_local_node/2
]).
-define(TYPES, [mqtt, http]).
@@ -220,7 +220,15 @@ param_path_operation()->
}.
list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
- crud_bridges_in_cluster(post, Id, maps:remove(<<"id">>, Conf));
+ ?TRY_PARSE_ID(Id,
+ case emqx_bridge:lookup(BridgeType, BridgeName) of
+ {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
+ {error, not_found} ->
+ case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of
+ ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201);
+ {error, Error} -> {400, Error}
+ end
+ end);
list_create_bridges_in_cluster(get, _Params) ->
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
@@ -229,67 +237,46 @@ list_local_bridges(Node) when Node =:= node() ->
list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]).
-crud_bridges_in_cluster(Method, #{bindings := #{id := Id}, body := Body}) ->
- crud_bridges_in_cluster(Method, Id, Body).
+crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) ->
+ ?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200));
-crud_bridges_in_cluster(Method, Id, Body) ->
- Results = [crud_local_bridges(Node, Method, Id, Body) || Node <- mria_mnesia:running_nodes()],
- Filter = fun ({200}) -> false;
- ({Code, _}) when Code == 200; Code == 201 -> false;
- (_) -> true
- end,
- case lists:filter(Filter, Results) of
- [] ->
- case Results of
- [{200} | _] -> {200};
- [{Code, _} | _] when Code == 200; Code == 201 ->
- {Code, format_bridge_info([Bridge || {_, Bridge} <- Results])}
- end;
- Errors ->
- hd(Errors)
- end.
-
-crud_local_bridges(Node, Method, Id, Body) when Node =/= node() ->
- rpc_call(Node, crud_local_bridges, [Node, Method, Id, Body]);
-
-crud_local_bridges(_, get, Id, _Body) ->
- ?TRY_PARSE_ID(Id, case emqx_bridge:lookup(BridgeType, BridgeName) of
- {ok, Data} -> {200, format_resp(Data)};
- {error, not_found} ->
- {404, #{code => 102, message => <<"not_found: ", Id/binary>>}}
- end);
-
-crud_local_bridges(_, post, Id, Conf) ->
- ?TRY_PARSE_ID(Id,
- case emqx_bridge:lookup(BridgeType, BridgeName) of
- {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
- {error, not_found} ->
- case ensure_bridge(Id, BridgeType, BridgeName, Conf) of
- {ok, Resp} -> {201, Resp};
- {error, Error} -> {400, Error}
- end
- end);
-
-crud_local_bridges(_, put, Id, Conf) ->
+crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
- case ensure_bridge(Id, BridgeType, BridgeName, Conf) of
- {ok, Resp} -> {200, Resp};
+ case ensure_bridge(BridgeType, BridgeName, Conf) of
+ ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200);
{error, Error} -> {400, Error}
end;
{error, not_found} ->
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
end);
-crud_local_bridges(_, delete, Id, _Body) ->
+crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
- case emqx:remove_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName]) of
+ case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
+ #{override_to => cluster}) of
{ok, _} -> {204};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
+lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
+ case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
+ {ok, [{ok, _} | _] = Results} ->
+ {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
+ {ok, [{error, not_found} | _]} ->
+ {404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)};
+ {error, ErrL} ->
+ {500, error_msg('UNKNOWN_ERROR', ErrL)}
+ end.
+
+lookup_from_local_node(BridgeType, BridgeName) ->
+ case emqx_bridge:lookup(BridgeType, BridgeName) of
+ {ok, Res} -> {ok, format_resp(Res)};
+ Error -> Error
+ end.
+
manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
OperFun =
fun (<<"start">>) -> start;
@@ -304,13 +291,12 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
-ensure_bridge(Id, BridgeType, BridgeName, Conf) ->
- case emqx:update_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
- #{rawconf_with_defaults => true}) of
- {ok, #{raw_config := RawConf, post_config_update := #{emqx_bridge := Data}}} ->
- {ok, format_resp(#{id => Id, raw_config => RawConf, resource_data => Data})};
+ensure_bridge(BridgeType, BridgeName, Conf) ->
+ case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
+ #{override_to => cluster}) of
+ {ok, _} -> ok;
{error, Reason} ->
- {error, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
+ {error, error_msg('BAD_ARG', Reason)}
end.
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
@@ -368,6 +354,14 @@ format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, st
metrics => ?METRICS(0,0,0,0,0)
}.
+rpc_multicall(Func, Args) ->
+ Nodes = mria_mnesia:running_nodes(),
+ ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),
+ case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
+ [] -> {ok, [Res || {ok, Res} <- ResL]};
+ ErrL -> {error, ErrL}
+ end.
+
rpc_call(Node, Fun, Args) ->
rpc_call(Node, ?MODULE, Fun, Args).
@@ -378,3 +372,8 @@ rpc_call(Node, Mod, Fun, Args) ->
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
+
+error_msg(Code, Msg) when is_binary(Msg) ->
+ #{code => Code, message => Msg};
+error_msg(Code, Msg) ->
+ #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index fcabfaf83..e898d0dc4 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -128,7 +128,7 @@ t_http_crud_apis(_) ->
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
- %% then we add a http bridge, using PUT
+ %% then we add a http bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl
index c3dfa8c49..733d9824f 100644
--- a/apps/emqx_conf/src/emqx_conf.erl
+++ b/apps/emqx_conf/src/emqx_conf.erl
@@ -81,7 +81,10 @@ get_node_and_config(KeyPath) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts0) ->
Args = [KeyPath, UpdateReq, Opts0],
- multicall(emqx, update_config, Args).
+ case multicall(emqx, update_config, Args) of
+ {ok, _TnxId, Res} -> Res;
+ {error, Res} -> Res
+ end.
%% @doc Update the specified node's key path in local-override.conf.
-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_request(),
diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl
index 0034fb28b..940e958e3 100644
--- a/apps/emqx_connector/src/emqx_connector.erl
+++ b/apps/emqx_connector/src/emqx_connector.erl
@@ -24,15 +24,45 @@
-export([ list/0
, lookup/1
, lookup/2
+ , create_dry_run/2
, update/2
, update/3
, delete/1
, delete/2
]).
+-export([ post_config_update/5
+ ]).
+
config_key_path() ->
[connectors].
+post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
+ ConnId = connector_id(Type, Name),
+ LinkedBridgeIds = lists:foldl(fun
+ (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc)
+ when ConnId0 == ConnId ->
+ [BId | Acc];
+ (_, Acc) -> Acc
+ end, [], emqx_bridge:list()),
+ case LinkedBridgeIds of
+ [] -> ok;
+ _ -> {error, {dependency_bridges_exist, LinkedBridgeIds}}
+ end;
+post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) ->
+ ConnId = connector_id(Type, Name),
+ lists:foreach(fun
+ (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId ->
+ {BType, BName} = emqx_bridge:parse_bridge_id(BId),
+ BridgeConf = emqx:get_config([bridges, BType, BName]),
+ case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of
+ {ok, _} -> ok;
+ {error, Reason} -> error({update_bridge_error, Reason})
+ end;
+ (_) ->
+ ok
+ end, emqx_bridge:list()).
+
connector_id(Type0, Name0) ->
Type = bin(Type0),
Name = bin(Name0),
@@ -62,6 +92,9 @@ lookup(Type, Name) ->
Conf -> {ok, Conf#{<<"id">> => Id}}
end.
+create_dry_run(Type, Conf) ->
+ emqx_bridge:create_dry_run(Type, Conf).
+
update(Id, Conf) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
update(Type, Name, Conf).
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 490722e1d..6eb397519 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -28,11 +28,13 @@
-export([api_spec/0, paths/0, schema/1, namespace/0]).
%% API callbacks
--export(['/connectors'/2, '/connectors/:id'/2]).
+-export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_connector:parse_connector_id(Id) of
- {ConnType, ConnName} -> EXPR
+ {ConnType, ConnName} ->
+ _ = ConnName,
+ EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
@@ -44,7 +46,7 @@ namespace() -> "connector".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
-paths() -> ["/connectors", "/connectors/:id"].
+paths() -> ["/connectors_test", "/connectors", "/connectors/:id"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
@@ -55,6 +57,10 @@ connector_info() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info")
]).
+connector_test_info() ->
+ hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_test_info")
+ ]).
+
connector_req() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector")
]).
@@ -62,6 +68,22 @@ connector_req() ->
param_path_id() ->
[{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}].
+schema("/connectors_test") ->
+ #{
+ operationId => '/connectors_test',
+ post => #{
+ tags => [<<"connectors">>],
+ description => <<"Test creating a new connector by given Id
"
+ "The Id must be of format :">>,
+ summary => <<"Test creating connector">>,
+ requestBody => connector_test_info(),
+ responses => #{
+ 200 => <<"Test connector OK">>,
+ 400 => error_schema('TEST_FAILED', "connector test failed")
+ }
+ }
+ };
+
schema("/connectors") ->
#{
operationId => '/connectors',
@@ -116,11 +138,18 @@ schema("/connectors/:id") ->
summary => <<"Delete connector">>,
parameters => param_path_id(),
responses => #{
- 200 => <<"Delete connector successfully">>,
+ 204 => <<"Delete connector successfully">>,
400 => error_schema('DELETE_FAIL', "Delete failed")
}}
}.
+'/connectors_test'(post, #{body := #{<<"bridge_type">> := ConnType} = Params}) ->
+ case emqx_connector:create_dry_run(ConnType, maps:remove(<<"bridge_type">>, Params)) of
+ ok -> {200};
+ {error, Error} ->
+ {400, error_msg('BAD_ARG', Error)}
+ end.
+
'/connectors'(get, _Request) ->
{200, emqx_connector:list()};
@@ -161,7 +190,7 @@ schema("/connectors/:id") ->
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:delete(ConnType, ConnName) of
- {ok, _} -> {200};
+ {ok, _} -> {204};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end;
{error, not_found} ->
diff --git a/apps/emqx_connector/src/emqx_connector_app.erl b/apps/emqx_connector/src/emqx_connector_app.erl
index 4de078076..93e577fde 100644
--- a/apps/emqx_connector/src/emqx_connector_app.erl
+++ b/apps/emqx_connector/src/emqx_connector_app.erl
@@ -20,11 +20,15 @@
-export([start/2, stop/1]).
+-define(CONF_HDLR_PATH, (emqx_connector:config_key_path() ++ ['?', '?'])).
+
start(_StartType, _StartArgs) ->
+ ok = emqx_config_handler:add_handler(?CONF_HDLR_PATH, emqx_connector),
emqx_connector_mqtt_worker:register_metrics(),
emqx_connector_sup:start_link().
stop(_State) ->
+ emqx_config_handler:remove_handler(?CONF_HDLR_PATH),
ok.
%% internal functions
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index abd3d2f7b..1d55afc96 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -138,6 +138,8 @@ on_health_check(_InstId, #{name := InstanceId} = State) ->
_ -> {error, {connector_down, InstanceId}, State}
end.
+make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
+ undefined;
make_sub_confs(undefined) ->
undefined;
make_sub_confs(SubRemoteConf) ->
@@ -148,6 +150,8 @@ make_sub_confs(SubRemoteConf) ->
SubConf#{on_message_received => MFA}
end.
+make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
+ undefined;
make_forward_confs(undefined) ->
undefined;
make_forward_confs(FrowardConf) ->
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
index 6e353b524..264a6dbd6 100644
--- a/apps/emqx_connector/src/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -25,6 +25,10 @@ fields("mqtt_connector") ->
fields("mqtt_connector_info") ->
[{id, sc(binary(), #{desc => "The connector Id"})}]
+ ++ fields("mqtt_connector");
+
+fields("mqtt_connector_test_info") ->
+ [{bridge_type, sc(mqtt, #{desc => "The Bridge Type"})}]
++ fields("mqtt_connector").
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
index 9ecfb56b3..b8a32c401 100644
--- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl
@@ -105,7 +105,7 @@ servers(validator) -> [?NOT_EMPTY("the value of the field 'servers' cannot be em
servers(_) -> undefined.
to_ip_port(Str) ->
- case string:tokens(Str, ":") of
+ case string:tokens(Str, ": ") of
[Ip, Port] ->
case inet:parse_address(Ip) of
{ok, R} -> {ok, {R, list_to_integer(Port)}};
@@ -121,7 +121,7 @@ ip_port_to_string({Ip, Port}) when is_tuple(Ip) ->
to_servers(Str) ->
{ok, lists:map(fun(Server) ->
- case string:tokens(Server, ":") of
+ case string:tokens(Server, ": ") of
[Ip] ->
[{host, Ip}];
[Ip, Port] ->
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
new file mode 100644
index 000000000..c9bd58394
--- /dev/null
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -0,0 +1,308 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connector_api_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include("emqx/include/emqx.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(CONF_DEFAULT, <<"connectors: {}">>).
+-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
+-define(CONNECTR_ID, <<"mqtt:test_connector">>).
+-define(BRIDGE_ID, <<"mqtt:test_bridge">>).
+-define(MQTT_CONNECOTR(Username),
+#{
+ <<"server">> => <<"127.0.0.1:1883">>,
+ <<"username">> => Username,
+ <<"password">> => <<"">>,
+ <<"proto_ver">> => <<"v4">>,
+ <<"ssl">> => #{<<"enable">> => false}
+}).
+-define(MQTT_CONNECOTR2(Server),
+ ?MQTT_CONNECOTR(<<"user1">>)#{<<"server">> => Server}).
+
+-define(MQTT_BRIDGE(ID),
+#{
+ <<"connector">> => ID,
+ <<"direction">> => <<"ingress">>,
+ <<"from_remote_topic">> => <<"remote_topic/#">>,
+ <<"to_local_topic">> => <<"local_topic/${topic}">>,
+ <<"subscribe_qos">> => 1,
+ <<"payload">> => <<"${payload}">>,
+ <<"qos">> => <<"${qos}">>,
+ <<"retain">> => <<"${retain}">>
+}).
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+ [].
+
+suite() ->
+ [{timetrap,{seconds,30}}].
+
+init_per_suite(Config) ->
+ ok = emqx_config:put([emqx_dashboard], #{
+ default_username => <<"admin">>,
+ default_password => <<"public">>,
+ listeners => [#{
+ protocol => http,
+ port => 18083
+ }]
+ }),
+ ok = application:load(emqx_conf),
+ ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
+ ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT),
+ ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
+ Config.
+
+end_per_suite(_Config) ->
+ ok = ekka:stop(),
+ emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
+ ok.
+
+init_per_testcase(_, Config) ->
+ {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+ Config.
+end_per_testcase(_, _Config) ->
+ ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_mqtt_crud_apis(_) ->
+ %% assert we there's no connectors at first
+ {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+
+ %% then we add a mqtt connector, using POST
+ %% POST /connectors/ will create a connector
+ User1 = <<"user1">>,
+ {ok, 201, Connector} = request(post, uri(["connectors"]),
+ ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+
+ %ct:pal("---connector: ~p", [Connector]),
+ ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"username">> := User1
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ }, jsx:decode(Connector)),
+
+ %% create a again returns an error
+ {ok, 400, RetMsg} = request(post, uri(["connectors"]),
+ ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+ ?assertMatch(
+ #{ <<"code">> := _
+ , <<"message">> := <<"connector already exists">>
+ }, jsx:decode(RetMsg)),
+
+ %% update the request-path of the connector
+ User2 = <<"user2">>,
+ {ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ ?MQTT_CONNECOTR(User2)),
+ ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"username">> := User2
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ }, jsx:decode(Connector2)),
+
+ %% list all connectors again, assert Connector2 is in it
+ {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
+ ?assertMatch([#{ <<"id">> := ?CONNECTR_ID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"username">> := User2
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ }], jsx:decode(Connector2Str)),
+
+ %% get the connector by id
+ {ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
+ ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"username">> := User2
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ }, jsx:decode(Connector3Str)),
+
+ %% delete the connector
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+
+ %% update a deleted connector returns an error
+ {ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ ?MQTT_CONNECOTR(User2)),
+ ?assertMatch(
+ #{ <<"code">> := _
+ , <<"message">> := <<"connector not found">>
+ }, jsx:decode(ErrMsg2)),
+ ok.
+
+t_mqtt_conn_bridge(_) ->
+ %% assert we there's no connectors and no bridges at first
+ {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+ %% then we add a mqtt connector, using POST
+ User1 = <<"user1">>,
+ {ok, 201, Connector} = request(post, uri(["connectors"]),
+ ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
+
+ %ct:pal("---connector: ~p", [Connector]),
+ ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"username">> := User1
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ }, jsx:decode(Connector)),
+
+ %% ... and a MQTT bridge, using POST
+ %% we bind this bridge to the connector created just now
+ {ok, 201, Bridge} = request(post, uri(["bridges"]),
+ ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
+
+ %ct:pal("---bridge: ~p", [Bridge]),
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ , <<"bridge_type">> := <<"mqtt">>
+ , <<"status">> := <<"connected">>
+ , <<"connector">> := ?CONNECTR_ID
+ }, jsx:decode(Bridge)),
+
+ %% we now test if the bridge works as expected
+
+ RemoteTopic = <<"remote_topic/1">>,
+ LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
+ Payload = <<"hello">>,
+ emqx:subscribe(LocalTopic),
+ %% PUBLISH a message to the 'remote' broker, as we have only one broker,
+ %% the remote broker is also the local one.
+ emqx:publish(emqx_message:make(RemoteTopic, Payload)),
+
+ %% we should receive a message on the local broker, with specified topic
+ ?assert(
+ receive
+ {deliver, LocalTopic, #message{payload = Payload}} ->
+ ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
+ true;
+ Msg ->
+ ct:pal("Msg: ~p", [Msg]),
+ false
+ after 100 ->
+ false
+ end),
+
+ %% delete the bridge
+ {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+ %% delete the connector
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+ ok.
+
+%% t_mqtt_conn_update:
+%% - update a connector should also update all of the the bridges
+%% - cannot delete a connector that is used by at least one bridge
+t_mqtt_conn_update(_) ->
+ %% assert we there's no connectors and no bridges at first
+ {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+ %% then we add a mqtt connector, using POST
+ {ok, 201, Connector} = request(post, uri(["connectors"]),
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"id">> => ?CONNECTR_ID}),
+
+ %ct:pal("---connector: ~p", [Connector]),
+ ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ }, jsx:decode(Connector)),
+
+ %% ... and a MQTT bridge, using POST
+ %% we bind this bridge to the connector created just now
+ {ok, 201, Bridge} = request(post, uri(["bridges"]),
+ ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
+ ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ , <<"bridge_type">> := <<"mqtt">>
+ , <<"status">> := <<"connected">>
+ , <<"connector">> := ?CONNECTR_ID
+ }, jsx:decode(Bridge)),
+
+ %% then we try to update 'server' of the connector, to an unavailable IP address
+ %% the update should fail because of 'unreachable' or 'connrefused'
+ {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)),
+ %% we fix the 'server' parameter to a normal one, it should work
+ {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
+ %% delete the bridge
+ {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+ %% delete the connector
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
+
+t_mqtt_conn_testing(_) ->
+ %% APIs for testing the connectivity
+ %% then we add a mqtt connector, using POST
+ {ok, 200, <<>>} = request(post, uri(["connectors_test"]),
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"bridge_type">> => <<"mqtt">>}),
+ {ok, 400, _} = request(post, uri(["connectors_test"]),
+ ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{<<"bridge_type">> => <<"mqtt">>}).
+
+%%--------------------------------------------------------------------
+%% HTTP Request
+%%--------------------------------------------------------------------
+-define(HOST, "http://127.0.0.1:18083/").
+-define(API_VERSION, "v5").
+-define(BASE_PATH, "api").
+
+request(Method, Url, Body) ->
+ Request = case Body of
+ [] -> {Url, [auth_header_()]};
+ _ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
+ end,
+ ct:pal("Method: ~p, Request: ~p", [Method, Request]),
+ case httpc:request(Method, Request, [], [{body_format, binary}]) of
+ {error, socket_closed_remotely} ->
+ {error, socket_closed_remotely};
+ {ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
+ {ok, Code, Return};
+ {ok, {Reason, _, _}} ->
+ {error, Reason}
+ end.
+
+uri() -> uri([]).
+uri(Parts) when is_list(Parts) ->
+ NParts = [E || E <- Parts],
+ ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
+
+auth_header_() ->
+ Username = <<"admin">>,
+ Password = <<"public">>,
+ {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
+ {"Authorization", "Bearer " ++ binary_to_list(Token)}.
+
diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl
index 9e5893639..ddbf99189 100644
--- a/apps/emqx_gateway/src/emqx_gateway_conf.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl
@@ -52,8 +52,8 @@
]).
%% callbacks for emqx_config_handler
--export([ pre_config_update/2
- , post_config_update/4
+-export([ pre_config_update/3
+ , post_config_update/5
]).
-type atom_or_bin() :: atom() | binary().
@@ -246,10 +246,11 @@ bin(B) when is_binary(B) ->
%% Config Handler
%%--------------------------------------------------------------------
--spec pre_config_update(emqx_config:update_request(),
+-spec pre_config_update(list(atom()),
+ emqx_config:update_request(),
emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
-pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
+pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
@@ -257,7 +258,7 @@ pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
_ ->
{error, already_exist}
end;
-pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
+pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
{error, not_found};
@@ -266,14 +267,14 @@ pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
<<"authentication">>], Conf),
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
end;
-pre_config_update({unload_gateway, GwName}, RawConf) ->
+pre_config_update(_, {unload_gateway, GwName}, RawConf) ->
_ = tune_gw_certs(fun clear_certs/2,
GwName,
maps:get(GwName, RawConf, #{})
),
{ok, maps:remove(GwName, RawConf)};
-pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined ->
@@ -285,7 +286,7 @@ pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
_ ->
{error, already_exist}
end;
-pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined ->
@@ -298,7 +299,7 @@ pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
#{GwName => #{<<"listeners">> => NListener}})}
end;
-pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
+pre_config_update(_, {remove_listener, GwName, {LType, LName}}, RawConf) ->
Path = [GwName, <<"listeners">>, LType, LName],
case emqx_map_lib:deep_get(Path, RawConf, undefined) of
undefined ->
@@ -308,7 +309,7 @@ pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
{ok, emqx_map_lib:deep_remove(Path, RawConf)}
end;
-pre_config_update({add_authn, GwName, Conf}, RawConf) ->
+pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"authentication">>], RawConf, undefined) of
undefined ->
@@ -318,7 +319,7 @@ pre_config_update({add_authn, GwName, Conf}, RawConf) ->
_ ->
{error, already_exist}
end;
-pre_config_update({add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf, undefined) of
@@ -336,7 +337,7 @@ pre_config_update({add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
{error, already_exist}
end
end;
-pre_config_update({update_authn, GwName, Conf}, RawConf) ->
+pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"authentication">>], RawConf, undefined) of
undefined ->
@@ -346,7 +347,7 @@ pre_config_update({update_authn, GwName, Conf}, RawConf) ->
RawConf,
#{GwName => #{<<"authentication">> => Conf}})}
end;
-pre_config_update({update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
+pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf, undefined) of
@@ -368,22 +369,24 @@ pre_config_update({update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
{ok, emqx_map_lib:deep_merge(RawConf, NGateway)}
end
end;
-pre_config_update({remove_authn, GwName}, RawConf) ->
+pre_config_update(_, {remove_authn, GwName}, RawConf) ->
{ok, emqx_map_lib:deep_remove(
[GwName, <<"authentication">>], RawConf)};
-pre_config_update({remove_authn, GwName, {LType, LName}}, RawConf) ->
+pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
Path = [GwName, <<"listeners">>, LType, LName, <<"authentication">>],
{ok, emqx_map_lib:deep_remove(Path, RawConf)};
-pre_config_update(UnknownReq, _RawConf) ->
+pre_config_update(_, UnknownReq, _RawConf) ->
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
{error, badreq}.
--spec post_config_update(emqx_config:update_request(), emqx_config:config(),
+-spec post_config_update(list(atom()),
+ emqx_config:update_request(),
+ emqx_config:config(),
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
-post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
+post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
[_Tag, GwName0 | _] = tuple_to_list(Req),
GwName = binary_to_existing_atom(GwName0),
@@ -398,7 +401,7 @@ post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
{New, Old} when is_map(New), is_map(Old) ->
emqx_gateway:update(GwName, New)
end;
-post_config_update(_Req, _NewConfig, _OldConfig, _AppEnvs) ->
+post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok.
%%--------------------------------------------------------------------
diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl
index 6d420e0d7..ee12d2c89 100644
--- a/apps/emqx_resource/src/emqx_resource.erl
+++ b/apps/emqx_resource/src/emqx_resource.erl
@@ -84,7 +84,7 @@
% , inc_counter/3 %% increment the counter by a given integer
]).
--define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => false}).
+-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
-optional_callbacks([ on_query/4
, on_health_check/2
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
index 42f652d59..bcf0fa02a 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
@@ -25,7 +25,7 @@
-export([start_link/0]).
--export([ post_config_update/4
+-export([ post_config_update/5
, config_key_path/0
]).
@@ -81,7 +81,7 @@ start_link() ->
%%------------------------------------------------------------------------------
%% The config handler for emqx_rule_engine
%%------------------------------------------------------------------------------
-post_config_update(_Req, NewRules, OldRules, _AppEnvs) ->
+post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Updated}
= emqx_map_lib:diff_maps(NewRules, OldRules),
maps_foreach(fun({Id, {_Old, New}}) ->