diff --git a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl index 2c0716e8b..146e9ef32 100644 --- a/apps/emqx_authn/test/emqx_authn_http_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_http_SUITE.erl @@ -153,9 +153,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_http:authenticate( Credentials, State)). diff --git a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl index edd91be55..855a2226d 100644 --- a/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mongo_SUITE.erl @@ -146,9 +146,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_mongodb:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 95eecdead..ffe98be65 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -159,9 +159,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_mysql:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index e33f5c100..56044faf4 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -159,9 +159,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_pgsql:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index de556a7bd..eeb674f86 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -164,9 +164,8 @@ t_destroy(_Config) -> ?GLOBAL), % Authenticator should not be usable anymore - ?assertException( - error, - _, + ?assertMatch( + ignore, emqx_authn_redis:authenticate( #{username => <<"plain">>, password => <<"plain">> diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d46ce217e..2e610b2b9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -39,11 +39,14 @@ , lookup/3 , list/0 , list_bridges_by_connector/1 + , create/2 , create/3 , recreate/2 , recreate/3 , create_dry_run/2 + , remove/1 , remove/3 + , update/2 , update/3 , start/2 , stop/2 @@ -80,17 +83,36 @@ unload_hook() -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> - lists:foreach(fun (Id) -> - send_message(Id, emqx_rule_events:eventmsg_publish(Message)) - end, get_matched_bridges(Topic)); + Msg = emqx_rule_events:eventmsg_publish(Message), + send_to_matched_egress_bridges(Topic, Msg); true -> ok end, {ok, Message}. +send_to_matched_egress_bridges(Topic, Msg) -> + lists:foreach(fun (Id) -> + try send_message(Id, Msg) of + ok -> ok; + Error -> ?SLOG(error, #{msg => "send_message_to_bridge_failed", + bridge => Id, error => Error}) + catch Err:Reason:ST -> + ?SLOG(error, #{msg => "send_message_to_bridge_crash", + bridge => Id, error => Err, reason => Reason, + stacktrace => ST}) + end + end, get_matched_bridges(Topic)). + send_message(BridgeId, Message) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), - emqx_resource:query(ResId, {send_message, Message}). + case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of + not_found -> + {error, {bridge_not_found, BridgeId}}; + #{enable := true} -> + emqx_resource:query(ResId, {send_message, Message}); + #{enable := false} -> + {error, {bridge_stopped, BridgeId}} + end. config_key_path() -> [bridges]. @@ -188,6 +210,10 @@ stop(Type, Name) -> restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). +create(BridgeId, Conf) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + create(BridgeType, BridgeName, Conf). + create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), @@ -198,6 +224,10 @@ create(Type, Name, Conf) -> {error, Reason} -> {error, Reason} end. +update(BridgeId, {OldConf, Conf}) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + update(BridgeType, BridgeName, {OldConf, Conf}). + update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% @@ -244,6 +274,10 @@ create_dry_run(Type, Conf) -> Error end. +remove(BridgeId) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + remove(BridgeType, BridgeName, #{}). + remove(Type, Name, _Conf) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of @@ -279,6 +313,8 @@ get_matched_bridges(Topic) -> end, Acc0, Conf) end, [], Bridges). +get_matched_bridge_id(#{enable := false}, _Topic, _BType, _BName, Acc) -> + Acc; get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) -> case emqx_topic:match(Topic, Filter) of true -> [bridge_id(BType, BName) | Acc]; @@ -309,21 +345,21 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) {Type, ConnName} -> ConnectorConfs = emqx:get_config([connectors, Type, ConnName]), make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name); + maps:without([connector, direction], Conf), Type, Name); {_ConnType, _ConnName} -> error({cannot_use_connector_with_different_type, ConnId}) end; -parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) +parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when is_map(ConnectorConfs) -> make_resource_confs(Direction, ConnectorConfs, - maps:without([connector, direction], Conf), Name). + maps:without([connector, direction], Conf), Type, Name). -make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) -> - BName = bin(Name), +make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> + BName = bridge_id(Type, Name), ConnectorConfs#{ ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>} }; -make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) -> +make_resource_confs(egress, ConnectorConfs, BridgeConf, _Type, _Name) -> ConnectorConfs#{ egress => BridgeConf }. diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index 494911d21..152289cf1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -68,7 +68,6 @@ How long will the HTTP request timeout. fields("post") -> [ type_field() - , name_field() ] ++ fields("bridge"); fields("put") -> @@ -103,8 +102,5 @@ id_field() -> type_field() -> {type, mk(http, #{desc => "The Bridge Type"})}. -name_field() -> - {name, mk(binary(), #{desc => "The Bridge Name"})}. - method() -> enum([post, put, get, delete]). diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl index 3de011b4c..96c9a1d38 100644 --- a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl @@ -24,11 +24,9 @@ fields("egress") -> fields("post_ingress") -> [ type_field() - , name_field() ] ++ proplists:delete(enable, fields("ingress")); fields("post_egress") -> [ type_field() - , name_field() ] ++ proplists:delete(enable, fields("egress")); fields("put_ingress") -> @@ -49,9 +47,3 @@ id_field() -> type_field() -> {type, mk(mqtt, #{desc => "The Bridge Type"})}. - -name_field() -> - {name, mk(binary(), - #{ desc => "The Bridge Name" - , example => "some_bridge_name" - })}. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index c216e905c..beeff6d3e 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -68,10 +68,6 @@ fields("put") -> fields("post") -> [ {type, mk(mqtt, #{desc => "The Connector Type"})} - , {name, mk(binary(), - #{ desc => "The Connector Name" - , example => <<"my_mqtt_connector">> - })} ] ++ fields("put"). %% =================================================================== diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index d7abcda84..7d5021f82 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -165,7 +165,8 @@ handle_publish(Msg, undefined) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(Msg, Vars) -> +handle_publish(Msg0, Vars) -> + Msg = format_msg_received(Msg0), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), case Vars of @@ -173,11 +174,7 @@ handle_publish(Msg, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - case maps:get(local_topic, Vars, undefined) of - undefined -> ok; - _Topic -> - emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) - end. + maybe_publish_to_local_broker(Msg0, Vars). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. @@ -197,3 +194,45 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) -> process_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). + +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> + case maps:get(local_topic, Vars, undefined) of + undefined -> + ok; %% local topic is not set, discard it + _ -> + case emqx_topic:match(Topic, SubTopic) of + true -> + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + ok; + false -> + ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", + message => Msg, subscribed => SubTopic, got_topic => Topic}) + end + end. + +format_msg_received(#{dup := Dup, payload := Payload, properties := Props, + qos := QoS, retain := Retain, topic := Topic}) -> + #{event => '$bridges/mqtt', + id => emqx_guid:to_hexstr(emqx_guid:gen()), + payload => Payload, + topic => Topic, + qos => QoS, + dup => Dup, + retain => Retain, + pub_props => printable_maps(Props), + timestamp => erlang:system_time(millisecond) + }. + +printable_maps(undefined) -> #{}; +printable_maps(Headers) -> + maps:fold( + fun ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; + (K, V0, AccIn) -> AccIn#{K => V0} + end, #{}, Headers). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index a0dd9eec1..35bcf3de1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -78,10 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, +to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> - MapMsg = format_msg_received(MapMsg0), Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), @@ -90,33 +89,6 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). -format_msg_received(#{dup := Dup, payload := Payload, properties := Props, - qos := QoS, retain := Retain, topic := Topic}) -> - #{event => '$bridges/mqtt', - id => emqx_guid:to_hexstr(emqx_guid:gen()), - payload => Payload, - topic => Topic, - qos => QoS, - flags => #{dup => Dup, retain => Retain}, - pub_props => printable_maps(Props), - timestamp => erlang:system_time(millisecond), - node => node() - }. - -printable_maps(undefined) -> #{}; -printable_maps(Headers) -> - maps:fold( - fun ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [#{ - key => Key, - value => Value - } || {Key, Value} <- V0] - }; - (K, V0, AccIn) -> AccIn#{K => V0} - end, #{}, Headers). - process_payload([], Msg) -> emqx_json:encode(Msg); process_payload(Tks, Msg) -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 1a96a3596..936982e75 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -22,7 +22,10 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CONF_DEFAULT, <<"connectors: {}">>). +%% output functions +-export([ inspect/3 + ]). + -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_NAME, <<"test_connector">>). @@ -67,6 +70,9 @@ <<"failed">> := FAILED, <<"rate">> := SPEED, <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). +inspect(Selected, _Envs, _Args) -> + persistent_term:put(?MODULE, #{inspect => Selected}). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -89,21 +95,35 @@ init_per_suite(Config) -> %% some testcases (may from other app) already get emqx_connector started _ = application:stop(emqx_resource), _ = application:stop(emqx_connector), - ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]), - ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT), + ok = emqx_common_test_helpers:start_apps([emqx_rule_engine, emqx_connector, + emqx_bridge, emqx_dashboard]), + ok = emqx_config:init_load(emqx_connector_schema, <<"connectors: {}">>), + ok = emqx_config:init_load(emqx_rule_engine_schema, <<"rule_engine {rules {}}">>), ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]), + emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_connector, emqx_bridge, emqx_dashboard]), ok. init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(_, _Config) -> + clear_resources(), ok. +clear_resources() -> + lists:foreach(fun(#{id := Id}) -> + ok = emqx_rule_engine:delete_rule(Id) + end, emqx_rule_engine:get_rules()), + lists:foreach(fun(#{id := Id}) -> + ok = emqx_bridge:remove(Id) + end, emqx_bridge:list()), + lists:foreach(fun(#{<<"id">> := Id}) -> + ok = emqx_connector:delete(Id) + end, emqx_connector:list()). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -223,7 +243,6 @@ t_mqtt_conn_bridge_ingress(_) -> %% PUBLISH a message to the 'remote' broker, as we have only one broker, %% the remote broker is also the local one. emqx:publish(emqx_message:make(RemoteTopic, Payload)), - %% we should receive a message on the local broker, with specified topic ?assert( receive @@ -435,6 +454,150 @@ t_mqtt_conn_testing(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS }). +t_ingress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_INGRESS + }), + #{ <<"id">> := BridgeIDIngress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule get messages from a source mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], + <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + + RemoteTopic = <<"remote_topic/1">>, + LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + %% and also the rule should be matched, with matched + 1: + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we also check if the outputs of the rule is triggered + ?assertMatch(#{inspect := #{ + event := '$bridges/mqtt', + id := MsgId, + payload := Payload, + topic := RemoteTopic, + qos := 0, + dup := false, + retain := false, + pub_props := #{}, + timestamp := _ + }} when is_binary(MsgId), persistent_term:get(?MODULE)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + +t_egress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [BridgeIDEgress], + <<"sql">> => <<"SELECT * from \"t/1\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + emqx:unsubscribe(RemoteTopic), + + %% PUBLISH a message to the rule. + Payload2 = <<"hi">>, + RuleTopic = <<"t/1">>, + RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, + emqx:subscribe(RemoteTopic2), + emqx:publish(emqx_message:make(RuleTopic, Payload2)), + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic2, #message{payload = Payload2}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress + , <<"metrics">> := ?metrics(2, 2, 0, _, _, _) + , <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + }, jsx:decode(BridgeStr)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + %%-------------------------------------------------------------------- %% HTTP Request %%-------------------------------------------------------------------- diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 08c230401..ed1de18cf 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -25,7 +25,7 @@ mod := module(), config := resource_config(), state := resource_state(), - status := started | stopped, + status := started | stopped | starting, metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). @@ -41,3 +41,5 @@ %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback %% actions upon query failure -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. + +-define(TEST_ID_PREFIX, "_test_:"). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 37c4caa2e..5ec5fd92a 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -82,7 +82,6 @@ ]). -define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}). - -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -optional_callbacks([ on_query/4 @@ -170,7 +169,7 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - InstId = iolist_to_binary(emqx_misc:gen_id(16)), + InstId = emqx_resource_instance:make_test_id(), call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). -spec recreate(instance_id(), resource_type(), resource_config(), term()) -> @@ -201,14 +200,18 @@ query(InstId, Request) -> -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> case get_instance(InstId) of + {ok, #{status := starting}} -> + query_error(starting, <<"cannot serve query when the resource " + "instance is still starting">>); {ok, #{status := stopped}} -> - error({resource_stopped, InstId}); + query_error(stopped, <<"cannot serve query when the resource " + "instance is stopped">>); {ok, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe Mod:on_query(InstId, Request, AfterQuery, ResourceState); - {error, Reason} -> - error({get_instance, {InstId, Reason}}) + {error, not_found} -> + query_error(not_found, <<"the resource id not exists">>) end. -spec restart(instance_id()) -> ok | {error, Reason :: term()}. @@ -368,3 +371,6 @@ cluster_call(Func, Args) -> {ok, _TxnId, Result} -> Result; Failed -> Failed end. + +query_error(Reason, Msg) -> + {error, {?MODULE, #{reason => Reason, msg => Msg}}}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 745b8b684..c413691d9 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -26,6 +26,7 @@ -export([ lookup/1 , get_metrics/1 , list_all/0 + , make_test_id/0 ]). -export([ hash_call/2 @@ -61,7 +62,7 @@ hash_call(InstId, Request) -> hash_call(InstId, Request, Timeout) -> gen_server:call(pick(InstId), Request, Timeout). --spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. +-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}. lookup(InstId) -> case ets:lookup(emqx_resource_instance, InstId) of [] -> {error, not_found}; @@ -69,6 +70,10 @@ lookup(InstId) -> {ok, Data#{id => InstId, metrics => get_metrics(InstId)}} end. +make_test_id() -> + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + <>. + get_metrics(InstId) -> emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). @@ -146,7 +151,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> Config = emqx_resource:call_config_merge(ResourceType, OldConfig, NewConfig, Params), - TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), + TestInstId = make_test_id(), case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> do_remove(ResourceType, InstId, ResourceState, false), @@ -166,7 +171,9 @@ do_create(InstId, ResourceType, Config, Opts) -> {ok, _} -> {ok, already_created}; _ -> Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => stopped, state => undefined}, + status => starting, state => undefined}, + %% The `emqx_resource:call_start/3` need the instance exist beforehand + ets:insert(emqx_resource_instance, {InstId, Res0}), case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), @@ -181,6 +188,7 @@ do_create(InstId, ResourceType, Config, Opts) -> ets:insert(emqx_resource_instance, {InstId, Res0}), {ok, Res0}; {error, Reason} when ForceCreate == false -> + ets:delete(emqx_resource_instance, InstId), {error, Reason} end end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 6b2e5903e..4e9c35efc 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -96,9 +96,7 @@ t_query(_) -> ?assert(false) end, - ?assertException( - error, - {get_instance, _Reason}, + ?assertMatch({error, {emqx_resource, #{reason := not_found}}}, emqx_resource:query(<<"unknown">>, get_state)), ok = emqx_resource:remove_local(?ID). @@ -142,7 +140,8 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)), + ?assertMatch({error, {emqx_resource, #{reason := stopped}}}, + emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 60a7cbaad..049829c59 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -101,7 +101,7 @@ do_apply_rule(#{ true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), Collection2 = filter_collection(Input, InCase, DoEach, Collection), - {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]}; + {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> {error, nomatch} end; @@ -118,7 +118,7 @@ do_apply_rule(#{id := RuleId, {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), - {ok, handle_output_list(Outputs, Selected, Input)}; + {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> {error, nomatch} end. @@ -231,15 +231,17 @@ number(Bin) -> catch error:badarg -> binary_to_float(Bin) end. -handle_output_list(Outputs, Selected, Envs) -> - [handle_output(Out, Selected, Envs) || Out <- Outputs]. +handle_output_list(RuleId, Outputs, Selected, Envs) -> + [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. -handle_output(OutId, Selected, Envs) -> +handle_output(RuleId, OutId, Selected, Envs) -> try do_handle_output(OutId, Selected, Envs) catch Err:Reason:ST -> - ?SLOG(error, #{msg => "output_failed", + ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId), + Level = case Err of throw -> debug; _ -> error end, + ?SLOG(Level, #{msg => "output_failed", output => OutId, exception => Err, reason => Reason, diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 389c382f0..4b6a084eb 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -59,13 +59,13 @@ statsd(put, #{body := Body}) -> Body, #{rawconf_with_defaults => true, override_to => cluster}) of {ok, #{raw_config := NewConfig, config := Config}} -> + _ = emqx_statsd_sup:stop_child(?APP), case maps:get(<<"enable">>, Body) of true -> ok = emqx_statsd_sup:ensure_child_stopped(?APP), ok = emqx_statsd_sup:ensure_child_started(?APP, maps:get(config, Config)); false -> ok = emqx_statsd_sup:ensure_child_stopped(?APP) - end, {200, NewConfig}; {error, Reason} -> Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),