Merge pull request #12882 from kjellwinblad/kjell/fix/rocketmq_same_topic/EEC-1006

fix(rocketmq action): all actions used only one topic
This commit is contained in:
Ivan Dyachkov 2024-04-15 22:03:57 +02:00 committed by GitHub
commit 091aa71a6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 74 additions and 11 deletions

View File

@ -9,3 +9,4 @@ accounts:
defaultGroupPerm: PUB|SUB
topicPerms:
- TopicTest=PUB|SUB
- Topic2=PUB|SUB

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]},

View File

@ -202,7 +202,7 @@ on_stop(InstanceId, _State) ->
({_, client_id, ClientId}) ->
destory_producers_map(ClientId),
ok = rocketmq:stop_and_delete_supervised_client(ClientId);
({_, _Topic, Producer}) ->
({_, _ProducerGroup, Producer}) ->
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
end,
emqx_resource:get_allocated_resources_list(InstanceId)
@ -258,7 +258,7 @@ do_query(
Data = apply_template(Query, Templates),
Result = safe_do_produce(
InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
),
case Result of
{error, Reason} ->
@ -284,9 +284,11 @@ do_query(
get_channel_id({ChannelId, _}) -> ChannelId;
get_channel_id([{ChannelId, _} | _]) -> ChannelId.
safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) ->
safe_do_produce(
ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
) ->
try
Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts),
Producers = get_producers(ChannelId, InstanceId, ClientId, TopicKey, ProducerOpts),
produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout)
catch
_Type:Reason ->
@ -391,16 +393,21 @@ destory_producers_map(ClientId) ->
ets:delete(Tid)
end.
get_producers(InstanceId, ClientId, Topic, ProducerOpts) ->
case ets:lookup(ClientId, Topic) of
get_producers(ChannelId, InstanceId, ClientId, Topic, ProducerOpts) ->
%% The topic need to be included in the name since we can have multiple
%% topics per channel due to templating.
ProducerGroup = iolist_to_binary([ChannelId, "_", Topic]),
case ets:lookup(ClientId, ProducerGroup) of
[{_, Producers}] ->
Producers;
_ ->
ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic]),
%% TODO: the name needs to be an atom but this may cause atom leak so we
%% should figure out a way to avoid this
ProducerOpts2 = ProducerOpts#{name => binary_to_atom(ProducerGroup)},
{ok, Producers} = rocketmq:ensure_supervised_producers(
ClientId, ProducerGroup, Topic, ProducerOpts
ClientId, ProducerGroup, Topic, ProducerOpts2
),
ok = emqx_resource:allocate_resource(InstanceId, Topic, Producers),
ets:insert(ClientId, {Topic, Producers}),
ok = emqx_resource:allocate_resource(InstanceId, ProducerGroup, Producers),
ets:insert(ClientId, {ProducerGroup, Producers}),
Producers
end.

View File

@ -263,6 +263,60 @@ t_setup_via_http_api_and_publish(Config) ->
),
ok.
t_setup_two_actions_via_http_api_and_publish(Config) ->
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
RocketMQConf = ?GET_CONFIG(rocketmq_config, Config),
RocketMQConf2 = RocketMQConf#{
<<"name">> => Name,
<<"type">> => BridgeType
},
?assertMatch(
{ok, _},
create_bridge_http(RocketMQConf2)
),
{ok, #{raw_config := ActionConf}} = emqx_bridge_v2:lookup(actions, BridgeType, Name),
Topic2 = <<"Topic2">>,
ActionConf2 = emqx_utils_maps:deep_force_put(
[<<"parameters">>, <<"topic">>], ActionConf, Topic2
),
Action2Name = atom_to_binary(?FUNCTION_NAME),
{ok, _} = emqx_bridge_v2:create(BridgeType, Action2Name, ActionConf2),
SentData = #{payload => ?PAYLOAD},
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
Config2 = proplists:delete(rocketmq_name, Config),
Config3 = [{rocketmq_name, Action2Name} | Config2],
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config3, SentData)),
#{?snk_kind := rocketmq_connector_query_return},
10_000
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(rocketmq_connector_query_return, Trace0),
?assertMatch([#{result := ok}], Trace),
ok
end
),
ok.
t_get_status(Config) ->
?assertMatch(
{ok, _},

View File

@ -0,0 +1 @@
The RocketMQ action has been fixed so that the topic configiuration works correctly. If more than one action used a single connector before this fix, all actions messages got delivered to the topic that was used first.