diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index b5b339ea9..1ef1c6617 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -256,6 +256,38 @@ publish_messages( publish_confirmation_timeout := PublishConfirmationTimeout }, Messages +) -> + try + publish_messages( + Conn, + RabbitMQ, + DeliveryMode, + Exchange, + RoutingKey, + PayloadTmpl, + Messages, + WaitForPublishConfirmations, + PublishConfirmationTimeout + ) + catch + %% if send a message to a non-existent exchange, RabbitMQ client will crash + %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>} + %% so we catch and return {recoverable_error, Reason} to increase metrics + _Type:Reason -> + Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])), + erlang:error({recoverable_error, Msg}) + end. + +publish_messages( + Conn, + RabbitMQ, + DeliveryMode, + Exchange, + RoutingKey, + PayloadTmpl, + Messages, + WaitForPublishConfirmations, + PublishConfirmationTimeout ) -> case maps:find(Conn, RabbitMQ) of {ok, Channel} -> diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl index 1a44c5f63..cf68c20e6 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl @@ -95,6 +95,9 @@ rabbitmq_source() -> parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name). rabbitmq_action() -> + rabbitmq_action(rabbit_mq_exchange()). + +rabbitmq_action(Exchange) -> Name = atom_to_binary(?MODULE), Action = #{ <<"actions">> => #{ @@ -103,12 +106,16 @@ rabbitmq_action() -> <<"connector">> => Name, <<"enable">> => true, <<"parameters">> => #{ - <<"exchange">> => rabbit_mq_exchange(), + <<"exchange">> => Exchange, <<"payload_template">> => <<"${.payload}">>, <<"routing_key">> => rabbit_mq_routing_key(), <<"delivery_mode">> => <<"non_persistent">>, <<"publish_confirmation_timeout">> => <<"30s">>, <<"wait_for_publish_confirmations">> => true + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"1s">>, + <<"inflight_window">> => 1 } } } @@ -131,7 +138,10 @@ delete_source(Name) -> ok = emqx_bridge_v2:remove(sources, ?TYPE, Name). create_action(Name) -> - Action = rabbitmq_action(), + create_action(Name, rabbit_mq_exchange()). + +create_action(Name, Exchange) -> + Action = rabbitmq_action(Exchange), {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action). delete_action(Name) -> @@ -210,14 +220,14 @@ t_source(Config) -> t_source_probe(_Config) -> Name = atom_to_binary(?FUNCTION_NAME), Source = rabbitmq_source(), - {ok, Res0} = probe_bridge_api(Name, "sources_probe", Source), + {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(source, ?TYPE, Name, Source), ?assertMatch({{_, 204, _}, _, _}, Res0), ok. t_action_probe(_Config) -> Name = atom_to_binary(?FUNCTION_NAME), Action = rabbitmq_action(), - {ok, Res0} = probe_bridge_api(Name, "actions_probe", Action), + {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action), ?assertMatch({{_, 204, _}, _, _}, Res0), ok. @@ -262,6 +272,95 @@ t_action(Config) -> ), ok. +t_action_not_exist_exchange(_Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + create_action(Name, <<"not_exist_exchange">>), + Actions = emqx_bridge_v2:list(actions), + Any = fun(#{name := BName}) -> BName =:= Name end, + ?assert(lists:any(Any, Actions), Actions), + Topic = <<"lkadfaodik">>, + {ok, #{id := RuleId}} = emqx_rule_engine:create_rule( + #{ + sql => <<"select * from \"", Topic/binary, "\"">>, + id => atom_to_binary(?FUNCTION_NAME), + actions => [<<"rabbitmq:", Name/binary>>], + description => <<"bridge_v2 send msg to rabbitmq action failed">> + } + ), + on_exit(fun() -> + emqx_rule_engine:delete_rule(RuleId), + _ = delete_action(Name) + end), + {ok, C1} = emqtt:start_link([{clean_start, true}]), + {ok, _} = emqtt:connect(C1), + Payload = payload(), + PayloadBin = emqx_utils_json:encode(Payload), + {ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]), + ok = emqtt:disconnect(C1), + InstanceId = instance_id(actions, Name), + waiting_for_disconnected_alarms(InstanceId), + waiting_for_dropped_count(InstanceId), + #{counters := Counters} = emqx_resource:get_metrics(InstanceId), + %% dropped + 1 + ?assertMatch( + #{ + dropped := 1, + success := 0, + matched := 1, + failed := 0, + received := 0 + }, + Counters + ), + ok = delete_action(Name), + ActionsAfterDelete = emqx_bridge_v2:list(actions), + ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ok. + +waiting_for_disconnected_alarms(InstanceId) -> + waiting_for_disconnected_alarms(InstanceId, 0). + +waiting_for_disconnected_alarms(_InstanceId, 200) -> + throw(not_receive_disconnect_alarm); +waiting_for_disconnected_alarms(InstanceId, Count) -> + case emqx_alarm:get_alarms(activated) of + [] -> + ct:sleep(100), + waiting_for_disconnected_alarms(InstanceId, Count + 1); + [Alarm] -> + ?assertMatch( + #{ + message := + <<"resource down: #{error => not_connected,status => disconnected}">>, + name := InstanceId + }, + Alarm + ) + end. + +waiting_for_dropped_count(InstanceId) -> + waiting_for_dropped_count(InstanceId, 0). + +waiting_for_dropped_count(_InstanceId, 400) -> + throw(not_receive_dropped_count); +waiting_for_dropped_count(InstanceId, Count) -> + #{ + counters := #{ + dropped := Dropped, + success := 0, + matched := 1, + failed := 0, + received := 0 + } + } = emqx_resource:get_metrics(InstanceId), + case Dropped of + 1 -> + ok; + 0 -> + ct:sleep(400), + waiting_for_dropped_count(InstanceId, Count + 1) + end. + receive_messages(Count) -> receive_messages(Count, []). receive_messages(0, Msgs) -> @@ -301,24 +400,6 @@ send_test_message_to_rabbitmq(Config) -> ), ok. -probe_bridge_api(Name, PathStr, Config) -> - Params = Config#{<<"type">> => ?TYPE, <<"name">> => Name}, - Path = emqx_mgmt_api_test_util:api_path([PathStr]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - Opts = #{return_all => true}, - ct:pal("probing bridge (via http): ~p", [Params]), - Res = - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of - {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> - {ok, Res0}; - {error, {Status, Headers, Body0}} -> - {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}}; - Error -> - Error - end, - ct:pal("bridge probe result: ~p", [Res]), - Res. - instance_id(Type, Name) -> ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name),