diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index f4af9445b..bd98c43d6 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 134ba15b6..b5b339ea9 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -317,17 +317,17 @@ handle_result(Res) -> make_channel(PoolName, ChannelId, Params) -> Conns = get_rabbitmq_connections(PoolName), - make_channel(Conns, PoolName, ChannelId, Params, #{}). + make_channel(Conns, ChannelId, Params, #{}). -make_channel([], _PoolName, _ChannelId, _Param, Acc) -> +make_channel([], _ChannelId, _Param, Acc) -> {ok, Acc}; -make_channel([Conn | Conns], PoolName, ChannelId, Params, Acc) -> +make_channel([Conn | Conns], ChannelId, Params, Acc) -> maybe {ok, RabbitMQChannel} ?= amqp_connection:open_channel(Conn), ok ?= try_confirm_channel(Params, RabbitMQChannel), - ok ?= try_subscribe(Params, RabbitMQChannel, PoolName, ChannelId), + ok ?= try_subscribe(Params, RabbitMQChannel, ChannelId), NewAcc = Acc#{Conn => RabbitMQChannel}, - make_channel(Conns, PoolName, ChannelId, Params, NewAcc) + make_channel(Conns, ChannelId, Params, NewAcc) end. %% We need to enable confirmations if we want to wait for them @@ -396,16 +396,15 @@ get_rabbitmq_connections(PoolName) -> try_subscribe( #{queue := Queue, no_ack := NoAck, config_root := sources} = Params, RabbitChan, - PoolName, ChannelId ) -> - WorkState = {RabbitChan, PoolName, Params}, + WorkState = {RabbitChan, ChannelId, Params}, {ok, ConsumePid} = emqx_bridge_rabbitmq_sup:ensure_started(ChannelId, WorkState), BasicConsume = #'basic.consume'{queue = Queue, no_ack = NoAck}, #'basic.consume_ok'{consumer_tag = _} = amqp_channel:subscribe(RabbitChan, BasicConsume, ConsumePid), ok; -try_subscribe(#{config_root := actions}, _RabbitChan, _PoolName, _ChannelId) -> +try_subscribe(#{config_root := actions}, _RabbitChan, _ChannelId) -> ok. try_unsubscribe(ChannelId, Channels) -> diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl index 711eab013..1a44c5f63 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -189,9 +190,35 @@ t_source(Config) -> receive_messages(1) ), ok = emqtt:disconnect(C1), + InstanceId = instance_id(sources, Name), + #{counters := Counters} = emqx_resource:get_metrics(InstanceId), ok = delete_source(Name), SourcesAfterDelete = emqx_bridge_v2:list(sources), ?assertNot(lists:any(Any, SourcesAfterDelete), SourcesAfterDelete), + ?assertMatch( + #{ + dropped := 0, + success := 0, + matched := 0, + failed := 0, + received := 1 + }, + Counters + ), + ok. + +t_source_probe(_Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + Source = rabbitmq_source(), + {ok, Res0} = probe_bridge_api(Name, "sources_probe", Source), + ?assertMatch({{_, 204, _}, _, _}, Res0), + ok. + +t_action_probe(_Config) -> + Name = atom_to_binary(?FUNCTION_NAME), + Action = rabbitmq_action(), + {ok, Res0} = probe_bridge_api(Name, "actions_probe", Action), + ?assertMatch({{_, 204, _}, _, _}, Res0), ok. t_action(Config) -> @@ -218,9 +245,21 @@ t_action(Config) -> Msg = receive_message_from_rabbitmq(Config), ?assertMatch(Payload, Msg), ok = emqtt:disconnect(C1), + InstanceId = instance_id(actions, Name), + #{counters := Counters} = emqx_resource:get_metrics(InstanceId), ok = delete_action(Name), ActionsAfterDelete = emqx_bridge_v2:list(actions), ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), + ?assertMatch( + #{ + dropped := 0, + success := 0, + matched := 1, + failed := 0, + received := 0 + }, + Counters + ), ok. receive_messages(Count) -> @@ -261,3 +300,31 @@ send_test_message_to_rabbitmq(Config) -> } ), ok. + +probe_bridge_api(Name, PathStr, Config) -> + Params = Config#{<<"type">> => ?TYPE, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path([PathStr]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("probing bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> + {ok, Res0}; + {error, {Status, Headers, Body0}} -> + {error, {Status, Headers, emqx_bridge_testlib:try_decode_error(Body0)}}; + Error -> + Error + end, + ct:pal("bridge probe result: ~p", [Res]), + Res. + +instance_id(Type, Name) -> + ConnectorId = emqx_bridge_resource:resource_id(Type, ?TYPE, Name), + BridgeId = emqx_bridge_resource:bridge_id(?TYPE, Name), + TypeBin = + case Type of + sources -> <<"source:">>; + actions -> <<"action:">> + end, + <>.