Merge pull request #12569 from zhongwencool/fix-rabbitmq-action-crash-dropped-metrics

fix: rabbitmq action failed to increase dropped metrics
This commit is contained in:
zhongwencool 2024-02-23 09:53:55 +08:00 committed by GitHub
commit 11d3df2775
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 135 additions and 22 deletions

View File

@ -256,6 +256,38 @@ publish_messages(
publish_confirmation_timeout := PublishConfirmationTimeout publish_confirmation_timeout := PublishConfirmationTimeout
}, },
Messages Messages
) ->
try
publish_messages(
Conn,
RabbitMQ,
DeliveryMode,
Exchange,
RoutingKey,
PayloadTmpl,
Messages,
WaitForPublishConfirmations,
PublishConfirmationTimeout
)
catch
%% if send a message to a non-existent exchange, RabbitMQ client will crash
%% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>}
%% so we catch and return {recoverable_error, Reason} to increase metrics
_Type:Reason ->
Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])),
erlang:error({recoverable_error, Msg})
end.
publish_messages(
Conn,
RabbitMQ,
DeliveryMode,
Exchange,
RoutingKey,
PayloadTmpl,
Messages,
WaitForPublishConfirmations,
PublishConfirmationTimeout
) -> ) ->
case maps:find(Conn, RabbitMQ) of case maps:find(Conn, RabbitMQ) of
{ok, Channel} -> {ok, Channel} ->

View File

@ -95,6 +95,9 @@ rabbitmq_source() ->
parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name). parse_and_check(<<"sources">>, emqx_bridge_v2_schema, Source, Name).
rabbitmq_action() -> rabbitmq_action() ->
rabbitmq_action(rabbit_mq_exchange()).
rabbitmq_action(Exchange) ->
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
Action = #{ Action = #{
<<"actions">> => #{ <<"actions">> => #{
@ -103,12 +106,16 @@ rabbitmq_action() ->
<<"connector">> => Name, <<"connector">> => Name,
<<"enable">> => true, <<"enable">> => true,
<<"parameters">> => #{ <<"parameters">> => #{
<<"exchange">> => rabbit_mq_exchange(), <<"exchange">> => Exchange,
<<"payload_template">> => <<"${.payload}">>, <<"payload_template">> => <<"${.payload}">>,
<<"routing_key">> => rabbit_mq_routing_key(), <<"routing_key">> => rabbit_mq_routing_key(),
<<"delivery_mode">> => <<"non_persistent">>, <<"delivery_mode">> => <<"non_persistent">>,
<<"publish_confirmation_timeout">> => <<"30s">>, <<"publish_confirmation_timeout">> => <<"30s">>,
<<"wait_for_publish_confirmations">> => true <<"wait_for_publish_confirmations">> => true
},
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"1s">>,
<<"inflight_window">> => 1
} }
} }
} }
@ -131,7 +138,10 @@ delete_source(Name) ->
ok = emqx_bridge_v2:remove(sources, ?TYPE, Name). ok = emqx_bridge_v2:remove(sources, ?TYPE, Name).
create_action(Name) -> create_action(Name) ->
Action = rabbitmq_action(), create_action(Name, rabbit_mq_exchange()).
create_action(Name, Exchange) ->
Action = rabbitmq_action(Exchange),
{ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action). {ok, _} = emqx_bridge_v2:create(actions, ?TYPE, Name, Action).
delete_action(Name) -> delete_action(Name) ->
@ -210,14 +220,14 @@ t_source(Config) ->
t_source_probe(_Config) -> t_source_probe(_Config) ->
Name = atom_to_binary(?FUNCTION_NAME), Name = atom_to_binary(?FUNCTION_NAME),
Source = rabbitmq_source(), Source = rabbitmq_source(),
{ok, Res0} = probe_bridge_api(Name, "sources_probe", Source), {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(source, ?TYPE, Name, Source),
?assertMatch({{_, 204, _}, _, _}, Res0), ?assertMatch({{_, 204, _}, _, _}, Res0),
ok. ok.
t_action_probe(_Config) -> t_action_probe(_Config) ->
Name = atom_to_binary(?FUNCTION_NAME), Name = atom_to_binary(?FUNCTION_NAME),
Action = rabbitmq_action(), Action = rabbitmq_action(),
{ok, Res0} = probe_bridge_api(Name, "actions_probe", Action), {ok, Res0} = emqx_bridge_v2_testlib:probe_bridge_api(action, ?TYPE, Name, Action),
?assertMatch({{_, 204, _}, _, _}, Res0), ?assertMatch({{_, 204, _}, _, _}, Res0),
ok. ok.
@ -262,6 +272,95 @@ t_action(Config) ->
), ),
ok. ok.
t_action_not_exist_exchange(_Config) ->
Name = atom_to_binary(?FUNCTION_NAME),
create_action(Name, <<"not_exist_exchange">>),
Actions = emqx_bridge_v2:list(actions),
Any = fun(#{name := BName}) -> BName =:= Name end,
?assert(lists:any(Any, Actions), Actions),
Topic = <<"lkadfaodik">>,
{ok, #{id := RuleId}} = emqx_rule_engine:create_rule(
#{
sql => <<"select * from \"", Topic/binary, "\"">>,
id => atom_to_binary(?FUNCTION_NAME),
actions => [<<"rabbitmq:", Name/binary>>],
description => <<"bridge_v2 send msg to rabbitmq action failed">>
}
),
on_exit(fun() ->
emqx_rule_engine:delete_rule(RuleId),
_ = delete_action(Name)
end),
{ok, C1} = emqtt:start_link([{clean_start, true}]),
{ok, _} = emqtt:connect(C1),
Payload = payload(),
PayloadBin = emqx_utils_json:encode(Payload),
{ok, _} = emqtt:publish(C1, Topic, #{}, PayloadBin, [{qos, 1}, {retain, false}]),
ok = emqtt:disconnect(C1),
InstanceId = instance_id(actions, Name),
waiting_for_disconnected_alarms(InstanceId),
waiting_for_dropped_count(InstanceId),
#{counters := Counters} = emqx_resource:get_metrics(InstanceId),
%% dropped + 1
?assertMatch(
#{
dropped := 1,
success := 0,
matched := 1,
failed := 0,
received := 0
},
Counters
),
ok = delete_action(Name),
ActionsAfterDelete = emqx_bridge_v2:list(actions),
?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
ok.
waiting_for_disconnected_alarms(InstanceId) ->
waiting_for_disconnected_alarms(InstanceId, 0).
waiting_for_disconnected_alarms(_InstanceId, 200) ->
throw(not_receive_disconnect_alarm);
waiting_for_disconnected_alarms(InstanceId, Count) ->
case emqx_alarm:get_alarms(activated) of
[] ->
ct:sleep(100),
waiting_for_disconnected_alarms(InstanceId, Count + 1);
[Alarm] ->
?assertMatch(
#{
message :=
<<"resource down: #{error => not_connected,status => disconnected}">>,
name := InstanceId
},
Alarm
)
end.
waiting_for_dropped_count(InstanceId) ->
waiting_for_dropped_count(InstanceId, 0).
waiting_for_dropped_count(_InstanceId, 400) ->
throw(not_receive_dropped_count);
waiting_for_dropped_count(InstanceId, Count) ->
#{
counters := #{
dropped := Dropped,
success := 0,
matched := 1,
failed := 0,
received := 0
}
} = emqx_resource:get_metrics(InstanceId),
case Dropped of
1 ->
ok;
0 ->
ct:sleep(400),
waiting_for_dropped_count(InstanceId, Count + 1)
end.
receive_messages(Count) -> receive_messages(Count) ->
receive_messages(Count, []). receive_messages(Count, []).
receive_messages(0, Msgs) -> receive_messages(0, Msgs) ->
@ -301,24 +400,6 @@ send_test_message_to_rabbitmq(Config) ->
), ),
ok. ok.
probe_bridge_api(Name, PathStr, Config) ->
Params = Config#{<<"type">> => ?TYPE, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path([PathStr]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("probing bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} ->
{ok, Res0};
{error, {Status, Headers, Body0}} ->
{error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}};
Error ->
Error
end,
ct:pal("bridge probe result: ~p", [Res]),
Res.
instance_id(Type, Name) -> instance_id(Type, Name) ->
ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name),
BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name), BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name),