From 7f9a311988b3d3c8b42bd7e3f9c528c126288e7b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 Apr 2024 20:08:56 +0200 Subject: [PATCH] fix(rocketmq action): all actions used only one topic This commit makes sure that a set of rocketmq workers are started for each topic. Before this commit all actions for a rocketmq connector used the same workers which all were bound to a single topic so all messages got delivered to that topic regardless of the action configuration. We should have automatic tests to check that the messages actually go to different topics but this needs to go into another PR since we have to deliver the fix fast and the rocketmq library does not support reading from topics. Fixes: https://emqx.atlassian.net/browse/EEC-1006 --- .../rocketmq/conf/plain_acl.yml | 1 + .../src/emqx_bridge_rocketmq.app.src | 2 +- .../src/emqx_bridge_rocketmq_connector.erl | 9 +++- .../test/emqx_bridge_rocketmq_SUITE.erl | 54 +++++++++++++++++++ 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml index e2c41a87f..e78e47fe5 100644 --- a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml +++ b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml @@ -9,3 +9,4 @@ accounts: defaultGroupPerm: PUB|SUB topicPerms: - TopicTest=PUB|SUB + - Topic2=PUB|SUB diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index 564e36a88..1f001218c 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]}, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 1af520a93..bd5154df5 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -139,13 +139,14 @@ on_add_channel( ChannelId, ChannelConfig ) -> - {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), + {ok, ChannelState} = create_channel_state(ChannelId, ChannelConfig, ACLInfo), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. create_channel_state( + ChannelId, #{parameters := Conf} = _ChannelConfig, ACLInfo ) -> @@ -154,7 +155,7 @@ create_channel_state( sync_timeout := SyncTimeout } = Conf, TopicTks = emqx_placeholder:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Conf, ACLInfo), + ProducerOpts = make_producer_opts(ChannelId, Conf, ACLInfo), Templates = parse_template(Conf), State = #{ topic => Topic, @@ -349,6 +350,7 @@ is_sensitive_key(_) -> false. make_producer_opts( + ChannelId, #{ send_buffer := SendBuff, refresh_interval := RefreshInterval @@ -356,6 +358,9 @@ make_producer_opts( ACLInfo ) -> #{ + %% TODO: the name needs to be an atom but this may cause atom leak so we + %% should figure out a way to avoid this + name => binary_to_atom(ChannelId), tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, acl_info => emqx_secret:wrap(ACLInfo) diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index a056ae3d2..7af6c7eea 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -263,6 +263,60 @@ t_setup_via_http_api_and_publish(Config) -> ), ok. +t_setup_two_actions_via_http_api_and_publish(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), + RocketMQConf2 = RocketMQConf#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(RocketMQConf2) + ), + {ok, #{raw_config := ActionConf}} = emqx_bridge_v2:lookup(actions, BridgeType, Name), + Topic2 = <<"Topic2">>, + ActionConf2 = emqx_utils_maps:deep_force_put( + [<<"parameters">>, <<"topic">>], ActionConf, Topic2 + ), + Action2Name = atom_to_binary(?FUNCTION_NAME), + {ok, _} = emqx_bridge_v2:create(BridgeType, Action2Name, ActionConf2), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + Config2 = proplists:delete(rocketmq_name, Config), + Config3 = [{rocketmq_name, Action2Name} | Config2], + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config3, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + t_get_status(Config) -> ?assertMatch( {ok, _},