From 7f9a311988b3d3c8b42bd7e3f9c528c126288e7b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 Apr 2024 20:08:56 +0200 Subject: [PATCH 1/4] fix(rocketmq action): all actions used only one topic This commit makes sure that a set of rocketmq workers are started for each topic. Before this commit all actions for a rocketmq connector used the same workers which all were bound to a single topic so all messages got delivered to that topic regardless of the action configuration. We should have automatic tests to check that the messages actually go to different topics but this needs to go into another PR since we have to deliver the fix fast and the rocketmq library does not support reading from topics. Fixes: https://emqx.atlassian.net/browse/EEC-1006 --- .../rocketmq/conf/plain_acl.yml | 1 + .../src/emqx_bridge_rocketmq.app.src | 2 +- .../src/emqx_bridge_rocketmq_connector.erl | 9 +++- .../test/emqx_bridge_rocketmq_SUITE.erl | 54 +++++++++++++++++++ 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml index e2c41a87f..e78e47fe5 100644 --- a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml +++ b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml @@ -9,3 +9,4 @@ accounts: defaultGroupPerm: PUB|SUB topicPerms: - TopicTest=PUB|SUB + - Topic2=PUB|SUB diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index 564e36a88..1f001218c 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]}, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 1af520a93..bd5154df5 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -139,13 +139,14 @@ on_add_channel( ChannelId, ChannelConfig ) -> - {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), + {ok, ChannelState} = create_channel_state(ChannelId, ChannelConfig, ACLInfo), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. create_channel_state( + ChannelId, #{parameters := Conf} = _ChannelConfig, ACLInfo ) -> @@ -154,7 +155,7 @@ create_channel_state( sync_timeout := SyncTimeout } = Conf, TopicTks = emqx_placeholder:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Conf, ACLInfo), + ProducerOpts = make_producer_opts(ChannelId, Conf, ACLInfo), Templates = parse_template(Conf), State = #{ topic => Topic, @@ -349,6 +350,7 @@ is_sensitive_key(_) -> false. make_producer_opts( + ChannelId, #{ send_buffer := SendBuff, refresh_interval := RefreshInterval @@ -356,6 +358,9 @@ make_producer_opts( ACLInfo ) -> #{ + %% TODO: the name needs to be an atom but this may cause atom leak so we + %% should figure out a way to avoid this + name => binary_to_atom(ChannelId), tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, acl_info => emqx_secret:wrap(ACLInfo) diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index a056ae3d2..7af6c7eea 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -263,6 +263,60 @@ t_setup_via_http_api_and_publish(Config) -> ), ok. +t_setup_two_actions_via_http_api_and_publish(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), + RocketMQConf2 = RocketMQConf#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(RocketMQConf2) + ), + {ok, #{raw_config := ActionConf}} = emqx_bridge_v2:lookup(actions, BridgeType, Name), + Topic2 = <<"Topic2">>, + ActionConf2 = emqx_utils_maps:deep_force_put( + [<<"parameters">>, <<"topic">>], ActionConf, Topic2 + ), + Action2Name = atom_to_binary(?FUNCTION_NAME), + {ok, _} = emqx_bridge_v2:create(BridgeType, Action2Name, ActionConf2), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + Config2 = proplists:delete(rocketmq_name, Config), + Config3 = [{rocketmq_name, Action2Name} | Config2], + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config3, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + t_get_status(Config) -> ?assertMatch( {ok, _}, From 2fe36776b5cd15c4328e00c0beb593018ab1da82 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 Apr 2024 20:21:16 +0200 Subject: [PATCH 2/4] docs: add change log entry --- changes/ee/fix-12882.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/fix-12882.en.md diff --git a/changes/ee/fix-12882.en.md b/changes/ee/fix-12882.en.md new file mode 100644 index 000000000..804665fef --- /dev/null +++ b/changes/ee/fix-12882.en.md @@ -0,0 +1 @@ +The RocketMQ action has been fixed so that the topic configiuration works correctly. If more than one action used a single connector before this fix, all actions messages got delivered to the topic that was used first. From 1fe92bddd032fece0c10a45994166a659af058af Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 Apr 2024 20:37:23 +0200 Subject: [PATCH 3/4] fix(rocketmq action): make sure that topic template is respected --- .../src/emqx_bridge_rocketmq_connector.erl | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index bd5154df5..6a7b8d4bc 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -139,14 +139,13 @@ on_add_channel( ChannelId, ChannelConfig ) -> - {ok, ChannelState} = create_channel_state(ChannelId, ChannelConfig, ACLInfo), + {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. create_channel_state( - ChannelId, #{parameters := Conf} = _ChannelConfig, ACLInfo ) -> @@ -155,7 +154,7 @@ create_channel_state( sync_timeout := SyncTimeout } = Conf, TopicTks = emqx_placeholder:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(ChannelId, Conf, ACLInfo), + ProducerOpts = make_producer_opts(Conf, ACLInfo), Templates = parse_template(Conf), State = #{ topic => Topic, @@ -203,7 +202,7 @@ on_stop(InstanceId, _State) -> ({_, client_id, ClientId}) -> destory_producers_map(ClientId), ok = rocketmq:stop_and_delete_supervised_client(ClientId); - ({_, _Topic, Producer}) -> + ({_, _ChannelId, Producer}) -> _ = rocketmq:stop_and_delete_supervised_producers(Producer) end, emqx_resource:get_allocated_resources_list(InstanceId) @@ -259,7 +258,7 @@ do_query( Data = apply_template(Query, Templates), Result = safe_do_produce( - InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout + ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout ), case Result of {error, Reason} -> @@ -285,9 +284,11 @@ do_query( get_channel_id({ChannelId, _}) -> ChannelId; get_channel_id([{ChannelId, _} | _]) -> ChannelId. -safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> +safe_do_produce( + ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout +) -> try - Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts), + Producers = get_producers(ChannelId, InstanceId, ClientId, TopicKey, ProducerOpts), produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) catch _Type:Reason -> @@ -350,7 +351,6 @@ is_sensitive_key(_) -> false. make_producer_opts( - ChannelId, #{ send_buffer := SendBuff, refresh_interval := RefreshInterval @@ -358,9 +358,6 @@ make_producer_opts( ACLInfo ) -> #{ - %% TODO: the name needs to be an atom but this may cause atom leak so we - %% should figure out a way to avoid this - name => binary_to_atom(ChannelId), tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, acl_info => emqx_secret:wrap(ACLInfo) @@ -396,16 +393,19 @@ destory_producers_map(ClientId) -> ets:delete(Tid) end. -get_producers(InstanceId, ClientId, Topic, ProducerOpts) -> - case ets:lookup(ClientId, Topic) of +get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) -> + case ets:lookup(ClientId, ChannelId) of [{_, Producers}] -> Producers; _ -> - ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]), + ProducerGroup = ChannelId, + %% TODO: the name needs to be an atom but this may cause atom leak so we + %% should figure out a way to avoid this + ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ChannelId)}, {ok, Producers} = rocketmq:ensure_supervised_producers( - ClientId, ProducerGroup, Topic, ProducerOpts + ClientId, ProducerGroup, Topic, ProducerOpts2 ), - ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers), - ets:insert(ClientId, {Topic, Producers}), + ok = emqx_resource:allocate_resource(InstanceId, ChannelId, Producers), + ets:insert(ClientId, {ChannelId, Producers}), Producers end. From f4818717926765ad5f2846d7ee17b3f6daa16846 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 15 Apr 2024 20:52:08 +0200 Subject: [PATCH 4/4] fix(rocketmq action): we need one producer group per channel and topic We need one producer group per channel and topic because we can have several topics per channel due to templating. --- .../src/emqx_bridge_rocketmq_connector.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 6a7b8d4bc..011d4074f 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -202,7 +202,7 @@ on_stop(InstanceId, _State) -> ({_, client_id, ClientId}) -> destory_producers_map(ClientId), ok = rocketmq:stop_and_delete_supervised_client(ClientId); - ({_, _ChannelId, Producer}) -> + ({_, _ProducerGroup, Producer}) -> _ = rocketmq:stop_and_delete_supervised_producers(Producer) end, emqx_resource:get_allocated_resources_list(InstanceId) @@ -394,18 +394,20 @@ destory_producers_map(ClientId) -> end. get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) -> - case ets:lookup(ClientId, ChannelId) of + %% The topic need to be included in the name since we can have multiple + %% topics per channel due to templating. + ProducerGroup = iolist_to_binary([ChannelId, "_", Topic]), + case ets:lookup(ClientId, ProducerGroup) of [{_, Producers}] -> Producers; _ -> - ProducerGroup = ChannelId, %% TODO: the name needs to be an atom but this may cause atom leak so we %% should figure out a way to avoid this - ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ChannelId)}, + ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ProducerGroup)}, {ok, Producers} = rocketmq:ensure_supervised_producers( ClientId, ProducerGroup, Topic, ProducerOpts2 ), - ok = emqx_resource:allocate_resource(InstanceId, ChannelId, Producers), - ets:insert(ClientId, {ChannelId, Producers}), + ok = emqx_resource:allocate_resource(InstanceId, ProducerGroup, Producers), + ets:insert(ClientId, {ProducerGroup, Producers}), Producers end.