feat: refactor RocketMQ bridge to connector and action

Fixes:
https://emqx.atlassian.net/browse/EMQX-11467
This commit is contained in:
Kjell Winblad 2024-02-05 15:29:19 +01:00
parent 811edb32a2
commit e284a83f73
9 changed files with 317 additions and 44 deletions

View File

@ -94,6 +94,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_oracle_action_info,
emqx_bridge_rocketmq_action_info,
emqx_bridge_influxdb_action_info,
emqx_bridge_cassandra_action_info,
emqx_bridge_mysql_action_info,

View File

@ -1,9 +1,9 @@
{application, emqx_bridge_rocketmq, [
{description, "EMQX Enterprise RocketMQ Bridge"},
{vsn, "0.1.4"},
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, emqx_resource, rocketmq]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]},
{modules, []},
{links, []}
]}.

View File

@ -8,12 +8,7 @@
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
conn_bridge_examples/1,
values/1
]).
-import(hoconsc, [mk/2, enum/1]).
-export([
namespace/0,
@ -22,6 +17,14 @@
desc/1
]).
-export([
bridge_v2_examples/1,
connector_examples/1,
conn_bridge_examples/1
]).
-define(CONNECTOR_TYPE, rocketmq).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
-define(DEFAULT_TEMPLATE, <<>>).
-define(DEFFAULT_REQ_TIMEOUT, <<"15s">>).
@ -33,14 +36,14 @@ conn_bridge_examples(Method) ->
#{
<<"rocketmq">> => #{
summary => <<"RocketMQ Bridge">>,
value => values(Method)
value => conn_bridge_example_values(Method)
}
}
].
values(get) ->
values(post);
values(post) ->
conn_bridge_example_values(get) ->
conn_bridge_example_values(post);
conn_bridge_example_values(post) ->
#{
enable => true,
type => rocketmq,
@ -58,8 +61,59 @@ values(post) ->
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}
};
values(put) ->
values(post).
conn_bridge_example_values(put) ->
conn_bridge_example_values(post).
%% TODO fix these examples
connector_examples(Method) ->
[
#{
<<"rocketmq">> =>
#{
summary => <<"RocketMQ Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].
connector_values() ->
#{
<<"enable">> => true,
<<"servers">> => <<"127.0.0.1:9876">>,
<<"pool_size">> => 8,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
}.
bridge_v2_examples(Method) ->
[
#{
<<"rocketmq">> =>
#{
summary => <<"RocketMQ Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
<<"parameters">> => #{
<<"topic">> => <<"TopicTest">>,
<<"template">> => ?DEFAULT_TEMPLATE,
<<"refresh_interval">> => <<"3s">>,
<<"send_buffer">> => <<"1024KB">>,
<<"sync_timeout">> => <<"3s">>
}
}.
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
@ -67,6 +121,84 @@ namespace() -> "bridge_rocketmq".
roots() -> [].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(
Field,
?CONNECTOR_TYPE,
fields("config_connector") -- emqx_connector_schema:common_fields()
);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rocketmq_action));
fields(action) ->
{?ACTION_TYPE,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, rocketmq_action)),
#{
desc => <<"RocketMQ Action Config">>,
required => false
}
)};
fields(rocketmq_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, action_parameters),
#{
required => true,
desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
Parameters =
[
{template,
mk(
binary(),
#{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE}
)}
] ++ emqx_bridge_rocketmq_connector:fields(config),
lists:foldl(
fun(Key, Acc) ->
proplists:delete(Key, Acc)
end,
Parameters,
[
servers,
pool_size,
auto_reconnect,
access_key,
secret_key,
security_token
]
);
fields("config_connector") ->
Config =
emqx_connector_schema:common_fields() ++
emqx_bridge_rocketmq_connector:fields(config) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
lists:foldl(
fun(Key, Acc) ->
proplists:delete(Key, Acc)
end,
Config,
[
topic,
sync_timeout,
refresh_interval,
send_buffer,
auto_reconnect
]
);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("config") ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -94,6 +226,16 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for RocketMQ using `", string:to_upper(Method), "` method."];
desc("creation_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc("config_connector") ->
?DESC("config_connector");
desc(rocketmq_action) ->
?DESC("rocketmq_action");
desc(action_parameters) ->
?DESC("action_parameters");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.

View File

@ -0,0 +1,22 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_rocketmq_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
bridge_v1_type_name() -> rocketmq.
action_type_name() -> rocketmq.
connector_type_name() -> rocketmq.
schema_module() -> emqx_bridge_rocketmq.

View File

@ -21,10 +21,14 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-import(hoconsc, [mk/2]).
-define(ROCKETMQ_HOST_OPTIONS, #{
default_port => 9876
@ -82,7 +86,12 @@ callback_mode() -> always_sync.
on_start(
InstanceId,
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config
#{
servers := BinServers,
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken
} = Config
) ->
?SLOG(info, #{
msg => "starting_rocketmq_connector",
@ -94,18 +103,18 @@ on_start(
emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS)
),
ClientId = client_id(InstanceId),
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
ClientCfg = #{acl_info => ACLInfo},
State = #{
client_id => ClientId,
topic => Topic,
topic_tokens => TopicTks,
sync_timeout => SyncTimeout,
templates => Templates,
producers_opts => ProducerOpts
acl_info => ACLInfo,
installed_channels => #{}
% topic => Topic,
% topic_tokens => TopicTks,
% sync_timeout => SyncTimeout,
% templates => Templates,
% producers_opts => ProducerOpts
},
ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),
@ -123,6 +132,64 @@ on_start(
{error, Reason}
end.
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels,
acl_info := ACLInfo
} = OldState,
ChannelId,
ChannelConfig
) ->
{ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo),
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
create_channel_state(
#{parameters := Conf} = _ChannelConfig,
ACLInfo
) ->
#{
topic := Topic,
sync_timeout := SyncTimeout
} = Conf,
TopicTks = emqx_placeholder:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Conf, ACLInfo),
Templates = parse_template(Conf),
State = #{
topic => Topic,
topic_tokens => TopicTks,
templates => Templates,
sync_timeout => SyncTimeout,
acl_info => ACLInfo,
producers_opts => ProducerOpts
},
{ok, State}.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
on_get_channel_status(
_ResId,
_ChannelId,
_State
) ->
?status_connected.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_stop(InstanceId, _State) ->
?SLOG(info, #{
msg => "stopping_rocketmq_connector",
@ -144,7 +211,7 @@ on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, send_sync, State).
%% We only support batch inserts and all messages must have the same topic
on_batch_query(InstanceId, [{send_message, _Msg} | _] = Query, State) ->
on_batch_query(InstanceId, [{_ChannelId, _Msg} | _] = Query, State) ->
do_query(InstanceId, Query, batch_send_sync, State);
on_batch_query(_InstanceId, Query, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}.
@ -154,11 +221,11 @@ on_get_status(_InstanceId, #{client_id := ClientId}) ->
{ok, Pid} ->
status_result(rocketmq_client:get_status(Pid));
_ ->
connecting
?status_connecting
end.
status_result(_Status = true) -> connected;
status_result(_Status) -> connecting.
status_result(_Status = true) -> ?status_connected;
status_result(_Status) -> ?status_connecting.
%%========================================================================================
%% Helper fns
@ -169,11 +236,8 @@ do_query(
Query,
QueryFunc,
#{
templates := Templates,
client_id := ClientId,
topic_tokens := TopicTks,
producers_opts := ProducerOpts,
sync_timeout := RequestTimeout
installed_channels := Channels
} = State
) ->
?TRACE(
@ -181,6 +245,13 @@ do_query(
"rocketmq_connector_received",
#{connector => InstanceId, query => Query, state => State}
),
ChannelId = get_channel_id(Query),
#{
topic_tokens := TopicTks,
templates := Templates,
sync_timeout := RequestTimeout,
producers_opts := ProducerOpts
} = maps:get(ChannelId, Channels),
TopicKey = get_topic_key(Query, TopicTks),
Data = apply_template(Query, Templates),
@ -209,6 +280,9 @@ do_query(
Result
end.
get_channel_id({ChannelId, _}) -> ChannelId;
get_channel_id([{ChannelId, _} | _]) -> ChannelId.
safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) ->
try
Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts),
@ -275,14 +349,11 @@ is_sensitive_key(_) ->
make_producer_opts(
#{
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken,
send_buffer := SendBuff,
refresh_interval := RefreshInterval
}
},
ACLInfo
) ->
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
#{
tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval,

View File

@ -196,14 +196,15 @@ create_bridge_http(Params) ->
send_message(Config, Payload) ->
Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload).
ActionId = emqx_bridge_v2:id(BridgeType, Name),
emqx_bridge_v2:query(BridgeType, Name, {ActionId, Payload}, #{}).
query_resource(Config, Request) ->
Name = ?GET_CONFIG(rocketmq_name, Config),
BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 500}).
ID = emqx_bridge_v2:id(BridgeType, Name),
ResID = emqx_connector_resource:resource_id(BridgeType, Name),
emqx_resource:query(ID, Request, #{timeout => 500, connector_resource_id => ResID}).
%%------------------------------------------------------------------------------
%% Testcases
@ -273,6 +274,7 @@ t_get_status(Config) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)),
ok.
t_simple_query(Config) ->
@ -280,7 +282,10 @@ t_simple_query(Config) ->
{ok, _},
create_bridge(Config)
),
Request = {send_message, #{message => <<"Hello">>}},
Type = ?GET_CONFIG(rocketmq_bridge_type, Config),
Name = ?GET_CONFIG(rocketmq_name, Config),
ActionId = emqx_bridge_v2:id(Type, Name),
Request = {ActionId, #{message => <<"Hello">>}},
Result = query_resource(Config, Request),
?assertEqual(ok, Result),
ok.

View File

@ -54,6 +54,8 @@ resource_type(timescale) ->
emqx_postgresql;
resource_type(redis) ->
emqx_bridge_redis_connector;
resource_type(rocketmq) ->
emqx_bridge_rocketmq_connector;
resource_type(iotdb) ->
emqx_bridge_iotdb_connector;
resource_type(elasticsearch) ->
@ -199,6 +201,14 @@ connector_structs() ->
required => false
}
)},
{rocketmq,
mk(
hoconsc:map(name, ref(emqx_bridge_rocketmq, "config_connector")),
#{
desc => <<"RocketMQ Connector Config">>,
required => false
}
)},
{syskeeper_forwarder,
mk(
hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
@ -291,6 +301,7 @@ schema_modules() ->
emqx_bridge_timescale,
emqx_postgresql_connector_schema,
emqx_bridge_redis_schema,
emqx_bridge_rocketmq,
emqx_bridge_iotdb_connector,
emqx_bridge_es_connector,
emqx_bridge_rabbitmq_connector_schema,
@ -327,6 +338,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"),
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
api_ref(emqx_bridge_rocketmq, <<"rocketmq">>, Method ++ "_connector"),
api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),

View File

@ -150,6 +150,8 @@ connector_type_to_bridge_types(pgsql) ->
[pgsql];
connector_type_to_bridge_types(redis) ->
[redis, redis_single, redis_sentinel, redis_cluster];
connector_type_to_bridge_types(rocketmq) ->
[rocketmq];
connector_type_to_bridge_types(syskeeper_forwarder) ->
[syskeeper_forwarder];
connector_type_to_bridge_types(syskeeper_proxy) ->

View File

@ -41,4 +41,22 @@ template.desc:
template.label:
"""Template"""
action_parameters.desc:
"""Action specific configuration."""
action_parameters.label:
"""Action"""
rocketmq_action.desc:
"""Configuration for RocketMQ Action"""
rocketmq_action.label:
"""RocketMQ Action Configuration"""
config_connector.desc:
"""Configuration for an RocketMQ Client."""
config_connector.label:
"""RocketMQ Client Configuration"""
}