From 9853d00cadb9ba446c769353d565ceb2c5d54230 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 13 Mar 2023 11:17:53 +0800 Subject: [PATCH 1/3] feat(bridges): integrate RocketMQ into data bridges --- apps/emqx_bridge/src/emqx_bridge.erl | 3 +- .../i18n/emqx_ee_bridge_rocketmq.conf | 70 ++++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 17 +- .../src/emqx_ee_bridge_rocketmq.erl | 120 ++++++ .../i18n/emqx_ee_connector_rocketmq.conf | 66 ++++ lib-ee/emqx_ee_connector/rebar.config | 1 + .../src/emqx_ee_connector.app.src | 3 +- .../src/emqx_ee_connector_rocketmq.erl | 342 ++++++++++++++++++ 8 files changed, 617 insertions(+), 5 deletions(-) create mode 100644 lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf create mode 100644 lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl create mode 100644 lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 98ce6a8b0..087c72dc3 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -67,7 +67,8 @@ T == timescale; T == matrix; T == tdengine; - T == dynamo + T == dynamo; + T == rocketmq ). load() -> diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf new file mode 100644 index 000000000..2e33e6c07 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf @@ -0,0 +1,70 @@ +emqx_ee_bridge_rocketmq { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to RocketMQ. All MQTT `PUBLISH` messages with the topic +matching the `local_topic` will be forwarded.
+NOTE: if the bridge is used as a rule action, `local_topic` should be left empty otherwise the messages will be duplicated.""" + zh: """发送到 'local_topic' 的消息都会转发到 RocketMQ。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + template { + desc { + en: """Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ""" + zh: """模板, 默认为空,为空时将会将整个消息转发给 RocketMQ""" + } + label { + en: "Template" + zh: "模板" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for a RocketMQ bridge.""" + zh: """RocketMQ 桥接配置""" + } + label: { + en: "RocketMQ Bridge Configuration" + zh: "RocketMQ 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index ec81b7935..3989c3ab2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -32,7 +32,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_matrix, Method), ref(emqx_ee_bridge_tdengine, Method), ref(emqx_ee_bridge_clickhouse, Method), - ref(emqx_ee_bridge_dynamo, Method) + ref(emqx_ee_bridge_dynamo, Method), + ref(emqx_ee_bridge_rocketmq, Method) ]. schema_modules() -> @@ -49,7 +50,8 @@ schema_modules() -> emqx_ee_bridge_matrix, emqx_ee_bridge_tdengine, emqx_ee_bridge_clickhouse, - emqx_ee_bridge_dynamo + emqx_ee_bridge_dynamo, + emqx_ee_bridge_rocketmq ]. examples(Method) -> @@ -85,7 +87,8 @@ resource_type(timescale) -> emqx_connector_pgsql; resource_type(matrix) -> emqx_connector_pgsql; resource_type(tdengine) -> emqx_ee_connector_tdengine; resource_type(clickhouse) -> emqx_ee_connector_clickhouse; -resource_type(dynamo) -> emqx_ee_connector_dynamo. +resource_type(dynamo) -> emqx_ee_connector_dynamo; +resource_type(rocketmq) -> emqx_ee_connector_rocketmq. fields(bridges) -> [ @@ -128,6 +131,14 @@ fields(bridges) -> desc => <<"Dynamo Bridge Config">>, required => false } + )}, + {rocketmq, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_rocketmq, "config")), + #{ + desc => <<"RocketMQ Bridge Config">>, + required => false + } )} ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs(). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl new file mode 100644 index 000000000..d81ffc54c --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl @@ -0,0 +1,120 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_rocketmq). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_TEMPLATE, <<>>). +-define(DEFFAULT_REQ_TIMEOUT, <<"15s">>). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"rocketmq">> => #{ + summary => <<"RocketMQ Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + enable => true, + type => rocketmq, + name => <<"foo">>, + server => <<"127.0.0.1:9876">>, + topic => <<"TopicTest">>, + template => ?DEFAULT_TEMPLATE, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => sync, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_rocketmq". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), required => false} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{<<"request_timeout">> => ?DEFFAULT_REQ_TIMEOUT}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + (emqx_ee_connector_rocketmq:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); +fields("creation_opts") -> + emqx_resource_schema:fields("creation_opts_sync_only"); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for RocketMQ using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([rocketmq]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf new file mode 100644 index 000000000..d4a610212 --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf @@ -0,0 +1,66 @@ +emqx_ee_connector_rocketmq { + + server { + desc { + en: """ +The IPv4 or IPv6 address or the hostname to connect to.
+A host entry has the following form: `Host[:Port]`.
+The RocketMQ default port 9876 is used if `[:Port]` is not specified. +""" + zh: """ +将要连接的 IPv4 或 IPv6 地址,或者主机名。
+主机名具有以下形式:`Host[:Port]`。
+如果未指定 `[:Port]`,则使用 RocketMQ 默认端口 9876。 +""" + } + label: { + en: "Server Host" + zh: "服务器地址" + } + } + + topic { + desc { + en: """RocketMQ Topic""" + zh: """RocketMQ 主题""" + } + label: { + en: "RocketMQ Topic" + zh: "RocketMQ 主题" + } + } + + refresh_interval { + desc { + en: """RocketMQ Topic Route Refresh Interval.""" + zh: """RocketMQ 主题路由更新间隔。""" + } + label: { + en: "Topic Route Refresh Interval" + zh: "主题路由更新间隔" + } + } + + send_buffer { + desc { + en: """The socket send buffer size of the RocketMQ driver client.""" + zh: """RocketMQ 驱动的套字节发送消息的缓冲区大小""" + } + label: { + en: "Send Buffer Size" + zh: "发送消息的缓冲区大小" + } + } + + security_token { + desc { + en: """RocketMQ Server Security Token""" + zh: """RocketMQ 服务器安全令牌""" + } + label: { + en: "Security Token" + zh: "安全令牌" + } + } + +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 76f6ccfba..96b3df6a3 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -5,6 +5,7 @@ {tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}}, {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}}, {erlcloud, {git, "https://github.com/emqx/erlcloud.git", {tag,"3.5.16-emqx-1"}}}, + {rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.1"}}}, {emqx, {path, "../../apps/emqx"}} ]}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 6f40f7158..d8921198c 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -11,7 +11,8 @@ wolff, brod, clickhouse, - erlcloud + erlcloud, + rocketmq ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl new file mode 100644 index 000000000..d41f83e1d --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -0,0 +1,342 @@ +%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_rocketmq). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-define(ROCKETMQ_HOST_OPTIONS, #{ + default_port => 9876 +}). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%%===================================================================== +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {server, server()}, + {topic, + mk( + binary(), + #{default => <<"TopicTest">>, desc => ?DESC(topic)} + )}, + {refresh_interval, + mk( + emqx_schema:duration(), + #{default => <<"3s">>, desc => ?DESC(refresh_interval)} + )}, + {send_buffer, + mk( + emqx_schema:bytesize(), + #{default => <<"1024KB">>, desc => ?DESC(send_buffer)} + )}, + {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})} + | relational_fields() + ]. + +add_default_username(Fields) -> + lists:map( + fun + ({username, OrigUsernameFn}) -> + {username, add_default_fn(OrigUsernameFn, <<"">>)}; + (Field) -> + Field + end, + Fields + ). + +add_default_fn(OrigFn, Default) -> + fun + (default) -> Default; + (Field) -> OrigFn(Field) + end. + +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). + +relational_fields() -> + Fields = [username, password, auto_reconnect], + Values = lists:filter( + fun({E, _}) -> lists:member(E, Fields) end, + emqx_connector_schema_lib:relational_db_fields() + ), + add_default_username(Values). + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> always_sync. + +is_buffer_supported() -> false. + +on_start( + InstanceId, + #{server := Server, topic := Topic} = Config1 +) -> + ?SLOG(info, #{ + msg => "starting_rocketmq_connector", + connector => InstanceId, + config => redact(Config1) + }), + Config = maps:merge(default_security_info(), Config1), + {Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + + Server1 = [{Host, Port}], + ClientId = client_id(InstanceId), + ClientCfg = #{acl_info => #{}}, + + TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), + ProducerOpts = make_producer_opts(Config), + Templates = parse_template(Config), + ProducersMapPID = create_producers_map(ClientId), + State = #{ + client_id => ClientId, + topic_tokens => TopicTks, + config => Config, + templates => Templates, + producers_map_pid => ProducersMapPID, + producers_opts => ProducerOpts + }, + + case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of + {ok, _Pid} -> + {ok, State}; + {error, _Reason} = Error -> + ?tp( + rocketmq_connector_start_failed, + #{error => _Reason} + ), + Error + end. + +on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) -> + ?SLOG(info, #{ + msg => "stopping_rocketmq_connector", + connector => InstanceId + }), + Pid ! ok, + ok = rocketmq:stop_and_delete_supervised_client(ClientId). + +on_query(InstanceId, Query, State) -> + do_query(InstanceId, Query, send_sync, State). + +%% We only support batch inserts and all messages must have the same topic +on_batch_query(InstanceId, [{send_message, _Msg} | _] = Query, State) -> + do_query(InstanceId, Query, batch_send_sync, State); +on_batch_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +on_get_status(_InstanceId, #{client_id := ClientId}) -> + case rocketmq_client_sup:find_client(ClientId) of + {ok, _Pid} -> + connected; + _ -> + connecting + end. + +%%======================================================================================== +%% Helper fns +%%======================================================================================== + +do_query( + InstanceId, + Query, + QueryFunc, + #{ + templates := Templates, + client_id := ClientId, + topic_tokens := TopicTks, + producers_opts := ProducerOpts, + config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}} + } = State +) -> + ?TRACE( + "QUERY", + "rocketmq_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + + TopicKey = get_topic_key(Query, RawTopic, TopicTks), + Data = apply_template(Query, Templates), + + Result = safe_do_produce( + InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout + ), + case Result of + {error, Reason} -> + ?tp( + rocketmq_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "rocketmq_connector_do_query_failed", + connector => InstanceId, + query => Query, + reason => Reason + }), + Result; + _ -> + ?tp( + rocketmq_connector_query_return, + #{result => Result} + ), + Result + end. + +safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> + try + Producers = get_producers(ClientId, TopicKey, ProducerOpts), + produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) + catch + _Type:Reason -> + {error, {unrecoverable_error, Reason}} + end. + +produce(_InstanceId, QueryFunc, Producers, Data, RequestTimeout) -> + rocketmq:QueryFunc(Producers, Data, RequestTimeout). + +parse_template(Config) -> + Templates = + case maps:get(template, Config, undefined) of + undefined -> #{}; + <<>> -> #{}; + Template -> #{send_message => Template} + end, + + parse_template(maps:to_list(Templates), #{}). + +parse_template([{Key, H} | T], Templates) -> + ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + parse_template( + T, + Templates#{Key => ParamsTks} + ); +parse_template([], Templates) -> + Templates. + +get_topic_key({_, Msg}, RawTopic, TopicTks) -> + {RawTopic, emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)}; +get_topic_key([Query | _], RawTopic, TopicTks) -> + get_topic_key(Query, RawTopic, TopicTks). + +apply_template({Key, Msg} = _Req, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + emqx_json:encode(Msg); + Template -> + emqx_plugin_libs_rule:proc_tmpl(Template, Msg) + end; +apply_template([{Key, _} | _] = Reqs, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + [emqx_json:encode(Msg) || {_, Msg} <- Reqs]; + Template -> + [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] + end. + +client_id(InstanceId) -> + Name = emqx_resource_manager:manager_id_to_resource_id(InstanceId), + erlang:binary_to_atom(Name, utf8). + +redact(Msg) -> + emqx_misc:redact(Msg, fun is_sensitive_key/1). + +is_sensitive_key(security_token) -> + true; +is_sensitive_key(_) -> + false. + +make_producer_opts( + #{ + username := Username, + password := Password, + security_token := SecurityToken, + send_buffer := SendBuff, + refresh_interval := RefreshInterval + } +) -> + ACLInfo = acl_info(Username, Password, SecurityToken), + #{ + tcp_opts => [{sndbuf, SendBuff}], + ref_topic_route_interval => RefreshInterval, + acl_info => ACLInfo + }. + +acl_info(<<>>, <<>>, <<>>) -> + #{}; +acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) -> + #{ + access_key => Username, + secret_key => Password + }; +acl_info(Username, Password, SecurityToken) when + is_binary(Username), is_binary(Password), is_binary(SecurityToken) +-> + #{ + access_key => Username, + secret_key => Password, + security_token => SecurityToken + }; +acl_info(_, _, _) -> + #{}. + +create_producers_map(ClientId) -> + erlang:spawn(fun() -> + case ets:whereis(ClientId) of + undefined -> + _ = ets:new(ClientId, [public, named_table]), + ok; + _ -> + ok + end, + receive + _Msg -> + ok + end + end). + +get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> + case ets:lookup(ClientId, TopicKey) of + [{_, Producers0}] -> + Producers0; + _ -> + ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]), + {ok, Producers0} = rocketmq:ensure_supervised_producers( + ClientId, ProducerGroup, Topic1, ProducerOpts + ), + ets:insert(ClientId, {TopicKey, Producers0}), + Producers0 + end. + +default_security_info() -> + #{username => <<>>, password => <<>>, security_token => <<>>}. From 4ad3579966945a40750e9ee2b3c87fe4148a6d89 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 20 Mar 2023 11:14:25 +0800 Subject: [PATCH 2/3] test(bridges): add test suite for RocketMQ --- .../docker-compose-rocketmq.yaml | 34 +++ .../docker-compose-toxiproxy.yaml | 1 + .../rocketmq/conf/broker.conf | 22 ++ .../rocketmq/logs/.gitkeep | 0 .../rocketmq/store/.gitkeep | 0 .ci/docker-compose-file/toxiproxy.json | 6 + lib-ee/emqx_ee_bridge/docker-ct | 1 + .../test/emqx_ee_bridge_rocketmq_SUITE.erl | 267 ++++++++++++++++++ .../src/emqx_ee_connector_rocketmq.erl | 4 - scripts/ct/run.sh | 3 + 10 files changed, 334 insertions(+), 4 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-rocketmq.yaml create mode 100644 .ci/docker-compose-file/rocketmq/conf/broker.conf create mode 100644 .ci/docker-compose-file/rocketmq/logs/.gitkeep create mode 100644 .ci/docker-compose-file/rocketmq/store/.gitkeep create mode 100644 lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl diff --git a/.ci/docker-compose-file/docker-compose-rocketmq.yaml b/.ci/docker-compose-file/docker-compose-rocketmq.yaml new file mode 100644 index 000000000..3c872a7c2 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-rocketmq.yaml @@ -0,0 +1,34 @@ +version: '3.9' + +services: + mqnamesrv: + image: apache/rocketmq:4.9.4 + container_name: rocketmq_namesrv +# ports: +# - 9876:9876 + volumes: + - ./rocketmq/logs:/opt/logs + - ./rocketmq/store:/opt/store + command: ./mqnamesrv + networks: + - emqx_bridge + + mqbroker: + image: apache/rocketmq:4.9.4 + container_name: rocketmq_broker +# ports: +# - 10909:10909 +# - 10911:10911 + volumes: + - ./rocketmq/logs:/opt/logs + - ./rocketmq/store:/opt/store + - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf + environment: + NAMESRV_ADDR: "rocketmq_namesrv:9876" + JAVA_OPTS: " -Duser.home=/opt" + JAVA_OPT_EXT: "-server -Xms1024m -Xmx1024m -Xmn1024m" + command: ./mqbroker -c /etc/rocketmq/broker.conf + depends_on: + - mqnamesrv + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 16f18b6c2..24f1d90b2 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -22,6 +22,7 @@ services: - 15433:5433 - 16041:6041 - 18000:8000 + - 19876:9876 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/rocketmq/conf/broker.conf b/.ci/docker-compose-file/rocketmq/conf/broker.conf new file mode 100644 index 000000000..c343090e4 --- /dev/null +++ b/.ci/docker-compose-file/rocketmq/conf/broker.conf @@ -0,0 +1,22 @@ +brokerClusterName=DefaultCluster +brokerName=broker-a +brokerId=0 + +brokerIP1=rocketmq_broker + +defaultTopicQueueNums=4 +autoCreateTopicEnable=true +autoCreateSubscriptionGroup=true + +listenPort=10911 +deleteWhen=04 + +fileReservedTime=120 +mapedFileSizeCommitLog=1073741824 +mapedFileSizeConsumeQueue=300000 +diskMaxUsedSpaceRatio=100 +maxMessageSize=65536 + +brokerRole=ASYNC_MASTER + +flushDiskType=ASYNC_FLUSH diff --git a/.ci/docker-compose-file/rocketmq/logs/.gitkeep b/.ci/docker-compose-file/rocketmq/logs/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/.ci/docker-compose-file/rocketmq/store/.gitkeep b/.ci/docker-compose-file/rocketmq/store/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 2f8c4341b..e22735091 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -77,5 +77,11 @@ "listen": "0.0.0.0:9295", "upstream": "kafka-1.emqx.net:9295", "enabled": true + }, + { + "name": "rocketmq", + "listen": "0.0.0.0:9876", + "upstream": "rocketmq_namesrv:9876", + "enabled": true } ] diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index ac1728ad2..116bc44ad 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -10,3 +10,4 @@ pgsql tdengine clickhouse dynamo +rocketmq diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl new file mode 100644 index 000000000..cd02b65d0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl @@ -0,0 +1,267 @@ +%%-------------------------------------------------------------------- +% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_rocketmq_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +% Bridge defaults +-define(TOPIC, "TopicTest"). +-define(BATCH_SIZE, 10). +-define(PAYLOAD, <<"HELLO">>). + +-define(GET_CONFIG(KEY__, CFG__), proplists:get_value(KEY__, CFG__)). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, with_batch}, + {group, without_batch} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {with_batch, TCs}, + {without_batch, TCs} + ]. + +init_per_group(with_batch, Config0) -> + Config = [{batch_size, ?BATCH_SIZE} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{batch_size, 1} | Config0], + common_init(Config); +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok. + +init_per_testcase(_Testcase, Config) -> + delete_bridge(Config), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = snabbkaffe:stop(), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +common_init(ConfigT) -> + BridgeType = <<"rocketmq">>, + Host = os:getenv("ROCKETMQ_HOST", "toxiproxy"), + Port = list_to_integer(os:getenv("ROCKETMQ_PORT", "9876")), + + Config0 = [ + {host, Host}, + {port, Port}, + {query_mode, sync}, + {proxy_name, "rocketmq"} + | ConfigT + ], + + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + % Ensure EE bridge module is loaded + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + {Name, RocketMQConf} = rocketmq_config(BridgeType, Config0), + Config = + [ + {rocketmq_config, RocketMQConf}, + {rocketmq_bridge_type, BridgeType}, + {rocketmq_name, Name}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort} + | Config0 + ], + Config; + false -> + case os:getenv("IS_CI") of + false -> + {skip, no_rocketmq}; + _ -> + throw(no_rocketmq) + end + end. + +rocketmq_config(BridgeType, Config) -> + Port = integer_to_list(?GET_CONFIG(port, Config)), + Server = ?GET_CONFIG(host, Config) ++ ":" ++ Port, + Name = atom_to_binary(?MODULE), + BatchSize = ?config(batch_size, Config), + QueryMode = ?config(query_mode, Config), + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " server = ~p\n" + " topic = ~p\n" + " resource_opts = {\n" + " request_timeout = 1500ms\n" + " batch_size = ~b\n" + " query_mode = ~s\n" + " }\n" + "}", + [ + BridgeType, + Name, + Server, + ?TOPIC, + BatchSize, + QueryMode + ] + ), + {Name, parse_and_check(ConfigString, BridgeType, Name)}. + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), + emqx_bridge:create(BridgeType, Name, RocketMQConf). + +delete_bridge(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + emqx_bridge:remove(BridgeType, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(Config, Payload) -> + Name = ?GET_CONFIG(rocketmq_name, Config), + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), + emqx_bridge:send_message(BridgeID, Payload). + +query_resource(Config, Request) -> + Name = ?GET_CONFIG(rocketmq_name, Config), + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request, #{timeout => 500}). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), + RocketMQConf2 = RocketMQConf#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(RocketMQConf2) + ), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + +t_get_status(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + Name = ?GET_CONFIG(rocketmq_name, Config), + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + ok. + +t_simple_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {send_message, #{message => <<"Hello">>}}, + Result = query_resource(Config, Request), + ?assertEqual(ok, Result), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index d41f83e1d..84f2e2a89 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -31,10 +31,6 @@ default_port => 9876 }). --ifdef(TEST). --export([execute/2]). --endif. - %%===================================================================== %% Hocon schema roots() -> diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index bf7b2073d..a4eeb366d 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -170,6 +170,9 @@ for dep in ${CT_DEPS}; do dynamo) FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' ) ;; + rocketmq) + FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 From 17e207cb71dd5247a2718ff0db6c043b1d038b2c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 16 Mar 2023 22:58:47 +0800 Subject: [PATCH 3/3] chore: fix spellcheck && update changes --- changes/ee/feat-10143.en.md | 1 + changes/ee/feat-10143.zh.md | 1 + scripts/spellcheck/dicts/emqx.txt | 1 + 3 files changed, 3 insertions(+) create mode 100644 changes/ee/feat-10143.en.md create mode 100644 changes/ee/feat-10143.zh.md diff --git a/changes/ee/feat-10143.en.md b/changes/ee/feat-10143.en.md new file mode 100644 index 000000000..67fc13dc2 --- /dev/null +++ b/changes/ee/feat-10143.en.md @@ -0,0 +1 @@ +Add `RocketMQ` data integration bridge. diff --git a/changes/ee/feat-10143.zh.md b/changes/ee/feat-10143.zh.md new file mode 100644 index 000000000..85a13ffa7 --- /dev/null +++ b/changes/ee/feat-10143.zh.md @@ -0,0 +1 @@ +为数据桥接增加 `RocketMQ` 支持。 diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index b027f92ec..b0d663ead 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -271,3 +271,4 @@ nif TDengine clickhouse FormatType +RocketMQ