Merge branch 'release-5.0-beta.3' into gw-review-r4

This commit is contained in:
JianBo He 2022-01-03 16:05:46 +08:00 committed by GitHub
commit e00cf71911
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 699 additions and 311 deletions

View File

@ -979,7 +979,7 @@ authenticator_examples() ->
mechanism => <<"password-based">>, mechanism => <<"password-based">>,
backend => <<"http">>, backend => <<"http">>,
method => <<"post">>, method => <<"post">>,
url => <<"http://127.0.0.2:8080">>, url => <<"http://127.0.0.1:18083">>,
headers => #{ headers => #{
<<"content-type">> => <<"application/json">> <<"content-type">> => <<"application/json">>
}, },

View File

@ -153,9 +153,8 @@ t_destroy(_Config) ->
?GLOBAL), ?GLOBAL),
% Authenticator should not be usable anymore % Authenticator should not be usable anymore
?assertException( ?assertMatch(
error, ignore,
_,
emqx_authn_http:authenticate( emqx_authn_http:authenticate(
Credentials, Credentials,
State)). State)).

View File

@ -146,9 +146,8 @@ t_destroy(_Config) ->
?GLOBAL), ?GLOBAL),
% Authenticator should not be usable anymore % Authenticator should not be usable anymore
?assertException( ?assertMatch(
error, ignore,
_,
emqx_authn_mongodb:authenticate( emqx_authn_mongodb:authenticate(
#{username => <<"plain">>, #{username => <<"plain">>,
password => <<"plain">> password => <<"plain">>

View File

@ -159,9 +159,8 @@ t_destroy(_Config) ->
?GLOBAL), ?GLOBAL),
% Authenticator should not be usable anymore % Authenticator should not be usable anymore
?assertException( ?assertMatch(
error, ignore,
_,
emqx_authn_mysql:authenticate( emqx_authn_mysql:authenticate(
#{username => <<"plain">>, #{username => <<"plain">>,
password => <<"plain">> password => <<"plain">>

View File

@ -159,9 +159,8 @@ t_destroy(_Config) ->
?GLOBAL), ?GLOBAL),
% Authenticator should not be usable anymore % Authenticator should not be usable anymore
?assertException( ?assertMatch(
error, ignore,
_,
emqx_authn_pgsql:authenticate( emqx_authn_pgsql:authenticate(
#{username => <<"plain">>, #{username => <<"plain">>,
password => <<"plain">> password => <<"plain">>

View File

@ -164,9 +164,8 @@ t_destroy(_Config) ->
?GLOBAL), ?GLOBAL),
% Authenticator should not be usable anymore % Authenticator should not be usable anymore
?assertException( ?assertMatch(
error, ignore,
_,
emqx_authn_redis:authenticate( emqx_authn_redis:authenticate(
#{username => <<"plain">>, #{username => <<"plain">>,
password => <<"plain">> password => <<"plain">>

View File

@ -80,8 +80,15 @@ format(Rule = #{topic := Topic}) when is_map(Rule) ->
}. }.
update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
{ok, _} = emqx:update_config([auto_subscribe, topics], Topics), case emqx_conf:update([auto_subscribe, topics],
update_hook(); Topics,
#{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewTopics}} ->
ok = update_hook(),
{ok, NewTopics};
{error, Reason} ->
{error, Reason}
end;
update_(_Topics) -> update_(_Topics) ->
{error, quota_exceeded}. {error, quota_exceeded}.

View File

@ -22,6 +22,7 @@
-export([auto_subscribe/2]). -export([auto_subscribe/2]).
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
-define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT').
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
@ -90,6 +91,9 @@ auto_subscribe(put, #{body := Params}) ->
Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p", Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p",
[emqx_auto_subscribe:max_limit()])), [emqx_auto_subscribe:max_limit()])),
{409, #{code => ?EXCEED_LIMIT, message => Message}}; {409, #{code => ?EXCEED_LIMIT, message => Message}};
ok -> {error, Reason} ->
{200, emqx_auto_subscribe:list()} Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
{500, #{code => ?INTERNAL_ERROR, message => Message}};
{ok, NewTopics} ->
{200, NewTopics}
end. end.

View File

@ -85,7 +85,7 @@ init_per_suite(Config) ->
} }
] ]
}">>), }">>),
emqx_common_test_helpers:start_apps([emqx_dashboard, ?APP], fun set_special_configs/1), emqx_common_test_helpers:start_apps([emqx_dashboard, emqx_conf, ?APP], fun set_special_configs/1),
Config. Config.
set_special_configs(emqx_dashboard) -> set_special_configs(emqx_dashboard) ->
@ -113,15 +113,17 @@ topic_config(T) ->
end_per_suite(_) -> end_per_suite(_) ->
application:unload(emqx_management), application:unload(emqx_management),
application:unload(emqx_conf),
application:unload(?APP), application:unload(?APP),
meck:unload(emqx_resource), meck:unload(emqx_resource),
meck:unload(emqx_schema), meck:unload(emqx_schema),
emqx_common_test_helpers:stop_apps([emqx_dashboard, ?APP]). emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_conf, ?APP]).
t_auto_subscribe(_) -> t_auto_subscribe(_) ->
emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]),
{ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}), {ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
timer:sleep(100), timer:sleep(200),
?assertEqual(check_subs(length(?TOPICS)), ok), ?assertEqual(check_subs(length(?TOPICS)), ok),
emqtt:disconnect(Client), emqtt:disconnect(Client),
ok. ok.
@ -148,6 +150,7 @@ t_update(_) ->
check_subs(Count) -> check_subs(Count) ->
Subs = ets:tab2list(emqx_suboption), Subs = ets:tab2list(emqx_suboption),
ct:pal("---> ~p ~p ~n", [Subs, Count]),
?assert(length(Subs) >= Count), ?assert(length(Subs) >= Count),
check_subs((Subs), ?ENSURE_TOPICS). check_subs((Subs), ?ENSURE_TOPICS).

View File

@ -35,15 +35,19 @@
]). ]).
-export([ load/0 -export([ load/0
, lookup/1
, lookup/2 , lookup/2
, lookup/3 , lookup/3
, list/0 , list/0
, list_bridges_by_connector/1 , list_bridges_by_connector/1
, create/2
, create/3 , create/3
, recreate/2 , recreate/2
, recreate/3 , recreate/3
, create_dry_run/2 , create_dry_run/2
, remove/1
, remove/3 , remove/3
, update/2
, update/3 , update/3
, start/2 , start/2
, stop/2 , stop/2
@ -80,17 +84,36 @@ unload_hook() ->
on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
case maps:get(sys, Flags, false) of case maps:get(sys, Flags, false) of
false -> false ->
lists:foreach(fun (Id) -> Msg = emqx_rule_events:eventmsg_publish(Message),
send_message(Id, emqx_rule_events:eventmsg_publish(Message)) send_to_matched_egress_bridges(Topic, Msg);
end, get_matched_bridges(Topic));
true -> ok true -> ok
end, end,
{ok, Message}. {ok, Message}.
send_to_matched_egress_bridges(Topic, Msg) ->
lists:foreach(fun (Id) ->
try send_message(Id, Msg) of
ok -> ok;
Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed",
bridge => Id, error => Error})
catch Err:Reason:ST ->
?SLOG(error, #{msg => "send_message_to_bridge_crash",
bridge => Id, error => Err, reason => Reason,
stacktrace => ST})
end
end, get_matched_bridges(Topic)).
send_message(BridgeId, Message) -> send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId), {BridgeType, BridgeName} = parse_bridge_id(BridgeId),
ResId = emqx_bridge:resource_id(BridgeType, BridgeName), ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
emqx_resource:query(ResId, {send_message, Message}). case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
not_found ->
{error, {bridge_not_found, BridgeId}};
#{enable := true} ->
emqx_resource:query(ResId, {send_message, Message});
#{enable := false} ->
{error, {bridge_stopped, BridgeId}}
end.
config_key_path() -> config_key_path() ->
[bridges]. [bridges].
@ -169,6 +192,10 @@ list_bridges_by_connector(ConnectorId) ->
[B || B = #{raw_config := #{<<"connector">> := Id}} <- list(), [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(),
ConnectorId =:= Id]. ConnectorId =:= Id].
lookup(Id) ->
{Type, Name} = parse_bridge_id(Id),
lookup(Type, Name).
lookup(Type, Name) -> lookup(Type, Name) ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf). lookup(Type, Name, RawConf).
@ -188,16 +215,24 @@ stop(Type, Name) ->
restart(Type, Name) -> restart(Type, Name) ->
emqx_resource:restart(resource_id(Type, Name)). emqx_resource:restart(resource_id(Type, Name)).
create(BridgeId, Conf) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
create(BridgeType, BridgeName, Conf).
create(Type, Name, Conf) -> create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name, ?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}), config => Conf}),
case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf), #{force_create => true}) of parse_confs(Type, Name, Conf), #{async_create => true}) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
update(BridgeId, {OldConf, Conf}) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
update(BridgeType, BridgeName, {OldConf, Conf}).
update(Type, Name, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}) ->
%% TODO: sometimes its not necessary to restart the bridge connection. %% TODO: sometimes its not necessary to restart the bridge connection.
%% %%
@ -217,7 +252,7 @@ update(Type, Name, {OldConf, Conf}) ->
?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one" ?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one"
, type => Type, name => Name, config => Conf}), , type => Type, name => Name, config => Conf}),
create(Type, Name, Conf); create(Type, Name, Conf);
{error, Reason} -> {update_bridge_failed, Reason} {error, Reason} -> {error, {update_bridge_failed, Reason}}
end; end;
true -> true ->
%% we don't need to recreate the bridge if this config change is only to %% we don't need to recreate the bridge if this config change is only to
@ -229,11 +264,12 @@ update(Type, Name, {OldConf, Conf}) ->
end. end.
recreate(Type, Name) -> recreate(Type, Name) ->
recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])). recreate(Type, Name, emqx:get_config([bridges, Type, Name])).
recreate(Type, Name, Conf) -> recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name), emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []). emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf),
#{async_create => true}).
create_dry_run(Type, Conf) -> create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},
@ -244,6 +280,10 @@ create_dry_run(Type, Conf) ->
Error Error
end. end.
remove(BridgeId) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
remove(BridgeType, BridgeName, #{}).
remove(Type, Name, _Conf) -> remove(Type, Name, _Conf) ->
?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}),
case emqx_resource:remove_local(resource_id(Type, Name)) of case emqx_resource:remove_local(resource_id(Type, Name)) of
@ -279,6 +319,8 @@ get_matched_bridges(Topic) ->
end, Acc0, Conf) end, Acc0, Conf)
end, [], Bridges). end, [], Bridges).
get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) ->
Acc;
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of case emqx_topic:match(Topic, Filter) of
true -> [bridge_id(BType, BName) | Acc]; true -> [bridge_id(BType, BName) | Acc];
@ -309,21 +351,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
{Type, ConnName} -> {Type, ConnName} ->
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
make_resource_confs(Direction, ConnectorConfs, make_resource_confs(Direction, ConnectorConfs,
maps:without([connector, direction], Conf), Name); maps:without([connector, direction], Conf), Type, Name);
{_ConnType, _ConnName} -> {_ConnType, _ConnName} ->
error({cannot_use_connector_with_different_type, ConnId}) error({cannot_use_connector_with_different_type, ConnId})
end; end;
parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
when is_map(ConnectorConfs) -> when is_map(ConnectorConfs) ->
make_resource_confs(Direction, ConnectorConfs, make_resource_confs(Direction, ConnectorConfs,
maps:without([connector, direction], Conf), Name). maps:without([connector, direction], Conf), Type, Name).
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) -> make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
BName = bin(Name), BName = bridge_id(Type, Name),
ConnectorConfs#{ ConnectorConfs#{
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
}; };
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) -> make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) ->
ConnectorConfs#{ ConnectorConfs#{
egress => BridgeConf egress => BridgeConf
}. }.

View File

@ -68,7 +68,6 @@ How long will the HTTP request timeout.
fields("post") -> fields("post") ->
[ type_field() [ type_field()
, name_field()
] ++ fields("bridge"); ] ++ fields("bridge");
fields("put") -> fields("put") ->
@ -103,8 +102,5 @@ id_field() ->
type_field() -> type_field() ->
{type, mk(http, #{desc => "The Bridge Type"})}. {type, mk(http, #{desc => "The Bridge Type"})}.
name_field() ->
{name, mk(binary(), #{desc => "The Bridge Name"})}.
method() -> method() ->
enum([post, put, get, delete]). enum([post, put, get, delete]).

View File

@ -24,11 +24,9 @@ fields("egress") ->
fields("post_ingress") -> fields("post_ingress") ->
[ type_field() [ type_field()
, name_field()
] ++ proplists:delete(enable, fields("ingress")); ] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") -> fields("post_egress") ->
[ type_field() [ type_field()
, name_field()
] ++ proplists:delete(enable, fields("egress")); ] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") -> fields("put_ingress") ->
@ -49,9 +47,3 @@ id_field() ->
type_field() -> type_field() ->
{type, mk(mqtt, #{desc => "The Bridge Type"})}. {type, mk(mqtt, #{desc => "The Bridge Type"})}.
name_field() ->
{name, mk(binary(),
#{ desc => "The Bridge Name"
, example => "some_bridge_name"
})}.

View File

@ -160,6 +160,7 @@ t_http_crud_apis(_) ->
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
%% send an message to emqx and the message should be forwarded to the HTTP server %% send an message to emqx and the message should be forwarded to the HTTP server
wait_for_resource_ready(BridgeID, 5),
Body = <<"my msg">>, Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert( ?assert(
@ -212,6 +213,7 @@ t_http_crud_apis(_) ->
}, jsx:decode(Bridge3Str)), }, jsx:decode(Bridge3Str)),
%% send an message to emqx again, check the path has been changed %% send an message to emqx again, check the path has been changed
wait_for_resource_ready(BridgeID, 5),
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert( ?assert(
receive receive
@ -320,3 +322,14 @@ auth_header_() ->
operation_path(Oper, BridgeID) -> operation_path(Oper, BridgeID) ->
uri(["bridges", BridgeID, "operation", Oper]). uri(["bridges", BridgeID, "operation", Oper]).
wait_for_resource_ready(InstId, 0) ->
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
ct:fail(wait_resource_timeout);
wait_for_resource_ready(InstId, Retry) ->
case emqx_bridge:lookup(InstId) of
{ok, #{resource_data := #{status := started}}} -> ok;
_ ->
timer:sleep(100),
wait_for_resource_ready(InstId, Retry-1)
end.

View File

@ -37,31 +37,26 @@
config_key_path() -> config_key_path() ->
[connectors]. [connectors].
-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]).
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name), ConnId = connector_id(Type, Name),
LinkedBridgeIds = lists:foldl(fun try foreach_linked_bridges(ConnId, fun(#{id := BId}) ->
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc) throw({dependency_bridges_exist, BId})
when ConnId0 == ConnId -> end)
[BId | Acc]; catch throw:Error -> {error, Error}
(_, Acc) -> Acc
end, [], emqx_bridge:list()),
case LinkedBridgeIds of
[] -> ok;
_ -> {error, {dependency_bridges_exist, LinkedBridgeIds}}
end; end;
post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) -> post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name), ConnId = connector_id(Type, Name),
lists:foreach(fun foreach_linked_bridges(ConnId,
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId -> fun(#{id := BId}) ->
{BType, BName} = emqx_bridge:parse_bridge_id(BId), {BType, BName} = emqx_bridge:parse_bridge_id(BId),
BridgeConf = emqx:get_config([bridges, BType, BName]), BridgeConf = emqx:get_config([bridges, BType, BName]),
case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf},
{ok, _} -> ok; BridgeConf#{connector => NewConf}}) of
ok -> ok;
{error, Reason} -> error({update_bridge_error, Reason}) {error, Reason} -> error({update_bridge_error, Reason})
end; end
(_) -> end).
ok
end, emqx_bridge:list()).
connector_id(Type0, Name0) -> connector_id(Type0, Name0) ->
Type = bin(Type0), Type = bin(Type0),
@ -112,3 +107,10 @@ delete(Type, Name) ->
bin(Bin) when is_binary(Bin) -> Bin; bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
foreach_linked_bridges(ConnId, Do) ->
lists:foreach(fun
(#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId ->
Do(Bridge);
(_) -> ok
end, emqx_bridge:list()).

View File

@ -253,6 +253,10 @@ schema("/connectors/:id") ->
{ok, _} -> {ok, _} ->
case emqx_connector:delete(ConnType, ConnName) of case emqx_connector:delete(ConnType, ConnName) of
{ok, _} -> {204}; {ok, _} -> {204};
{error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} ->
{403, error_msg('DEPENDENCY_EXISTS',
<<"Cannot remove the connector as it's in use by a bridge: ",
BridgeID/binary>>)};
{error, Error} -> {400, error_msg('BAD_ARG', Error)} {error, Error} -> {400, error_msg('BAD_ARG', Error)}
end; end;
{error, not_found} -> {error, not_found} ->

View File

@ -68,10 +68,6 @@ fields("put") ->
fields("post") -> fields("post") ->
[ {type, mk(mqtt, #{desc => "The Connector Type"})} [ {type, mk(mqtt, #{desc => "The Connector Type"})}
, {name, mk(binary(),
#{ desc => "The Connector Name"
, example => <<"my_mqtt_connector">>
})}
] ++ fields("put"). ] ++ fields("put").
%% =================================================================== %% ===================================================================

View File

@ -165,7 +165,8 @@ handle_publish(Msg, undefined) ->
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
"_'ingress'_is_not_configured", "_'ingress'_is_not_configured",
message => Msg}); message => Msg});
handle_publish(Msg, Vars) -> handle_publish(Msg0, Vars) ->
Msg = format_msg_received(Msg0),
?SLOG(debug, #{msg => "publish_to_local_broker", ?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}), message => Msg, vars => Vars}),
case Vars of case Vars of
@ -173,11 +174,7 @@ handle_publish(Msg, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]); _ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok _ -> ok
end, end,
case maps:get(local_topic, Vars, undefined) of maybe_publish_to_local_broker(Msg0, Vars).
undefined -> ok;
_Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
end.
handle_disconnected(Reason, Parent) -> handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}. Parent ! {disconnected, self(), Reason}.
@ -197,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
process_config(Config) -> process_config(Config) ->
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
case maps:get(local_topic, Vars, undefined) of
undefined ->
ok; %% local topic is not set, discard it
_ ->
case emqx_topic:match(Topic, SubTopic) of
true ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
ok;
false ->
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
message => Msg, subscribed => SubTopic, got_topic => Topic})
end
end.
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
dup => Dup,
retain => Retain,
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond)
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).

View File

@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}. Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection %% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{local_topic := TopicToken, payload := PayloadToken, #{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
MapMsg = format_msg_received(MapMsg0),
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(PayloadToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),
@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
flags => #{dup => Dup, retain => Retain},
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond),
node => node()
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).
process_payload([], Msg) -> process_payload([], Msg) ->
emqx_json:encode(Msg); emqx_json:encode(Msg);
process_payload(Tks, Msg) -> process_payload(Tks, Msg) ->

View File

@ -22,7 +22,10 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"connectors: {}">>). %% output functions
-export([ inspect/3
]).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>). -define(CONNECTR_NAME, <<"test_connector">>).
@ -67,6 +70,9 @@
<<"failed">> := FAILED, <<"rate">> := SPEED, <<"failed">> := FAILED, <<"rate">> := SPEED,
<<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}).
inspect(Selected, _Envs, _Args) ->
persistent_term:put(?MODULE, #{inspect => Selected}).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -89,21 +95,38 @@ init_per_suite(Config) ->
%% some testcases (may from other app) already get emqx_connector started %% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource), _ = application:stop(emqx_resource),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]), ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector,
ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT), emqx_bridge, emqx_dashboard]),
ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>),
ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>),
ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]), emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]),
ok. ok.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Config. Config.
end_per_testcase(_, _Config) -> end_per_testcase(_, _Config) ->
clear_resources(),
ok. ok.
clear_resources() ->
lists:foreach(fun(#{id := Id}) ->
ok = emqx_rule_engine:delete_rule(Id)
end, emqx_rule_engine:get_rules()),
lists:foreach(fun(#{id := Id}) ->
ok = emqx_bridge:remove(Id)
end, emqx_bridge:list()),
lists:foreach(fun(#{<<"id">> := Id}) ->
ok = emqx_connector:delete(Id)
end, emqx_connector:list()).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -180,10 +203,6 @@ t_mqtt_crud_apis(_) ->
ok. ok.
t_mqtt_conn_bridge_ingress(_) -> t_mqtt_conn_bridge_ingress(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
User1 = <<"user1">>, User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]), {ok, 201, Connector} = request(post, uri(["connectors"]),
@ -222,8 +241,8 @@ t_mqtt_conn_bridge_ingress(_) ->
emqx:subscribe(LocalTopic), emqx:subscribe(LocalTopic),
%% PUBLISH a message to the 'remote' broker, as we have only one broker, %% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one. %% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDIngress, 5),
emqx:publish(emqx_message:make(RemoteTopic, Payload)), emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic %% we should receive a message on the local broker, with specified topic
?assert( ?assert(
receive receive
@ -253,10 +272,6 @@ t_mqtt_conn_bridge_ingress(_) ->
ok. ok.
t_mqtt_conn_bridge_egress(_) -> t_mqtt_conn_bridge_egress(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
User1 = <<"user1">>, User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]), {ok, 201, Connector} = request(post, uri(["connectors"]),
@ -295,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) ->
emqx:subscribe(RemoteTopic), emqx:subscribe(RemoteTopic),
%% PUBLISH a message to the 'local' broker, as we have only one broker, %% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one. %% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDEgress, 5),
emqx:publish(emqx_message:make(LocalTopic, Payload)), emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic %% we should receive a message on the "remote" broker, with specified topic
@ -331,10 +347,6 @@ t_mqtt_conn_bridge_egress(_) ->
%% - update a connector should also update all of the the bridges %% - update a connector should also update all of the the bridges
%% - cannot delete a connector that is used by at least one bridge %% - cannot delete a connector that is used by at least one bridge
t_mqtt_conn_update(_) -> t_mqtt_conn_update(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
{ok, 201, Connector} = request(post, uri(["connectors"]), {ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
@ -360,6 +372,7 @@ t_mqtt_conn_update(_) ->
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
, <<"connector">> := ConnctorID , <<"connector">> := ConnctorID
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
wait_for_resource_ready(BridgeIDEgress, 2),
%% then we try to update 'server' of the connector, to an unavailable IP address %% then we try to update 'server' of the connector, to an unavailable IP address
%% the update should fail because of 'unreachable' or 'connrefused' %% the update should fail because of 'unreachable' or 'connrefused'
@ -377,10 +390,6 @@ t_mqtt_conn_update(_) ->
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update2(_) -> t_mqtt_conn_update2(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
%% but this connector is point to a unreachable server "2603" %% but this connector is point to a unreachable server "2603"
{ok, 201, Connector} = request(post, uri(["connectors"]), {ok, 201, Connector} = request(post, uri(["connectors"]),
@ -406,6 +415,11 @@ t_mqtt_conn_update2(_) ->
, <<"status">> := <<"disconnected">> , <<"status">> := <<"disconnected">>
, <<"connector">> := ConnctorID , <<"connector">> := ConnctorID
} = jsx:decode(Bridge), } = jsx:decode(Bridge),
%% We try to fix the 'server' parameter, to another unavailable server..
%% The update should success: we don't check the connectivity of the new config
%% if the resource is now disconnected.
{ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)),
%% we fix the 'server' parameter to a normal one, it should work %% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ConnctorID]), {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
@ -421,6 +435,34 @@ t_mqtt_conn_update2(_) ->
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update3(_) ->
%% we add a mqtt connector, using POST
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)
#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
#{ <<"id">> := BridgeIDEgress
, <<"connector">> := ConnctorID
} = jsx:decode(Bridge),
wait_for_resource_ready(BridgeIDEgress, 2),
%% delete the connector should fail because it is in use by a bridge
{ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
%% the connector now can be deleted without problems
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
t_mqtt_conn_testing(_) -> t_mqtt_conn_testing(_) ->
%% APIs for testing the connectivity %% APIs for testing the connectivity
%% then we add a mqtt connector, using POST %% then we add a mqtt connector, using POST
@ -435,6 +477,153 @@ t_mqtt_conn_testing(_) ->
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}). }).
t_ingress_mqtt_bridge_with_rules(_) ->
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS
}),
#{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge),
{ok, 201, Rule} = request(post, uri(["rules"]),
#{<<"name">> => <<"A rule get messages from a source mqtt bridge">>,
<<"enable">> => true,
<<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
}),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDIngress, 5),
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
%% and also the rule should be matched, with matched + 1:
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{ <<"id">> := RuleId
, <<"metrics">> := #{<<"matched">> := 1}
} = jsx:decode(Rule1),
%% we also check if the outputs of the rule is triggered
?assertMatch(#{inspect := #{
event := '$bridges/mqtt',
id := MsgId,
payload := Payload,
topic := RemoteTopic,
qos := 0,
dup := false,
retain := false,
pub_props := #{},
timestamp := _
}} when is_binary(MsgId), persistent_term:get(?MODULE)),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
t_egress_mqtt_bridge_with_rules(_) ->
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
#{ <<"id">> := ConnctorID } = jsx:decode(Connector),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
#{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge),
{ok, 201, Rule} = request(post, uri(["rules"]),
#{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>,
<<"enable">> => true,
<<"outputs">> => [BridgeIDEgress],
<<"sql">> => <<"SELECT * from \"t/1\"">>
}),
#{<<"id">> := RuleId} = jsx:decode(Rule),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
wait_for_resource_ready(BridgeIDEgress, 5),
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
emqx:unsubscribe(RemoteTopic),
%% PUBLISH a message to the rule.
Payload2 = <<"hi">>,
RuleTopic = <<"t/1">>,
RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>,
emqx:subscribe(RemoteTopic2),
wait_for_resource_ready(BridgeIDEgress, 5),
emqx:publish(emqx_message:make(RuleTopic, Payload2)),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{ <<"id">> := RuleId
, <<"metrics">> := #{<<"matched">> := 1}
} = jsx:decode(Rule1),
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic2, #message{payload = Payload2}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)
, <<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}]
}, jsx:decode(BridgeStr)),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% HTTP Request %% HTTP Request
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -468,3 +657,13 @@ auth_header_() ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}. {"Authorization", "Bearer " ++ binary_to_list(Token)}.
wait_for_resource_ready(InstId, 0) ->
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
ct:fail(wait_resource_timeout);
wait_for_resource_ready(InstId, Retry) ->
case emqx_bridge:lookup(InstId) of
{ok, #{resource_data := #{status := started}}} -> ok;
_ ->
timer:sleep(100),
wait_for_resource_ready(InstId, Retry-1)
end.

View File

@ -155,7 +155,7 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed,
}, },
case WithPayload of case WithPayload of
true -> true ->
Result#{payload => base64:encode(Payload)}; Result#{payload => Payload};
_ -> _ ->
Result Result
end. end.
@ -187,7 +187,7 @@ delete_delayed_message(Id0) ->
mria:dirty_delete(?TAB, {Timestamp, Id}) mria:dirty_delete(?TAB, {Timestamp, Id})
end. end.
update_config(Config) -> update_config(Config) ->
{ok, _} = emqx:update_config([delayed], Config). emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callback %% gen_server callback

View File

@ -25,12 +25,14 @@
-define(MAX_PAYLOAD_LENGTH, 2048). -define(MAX_PAYLOAD_LENGTH, 2048).
-define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE').
-export([status/2 -export([ status/2
, delayed_messages/2 , delayed_messages/2
, delayed_message/2 , delayed_message/2
]). ]).
-export([paths/0, fields/1, schema/1]). -export([ paths/0
, fields/1
, schema/1]).
%% for rpc %% for rpc
-export([update_config_/1]). -export([update_config_/1]).
@ -40,6 +42,7 @@
-define(ALREADY_ENABLED, 'ALREADY_ENABLED'). -define(ALREADY_ENABLED, 'ALREADY_ENABLED').
-define(ALREADY_DISABLED, 'ALREADY_DISABLED'). -define(ALREADY_DISABLED, 'ALREADY_DISABLED').
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_REQUEST, 'BAD_REQUEST').
-define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND').
@ -49,7 +52,11 @@
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE). emqx_dashboard_swagger:spec(?MODULE).
paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"]. paths() ->
[ "/mqtt/delayed"
, "/mqtt/delayed/messages"
, "/mqtt/delayed/messages/:msgid"
].
schema("/mqtt/delayed") -> schema("/mqtt/delayed") ->
#{ #{
@ -189,8 +196,7 @@ get_status() ->
update_config(Config) -> update_config(Config) ->
case generate_config(Config) of case generate_config(Config) of
{ok, Config} -> {ok, Config} ->
update_config_(Config), update_config_(Config);
{200, get_status()};
{error, {Code, Message}} -> {error, {Code, Message}} ->
{400, #{code => Code, message => Message}} {400, #{code => Code, message => Message}}
end. end.
@ -215,29 +221,28 @@ generate_max_delayed_messages(Config) ->
{ok, Config}. {ok, Config}.
update_config_(Config) -> update_config_(Config) ->
lists:foreach(fun(Node) -> case emqx_delayed:update_config(Config) of
update_config_(Node, Config) {ok, #{raw_config := NewDelayed}} ->
end, mria_mnesia:running_nodes()). case maps:get(<<"enable">>, Config, undefined) of
undefined ->
update_config_(Node, Config) when Node =:= node() -> ignore;
_ = emqx_delayed:update_config(Config), true ->
case maps:get(<<"enable">>, Config, undefined) of emqx_delayed:enable();
undefined -> false ->
ignore; emqx_delayed:disable()
true -> end,
emqx_delayed:enable(); case maps:get(<<"max_delayed_messages">>, Config, undefined) of
false -> undefined ->
emqx_delayed:disable() ignore;
end, Max ->
case maps:get(<<"max_delayed_messages">>, Config, undefined) of ok = emqx_delayed:set_max_delayed_messages(Max)
undefined -> end,
ignore; {200, NewDelayed};
Max -> {error, Reason} ->
ok = emqx_delayed:set_max_delayed_messages(Max) Message = list_to_binary(
end; io_lib:format("Update config failed ~p", [Reason])),
{500, ?INTERNAL_ERROR, Message}
update_config_(Node, Config) -> end.
rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]).
generate_http_code_map(id_schema_error, Id) -> generate_http_code_map(id_schema_error, Id) ->
#{code => ?MESSAGE_ID_SCHEMA_ERROR, message => #{code => ?MESSAGE_ID_SCHEMA_ERROR, message =>
@ -245,9 +250,3 @@ generate_http_code_map(id_schema_error, Id) ->
generate_http_code_map(not_found, Id) -> generate_http_code_map(not_found, Id) ->
#{code => ?MESSAGE_ID_NOT_FOUND, message => #{code => ?MESSAGE_ID_NOT_FOUND, message =>
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}.
rpc_call(Node, Module, Fun, Args) ->
case rpc:call(Node, Module, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Result -> Result
end.

View File

@ -44,8 +44,15 @@ list() ->
update(Params) -> update(Params) ->
disable(), disable(),
{ok, _} = emqx:update_config([event_message], Params), case emqx_conf:update([event_message],
enable(). Params,
#{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewEventMessage}} ->
enable(),
{ok, NewEventMessage};
{error, Reason} ->
{error, Reason}
end.
enable() -> enable() ->
lists:foreach(fun({_Topic, false}) -> ok; lists:foreach(fun({_Topic, false}) -> ok;

View File

@ -53,5 +53,10 @@ event_message(get, _Params) ->
{200, emqx_event_message:list()}; {200, emqx_event_message:list()};
event_message(put, #{body := Body}) -> event_message(put, #{body := Body}) ->
_ = emqx_event_message:update(Body), case emqx_event_message:update(Body) of
{200, emqx_event_message:list()}. {ok, NewConfig} ->
{200, NewConfig};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
{500, 'INTERNAL_ERROR', Message}
end.

View File

@ -25,7 +25,7 @@
mod := module(), mod := module(),
config := resource_config(), config := resource_config(),
state := resource_state(), state := resource_state(),
status := started | stopped, status := started | stopped | starting,
metrics := emqx_plugin_libs_metrics:metrics() metrics := emqx_plugin_libs_metrics:metrics()
}. }.
-type resource_group() :: binary(). -type resource_group() :: binary().
@ -33,7 +33,7 @@
%% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
%% the 'status' of the resource will be 'stopped' in this case. %% the 'status' of the resource will be 'stopped' in this case.
%% Defaults to 'false' %% Defaults to 'false'
force_create => boolean() async_create => boolean()
}. }.
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
undefined. undefined.
@ -41,3 +41,5 @@
%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
%% actions upon query failure %% actions upon query failure
-type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
-define(TEST_ID_PREFIX, "_test_:").

View File

@ -58,6 +58,7 @@
%% Calls to the callback module with current resource state %% Calls to the callback module with current resource state
%% They also save the state after the call finished (except query/2,3). %% They also save the state after the call finished (except query/2,3).
-export([ restart/1 %% restart the instance. -export([ restart/1 %% restart the instance.
, restart/2
, health_check/1 %% verify if the resource is working normally , health_check/1 %% verify if the resource is working normally
, stop/1 %% stop the instance , stop/1 %% stop the instance
, query/2 %% query the instance , query/2 %% query the instance
@ -68,7 +69,6 @@
-export([ call_start/3 %% start the instance -export([ call_start/3 %% start the instance
, call_health_check/3 %% verify if the resource is working normally , call_health_check/3 %% verify if the resource is working normally
, call_stop/3 %% stop the instance , call_stop/3 %% stop the instance
, call_config_merge/4 %% merge the config when updating
, call_jsonify/2 , call_jsonify/2
]). ]).
@ -82,17 +82,13 @@
]). ]).
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}). -define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
-optional_callbacks([ on_query/4 -optional_callbacks([ on_query/4
, on_health_check/2 , on_health_check/2
, on_config_merge/3
, on_jsonify/1 , on_jsonify/1
]). ]).
-callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config().
-callback on_jsonify(resource_config()) -> jsx:json_term(). -callback on_jsonify(resource_config()) -> jsx:json_term().
%% when calling emqx_resource:start/1 %% when calling emqx_resource:start/1
@ -170,18 +166,17 @@ create_dry_run(ResourceType, Config) ->
-spec create_dry_run_local(resource_type(), resource_config()) -> -spec create_dry_run_local(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
create_dry_run_local(ResourceType, Config) -> create_dry_run_local(ResourceType, Config) ->
InstId = iolist_to_binary(emqx_misc:gen_id(16)), call_instance(<<?TEST_ID_PREFIX>>, {create_dry_run, ResourceType, Config}).
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
-spec recreate(instance_id(), resource_type(), resource_config(), term()) -> -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config, Params) -> recreate(InstId, ResourceType, Config, Opts) ->
cluster_call(recreate_local, [InstId, ResourceType, Config, Params]). cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]).
-spec recreate_local(instance_id(), resource_type(), resource_config(), term()) -> -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate_local(InstId, ResourceType, Config, Params) -> recreate_local(InstId, ResourceType, Config, Opts) ->
call_instance(InstId, {recreate, InstId, ResourceType, Config, Params}). call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}).
-spec remove(instance_id()) -> ok | {error, Reason :: term()}. -spec remove(instance_id()) -> ok | {error, Reason :: term()}.
remove(InstId) -> remove(InstId) ->
@ -201,19 +196,27 @@ query(InstId, Request) ->
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
query(InstId, Request, AfterQuery) -> query(InstId, Request, AfterQuery) ->
case get_instance(InstId) of case get_instance(InstId) of
{ok, #{status := starting}} ->
query_error(starting, <<"cannot serve query when the resource "
"instance is still starting">>);
{ok, #{status := stopped}} -> {ok, #{status := stopped}} ->
error({resource_stopped, InstId}); query_error(stopped, <<"cannot serve query when the resource "
"instance is stopped">>);
{ok, #{mod := Mod, state := ResourceState, status := started}} -> {ok, #{mod := Mod, state := ResourceState, status := started}} ->
%% the resource state is readonly to Module:on_query/4 %% the resource state is readonly to Module:on_query/4
%% and the `after_query()` functions should be thread safe %% and the `after_query()` functions should be thread safe
Mod:on_query(InstId, Request, AfterQuery, ResourceState); Mod:on_query(InstId, Request, AfterQuery, ResourceState);
{error, Reason} -> {error, not_found} ->
error({get_instance, {InstId, Reason}}) query_error(not_found, <<"the resource id not exists">>)
end. end.
-spec restart(instance_id()) -> ok | {error, Reason :: term()}. -spec restart(instance_id()) -> ok | {error, Reason :: term()}.
restart(InstId) -> restart(InstId) ->
call_instance(InstId, {restart, InstId}). restart(InstId, #{}).
-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
restart(InstId, Opts) ->
call_instance(InstId, {restart, InstId, Opts}).
-spec stop(instance_id()) -> ok | {error, Reason :: term()}. -spec stop(instance_id()) -> ok | {error, Reason :: term()}.
stop(InstId) -> stop(InstId) ->
@ -273,14 +276,6 @@ call_health_check(InstId, Mod, ResourceState) ->
call_stop(InstId, Mod, ResourceState) -> call_stop(InstId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
-spec call_config_merge(module(), resource_config(), resource_config(), term()) ->
resource_config().
call_config_merge(Mod, OldConfig, NewConfig, Params) ->
case erlang:function_exported(Mod, on_config_merge, 3) of
true -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params));
false -> NewConfig
end.
-spec call_jsonify(module(), resource_config()) -> jsx:json_term(). -spec call_jsonify(module(), resource_config()) -> jsx:json_term().
call_jsonify(Mod, Config) -> call_jsonify(Mod, Config) ->
case erlang:function_exported(Mod, on_jsonify, 1) of case erlang:function_exported(Mod, on_jsonify, 1) of
@ -327,17 +322,17 @@ check_and_create_local(InstId, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig, check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) -> -spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate(InstId, ResourceType, RawConfig, Params) -> check_and_recreate(InstId, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig, check_and_do(ResourceType, RawConfig,
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Params) end). fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end).
-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), term()) -> -spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate_local(InstId, ResourceType, RawConfig, Params) -> check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig, check_and_do(ResourceType, RawConfig,
fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Params) end). fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end).
check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
case check_config(ResourceType, RawConfig) of case check_config(ResourceType, RawConfig) of
@ -368,3 +363,6 @@ cluster_call(Func, Args) ->
{ok, _TxnId, Result} -> Result; {ok, _TxnId, Result} -> Result;
Failed -> Failed Failed -> Failed
end. end.
query_error(Reason, Msg) ->
{error, {?MODULE, #{reason => Reason, msg => Msg}}}.

View File

@ -0,0 +1,66 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_health_check).
-export([ start_link/2
, create_checker/2
, delete_checker/1
]).
-export([health_check/2]).
-define(SUP, emqx_resource_health_check_sup).
-define(ID(NAME), {resource_health_check, NAME}).
child_spec(Name, Sleep) ->
#{id => ?ID(Name),
start => {?MODULE, start_link, [Name, Sleep]},
restart => transient,
shutdown => 5000, type => worker, modules => [?MODULE]}.
start_link(Name, Sleep) ->
Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]),
{ok, Pid}.
create_checker(Name, Sleep) ->
create_checker(Name, Sleep, false).
create_checker(Name, Sleep, Retry) ->
case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of
{ok, _} -> ok;
{error, already_present} -> ok;
{error, {already_started, _}} when Retry == false ->
ok = delete_checker(Name),
create_checker(Name, Sleep, true);
Error -> Error
end.
delete_checker(Name) ->
case supervisor:terminate_child(?SUP, ?ID(Name)) of
ok -> supervisor:delete_child(?SUP, ?ID(Name));
Error -> Error
end.
health_check(Name, SleepTime) ->
case emqx_resource:health_check(Name) of
ok ->
emqx_alarm:deactivate(Name);
{error, _} ->
emqx_alarm:activate(Name, #{name => Name},
<<Name/binary, " health check failed">>)
end,
timer:sleep(SleepTime),
health_check(Name, SleepTime).

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource_health_check_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
{ok, {SupFlags, []}}.

View File

@ -26,6 +26,7 @@
-export([ lookup/1 -export([ lookup/1
, get_metrics/1 , get_metrics/1
, list_all/0 , list_all/0
, make_test_id/0
]). ]).
-export([ hash_call/2 -export([ hash_call/2
@ -61,7 +62,7 @@ hash_call(InstId, Request) ->
hash_call(InstId, Request, Timeout) -> hash_call(InstId, Request, Timeout) ->
gen_server:call(pick(InstId), Request, Timeout). gen_server:call(pick(InstId), Request, Timeout).
-spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. -spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}.
lookup(InstId) -> lookup(InstId) ->
case ets:lookup(emqx_resource_instance, InstId) of case ets:lookup(emqx_resource_instance, InstId) of
[] -> {error, not_found}; [] -> {error, not_found};
@ -69,6 +70,10 @@ lookup(InstId) ->
{ok, Data#{id => InstId, metrics => get_metrics(InstId)}} {ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
end. end.
make_test_id() ->
RandId = iolist_to_binary(emqx_misc:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>.
get_metrics(InstId) -> get_metrics(InstId) ->
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
@ -98,17 +103,17 @@ init({Pool, Id}) ->
handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
{reply, do_create(InstId, ResourceType, Config, Opts), State}; {reply, do_create(InstId, ResourceType, Config, Opts), State};
handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> handle_call({create_dry_run, ResourceType, Config}, _From, State) ->
{reply, do_create_dry_run(InstId, ResourceType, Config), State}; {reply, do_create_dry_run(ResourceType, Config), State};
handle_call({recreate, InstId, ResourceType, Config, Params}, _From, State) -> handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) ->
{reply, do_recreate(InstId, ResourceType, Config, Params), State}; {reply, do_recreate(InstId, ResourceType, Config, Opts), State};
handle_call({remove, InstId}, _From, State) -> handle_call({remove, InstId}, _From, State) ->
{reply, do_remove(InstId), State}; {reply, do_remove(InstId), State};
handle_call({restart, InstId}, _From, State) -> handle_call({restart, InstId, Opts}, _From, State) ->
{reply, do_restart(InstId), State}; {reply, do_restart(InstId, Opts), State};
handle_call({stop, InstId}, _From, State) -> handle_call({stop, InstId}, _From, State) ->
{reply, do_stop(InstId), State}; {reply, do_stop(InstId), State};
@ -135,25 +140,30 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% suppress the race condition check, as these functions are protected in gproc workers %% suppress the race condition check, as these functions are protected in gproc workers
-dialyzer({nowarn_function, [do_recreate/4, -dialyzer({nowarn_function, [ do_recreate/4
do_create/4, , do_create/4
do_restart/1, , do_restart/2
do_stop/1, , do_start/4
do_health_check/1]}). , do_stop/1
, do_health_check/1
, start_and_check/5
]}).
do_recreate(InstId, ResourceType, NewConfig, Params) -> do_recreate(InstId, ResourceType, NewConfig, Opts) ->
case lookup(InstId) of case lookup(InstId) of
{ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> {ok, #{mod := ResourceType, status := started} = Data} ->
Config = emqx_resource:call_config_merge(ResourceType, OldConfig, %% If this resource is in use (status='started'), we should make sure
NewConfig, Params), %% the new config is OK before removing the old one.
TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), case do_create_dry_run(ResourceType, NewConfig) of
case do_create_dry_run(TestInstId, ResourceType, Config) of
ok -> ok ->
do_remove(ResourceType, InstId, ResourceState, false), do_remove(Data, false),
do_create(InstId, ResourceType, Config, #{force_create => true}); do_create(InstId, ResourceType, NewConfig, Opts);
Error -> Error ->
Error Error
end; end;
{ok, #{mod := ResourceType, status := _} = Data} ->
do_remove(Data, false),
do_create(InstId, ResourceType, NewConfig, Opts);
{ok, #{mod := Mod}} when Mod =/= ResourceType -> {ok, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type}; {error, updating_to_incorrect_resource_type};
{error, not_found} -> {error, not_found} ->
@ -161,98 +171,96 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
end. end.
do_create(InstId, ResourceType, Config, Opts) -> do_create(InstId, ResourceType, Config, Opts) ->
ForceCreate = maps:get(force_create, Opts, false),
case lookup(InstId) of case lookup(InstId) of
{ok, _} -> {ok, already_created}; {ok, _} ->
_ -> {ok, already_created};
Res0 = #{id => InstId, mod => ResourceType, config => Config, {error, not_found} ->
status => stopped, state => undefined}, case do_start(InstId, ResourceType, Config, Opts) of
case emqx_resource:call_start(InstId, ResourceType, Config) of ok ->
{ok, ResourceState} -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
%% this is the first time we do health check, this will update the
%% status and then do ets:insert/2
_ = do_health_check(Res0#{state => ResourceState}),
{ok, force_lookup(InstId)}; {ok, force_lookup(InstId)};
{error, Reason} when ForceCreate == true -> Error ->
logger:error("start ~ts resource ~ts failed: ~p, " Error
"force_create it as a stopped resource",
[ResourceType, InstId, Reason]),
ets:insert(emqx_resource_instance, {InstId, Res0}),
{ok, Res0};
{error, Reason} when ForceCreate == false ->
{error, Reason}
end end
end. end.
do_create_dry_run(InstId, ResourceType, Config) -> do_create_dry_run(ResourceType, Config) ->
InstId = make_test_id(),
case emqx_resource:call_start(InstId, ResourceType, Config) of case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState0} -> {ok, ResourceState} ->
Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
{ok, ResourceState1} -> ok; {ok, _} -> ok;
{error, Reason, ResourceState1} -> {error, Reason, _} -> {error, Reason}
{error, Reason} end;
end,
_ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1),
Return;
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end. end.
do_remove(InstId) -> do_remove(Instance) ->
case lookup(InstId) of do_remove(Instance, true).
{ok, #{mod := Mod, state := ResourceState}} ->
do_remove(Mod, InstId, ResourceState);
Error ->
Error
end.
do_remove(Mod, InstId, ResourceState) -> do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
do_remove(Mod, InstId, ResourceState, true). do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]);
do_remove(#{id := InstId} = Data, ClearMetrics) ->
do_remove(Mod, InstId, ResourceState, ClearMetrics) -> _ = do_stop(Data),
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
ets:delete(emqx_resource_instance, InstId), ets:delete(emqx_resource_instance, InstId),
case ClearMetrics of case ClearMetrics of
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
false -> ok false -> ok
end,
ok.
do_restart(InstId, Opts) ->
case lookup(InstId) of
{ok, #{mod := ResourceType, config := Config} = Data} ->
ok = do_stop(Data),
do_start(InstId, ResourceType, Config, Opts);
Error ->
Error
end. end.
do_restart(InstId) -> do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) ->
case lookup(InstId) of InitData = #{id => InstId, mod => ResourceType, config => Config,
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> status => starting, state => undefined},
_ = case ResourceState of %% The `emqx_resource:call_start/3` need the instance exist beforehand
undefined -> ok; ets:insert(emqx_resource_instance, {InstId, InitData}),
_ -> emqx_resource:call_stop(InstId, Mod, ResourceState) case maps:get(async_create, Opts, false) of
end, false ->
case emqx_resource:call_start(InstId, Mod, Config) of start_and_check(InstId, ResourceType, Config, Opts, InitData);
{ok, NewResourceState} -> true ->
ets:insert(emqx_resource_instance, spawn(fun() ->
{InstId, Data#{state => NewResourceState, status => started}}), start_and_check(InstId, ResourceType, Config, Opts, InitData)
ok; end),
{error, Reason} -> ok
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), end.
{error, Reason}
start_and_check(InstId, ResourceType, Config, Opts, Data) ->
case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} ->
Data2 = Data#{state => ResourceState},
ets:insert(emqx_resource_instance, {InstId, Data2}),
case maps:get(async_create, Opts, false) of
false -> do_health_check(Data2);
true -> emqx_resource_health_check:create_checker(InstId,
maps:get(health_check_interval, Opts, 15000))
end; end;
Error -> {error, Reason} ->
Error ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
{error, Reason}
end. end.
do_stop(InstId) -> do_stop(InstId) when is_binary(InstId) ->
case lookup(InstId) of do_with_instance_data(InstId, fun do_stop/1, []);
{ok, #{mod := Mod, state := ResourceState} = Data} -> do_stop(#{state := undefined}) ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState), ok;
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) ->
ok; _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
Error -> _ = emqx_resource_health_check:delete_checker(InstId),
Error ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
end. ok.
do_health_check(InstId) when is_binary(InstId) -> do_health_check(InstId) when is_binary(InstId) ->
case lookup(InstId) of do_with_instance_data(InstId, fun do_health_check/1, []);
{ok, Data} -> do_health_check(Data);
Error -> Error
end;
do_health_check(#{state := undefined}) -> do_health_check(#{state := undefined}) ->
{error, resource_not_initialized}; {error, resource_not_initialized};
do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
@ -272,6 +280,12 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
%% internal functions %% internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
do_with_instance_data(InstId, Do, Args) ->
case lookup(InstId) of
{ok, Data} -> erlang:apply(Do, [Data | Args]);
Error -> Error
end.
proc_name(Mod, Id) -> proc_name(Mod, Id) ->
list_to_atom(lists:concat([Mod, "_", Id])). list_to_atom(lists:concat([Mod, "_", Id])).

View File

@ -45,7 +45,12 @@ init([]) ->
restart => transient, restart => transient,
shutdown => 5000, type => worker, modules => [Mod]} shutdown => 5000, type => worker, modules => [Mod]}
end || Idx <- lists:seq(1, ?POOL_SIZE)], end || Idx <- lists:seq(1, ?POOL_SIZE)],
{ok, {SupFlags, [Metrics | ResourceInsts]}}. HealthCheck =
#{id => emqx_resource_health_check_sup,
start => {emqx_resource_health_check_sup, start_link, []},
restart => transient,
shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]},
{ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}.
%% internal functions %% internal functions
ensure_pool(Pool, Type, Opts) -> ensure_pool(Pool, Type, Opts) ->

View File

@ -96,9 +96,7 @@ t_query(_) ->
?assert(false) ?assert(false)
end, end,
?assertException( ?assertMatch({error, {emqx_resource, #{reason := not_found}}},
error,
{get_instance, _Reason},
emqx_resource:query(<<"unknown">>, get_state)), emqx_resource:query(<<"unknown">>, get_state)),
ok = emqx_resource:remove_local(?ID). ok = emqx_resource:remove_local(?ID).
@ -142,7 +140,8 @@ t_stop_start(_) ->
?assertNot(is_process_alive(Pid0)), ?assertNot(is_process_alive(Pid0)),
?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)), ?assertMatch({error, {emqx_resource, #{reason := stopped}}},
emqx_resource:query(?ID, get_state)),
ok = emqx_resource:restart(?ID), ok = emqx_resource:restart(?ID),

View File

@ -84,7 +84,7 @@ with_topic_api() ->
parameters => parameters(), parameters => parameters(),
responses => #{ responses => #{
<<"200">> => object_schema(message_props(), <<"List retained messages">>), <<"200">> => object_schema(message_props(), <<"List retained messages">>),
<<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']), <<"404">> => error_schema(<<"Retained Not Exists">>, ['NOT_FOUND']),
<<"405">> => schema(<<"NotAllowed">>) <<"405">> => schema(<<"NotAllowed">>)
} }
}, },

View File

@ -101,7 +101,7 @@ do_apply_rule(#{
true -> true ->
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
Collection2 = filter_collection(Input, InCase, DoEach, Collection), Collection2 = filter_collection(Input, InCase, DoEach, Collection),
{ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]}; {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
false -> false ->
{error, nomatch} {error, nomatch}
end; end;
@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId,
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true -> true ->
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
{ok, handle_output_list(Outputs, Selected, Input)}; {ok, handle_output_list(RuleId, Outputs, Selected, Input)};
false -> false ->
{error, nomatch} {error, nomatch}
end. end.
@ -231,15 +231,17 @@ number(Bin) ->
catch error:badarg -> binary_to_float(Bin) catch error:badarg -> binary_to_float(Bin)
end. end.
handle_output_list(Outputs, Selected, Envs) -> handle_output_list(RuleId, Outputs, Selected, Envs) ->
[handle_output(Out, Selected, Envs) || Out <- Outputs]. [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
handle_output(OutId, Selected, Envs) -> handle_output(RuleId, OutId, Selected, Envs) ->
try try
do_handle_output(OutId, Selected, Envs) do_handle_output(OutId, Selected, Envs)
catch catch
Err:Reason:ST -> Err:Reason:ST ->
?SLOG(error, #{msg => "output_failed", ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId),
Level = case Err of throw -> debug; _ -> error end,
?SLOG(Level, #{msg => "output_failed",
output => OutId, output => OutId,
exception => Err, exception => Err,
reason => Reason, reason => Reason,

View File

@ -59,12 +59,10 @@ statsd(put, #{body := Body}) ->
Body, Body,
#{rawconf_with_defaults => true, override_to => cluster}) of #{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewConfig, config := Config}} -> {ok, #{raw_config := NewConfig, config := Config}} ->
_ = case maps:get(<<"enable">>, Body) of _ = emqx_statsd_sup:stop_child(?APP),
true -> case maps:get(<<"enable">>, Body) of
_ = emqx_statsd_sup:stop_child(?APP), true -> emqx_statsd_sup:start_child(?APP, maps:get(config, Config));
emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); false -> ok
false ->
_ = emqx_statsd_sup:stop_child(?APP)
end, end,
{200, NewConfig}; {200, NewConfig};
{error, Reason} -> {error, Reason} ->