Merge pull request #12488 from kjellwinblad/kjell/refactor/rocketmq_bridge/EMQX-11467

feat: refactor RocketMQ bridge to connector and action
This commit is contained in:
Kjell Winblad 2024-02-13 16:19:21 +01:00 committed by GitHub
commit 366827390e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 309 additions and 44 deletions

View File

@ -94,6 +94,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_matrix_action_info, emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info, emqx_bridge_mongodb_action_info,
emqx_bridge_oracle_action_info, emqx_bridge_oracle_action_info,
emqx_bridge_rocketmq_action_info,
emqx_bridge_influxdb_action_info, emqx_bridge_influxdb_action_info,
emqx_bridge_cassandra_action_info, emqx_bridge_cassandra_action_info,
emqx_bridge_mysql_action_info, emqx_bridge_mysql_action_info,

View File

@ -1,9 +1,9 @@
{application, emqx_bridge_rocketmq, [ {application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"}, {description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.4"}, {vsn, "0.1.5"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]}, {applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, []}, {env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -8,12 +8,7 @@
-include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1]).
-export([
conn_bridge_examples/1,
values/1
]).
-export([ -export([
namespace/0, namespace/0,
@ -22,6 +17,14 @@
desc/1 desc/1
]). ]).
-export([
bridge_v2_examples/1,
connector_examples/1,
conn_bridge_examples/1
]).
-define(CONNECTOR_TYPE, rocketmq).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
-define(DEFAULT_TEMPLATE, <<>>). -define(DEFAULT_TEMPLATE, <<>>).
-define(DEFFAULT_REQ_TIMEOUT, <<"15s">>). -define(DEFFAULT_REQ_TIMEOUT, <<"15s">>).
@ -33,14 +36,14 @@ conn_bridge_examples(Method) ->
#{ #{
<<"rocketmq">> => #{ <<"rocketmq">> => #{
summary => <<"RocketMQ Bridge">>, summary => <<"RocketMQ Bridge">>,
value => values(Method) value => conn_bridge_example_values(Method)
} }
} }
]. ].
values(get) -> conn_bridge_example_values(get) ->
values(post); conn_bridge_example_values(post);
values(post) -> conn_bridge_example_values(post) ->
#{ #{
enable => true, enable => true,
type => rocketmq, type => rocketmq,
@ -58,8 +61,57 @@ values(post) ->
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}; };
values(put) -> conn_bridge_example_values(put) ->
values(post). conn_bridge_example_values(post).
connector_examples(Method) ->
[
#{
<<"rocketmq">> =>
#{
summary => <<"RocketMQ Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].
connector_values() ->
#{
<<"enable">> => true,
<<"servers">> => <<"127.0.0.1:9876">>,
<<"pool_size">> => 8,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
}.
bridge_v2_examples(Method) ->
[
#{
<<"rocketmq">> =>
#{
summary => <<"RocketMQ Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
<<"parameters">> => #{
<<"topic">> => <<"TopicTest">>,
<<"template">> => ?DEFAULT_TEMPLATE,
<<"refresh_interval">> => <<"3s">>,
<<"send_buffer">> => <<"1024KB">>,
<<"sync_timeout">> => <<"3s">>
}
}.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
@ -67,6 +119,84 @@ namespace() -> "bridge_rocketmq".
roots() -> []. roots() -> [].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(
Field,
?CONNECTOR_TYPE,
fields("config_connector") -- emqx_connector_schema:common_fields()
);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rocketmq_action));
fields(action) ->
{?ACTION_TYPE,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, rocketmq_action)),
#{
desc => <<"RocketMQ Action Config">>,
required => false
}
)};
fields(rocketmq_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, action_parameters),
#{
required => true,
desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
Parameters =
[
{template,
mk(
binary(),
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
)}
] ++ emqx_bridge_rocketmq_connector:fields(config),
lists:foldl(
fun(Key, Acc) ->
proplists:delete(Key, Acc)
end,
Parameters,
[
servers,
pool_size,
auto_reconnect,
access_key,
secret_key,
security_token
]
);
fields("config_connector") ->
Config =
emqx_connector_schema:common_fields() ++
emqx_bridge_rocketmq_connector:fields(config) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
lists:foldl(
fun(Key, Acc) ->
proplists:delete(Key, Acc)
end,
Config,
[
topic,
sync_timeout,
refresh_interval,
send_buffer,
auto_reconnect
]
);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("config") -> fields("config") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -94,6 +224,14 @@ desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for RocketMQ using `", string:to_upper(Method), "` method."]; ["Configuration for RocketMQ using `", string:to_upper(Method), "` method."];
desc("config_connector") ->
?DESC("config_connector");
desc(rocketmq_action) ->
?DESC("rocketmq_action");
desc(action_parameters) ->
?DESC("action_parameters");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) -> desc(_) ->
undefined. undefined.

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_rocketmq_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> rocketmq.
action_type_name() -> rocketmq.
connector_type_name() -> rocketmq.
schema_module() -> emqx_bridge_rocketmq.

View File

@ -21,10 +21,14 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_get_status/2 on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]). ]).
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2]).
-define(ROCKETMQ_HOST_OPTIONS, #{ -define(ROCKETMQ_HOST_OPTIONS, #{
default_port => 9876 default_port => 9876
@ -82,7 +86,12 @@ callback_mode() -> always_sync.
on_start( on_start(
InstanceId, InstanceId,
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config #{
servers := BinServers,
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken
} = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_rocketmq_connector", msg => "starting_rocketmq_connector",
@ -94,18 +103,13 @@ on_start(
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
), ),
ClientId = client_id(InstanceId), ClientId = client_id(InstanceId),
TopicTks = emqx_placeholder:preproc_tmpl(Topic), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), ClientCfg = #{acl_info => ACLInfo},
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config),
State = #{ State = #{
client_id => ClientId, client_id => ClientId,
topic => Topic, acl_info => ACLInfo,
topic_tokens => TopicTks, installed_channels => #{}
sync_timeout => SyncTimeout,
templates => Templates,
producers_opts => ProducerOpts
}, },
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId), ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
@ -123,6 +127,64 @@ on_start(
{error, Reason} {error, Reason}
end. end.
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels,
acl_info := ACLInfo
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
create_channel_state(
#{parameters := Conf} = _ChannelConfig,
ACLInfo
) ->
#{
topic := Topic,
sync_timeout := SyncTimeout
} = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo),
Templates = parse_template(Conf),
State = #{
topic => Topic,
topic_tokens => TopicTks,
templates => Templates,
sync_timeout => SyncTimeout,
acl_info => ACLInfo,
producers_opts => ProducerOpts
},
{ok, State}.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
on_get_channel_status(
_ResId,
_ChannelId,
_State
) ->
?status_connected.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_stop(InstanceId, _State) -> on_stop(InstanceId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_rocketmq_connector", msg => "stopping_rocketmq_connector",
@ -144,7 +206,7 @@ on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, send_sync, State). do_query(InstanceId, Query, send_sync, State).
%% We only support batch inserts and all messages must have the same topic %% We only support batch inserts and all messages must have the same topic
on_batch_query(InstanceId, [{send_message, _Msg} | _] = Query, State) -> on_batch_query(InstanceId, [{_ChannelId, _Msg} | _] = Query, State) ->
do_query(InstanceId, Query, batch_send_sync, State); do_query(InstanceId, Query, batch_send_sync, State);
on_batch_query(_InstanceId, Query, _State) -> on_batch_query(_InstanceId, Query, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}. {error, {unrecoverable_error, {invalid_request, Query}}}.
@ -154,11 +216,11 @@ on_get_status(_InstanceId, #{client_id := ClientId}) ->
{ok, Pid} -> {ok, Pid} ->
status_result(rocketmq_client:get_status(Pid)); status_result(rocketmq_client:get_status(Pid));
_ -> _ ->
connecting ?status_connecting
end. end.
status_result(_Status = true) -> connected; status_result(_Status = true) -> ?status_connected;
status_result(_Status) -> connecting. status_result(_Status) -> ?status_connecting.
%%======================================================================================== %%========================================================================================
%% Helper fns %% Helper fns
@ -169,11 +231,8 @@ do_query(
Query, Query,
QueryFunc, QueryFunc,
#{ #{
templates := Templates,
client_id := ClientId, client_id := ClientId,
topic_tokens := TopicTks, installed_channels := Channels
producers_opts := ProducerOpts,
sync_timeout := RequestTimeout
} = State } = State
) -> ) ->
?TRACE( ?TRACE(
@ -181,6 +240,13 @@ do_query(
"rocketmq_connector_received", "rocketmq_connector_received",
#{connector => InstanceId, query => Query, state => State} #{connector => InstanceId, query => Query, state => State}
), ),
ChannelId = get_channel_id(Query),
#{
topic_tokens := TopicTks,
templates := Templates,
sync_timeout := RequestTimeout,
producers_opts := ProducerOpts
} = maps:get(ChannelId, Channels),
TopicKey = get_topic_key(Query, TopicTks), TopicKey = get_topic_key(Query, TopicTks),
Data = apply_template(Query, Templates), Data = apply_template(Query, Templates),
@ -209,6 +275,9 @@ do_query(
Result Result
end. end.
get_channel_id({ChannelId, _}) -> ChannelId;
get_channel_id([{ChannelId, _} | _]) -> ChannelId.
safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) ->
try try
Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts), Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts),
@ -275,14 +344,11 @@ is_sensitive_key(_) ->
make_producer_opts( make_producer_opts(
#{ #{
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken,
send_buffer := SendBuff, send_buffer := SendBuff,
refresh_interval := RefreshInterval refresh_interval := RefreshInterval
} },
ACLInfo
) -> ) ->
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
#{ #{
tcp_opts => [{sndbuf, SendBuff}], tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval, ref_topic_route_interval => RefreshInterval,

View File

@ -196,14 +196,15 @@ create_bridge_http(Params) ->
send_message(Config, Payload) -> send_message(Config, Payload) ->
Name = ?GET_CONFIG(rocketmq_name, Config), Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), ActionId = emqx_bridge_v2:id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload). emqx_bridge_v2:query(BridgeType, Name, {ActionId, Payload}, #{}).
query_resource(Config, Request) -> query_resource(Config, Request) ->
Name = ?GET_CONFIG(rocketmq_name, Config), Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ID = emqx_bridge_v2:id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 500}). ResID = emqx_connector_resource:resource_id(BridgeType, Name),
emqx_resource:query(ID, Request, #{timeout => 500, connector_resource_id => ResID}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
@ -273,6 +274,7 @@ t_get_status(Config) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
ok. ok.
t_simple_query(Config) -> t_simple_query(Config) ->
@ -280,7 +282,10 @@ t_simple_query(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {send_message, #{message => <<"Hello">>}}, Type = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
ActionId = emqx_bridge_v2:id(Type, Name),
Request = {ActionId, #{message => <<"Hello">>}},
Result = query_resource(Config, Request), Result = query_resource(Config, Request),
?assertEqual(ok, Result), ?assertEqual(ok, Result),
ok. ok.

View File

@ -54,6 +54,8 @@ resource_type(timescale) ->
emqx_postgresql; emqx_postgresql;
resource_type(redis) -> resource_type(redis) ->
emqx_bridge_redis_connector; emqx_bridge_redis_connector;
resource_type(rocketmq) ->
emqx_bridge_rocketmq_connector;
resource_type(iotdb) -> resource_type(iotdb) ->
emqx_bridge_iotdb_connector; emqx_bridge_iotdb_connector;
resource_type(elasticsearch) -> resource_type(elasticsearch) ->
@ -201,6 +203,14 @@ connector_structs() ->
required => false required => false
} }
)}, )},
{rocketmq,
mk(
hoconsc:map(name, ref(emqx_bridge_rocketmq, "config_connector")),
#{
desc => <<"RocketMQ Connector Config">>,
required => false
}
)},
{syskeeper_forwarder, {syskeeper_forwarder,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)), hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
@ -301,6 +311,7 @@ schema_modules() ->
emqx_bridge_timescale, emqx_bridge_timescale,
emqx_postgresql_connector_schema, emqx_postgresql_connector_schema,
emqx_bridge_redis_schema, emqx_bridge_redis_schema,
emqx_bridge_rocketmq,
emqx_bridge_iotdb_connector, emqx_bridge_iotdb_connector,
emqx_bridge_es_connector, emqx_bridge_es_connector,
emqx_bridge_rabbitmq_connector_schema, emqx_bridge_rabbitmq_connector_schema,
@ -338,6 +349,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
api_ref(emqx_bridge_rocketmq, <<"rocketmq">>, Method ++ "_connector"),
api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),

View File

@ -150,6 +150,8 @@ connector_type_to_bridge_types(pgsql) ->
[pgsql]; [pgsql];
connector_type_to_bridge_types(redis) -> connector_type_to_bridge_types(redis) ->
[redis, redis_single, redis_sentinel, redis_cluster]; [redis, redis_single, redis_sentinel, redis_cluster];
connector_type_to_bridge_types(rocketmq) ->
[rocketmq];
connector_type_to_bridge_types(syskeeper_forwarder) -> connector_type_to_bridge_types(syskeeper_forwarder) ->
[syskeeper_forwarder]; [syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) -> connector_type_to_bridge_types(syskeeper_proxy) ->

View File

@ -0,0 +1 @@
The RocketMQ bridge has been split into connector and action components. Old RocketMQ bridges will be upgraded automatically.

View File

@ -41,4 +41,22 @@ template.desc:
template.label: template.label:
"""Template""" """Template"""
action_parameters.desc:
"""Action specific configuration."""
action_parameters.label:
"""Action"""
rocketmq_action.desc:
"""Configuration for RocketMQ Action"""
rocketmq_action.label:
"""RocketMQ Action Configuration"""
config_connector.desc:
"""Configuration for an RocketMQ Client."""
config_connector.label:
"""RocketMQ Client Configuration"""
} }