diff --git a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml index e2c41a87f..e78e47fe5 100644 --- a/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml +++ b/.ci/docker-compose-file/rocketmq/conf/plain_acl.yml @@ -9,3 +9,4 @@ accounts: defaultGroupPerm: PUB|SUB topicPerms: - TopicTest=PUB|SUB + - Topic2=PUB|SUB diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index 564e36a88..1f001218c 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]}, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 1af520a93..bd5154df5 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -139,13 +139,14 @@ on_add_channel( ChannelId, ChannelConfig ) -> - {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), + {ok, ChannelState} = create_channel_state(ChannelId, ChannelConfig, ACLInfo), NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), %% Update state NewState = OldState#{installed_channels => NewInstalledChannels}, {ok, NewState}. create_channel_state( + ChannelId, #{parameters := Conf} = _ChannelConfig, ACLInfo ) -> @@ -154,7 +155,7 @@ create_channel_state( sync_timeout := SyncTimeout } = Conf, TopicTks = emqx_placeholder:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Conf, ACLInfo), + ProducerOpts = make_producer_opts(ChannelId, Conf, ACLInfo), Templates = parse_template(Conf), State = #{ topic => Topic, @@ -349,6 +350,7 @@ is_sensitive_key(_) -> false. make_producer_opts( + ChannelId, #{ send_buffer := SendBuff, refresh_interval := RefreshInterval @@ -356,6 +358,9 @@ make_producer_opts( ACLInfo ) -> #{ + %% TODO: the name needs to be an atom but this may cause atom leak so we + %% should figure out a way to avoid this + name => binary_to_atom(ChannelId), tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, acl_info => emqx_secret:wrap(ACLInfo) diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index a056ae3d2..7af6c7eea 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -263,6 +263,60 @@ t_setup_via_http_api_and_publish(Config) -> ), ok. +t_setup_two_actions_via_http_api_and_publish(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), + RocketMQConf2 = RocketMQConf#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(RocketMQConf2) + ), + {ok, #{raw_config := ActionConf}} = emqx_bridge_v2:lookup(actions, BridgeType, Name), + Topic2 = <<"Topic2">>, + ActionConf2 = emqx_utils_maps:deep_force_put( + [<<"parameters">>, <<"topic">>], ActionConf, Topic2 + ), + Action2Name = atom_to_binary(?FUNCTION_NAME), + {ok, _} = emqx_bridge_v2:create(BridgeType, Action2Name, ActionConf2), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + Config2 = proplists:delete(rocketmq_name, Config), + Config3 = [{rocketmq_name, Action2Name} | Config2], + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config3, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + t_get_status(Config) -> ?assertMatch( {ok, _},