fix(rocketmq action): all actions used only one topic

This commit makes sure that a set of rocketmq workers are started for
each topic. Before this commit all actions for a rocketmq connector used
the same workers which all were bound to a single topic so all messages
got delivered to that topic regardless of the action configuration.

We should have automatic tests to check that the messages actually go to
different topics but this needs to go into another PR since we have to
deliver the fix fast and the rocketmq library does not support reading
from topics.

Fixes:
https://emqx.atlassian.net/browse/EEC-1006
This commit is contained in:
Kjell Winblad 2024-04-15 20:08:56 +02:00
parent 76aa4cf434
commit 7f9a311988
4 changed files with 63 additions and 3 deletions

View File

@ -9,3 +9,4 @@ accounts:
defaultGroupPerm: PUB|SUB defaultGroupPerm: PUB|SUB
topicPerms: topicPerms:
- TopicTest=PUB|SUB - TopicTest=PUB|SUB
- Topic2=PUB|SUB

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [ {application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.5"}, {vsn, "0.1.6"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]}, {env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]},

View File

@ -139,13 +139,14 @@ on_add_channel(
ChannelId, ChannelId,
ChannelConfig ChannelConfig
) -> ) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), {ok, ChannelState} = create_channel_state(ChannelId, ChannelConfig, ACLInfo),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state %% Update state
NewState = OldState#{installed_channels => NewInstalledChannels}, NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}. {ok, NewState}.
create_channel_state( create_channel_state(
ChannelId,
#{parameters := Conf} = _ChannelConfig, #{parameters := Conf} = _ChannelConfig,
ACLInfo ACLInfo
) -> ) ->
@ -154,7 +155,7 @@ create_channel_state(
sync_timeout := SyncTimeout sync_timeout := SyncTimeout
} = Conf, } = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic), TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo), ProducerOpts = make_producer_opts(ChannelId, Conf, ACLInfo),
Templates = parse_template(Conf), Templates = parse_template(Conf),
State = #{ State = #{
topic => Topic, topic => Topic,
@ -349,6 +350,7 @@ is_sensitive_key(_) ->
false. false.
make_producer_opts( make_producer_opts(
ChannelId,
#{ #{
send_buffer := SendBuff, send_buffer := SendBuff,
refresh_interval := RefreshInterval refresh_interval := RefreshInterval
@ -356,6 +358,9 @@ make_producer_opts(
ACLInfo ACLInfo
) -> ) ->
#{ #{
%% TODO: the name needs to be an atom but this may cause atom leak so we
%% should figure out a way to avoid this
name => binary_to_atom(ChannelId),
tcp_opts => [{sndbuf, SendBuff}], tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval, ref_topic_route_interval => RefreshInterval,
acl_info => emqx_secret:wrap(ACLInfo) acl_info => emqx_secret:wrap(ACLInfo)

View File

@ -263,6 +263,60 @@ t_setup_via_http_api_and_publish(Config) ->
), ),
ok. ok.
t_setup_two_actions_via_http_api_and_publish(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf = ?GET_CONFIG(rocketmq_config, Config),
RocketMQConf2 = RocketMQConf#{
<<"name">> => Name,
<<"type">> => BridgeType
},
?assertMatch(
{ok, _},
create_bridge_http(RocketMQConf2)
),
{ok, #{raw_config := ActionConf}} = emqx_bridge_v2:lookup(actions, BridgeType, Name),
Topic2 = <<"Topic2">>,
ActionConf2 = emqx_utils_maps:deep_force_put(
[<<"parameters">>, <<"topic">>], ActionConf, Topic2
),
Action2Name = atom_to_binary(?FUNCTION_NAME),
{ok, _} = emqx_bridge_v2:create(BridgeType, Action2Name, ActionConf2),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
Config2 = proplists:delete(rocketmq_name, Config),
Config3 = [{rocketmq_name, Action2Name} | Config2],
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config3, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
ok.
t_get_status(Config) -> t_get_status(Config) ->
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},