diff --git a/apps/emqx_authn/src/emqx_authn_api.erl b/apps/emqx_authn/src/emqx_authn_api.erl index 64ce3e6a4..a7e073581 100644 --- a/apps/emqx_authn/src/emqx_authn_api.erl +++ b/apps/emqx_authn/src/emqx_authn_api.erl @@ -979,7 +979,7 @@ authenticator_examples() -> mechanism => <<"password-based">>, backend => <<"http">>, method => <<"post">>, - url => <<"http://127.0.0.2:8080">>, + url => <<"http://127.0.0.1:18083">>, headers => #{ <<"content-type">> => <<"application/json">> }, diff --git a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl index 2c0716e8b..146e9ef32 100644 --- a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl @@ -153,9 +153,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_http:authenticate( Credentials, State)). diff --git a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl index edd91be55..855a2226d 100644 --- a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl @@ -146,9 +146,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_mongodb:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 95eecdead..ffe98be65 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -159,9 +159,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_mysql:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index e33f5c100..56044faf4 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -159,9 +159,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_pgsql:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index de556a7bd..eeb674f86 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -164,9 +164,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_redis:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 558e5005c..81b0d70a4 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -80,8 +80,15 @@ format(Rule = #{topic := Topic}) when is_map(Rule) -> }. update_(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE -> - {ok, _} = emqx:update_config([auto_subscribe, topics], Topics), - update_hook(); + case emqx_conf:update([auto_subscribe, topics], + Topics, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewTopics}} -> + ok = update_hook(), + {ok, NewTopics}; + {error, Reason} -> + {error, Reason} + end; update_(_Topics) -> {error, quota_exceeded}. diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl index d1207544a..cb5372d5d 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_api.erl @@ -22,6 +22,7 @@ -export([auto_subscribe/2]). +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -define(BAD_REQUEST, 'BAD_REQUEST'). @@ -90,6 +91,9 @@ auto_subscribe(put, #{body := Params}) -> Message = list_to_binary(io_lib:format("Max auto subscribe topic count is ~p", [emqx_auto_subscribe:max_limit()])), {409, #{code => ?EXCEED_LIMIT, message => Message}}; - ok -> - {200, emqx_auto_subscribe:list()} + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, #{code => ?INTERNAL_ERROR, message => Message}}; + {ok, NewTopics} -> + {200, NewTopics} end. diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 0e5022533..92eb9a9ab 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -85,7 +85,7 @@ init_per_suite(Config) -> } ] }">>), - emqx_common_test_helpers:start_apps([emqx_dashboard, ?APP], fun set_special_configs/1), + emqx_common_test_helpers:start_apps([emqx_dashboard, emqx_conf, ?APP], fun set_special_configs/1), Config. set_special_configs(emqx_dashboard) -> @@ -113,15 +113,17 @@ topic_config(T) -> end_per_suite(_) -> application:unload(emqx_management), + application:unload(emqx_conf), application:unload(?APP), meck:unload(emqx_resource), meck:unload(emqx_schema), - emqx_common_test_helpers:stop_apps([emqx_dashboard, ?APP]). + emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_conf, ?APP]). t_auto_subscribe(_) -> + emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]), {ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}), {ok, _} = emqtt:connect(Client), - timer:sleep(100), + timer:sleep(200), ?assertEqual(check_subs(length(?TOPICS)), ok), emqtt:disconnect(Client), ok. @@ -148,6 +150,7 @@ t_update(_) -> check_subs(Count) -> Subs = ets:tab2list(emqx_suboption), + ct:pal("---> ~p ~p ~n", [Subs, Count]), ?assert(length(Subs) >= Count), check_subs((Subs), ?ENSURE_TOPICS). diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d46ce217e..f27603bf9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -35,15 +35,19 @@ ]). -export([ load/0 + , lookup/1 , lookup/2 , lookup/3 , list/0 , list_bridges_by_connector/1 + , create/2 , create/3 , recreate/2 , recreate/3 , create_dry_run/2 + , remove/1 , remove/3 + , update/2 , update/3 , start/2 , stop/2 @@ -80,17 +84,36 @@ unload_hook() -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> - lists:foreach(fun (Id) -> - send_message(Id, emqx_rule_events:eventmsg_publish(Message)) - end, get_matched_bridges(Topic)); + Msg = emqx_rule_events:eventmsg_publish(Message), + send_to_matched_egress_bridges(Topic, Msg); true -> ok end, {ok, Message}. +send_to_matched_egress_bridges(Topic, Msg) -> + lists:foreach(fun (Id) -> + try send_message(Id, Msg) of + ok -> ok; + Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed", + bridge => Id, error => Error}) + catch Err:Reason:ST -> + ?SLOG(error, #{msg => "send_message_to_bridge_crash", + bridge => Id, error => Err, reason => Reason, + stacktrace => ST}) + end + end, get_matched_bridges(Topic)). + send_message(BridgeId, Message) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), - emqx_resource:query(ResId, {send_message, Message}). + case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of + not_found -> + {error, {bridge_not_found, BridgeId}}; + #{enable := true} -> + emqx_resource:query(ResId, {send_message, Message}); + #{enable := false} -> + {error, {bridge_stopped, BridgeId}} + end. config_key_path() -> [bridges]. @@ -169,6 +192,10 @@ list_bridges_by_connector(ConnectorId) -> [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(), ConnectorId =:= Id]. +lookup(Id) -> + {Type, Name} = parse_bridge_id(Id), + lookup(Type, Name). + lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). @@ -188,16 +215,24 @@ stop(Type, Name) -> restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). +create(BridgeId, Conf) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + create(BridgeType, BridgeName, Conf). + create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), #{force_create => true}) of + parse_confs(Type, Name, Conf), #{async_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} end. +update(BridgeId, {OldConf, Conf}) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + update(BridgeType, BridgeName, {OldConf, Conf}). + update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% @@ -217,7 +252,7 @@ update(Type, Name, {OldConf, Conf}) -> ?SLOG(warning, #{ msg => "updating_a_non-exist_bridge_need_create_a_new_one" , type => Type, name => Name, config => Conf}), create(Type, Name, Conf); - {error, Reason} -> {update_bridge_failed, Reason} + {error, Reason} -> {error, {update_bridge_failed, Reason}} end; true -> %% we don't need to recreate the bridge if this config change is only to @@ -229,11 +264,12 @@ update(Type, Name, {OldConf, Conf}) -> end. recreate(Type, Name) -> - recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])). + recreate(Type, Name, emqx:get_config([bridges, Type, Name])). recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), - emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []). + emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), + #{async_create => true}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, @@ -244,6 +280,10 @@ create_dry_run(Type, Conf) -> Error end. +remove(BridgeId) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + remove(BridgeType, BridgeName, #{}). + remove(Type, Name, _Conf) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of @@ -279,6 +319,8 @@ get_matched_bridges(Topic) -> end, Acc0, Conf) end, [], Bridges). +get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) -> + Acc; get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> case emqx_topic:match(Topic, Filter) of true -> [bridge_id(BType, BName) | Acc]; @@ -309,21 +351,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) {Type, ConnName} -> ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name); + maps:without([connector, direction], Conf), Type, Name); {_ConnType, _ConnName} -> error({cannot_use_connector_with_different_type, ConnId}) end; -parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) +parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when is_map(ConnectorConfs) -> make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name). + maps:without([connector, direction], Conf), Type, Name). -make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) -> - BName = bin(Name), +make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> + BName = bridge_id(Type, Name), ConnectorConfs#{ ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} }; -make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) -> +make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> ConnectorConfs#{ egress => BridgeConf }. diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index 494911d21..152289cf1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -68,7 +68,6 @@ How long will the HTTP request timeout. fields("post") -> [ type_field() - , name_field() ] ++ fields("bridge"); fields("put") -> @@ -103,8 +102,5 @@ id_field() -> type_field() -> {type, mk(http, #{desc => "The Bridge Type"})}. -name_field() -> - {name, mk(binary(), #{desc => "The Bridge Name"})}. - method() -> enum([post, put, get, delete]). diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl index 3de011b4c..96c9a1d38 100644 --- a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl @@ -24,11 +24,9 @@ fields("egress") -> fields("post_ingress") -> [ type_field() - , name_field() ] ++ proplists:delete(enable, fields("ingress")); fields("post_egress") -> [ type_field() - , name_field() ] ++ proplists:delete(enable, fields("egress")); fields("put_ingress") -> @@ -49,9 +47,3 @@ id_field() -> type_field() -> {type, mk(mqtt, #{desc => "The Bridge Type"})}. - -name_field() -> - {name, mk(binary(), - #{ desc => "The Bridge Name" - , example => "some_bridge_name" - })}. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 807ad32f6..16da7395f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -160,6 +160,7 @@ t_http_crud_apis(_) -> } = jsx:decode(Bridge), %% send an message to emqx and the message should be forwarded to the HTTP server + wait_for_resource_ready(BridgeID, 5), Body = <<"my msg">>, emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( @@ -212,6 +213,7 @@ t_http_crud_apis(_) -> }, jsx:decode(Bridge3Str)), %% send an message to emqx again, check the path has been changed + wait_for_resource_ready(BridgeID, 5), emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), ?assert( receive @@ -320,3 +322,14 @@ auth_header_() -> operation_path(Oper, BridgeID) -> uri(["bridges", BridgeID, "operation", Oper]). + +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end. diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 940e958e3..db1caefbb 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -37,31 +37,26 @@ config_key_path() -> [connectors]. +-dialyzer([{nowarn_function, [post_config_update/5]}, error_handling]). post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), - LinkedBridgeIds = lists:foldl(fun - (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc) - when ConnId0 == ConnId -> - [BId | Acc]; - (_, Acc) -> Acc - end, [], emqx_bridge:list()), - case LinkedBridgeIds of - [] -> ok; - _ -> {error, {dependency_bridges_exist, LinkedBridgeIds}} + try foreach_linked_bridges(ConnId, fun(#{id := BId}) -> + throw({dependency_bridges_exist, BId}) + end) + catch throw:Error -> {error, Error} end; -post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) -> +post_config_update([connectors, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> ConnId = connector_id(Type, Name), - lists:foreach(fun - (#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId -> + foreach_linked_bridges(ConnId, + fun(#{id := BId}) -> {BType, BName} = emqx_bridge:parse_bridge_id(BId), BridgeConf = emqx:get_config([bridges, BType, BName]), - case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of - {ok, _} -> ok; + case emqx_bridge:update(BType, BName, {BridgeConf#{connector => OldConf}, + BridgeConf#{connector => NewConf}}) of + ok -> ok; {error, Reason} -> error({update_bridge_error, Reason}) - end; - (_) -> - ok - end, emqx_bridge:list()). + end + end). connector_id(Type0, Name0) -> Type = bin(Type0), @@ -112,3 +107,10 @@ delete(Type, Name) -> bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +foreach_linked_bridges(ConnId, Do) -> + lists:foreach(fun + (#{raw_config := #{<<"connector">> := ConnId0}} = Bridge) when ConnId0 == ConnId -> + Do(Bridge); + (_) -> ok + end, emqx_bridge:list()). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 4989cf17e..72938649c 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -253,6 +253,10 @@ schema("/connectors/:id") -> {ok, _} -> case emqx_connector:delete(ConnType, ConnName) of {ok, _} -> {204}; + {error, {post_config_update, _, {dependency_bridges_exist, BridgeID}}} -> + {403, error_msg('DEPENDENCY_EXISTS', + <<"Cannot remove the connector as it's in use by a bridge: ", + BridgeID/binary>>)}; {error, Error} -> {400, error_msg('BAD_ARG', Error)} end; {error, not_found} -> diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index c216e905c..beeff6d3e 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -68,10 +68,6 @@ fields("put") -> fields("post") -> [ {type, mk(mqtt, #{desc => "The Connector Type"})} - , {name, mk(binary(), - #{ desc => "The Connector Name" - , example => <<"my_mqtt_connector">> - })} ] ++ fields("put"). %% =================================================================== diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index d7abcda84..7d5021f82 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -165,7 +165,8 @@ handle_publish(Msg, undefined) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(Msg, Vars) -> +handle_publish(Msg0, Vars) -> + Msg = format_msg_received(Msg0), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), case Vars of @@ -173,11 +174,7 @@ handle_publish(Msg, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - case maps:get(local_topic, Vars, undefined) of - undefined -> ok; - _Topic -> - emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) - end. + maybe_publish_to_local_broker(Msg0, Vars). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. @@ -197,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) -> process_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). + +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> + case maps:get(local_topic, Vars, undefined) of + undefined -> + ok; %% local topic is not set, discard it + _ -> + case emqx_topic:match(Topic, SubTopic) of + true -> + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + ok; + false -> + ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", + message => Msg, subscribed => SubTopic, got_topic => Topic}) + end + end. + +format_msg_received(#{dup := Dup, payload := Payload, properties := Props, + qos := QoS, retain := Retain, topic := Topic}) -> + #{event => '$bridges/mqtt', + id => emqx_guid:to_hexstr(emqx_guid:gen()), + payload => Payload, + topic => Topic, + qos => QoS, + dup => Dup, + retain => Retain, + pub_props => printable_maps(Props), + timestamp => erlang:system_time(millisecond) + }. + +printable_maps(undefined) -> #{}; +printable_maps(Headers) -> + maps:fold( + fun ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; + (K, V0, AccIn) -> AccIn#{K => V0} + end, #{}, Headers). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index a0dd9eec1..35bcf3de1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, +to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> - MapMsg = format_msg_received(MapMsg0), Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), @@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). -format_msg_received(#{dup := Dup, payload := Payload, properties := Props, - qos := QoS, retain := Retain, topic := Topic}) -> - #{event => '$bridges/mqtt', - id => emqx_guid:to_hexstr(emqx_guid:gen()), - payload => Payload, - topic => Topic, - qos => QoS, - flags => #{dup => Dup, retain => Retain}, - pub_props => printable_maps(Props), - timestamp => erlang:system_time(millisecond), - node => node() - }. - -printable_maps(undefined) -> #{}; -printable_maps(Headers) -> - maps:fold( - fun ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [#{ - key => Key, - value => Value - } || {Key, Value} <- V0] - }; - (K, V0, AccIn) -> AccIn#{K => V0} - end, #{}, Headers). - process_payload([], Msg) -> emqx_json:encode(Msg); process_payload(Tks, Msg) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 1a96a3596..12a3a8e23 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -22,7 +22,10 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CONF_DEFAULT, <<"connectors: {}">>). +%% output functions +-export([ inspect/3 + ]). + -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_NAME, <<"test_connector">>). @@ -67,6 +70,9 @@ <<"failed">> := FAILED, <<"rate">> := SPEED, <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). +inspect(Selected, _Envs, _Args) -> + persistent_term:put(?MODULE, #{inspect => Selected}). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -89,21 +95,38 @@ init_per_suite(Config) -> %% some testcases (may from other app) already get emqx_connector started _ = application:stop(emqx_resource), _ = application:stop(emqx_connector), - ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]), - ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT), + ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector, + emqx_bridge, emqx_dashboard]), + ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>), + ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>), ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]), + emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]), ok. init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + %% assert we there's no connectors and no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), Config. end_per_testcase(_, _Config) -> + clear_resources(), ok. +clear_resources() -> + lists:foreach(fun(#{id := Id}) -> + ok = emqx_rule_engine:delete_rule(Id) + end, emqx_rule_engine:get_rules()), + lists:foreach(fun(#{id := Id}) -> + ok = emqx_bridge:remove(Id) + end, emqx_bridge:list()), + lists:foreach(fun(#{<<"id">> := Id}) -> + ok = emqx_connector:delete(Id) + end, emqx_connector:list()). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -180,10 +203,6 @@ t_mqtt_crud_apis(_) -> ok. t_mqtt_conn_bridge_ingress(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -222,8 +241,8 @@ t_mqtt_conn_bridge_ingress(_) -> emqx:subscribe(LocalTopic), %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), emqx:publish(emqx_message:make(RemoteTopic, Payload)), - %% we should receive a message on the local broker, with specified topic ?assert( receive @@ -253,10 +272,6 @@ t_mqtt_conn_bridge_ingress(_) -> ok. t_mqtt_conn_bridge_egress(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -295,6 +310,7 @@ t_mqtt_conn_bridge_egress(_) -> emqx:subscribe(RemoteTopic), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic @@ -331,10 +347,6 @@ t_mqtt_conn_bridge_egress(_) -> %% - update a connector should also update all of the the bridges %% - cannot delete a connector that is used by at least one bridge t_mqtt_conn_update(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST {ok, 201, Connector} = request(post, uri(["connectors"]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) @@ -360,6 +372,7 @@ t_mqtt_conn_update(_) -> , <<"status">> := <<"connected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' @@ -377,10 +390,6 @@ t_mqtt_conn_update(_) -> {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). t_mqtt_conn_update2(_) -> - %% assert we there's no connectors and no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a mqtt connector, using POST %% but this connector is point to a unreachable server "2603" {ok, 201, Connector} = request(post, uri(["connectors"]), @@ -406,6 +415,11 @@ t_mqtt_conn_update2(_) -> , <<"status">> := <<"disconnected">> , <<"connector">> := ConnctorID } = jsx:decode(Bridge), + %% We try to fix the 'server' parameter, to another unavailable server.. + %% The update should success: we don't check the connectivity of the new config + %% if the resource is now disconnected. + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:2604">>)), %% we fix the 'server' parameter to a normal one, it should work {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), @@ -421,6 +435,34 @@ t_mqtt_conn_update2(_) -> {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). +t_mqtt_conn_update3(_) -> + %% we add a mqtt connector, using POST + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) + #{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + %% ... and a MQTT bridge, using POST + %% we bind this bridge to the connector created just now + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), + wait_for_resource_ready(BridgeIDEgress, 2), + + %% delete the connector should fail because it is in use by a bridge + {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + %% the connector now can be deleted without problems + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + t_mqtt_conn_testing(_) -> %% APIs for testing the connectivity %% then we add a mqtt connector, using POST @@ -435,6 +477,153 @@ t_mqtt_conn_testing(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS }). +t_ingress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_INGRESS + }), + #{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule get messages from a source mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], + <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + + RemoteTopic = <<"remote_topic/1">>, + LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDIngress, 5), + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + %% and also the rule should be matched, with matched + 1: + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we also check if the outputs of the rule is triggered + ?assertMatch(#{inspect := #{ + event := '$bridges/mqtt', + id := MsgId, + payload := Payload, + topic := RemoteTopic, + qos := 0, + dup := false, + retain := false, + pub_props := #{}, + timestamp := _ + }} when is_binary(MsgId), persistent_term:get(?MODULE)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + +t_egress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [BridgeIDEgress], + <<"sql">> => <<"SELECT * from \"t/1\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + wait_for_resource_ready(BridgeIDEgress, 5), + emqx:publish(emqx_message:make(LocalTopic, Payload)), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + emqx:unsubscribe(RemoteTopic), + + %% PUBLISH a message to the rule. + Payload2 = <<"hi">>, + RuleTopic = <<"t/1">>, + RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, + emqx:subscribe(RemoteTopic2), + wait_for_resource_ready(BridgeIDEgress, 5), + emqx:publish(emqx_message:make(RuleTopic, Payload2)), + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic2, #message{payload = Payload2}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress + , <<"metrics">> := ?metrics(2, 2, 0, _, _, _) + , <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + }, jsx:decode(BridgeStr)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- @@ -468,3 +657,13 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. +wait_for_resource_ready(InstId, 0) -> + ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), + ct:fail(wait_resource_timeout); +wait_for_resource_ready(InstId, Retry) -> + case emqx_bridge:lookup(InstId) of + {ok, #{resource_data := #{status := started}}} -> ok; + _ -> + timer:sleep(100), + wait_for_resource_ready(InstId, Retry-1) + end. diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 36569d52e..fef0a1ae1 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -155,7 +155,7 @@ format_delayed(#delayed_message{key = {ExpectTimeStamp, Id}, delayed = Delayed, }, case WithPayload of true -> - Result#{payload => base64:encode(Payload)}; + Result#{payload => Payload}; _ -> Result end. @@ -187,7 +187,7 @@ delete_delayed_message(Id0) -> mria:dirty_delete(?TAB, {Timestamp, Id}) end. update_config(Config) -> - {ok, _} = emqx:update_config([delayed], Config). + emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}). %%-------------------------------------------------------------------- %% gen_server callback diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 9199d7b2c..5caad8aa1 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -25,12 +25,14 @@ -define(MAX_PAYLOAD_LENGTH, 2048). -define(PAYLOAD_TOO_LARGE, 'PAYLOAD_TOO_LARGE'). --export([status/2 - , delayed_messages/2 - , delayed_message/2 -]). +-export([ status/2 + , delayed_messages/2 + , delayed_message/2 + ]). --export([paths/0, fields/1, schema/1]). +-export([ paths/0 + , fields/1 + , schema/1]). %% for rpc -export([update_config_/1]). @@ -40,6 +42,7 @@ -define(ALREADY_ENABLED, 'ALREADY_ENABLED'). -define(ALREADY_DISABLED, 'ALREADY_DISABLED'). +-define(INTERNAL_ERROR, 'INTERNAL_ERROR'). -define(BAD_REQUEST, 'BAD_REQUEST'). -define(MESSAGE_ID_NOT_FOUND, 'MESSAGE_ID_NOT_FOUND'). @@ -49,7 +52,11 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE). -paths() -> ["/mqtt/delayed", "/mqtt/delayed/messages", "/mqtt/delayed/messages/:msgid"]. +paths() -> + [ "/mqtt/delayed" + , "/mqtt/delayed/messages" + , "/mqtt/delayed/messages/:msgid" + ]. schema("/mqtt/delayed") -> #{ @@ -189,8 +196,7 @@ get_status() -> update_config(Config) -> case generate_config(Config) of {ok, Config} -> - update_config_(Config), - {200, get_status()}; + update_config_(Config); {error, {Code, Message}} -> {400, #{code => Code, message => Message}} end. @@ -215,29 +221,28 @@ generate_max_delayed_messages(Config) -> {ok, Config}. update_config_(Config) -> - lists:foreach(fun(Node) -> - update_config_(Node, Config) - end, mria_mnesia:running_nodes()). - -update_config_(Node, Config) when Node =:= node() -> - _ = emqx_delayed:update_config(Config), - case maps:get(<<"enable">>, Config, undefined) of - undefined -> - ignore; - true -> - emqx_delayed:enable(); - false -> - emqx_delayed:disable() - end, - case maps:get(<<"max_delayed_messages">>, Config, undefined) of - undefined -> - ignore; - Max -> - ok = emqx_delayed:set_max_delayed_messages(Max) - end; - -update_config_(Node, Config) -> - rpc_call(Node, ?MODULE, ?FUNCTION_NAME, [Node, Config]). + case emqx_delayed:update_config(Config) of + {ok, #{raw_config := NewDelayed}} -> + case maps:get(<<"enable">>, Config, undefined) of + undefined -> + ignore; + true -> + emqx_delayed:enable(); + false -> + emqx_delayed:disable() + end, + case maps:get(<<"max_delayed_messages">>, Config, undefined) of + undefined -> + ignore; + Max -> + ok = emqx_delayed:set_max_delayed_messages(Max) + end, + {200, NewDelayed}; + {error, Reason} -> + Message = list_to_binary( + io_lib:format("Update config failed ~p", [Reason])), + {500, ?INTERNAL_ERROR, Message} + end. generate_http_code_map(id_schema_error, Id) -> #{code => ?MESSAGE_ID_SCHEMA_ERROR, message => @@ -245,9 +250,3 @@ generate_http_code_map(id_schema_error, Id) -> generate_http_code_map(not_found, Id) -> #{code => ?MESSAGE_ID_NOT_FOUND, message => iolist_to_binary(io_lib:format("Message ID ~p not found", [Id]))}. - -rpc_call(Node, Module, Fun, Args) -> - case rpc:call(Node, Module, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Result -> Result - end. diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index ccdb75ccb..3af57a38d 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -44,8 +44,15 @@ list() -> update(Params) -> disable(), - {ok, _} = emqx:update_config([event_message], Params), - enable(). + case emqx_conf:update([event_message], + Params, + #{rawconf_with_defaults => true, override_to => cluster}) of + {ok, #{raw_config := NewEventMessage}} -> + enable(), + {ok, NewEventMessage}; + {error, Reason} -> + {error, Reason} + end. enable() -> lists:foreach(fun({_Topic, false}) -> ok; diff --git a/apps/emqx_modules/src/emqx_event_message_api.erl b/apps/emqx_modules/src/emqx_event_message_api.erl index 80e5825d1..e27311e15 100644 --- a/apps/emqx_modules/src/emqx_event_message_api.erl +++ b/apps/emqx_modules/src/emqx_event_message_api.erl @@ -53,5 +53,10 @@ event_message(get, _Params) -> {200, emqx_event_message:list()}; event_message(put, #{body := Body}) -> - _ = emqx_event_message:update(Body), - {200, emqx_event_message:list()}. + case emqx_event_message:update(Body) of + {ok, NewConfig} -> + {200, NewConfig}; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), + {500, 'INTERNAL_ERROR', Message} + end. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 08c230401..363e40a5f 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -25,7 +25,7 @@ mod := module(), config := resource_config(), state := resource_state(), - status := started | stopped, + status := started | stopped | starting, metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). @@ -33,7 +33,7 @@ %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails, %% the 'status' of the resource will be 'stopped' in this case. %% Defaults to 'false' - force_create => boolean() + async_create => boolean() }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. @@ -41,3 +41,5 @@ %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback %% actions upon query failure -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. + +-define(TEST_ID_PREFIX, "_test_:"). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 37c4caa2e..12ae912e8 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -58,6 +58,7 @@ %% Calls to the callback module with current resource state %% They also save the state after the call finished (except query/2,3). -export([ restart/1 %% restart the instance. + , restart/2 , health_check/1 %% verify if the resource is working normally , stop/1 %% stop the instance , query/2 %% query the instance @@ -68,7 +69,6 @@ -export([ call_start/3 %% start the instance , call_health_check/3 %% verify if the resource is working normally , call_stop/3 %% stop the instance - , call_config_merge/4 %% merge the config when updating , call_jsonify/2 ]). @@ -82,17 +82,13 @@ ]). -define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}). - -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -optional_callbacks([ on_query/4 , on_health_check/2 - , on_config_merge/3 , on_jsonify/1 ]). --callback on_config_merge(resource_config(), resource_config(), term()) -> resource_config(). - -callback on_jsonify(resource_config()) -> jsx:json_term(). %% when calling emqx_resource:start/1 @@ -170,18 +166,17 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - InstId = iolist_to_binary(emqx_misc:gen_id(16)), - call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). + call_instance(<>, {create_dry_run, ResourceType, Config}). --spec recreate(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, Config, Params) -> - cluster_call(recreate_local, [InstId, ResourceType, Config, Params]). +recreate(InstId, ResourceType, Config, Opts) -> + cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]). --spec recreate_local(instance_id(), resource_type(), resource_config(), term()) -> +-spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(InstId, ResourceType, Config, Params) -> - call_instance(InstId, {recreate, InstId, ResourceType, Config, Params}). +recreate_local(InstId, ResourceType, Config, Opts) -> + call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> @@ -201,19 +196,27 @@ query(InstId, Request) -> -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> case get_instance(InstId) of + {ok, #{status := starting}} -> + query_error(starting, <<"cannot serve query when the resource " + "instance is still starting">>); {ok, #{status := stopped}} -> - error({resource_stopped, InstId}); + query_error(stopped, <<"cannot serve query when the resource " + "instance is stopped">>); {ok, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe Mod:on_query(InstId, Request, AfterQuery, ResourceState); - {error, Reason} -> - error({get_instance, {InstId, Reason}}) + {error, not_found} -> + query_error(not_found, <<"the resource id not exists">>) end. -spec restart(instance_id()) -> ok | {error, Reason :: term()}. restart(InstId) -> - call_instance(InstId, {restart, InstId}). + restart(InstId, #{}). + +-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +restart(InstId, Opts) -> + call_instance(InstId, {restart, InstId, Opts}). -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> @@ -273,14 +276,6 @@ call_health_check(InstId, Mod, ResourceState) -> call_stop(InstId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). --spec call_config_merge(module(), resource_config(), resource_config(), term()) -> - resource_config(). -call_config_merge(Mod, OldConfig, NewConfig, Params) -> - case erlang:function_exported(Mod, on_config_merge, 3) of - true -> ?SAFE_CALL(Mod:on_config_merge(OldConfig, NewConfig, Params)); - false -> NewConfig - end. - -spec call_jsonify(module(), resource_config()) -> jsx:json_term(). call_jsonify(Mod, Config) -> case erlang:function_exported(Mod, on_jsonify, 1) of @@ -327,17 +322,17 @@ check_and_create_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), term()) -> +-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_recreate_local(InstId, ResourceType, RawConfig, Params) -> +check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Params) end). + fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end). check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> case check_config(ResourceType, RawConfig) of @@ -368,3 +363,6 @@ cluster_call(Func, Args) -> {ok, _TxnId, Result} -> Result; Failed -> Failed end. + +query_error(Reason, Msg) -> + {error, {?MODULE, #{reason => Reason, msg => Msg}}}. diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl new file mode 100644 index 000000000..032ff6999 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_resource_health_check). + +-export([ start_link/2 + , create_checker/2 + , delete_checker/1 + ]). + +-export([health_check/2]). + +-define(SUP, emqx_resource_health_check_sup). +-define(ID(NAME), {resource_health_check, NAME}). + +child_spec(Name, Sleep) -> + #{id => ?ID(Name), + start => {?MODULE, start_link, [Name, Sleep]}, + restart => transient, + shutdown => 5000, type => worker, modules => [?MODULE]}. + +start_link(Name, Sleep) -> + Pid = proc_lib:spawn_link(?MODULE, health_check, [Name, Sleep]), + {ok, Pid}. + +create_checker(Name, Sleep) -> + create_checker(Name, Sleep, false). + +create_checker(Name, Sleep, Retry) -> + case supervisor:start_child(?SUP, child_spec(Name, Sleep)) of + {ok, _} -> ok; + {error, already_present} -> ok; + {error, {already_started, _}} when Retry == false -> + ok = delete_checker(Name), + create_checker(Name, Sleep, true); + Error -> Error + end. + +delete_checker(Name) -> + case supervisor:terminate_child(?SUP, ?ID(Name)) of + ok -> supervisor:delete_child(?SUP, ?ID(Name)); + Error -> Error + end. + +health_check(Name, SleepTime) -> + case emqx_resource:health_check(Name) of + ok -> + emqx_alarm:deactivate(Name); + {error, _} -> + emqx_alarm:activate(Name, #{name => Name}, + <>) + end, + timer:sleep(SleepTime), + health_check(Name, SleepTime). diff --git a/apps/emqx_resource/src/emqx_resource_health_check_sup.erl b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl new file mode 100644 index 000000000..e17186114 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_health_check_sup.erl @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_resource_health_check_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, + {ok, {SupFlags, []}}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 745b8b684..86318e355 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -26,6 +26,7 @@ -export([ lookup/1 , get_metrics/1 , list_all/0 + , make_test_id/0 ]). -export([ hash_call/2 @@ -61,7 +62,7 @@ hash_call(InstId, Request) -> hash_call(InstId, Request, Timeout) -> gen_server:call(pick(InstId), Request, Timeout). --spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. +-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}. lookup(InstId) -> case ets:lookup(emqx_resource_instance, InstId) of [] -> {error, not_found}; @@ -69,6 +70,10 @@ lookup(InstId) -> {ok, Data#{id => InstId, metrics => get_metrics(InstId)}} end. +make_test_id() -> + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + <>. + get_metrics(InstId) -> emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). @@ -98,17 +103,17 @@ init({Pool, Id}) -> handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> {reply, do_create(InstId, ResourceType, Config, Opts), State}; -handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) -> - {reply, do_create_dry_run(InstId, ResourceType, Config), State}; +handle_call({create_dry_run, ResourceType, Config}, _From, State) -> + {reply, do_create_dry_run(ResourceType, Config), State}; -handle_call({recreate, InstId, ResourceType, Config, Params}, _From, State) -> - {reply, do_recreate(InstId, ResourceType, Config, Params), State}; +handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) -> + {reply, do_recreate(InstId, ResourceType, Config, Opts), State}; handle_call({remove, InstId}, _From, State) -> {reply, do_remove(InstId), State}; -handle_call({restart, InstId}, _From, State) -> - {reply, do_restart(InstId), State}; +handle_call({restart, InstId, Opts}, _From, State) -> + {reply, do_restart(InstId, Opts), State}; handle_call({stop, InstId}, _From, State) -> {reply, do_stop(InstId), State}; @@ -135,25 +140,30 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% suppress the race condition check, as these functions are protected in gproc workers --dialyzer({nowarn_function, [do_recreate/4, - do_create/4, - do_restart/1, - do_stop/1, - do_health_check/1]}). +-dialyzer({nowarn_function, [ do_recreate/4 + , do_create/4 + , do_restart/2 + , do_start/4 + , do_stop/1 + , do_health_check/1 + , start_and_check/5 + ]}). -do_recreate(InstId, ResourceType, NewConfig, Params) -> +do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> - Config = emqx_resource:call_config_merge(ResourceType, OldConfig, - NewConfig, Params), - TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), - case do_create_dry_run(TestInstId, ResourceType, Config) of + {ok, #{mod := ResourceType, status := started} = Data} -> + %% If this resource is in use (status='started'), we should make sure + %% the new config is OK before removing the old one. + case do_create_dry_run(ResourceType, NewConfig) of ok -> - do_remove(ResourceType, InstId, ResourceState, false), - do_create(InstId, ResourceType, Config, #{force_create => true}); + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts); Error -> Error end; + {ok, #{mod := ResourceType, status := _} = Data} -> + do_remove(Data, false), + do_create(InstId, ResourceType, NewConfig, Opts); {ok, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> @@ -161,98 +171,96 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> end. do_create(InstId, ResourceType, Config, Opts) -> - ForceCreate = maps:get(force_create, Opts, false), case lookup(InstId) of - {ok, _} -> {ok, already_created}; - _ -> - Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => stopped, state => undefined}, - case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState} -> - ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), - %% this is the first time we do health check, this will update the - %% status and then do ets:insert/2 - _ = do_health_check(Res0#{state => ResourceState}), + {ok, _} -> + {ok, already_created}; + {error, not_found} -> + case do_start(InstId, ResourceType, Config, Opts) of + ok -> + ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; - {error, Reason} when ForceCreate == true -> - logger:error("start ~ts resource ~ts failed: ~p, " - "force_create it as a stopped resource", - [ResourceType, InstId, Reason]), - ets:insert(emqx_resource_instance, {InstId, Res0}), - {ok, Res0}; - {error, Reason} when ForceCreate == false -> - {error, Reason} + Error -> + Error end end. -do_create_dry_run(InstId, ResourceType, Config) -> +do_create_dry_run(ResourceType, Config) -> + InstId = make_test_id(), case emqx_resource:call_start(InstId, ResourceType, Config) of - {ok, ResourceState0} -> - Return = case emqx_resource:call_health_check(InstId, ResourceType, ResourceState0) of - {ok, ResourceState1} -> ok; - {error, Reason, ResourceState1} -> - {error, Reason} - end, - _ = emqx_resource:call_stop(InstId, ResourceType, ResourceState1), - Return; + {ok, ResourceState} -> + case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of + {ok, _} -> ok; + {error, Reason, _} -> {error, Reason} + end; {error, Reason} -> {error, Reason} end. -do_remove(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState}} -> - do_remove(Mod, InstId, ResourceState); - Error -> - Error - end. +do_remove(Instance) -> + do_remove(Instance, true). -do_remove(Mod, InstId, ResourceState) -> - do_remove(Mod, InstId, ResourceState, true). - -do_remove(Mod, InstId, ResourceState, ClearMetrics) -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), +do_remove(InstId, ClearMetrics) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]); +do_remove(#{id := InstId} = Data, ClearMetrics) -> + _ = do_stop(Data), ets:delete(emqx_resource_instance, InstId), case ClearMetrics of true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); false -> ok + end, + ok. + +do_restart(InstId, Opts) -> + case lookup(InstId) of + {ok, #{mod := ResourceType, config := Config} = Data} -> + ok = do_stop(Data), + do_start(InstId, ResourceType, Config, Opts); + Error -> + Error end. -do_restart(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> - _ = case ResourceState of - undefined -> ok; - _ -> emqx_resource:call_stop(InstId, Mod, ResourceState) - end, - case emqx_resource:call_start(InstId, Mod, Config) of - {ok, NewResourceState} -> - ets:insert(emqx_resource_instance, - {InstId, Data#{state => NewResourceState, status => started}}), - ok; - {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), - {error, Reason} +do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) -> + InitData = #{id => InstId, mod => ResourceType, config => Config, + status => starting, state => undefined}, + %% The `emqx_resource:call_start/3` need the instance exist beforehand + ets:insert(emqx_resource_instance, {InstId, InitData}), + case maps:get(async_create, Opts, false) of + false -> + start_and_check(InstId, ResourceType, Config, Opts, InitData); + true -> + spawn(fun() -> + start_and_check(InstId, ResourceType, Config, Opts, InitData) + end), + ok + end. + +start_and_check(InstId, ResourceType, Config, Opts, Data) -> + case emqx_resource:call_start(InstId, ResourceType, Config) of + {ok, ResourceState} -> + Data2 = Data#{state => ResourceState}, + ets:insert(emqx_resource_instance, {InstId, Data2}), + case maps:get(async_create, Opts, false) of + false -> do_health_check(Data2); + true -> emqx_resource_health_check:create_checker(InstId, + maps:get(health_check_interval, Opts, 15000)) end; - Error -> - Error + {error, Reason} -> + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + {error, Reason} end. -do_stop(InstId) -> - case lookup(InstId) of - {ok, #{mod := Mod, state := ResourceState} = Data} -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), - ok; - Error -> - Error - end. +do_stop(InstId) when is_binary(InstId) -> + do_with_instance_data(InstId, fun do_stop/1, []); +do_stop(#{state := undefined}) -> + ok; +do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> + _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + _ = emqx_resource_health_check:delete_checker(InstId), + ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + ok. do_health_check(InstId) when is_binary(InstId) -> - case lookup(InstId) of - {ok, Data} -> do_health_check(Data); - Error -> Error - end; + do_with_instance_data(InstId, fun do_health_check/1, []); do_health_check(#{state := undefined}) -> {error, resource_not_initialized}; do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> @@ -272,6 +280,12 @@ do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> %% internal functions %%------------------------------------------------------------------------------ +do_with_instance_data(InstId, Do, Args) -> + case lookup(InstId) of + {ok, Data} -> erlang:apply(Do, [Data | Args]); + Error -> Error + end. + proc_name(Mod, Id) -> list_to_atom(lists:concat([Mod, "_", Id])). diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 534777b69..99d601ec4 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -45,7 +45,12 @@ init([]) -> restart => transient, shutdown => 5000, type => worker, modules => [Mod]} end || Idx <- lists:seq(1, ?POOL_SIZE)], - {ok, {SupFlags, [Metrics | ResourceInsts]}}. + HealthCheck = + #{id => emqx_resource_health_check_sup, + start => {emqx_resource_health_check_sup, start_link, []}, + restart => transient, + shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]}, + {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}. %% internal functions ensure_pool(Pool, Type, Opts) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 6b2e5903e..4e9c35efc 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -96,9 +96,7 @@ t_query(_) -> ?assert(false) end, - ?assertException( - error, - {get_instance, _Reason}, + ?assertMatch({error, {emqx_resource, #{reason := not_found}}}, emqx_resource:query(<<"unknown">>, get_state)), ok = emqx_resource:remove_local(?ID). @@ -142,7 +140,8 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)), + ?assertMatch({error, {emqx_resource, #{reason := stopped}}}, + emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index b60ac5627..7739d60dc 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -84,7 +84,7 @@ with_topic_api() -> parameters => parameters(), responses => #{ <<"200">> => object_schema(message_props(), <<"List retained messages">>), - <<"404">> => error_schema(<<"Reatined Not Exists">>, ['NOT_FOUND']), + <<"404">> => error_schema(<<"Retained Not Exists">>, ['NOT_FOUND']), <<"405">> => schema(<<"NotAllowed">>) } }, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 60a7cbaad..049829c59 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -101,7 +101,7 @@ do_apply_rule(#{ true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), Collection2 = filter_collection(Input, InCase, DoEach, Collection), - {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]}; + {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> {error, nomatch} end; @@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId, {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), - {ok, handle_output_list(Outputs, Selected, Input)}; + {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> {error, nomatch} end. @@ -231,15 +231,17 @@ number(Bin) -> catch error:badarg -> binary_to_float(Bin) end. -handle_output_list(Outputs, Selected, Envs) -> - [handle_output(Out, Selected, Envs) || Out <- Outputs]. +handle_output_list(RuleId, Outputs, Selected, Envs) -> + [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. -handle_output(OutId, Selected, Envs) -> +handle_output(RuleId, OutId, Selected, Envs) -> try do_handle_output(OutId, Selected, Envs) catch Err:Reason:ST -> - ?SLOG(error, #{msg => "output_failed", + ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId), + Level = case Err of throw -> debug; _ -> error end, + ?SLOG(Level, #{msg => "output_failed", output => OutId, exception => Err, reason => Reason, diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index b267a9412..d545003b0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -59,12 +59,10 @@ statsd(put, #{body := Body}) -> Body, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfig, config := Config}} -> - _ = case maps:get(<<"enable">>, Body) of - true -> - _ = emqx_statsd_sup:stop_child(?APP), - emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); - false -> - _ = emqx_statsd_sup:stop_child(?APP) + _ = emqx_statsd_sup:stop_child(?APP), + case maps:get(<<"enable">>, Body) of + true -> emqx_statsd_sup:start_child(?APP, maps:get(config, Config)); + false -> ok end, {200, NewConfig}; {error, Reason} ->