From c4dd167cb99695f4a3b4b02f6b38c8436f10592c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 3 Jul 2024 14:56:20 -0300 Subject: [PATCH] feat: implement couchbase connector and action Fixes https://emqx.atlassian.net/browse/EMQX-12545 --- .../docker-compose-couchbase.yaml | 30 ++ .ci/docker-compose-file/toxiproxy.json | 6 + apps/emqx_bridge/src/emqx_action_info.erl | 51 +- .../test/emqx_bridge_v2_testlib.erl | 44 ++ .../emqx_bridge_azure_blob_storage_SUITE.erl | 5 +- apps/emqx_bridge_couchbase/BSL.txt | 94 ++++ apps/emqx_bridge_couchbase/README.md | 16 + apps/emqx_bridge_couchbase/docker-ct | 2 + apps/emqx_bridge_couchbase/mix.exs | 30 ++ apps/emqx_bridge_couchbase/rebar.config | 14 + .../src/emqx_bridge_couchbase.app.src | 21 + .../src/emqx_bridge_couchbase.hrl | 19 + .../src/emqx_bridge_couchbase_action_info.erl | 34 ++ .../emqx_bridge_couchbase_action_schema.erl | 148 ++++++ .../src/emqx_bridge_couchbase_connector.erl | 393 ++++++++++++++ .../emqx_bridge_couchbase_connector_info.erl | 73 +++ ...emqx_bridge_couchbase_connector_schema.erl | 136 +++++ .../test/emqx_bridge_couchbase_SUITE.erl | 484 ++++++++++++++++++ .../src/emqx_connector_info.erl | 33 +- .../src/schema/emqx_connector_schema.erl | 8 + apps/emqx_machine/priv/reboot_lists.eterm | 1 + changes/ee/feat-13415.en.md | 1 + mix.exs | 1 + rebar.config.erl | 1 + .../emqx_bridge_couchbase_action_schema.hocon | 22 + ...qx_bridge_couchbase_connector_schema.hocon | 44 ++ scripts/ct/run.sh | 3 + scripts/spellcheck/dicts/emqx.txt | 1 + 28 files changed, 1672 insertions(+), 43 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-couchbase.yaml create mode 100644 apps/emqx_bridge_couchbase/BSL.txt create mode 100644 apps/emqx_bridge_couchbase/README.md create mode 100644 apps/emqx_bridge_couchbase/docker-ct create mode 100644 apps/emqx_bridge_couchbase/mix.exs create mode 100644 apps/emqx_bridge_couchbase/rebar.config create mode 100644 apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.app.src create mode 100644 apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.hrl create mode 100644 apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_info.erl create mode 100644 apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_schema.erl create mode 100644 apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl create mode 100644 apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_info.erl create mode 100644 apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_schema.erl create mode 100644 apps/emqx_bridge_couchbase/test/emqx_bridge_couchbase_SUITE.erl create mode 100644 changes/ee/feat-13415.en.md create mode 100644 rel/i18n/emqx_bridge_couchbase_action_schema.hocon create mode 100644 rel/i18n/emqx_bridge_couchbase_connector_schema.hocon diff --git a/.ci/docker-compose-file/docker-compose-couchbase.yaml b/.ci/docker-compose-file/docker-compose-couchbase.yaml new file mode 100644 index 000000000..9ec910a8a --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-couchbase.yaml @@ -0,0 +1,30 @@ +version: '3.9' + +services: + couchbase: + container_name: couchbase + hostname: couchbase + image: ghcr.io/emqx/couchbase:1.0.0 + restart: always + expose: + - 8091-8093 + # ports: + # - "8091-8093:8091-8093" + networks: + - emqx_bridge + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8093/admin/ping"] + interval: 30s + timeout: 5s + retries: 4 + environment: + - CLUSTER=localhost + - USER=admin + - PASS=public + - PORT=8091 + - RAMSIZEMB=2048 + - RAMSIZEINDEXMB=512 + - RAMSIZEFTSMB=512 + - BUCKETS=mqtt + - BUCKETSIZES=100 + - AUTOREBALANCE=true diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index d53f3715f..dbf890d2d 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -221,5 +221,11 @@ "listen": "0.0.0.0:10000", "upstream": "azurite:10000", "enabled": true + }, + { + "name": "couchbase", + "listen": "0.0.0.0:8093", + "upstream": "couchbase:8093", + "enabled": true } ] diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 1f1c37a4e..ef4c72cee 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -89,37 +89,38 @@ -if(?EMQX_RELEASE_EDITION == ee). hard_coded_action_info_modules_ee() -> [ - emqx_bridge_azure_event_hub_action_info, emqx_bridge_azure_blob_storage_action_info, - emqx_bridge_confluent_producer_action_info, - emqx_bridge_dynamo_action_info, - emqx_bridge_gcp_pubsub_consumer_action_info, - emqx_bridge_gcp_pubsub_producer_action_info, - emqx_bridge_kafka_producer_action_info, - emqx_bridge_kafka_consumer_action_info, - emqx_bridge_kinesis_action_info, - emqx_bridge_hstreamdb_action_info, - emqx_bridge_matrix_action_info, - emqx_bridge_mongodb_action_info, - emqx_bridge_oracle_action_info, - emqx_bridge_rocketmq_action_info, - emqx_bridge_influxdb_action_info, + emqx_bridge_azure_event_hub_action_info, emqx_bridge_cassandra_action_info, emqx_bridge_clickhouse_action_info, - emqx_bridge_mysql_action_info, - emqx_bridge_pgsql_action_info, - emqx_bridge_syskeeper_action_info, - emqx_bridge_sqlserver_action_info, - emqx_bridge_timescale_action_info, - emqx_bridge_redis_action_info, - emqx_bridge_iotdb_action_info, + emqx_bridge_confluent_producer_action_info, + emqx_bridge_couchbase_action_info, + emqx_bridge_dynamo_action_info, emqx_bridge_es_action_info, - emqx_bridge_opents_action_info, - emqx_bridge_rabbitmq_action_info, - emqx_bridge_pulsar_action_info, + emqx_bridge_gcp_pubsub_consumer_action_info, + emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_greptimedb_action_info, + emqx_bridge_hstreamdb_action_info, + emqx_bridge_influxdb_action_info, + emqx_bridge_iotdb_action_info, + emqx_bridge_kafka_consumer_action_info, + emqx_bridge_kafka_producer_action_info, + emqx_bridge_kinesis_action_info, + emqx_bridge_matrix_action_info, + emqx_bridge_mongodb_action_info, + emqx_bridge_mysql_action_info, + emqx_bridge_opents_action_info, + emqx_bridge_oracle_action_info, + emqx_bridge_pgsql_action_info, + emqx_bridge_pulsar_action_info, + emqx_bridge_rabbitmq_action_info, + emqx_bridge_redis_action_info, + emqx_bridge_rocketmq_action_info, + emqx_bridge_s3_upload_action_info, + emqx_bridge_sqlserver_action_info, + emqx_bridge_syskeeper_action_info, emqx_bridge_tdengine_action_info, - emqx_bridge_s3_upload_action_info + emqx_bridge_timescale_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 6271df14a..d98f4f926 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -807,6 +807,50 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> end, ok. +t_rule_action(TCConfig) -> + t_rule_action(TCConfig, _Opts = #{}). + +%% Similar to `t_sync_query', but using only API functions and rule actions to trigger the +%% bridge, instead of lower level functions. +t_rule_action(TCConfig, Opts) -> + TraceCheckers = maps:get(trace_checkers, Opts, []), + PostPublishFn = maps:get(post_publish_fn, Opts, fun(Context) -> Context end), + PrePublishFn = maps:get(pre_publish_fn, Opts, fun(Context) -> Context end), + PayloadFn = maps:get(payload_fn, Opts, fun() -> emqx_guid:to_hexstr(emqx_guid:gen()) end), + PublishFn = maps:get( + publish_fn, + Opts, + fun(#{rule_topic := RuleTopic, payload_fn := PayloadFnIn} = Context) -> + Payload = PayloadFnIn(), + {ok, C} = emqtt:start_link(#{clean_start => true}), + {ok, _} = emqtt:connect(C), + ?assertMatch({ok, _}, emqtt:publish(C, RuleTopic, Payload, [{qos, 2}])), + ok = emqtt:stop(C), + Context#{payload => Payload} + end + ), + ?check_trace( + begin + #{type := Type} = get_common_values(TCConfig), + ?assertMatch({ok, _}, create_bridge_api(TCConfig)), + RuleTopic = emqx_topic:join([<<"test">>, emqx_utils_conv:bin(Type)]), + {ok, _} = create_rule_and_action_http(Type, RuleTopic, TCConfig), + ResourceId = resource_id(TCConfig), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + Context0 = #{rule_topic => RuleTopic, payload_fn => PayloadFn}, + Context1 = PrePublishFn(Context0), + Context2 = PublishFn(Context1), + PostPublishFn(Context2), + ok + end, + TraceCheckers + ), + ok. + %% Like `t_sync_query', but we send the message while the connector is %% `?status_disconnected' and test that, after recovery, the buffered message eventually %% is sent. diff --git a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl index d8b93632a..264299669 100644 --- a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl +++ b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl @@ -92,7 +92,7 @@ init_per_testcase(TestCase, Config0) -> ConnectorConfig = connector_config(Name, Endpoint), ContainerName = container_name(Name), %% TODO: switch based on test - ActionConfig = + ActionConfig0 = case lists:member(TestCase, direct_action_cases()) of true -> direct_action_config(#{ @@ -105,6 +105,7 @@ init_per_testcase(TestCase, Config0) -> parameters => #{container => ContainerName} }) end, + ActionConfig = emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, ActionConfig0), Client = new_control_driver(Endpoint), ct:pal("container name: ~s", [ContainerName]), ok = ensure_new_container(ContainerName, Client), @@ -185,7 +186,7 @@ connector_config(Name, Endpoint) -> <<"start_timeout">> => <<"5s">> } }, - emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0). + emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap0). direct_action_config(Overrides0) -> Overrides = emqx_utils_maps:binary_key_map(Overrides0), diff --git a/apps/emqx_bridge_couchbase/BSL.txt b/apps/emqx_bridge_couchbase/BSL.txt new file mode 100644 index 000000000..3c50ee962 --- /dev/null +++ b/apps/emqx_bridge_couchbase/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2024 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2028-05-17 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_couchbase/README.md b/apps/emqx_bridge_couchbase/README.md new file mode 100644 index 000000000..148efb795 --- /dev/null +++ b/apps/emqx_bridge_couchbase/README.md @@ -0,0 +1,16 @@ +# EMQX Couchbase Bridge + +This application provides connector and action implementations for the EMQX to integrate with Couchbase as part of the EMQX data integration pipelines. +Users can leverage [EMQX Rule Engine](https://docs.emqx.com/en/enterprise/latest/data-integration/rules.html) to create rules that publish message data to Couchbase. + +## Documentation + +Refer to [Rules engine](https://docs.emqx.com/en/enterprise/latest/data-integration/rules.html) for the EMQX rules engine introduction. + +## Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +## License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_bridge_couchbase/docker-ct b/apps/emqx_bridge_couchbase/docker-ct new file mode 100644 index 000000000..8a4b7a3cc --- /dev/null +++ b/apps/emqx_bridge_couchbase/docker-ct @@ -0,0 +1,2 @@ +toxiproxy +couchbase diff --git a/apps/emqx_bridge_couchbase/mix.exs b/apps/emqx_bridge_couchbase/mix.exs new file mode 100644 index 000000000..b74901db2 --- /dev/null +++ b/apps/emqx_bridge_couchbase/mix.exs @@ -0,0 +1,30 @@ +defmodule EMQXBridgeCouchbase.MixProject do + use Mix.Project + alias EMQXUmbrella.MixProject, as: UMP + + def project do + [ + app: :emqx_bridge_couchbase, + version: "0.1.0", + build_path: "../../_build", + erlc_options: UMP.erlc_options(), + erlc_paths: UMP.erlc_paths(), + deps_path: "../../deps", + lockfile: "../../mix.lock", + elixir: "~> 1.14", + start_permanent: Mix.env() == :prod, + deps: deps() + ] + end + + def application do + [extra_applications: UMP.extra_applications()] + end + + def deps() do + [ + {:emqx_resource, in_umbrella: true}, + UMP.common_dep(:ehttpc) + ] + end +end diff --git a/apps/emqx_bridge_couchbase/rebar.config b/apps/emqx_bridge_couchbase/rebar.config new file mode 100644 index 000000000..79ce978f9 --- /dev/null +++ b/apps/emqx_bridge_couchbase/rebar.config @@ -0,0 +1,14 @@ +%% -*- mode: erlang; -*- + +{erl_opts, [ + warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_obsolete_guard, + warnings_as_errors, + debug_info +]}. + +{deps, [ + {emqx_resource, {path, "../../apps/emqx_resource"}} +]}. diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.app.src b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.app.src new file mode 100644 index 000000000..9fd153f29 --- /dev/null +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.app.src @@ -0,0 +1,21 @@ +{application, emqx_bridge_couchbase, [ + {description, "EMQX Enterprise Couchbase Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + ehttpc, + emqx_resource + ]}, + {env, [ + {emqx_action_info_modules, [ + emqx_bridge_couchbase_action_info + ]}, + {emqx_connector_info_modules, [ + emqx_bridge_couchbase_connector_info + ]} + ]}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.hrl b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.hrl new file mode 100644 index 000000000..51ce22a0d --- /dev/null +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase.hrl @@ -0,0 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(__EMQX_BRIDGE_COUCHBASE_HRL__). +-define(__EMQX_BRIDGE_COUCHBASE_HRL__, true). + +-define(CONNECTOR_TYPE, couchbase). +-define(CONNECTOR_TYPE_BIN, <<"couchbase">>). + +-define(ACTION_TYPE, couchbase). +-define(ACTION_TYPE_BIN, <<"couchbase">>). + +-define(SERVER_OPTIONS, #{ + default_port => 8093 +}). + +%% END ifndef(__EMQX_BRIDGE_COUCHBASE_HRL__) +-endif. diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_info.erl b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_info.erl new file mode 100644 index 000000000..c89a48ce9 --- /dev/null +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_info.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_couchbase_action_info). + +-behaviour(emqx_action_info). + +-include("emqx_bridge_couchbase.hrl"). + +%% `emqx_action_info' API +-export([ + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% `emqx_action_info' API +%%------------------------------------------------------------------------------ + +action_type_name() -> ?ACTION_TYPE. + +connector_type_name() -> ?CONNECTOR_TYPE. + +schema_module() -> emqx_bridge_couchbase_action_schema. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_schema.erl b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_schema.erl new file mode 100644 index 000000000..469561b8a --- /dev/null +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_action_schema.erl @@ -0,0 +1,148 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_couchbase_action_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_couchbase.hrl"). + +%% `hocon_schema' API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% `emqx_bridge_v2_schema' "unofficial" API +-export([ + bridge_v2_examples/1 +]). + +%% API +-export([]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "action_couchbase". + +roots() -> + []. + +fields(Field) when + Field == "get_bridge_v2"; + Field == "put_bridge_v2"; + Field == "post_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(?ACTION_TYPE)); +fields(action) -> + {?ACTION_TYPE, + mk( + hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION_TYPE)), + #{ + desc => <<"Couchbase Action Config">>, + required => false + } + )}; +fields(?ACTION_TYPE) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(parameters), + #{ + required => true, + desc => ?DESC("parameters") + } + ), + #{resource_opts_ref => ref(action_resource_opts)} + ); +fields(parameters) -> + [ + {sql, mk(emqx_schema:template(), #{required => true, desc => ?DESC("sql")})}, + {max_retries, mk(non_neg_integer(), #{required => false, desc => ?DESC("max_retries")})} + ]; +fields(action_resource_opts) -> + Fields = emqx_bridge_v2_schema:action_resource_opts_fields(), + lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]). + +desc(Name) when + Name =:= ?ACTION_TYPE; + Name =:= parameters +-> + ?DESC(Name); +desc(action_resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc(_Name) -> + undefined. + +%%------------------------------------------------------------------------------------------------- +%% `emqx_bridge_v2_schema' "unofficial" API +%%------------------------------------------------------------------------------------------------- + +bridge_v2_examples(Method) -> + [ + #{ + ?ACTION_TYPE_BIN => #{ + summary => <<"Couchbase Action">>, + value => action_example(Method) + } + } + ]. + +action_example(post) -> + maps:merge( + action_example(put), + #{ + type => ?ACTION_TYPE_BIN, + name => <<"my_action">> + } + ); +action_example(get) -> + maps:merge( + action_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +action_example(put) -> + #{ + enable => true, + description => <<"my action">>, + connector => <<"my_connector">>, + parameters => + #{ + sql => <<"insert into mqtt (key, value) values (${.id}, ${.payload})">> + }, + resource_opts => + #{ + %% batch is not yet supported + %% batch_time => <<"0ms">>, + %% batch_size => 1, + health_check_interval => <<"30s">>, + inflight_window => 100, + query_mode => <<"sync">>, + request_ttl => <<"45s">>, + worker_pool_size => 16 + } + }. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +ref(Name) -> hoconsc:ref(?MODULE, Name). +mk(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl new file mode 100644 index 000000000..1e7122800 --- /dev/null +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector.erl @@ -0,0 +1,393 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_couchbase_connector). + +-behaviour(emqx_resource). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). +-include("emqx_bridge_couchbase.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + + on_start/2, + on_stop/2, + on_get_status/2, + + on_get_channels/1, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + + on_query/3 + %% on_batch_query/3 +]). + +%% API +-export([ + query/3 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-define(SUCCESS(STATUS_CODE), (STATUS_CODE >= 200 andalso STATUS_CODE < 300)). + +%% Ad-hoc requests +-record(sql_query, {sql :: sql(), opts :: ad_hoc_query_opts()}). + +-type connector_config() :: #{ + connect_timeout := pos_integer(), + password := emqx_secret:t(binary()), + pipelining := pos_integer(), + pool_size := pos_integer(), + server := binary(), + username := binary() +}. +-type connector_state() :: #{ + installed_actions := #{action_resource_id() => action_state()} +}. + +-type action_config() :: #{ + parameters := #{ + sql := binary(), + max_retries := non_neg_integer() + }, + resource_opts := #{request_ttl := timeout()} +}. +-type action_state() :: #{ + args_template := emqx_template_sql:row_template(), + max_retries := non_neg_integer(), + request_ttl := timeout(), + sql := iolist() +}. + +-type query() :: action_query() | ad_hoc_query(). +-type action_query() :: {_Tag :: channel_id(), _Data :: map()}. +-type ad_hoc_query() :: #sql_query{}. + +-type sql() :: iolist(). +-type ad_hoc_query_opts() :: map(). + +%%------------------------------------------------------------------------------ +%% `emqx_resource' API +%%------------------------------------------------------------------------------ + +-spec callback_mode() -> callback_mode(). +callback_mode() -> + always_sync. + +-spec on_start(connector_resource_id(), connector_config()) -> + {ok, connector_state()} | {error, _Reason}. +on_start(ConnResId, ConnConfig) -> + #{ + server := Server, + connect_timeout := ConnectTimeout, + password := Password, + pipelining := Pipelining, + pool_size := PoolSize, + username := Username + } = ConnConfig, + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_OPTIONS), + State = #{ + installed_actions => #{}, + password => Password, + username => Username + }, + {Transport, TransportOpts0} = + case maps:get(ssl, ConnConfig) of + #{enable := true} = TLSConfig -> + TLSOpts = emqx_tls_lib:to_client_opts(TLSConfig), + {tls, TLSOpts}; + _ -> + {tcp, []} + end, + TransportOpts = emqx_utils:ipv6_probe(TransportOpts0), + PoolOpts = [ + {host, Host}, + {port, Port}, + {connect_timeout, ConnectTimeout}, + {keepalive, 30_000}, + {pool_type, random}, + {pool_size, PoolSize}, + {transport, Transport}, + {transport_opts, TransportOpts}, + {enable_pipelining, Pipelining} + ], + case ehttpc_sup:start_pool(ConnResId, PoolOpts) of + {ok, _} -> + {ok, State}; + {error, {already_started, _}} -> + {ok, State}; + {error, Reason} -> + {error, Reason} + end. + +-spec on_stop(connector_resource_id(), connector_state()) -> ok. +on_stop(ConnResId, _ConnState) -> + Res = ehttpc_sup:stop_pool(ConnResId), + ?tp("couchbase_connector_stop", #{instance_id => ConnResId}), + Res. + +-spec on_get_status(connector_resource_id(), connector_state()) -> + ?status_connected | ?status_disconnected. +on_get_status(ConnResId, _ConnState) -> + health_check_pool_workers(ConnResId). + +-spec on_add_channel( + connector_resource_id(), + connector_state(), + action_resource_id(), + action_config() +) -> + {ok, connector_state()}. +on_add_channel(_ConnResId, ConnState0, ActionResId, ActionConfig) -> + ActionState = create_action(ActionConfig), + ConnState = emqx_utils_maps:deep_put([installed_actions, ActionResId], ConnState0, ActionState), + {ok, ConnState}. + +-spec on_remove_channel( + connector_resource_id(), + connector_state(), + action_resource_id() +) -> + {ok, connector_state()}. +on_remove_channel( + _ConnResId, ConnState0 = #{installed_actions := InstalledActions0}, ActionResId +) when + is_map_key(ActionResId, InstalledActions0) +-> + #{installed_actions := InstalledActions0} = ConnState0, + InstalledActions = maps:remove(ActionResId, InstalledActions0), + ConnState = ConnState0#{installed_actions := InstalledActions}, + {ok, ConnState}; +on_remove_channel(_ConnResId, ConnState, _ActionResId) -> + {ok, ConnState}. + +-spec on_get_channels(connector_resource_id()) -> + [{action_resource_id(), action_config()}]. +on_get_channels(ConnResId) -> + emqx_bridge_v2:get_channels_for_connector(ConnResId). + +-spec on_get_channel_status( + connector_resource_id(), + action_resource_id(), + connector_state() +) -> + ?status_connected | ?status_disconnected. +on_get_channel_status( + _ConnResId, + ActionResId, + _ConnState = #{installed_actions := InstalledActions} +) when is_map_key(ActionResId, InstalledActions) -> + %% Is it possible to infer table existence and whatnot from an arbitrary statement? + ?status_connected; +on_get_channel_status(_ConnResId, _ActionResId, _ConnState) -> + ?status_disconnected. + +-spec on_query(connector_resource_id(), query(), connector_state()) -> + {ok, _Result} | {error, _Reason}. +on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions} = ConnState) when + is_map_key(Tag, InstalledActions) +-> + ?tp("couchbase_on_query_enter", #{}), + ActionState = maps:get(Tag, InstalledActions), + run_action(ConnResId, Data, ActionState, ConnState); +on_query(ConnResId, #sql_query{sql = SQL, opts = Opts}, ConnState) -> + run_query(ConnResId, SQL, Opts, ConnState); +on_query(_ConnResId, Query, _ConnState) -> + {error, {unrecoverable_error, {invalid_query, Query}}}. + +%% -spec on_batch_query(connector_resource_id(), [query()], connector_state()) -> +%% {ok, _Result} | {error, _Reason}. +%% on_batch_query(_ConnResId, [{Tag, _} | Rest], #{installed_actions := InstalledActions}) when +%% is_map_key(Tag, InstalledActions) +%% -> +%% ActionState = maps:get(Tag, InstalledActions), +%% todo; +%% on_batch_query(_ConnResId, Batch, _ConnState) -> +%% {error, {unrecoverable_error, {bad_batch, Batch}}}. + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec query(connector_resource_id(), _SQL :: iolist(), _Opts :: map()) -> _TODO. +query(ConnResId, SQL, Opts) -> + emqx_resource:simple_sync_query(ConnResId, #sql_query{sql = SQL, opts = Opts}). + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +-spec create_action(action_config()) -> action_state(). +create_action(ActionConfig) -> + #{ + parameters := #{ + sql := SQL0, + max_retries := MaxRetries + }, + resource_opts := #{request_ttl := RequestTTL} + } = ActionConfig, + {SQL, ArgsTemplate} = emqx_template_sql:parse_prepstmt(SQL0, #{parameters => '$n'}), + #{ + args_template => ArgsTemplate, + max_retries => MaxRetries, + request_ttl => RequestTTL, + sql => SQL + }. + +-spec health_check_pool_workers(connector_resource_id()) -> + ?status_connected | ?status_connecting | ?status_disconnected. +health_check_pool_workers(ConnResId) -> + Timeout = emqx_resource_pool:health_check_timeout(), + Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ConnResId)], + try + emqx_utils:pmap(fun(Worker) -> ehttpc:health_check(Worker, Timeout) end, Workers, Timeout) + of + [] -> + ?status_connecting; + [_ | _] = Results -> + case [E || {error, _} = E <- Results] of + [] -> + ping(ConnResId, Timeout); + [{error, Reason} | _] -> + ?SLOG(info, #{ + msg => "couchbase_health_check_failed", + reason => emqx_utils:redact(Reason), + connector => ConnResId + }), + ?status_disconnected + end + catch + exit:timeout -> + ?SLOG(info, #{ + msg => "couchbase_health_check_failed", + reason => timeout, + connector => ConnResId + }), + ?status_disconnected + end. + +ping(ConnResId, RequestTTL) -> + MaxRetries = 3, + Request = {<<"/admin/ping">>, []}, + Response = ehttpc:request(ConnResId, get, Request, RequestTTL, MaxRetries), + case Response of + {ok, 200, _Headers, Body0} -> + case maybe_decode_json(Body0) of + #{<<"status">> := <<"OK">>} -> + ?status_connected; + _ -> + ?tp("couchbase_bad_ping_response", #{response => Response}), + ?status_disconnected + end; + _ -> + ?tp("couchbase_bad_ping_response", #{response => Response}), + ?status_disconnected + end. + +render_args(Context, ArgsTemplate) -> + %% Missing values will become `null'. + {Rendered, _Missing} = emqx_template_sql:render_prepstmt(ArgsTemplate, Context), + Rendered. + +auth_header(ConnState) -> + #{ + username := Username, + password := Password0 + } = ConnState, + Password = emqx_secret:unwrap(Password0), + BasicAuth = base64:encode(<>), + {<<"Authorization">>, [<<"Basic ">>, BasicAuth]}. + +run_action(ConnResId, Data, ActionState, ConnState) -> + #{ + args_template := ArgsTemplate, + sql := SQL, + request_ttl := RequestTTL, + max_retries := MaxRetries + } = ActionState, + Args = render_args(Data, ArgsTemplate), + do_query(ConnResId, SQL, Args, RequestTTL, MaxRetries, ConnState). + +run_query(ConnResId, SQL, Opts, ConnState) -> + RequestTTL = maps:get(request_ttl, Opts, timer:seconds(15)), + MaxRetries = maps:get(max_retries, Opts, 3), + Args = maps:get(args, Opts, undefined), + do_query(ConnResId, SQL, Args, RequestTTL, MaxRetries, ConnState). + +do_query(ConnResId, SQL, Args, RequestTTL, MaxRetries, ConnState) -> + Body0 = #{statement => iolist_to_binary(SQL)}, + Body1 = emqx_utils_maps:put_if(Body0, args, Args, Args =/= undefined), + Body = emqx_utils_json:encode(Body1), + Request = { + <<"/query/service">>, + [ + auth_header(ConnState), + {<<"Content-Type">>, <<"application/json">>} + ], + Body + }, + Response0 = ehttpc:request(ConnResId, post, Request, RequestTTL, MaxRetries), + Response = map_response(Response0), + ?tp("couchbase_response", #{response => Response, request => Request}), + Response. + +maybe_decode_json(Raw) -> + case emqx_utils_json:safe_decode(Raw, [return_maps]) of + {ok, JSON} -> + JSON; + {error, _} -> + Raw + end. + +map_response({error, Reason}) when + Reason =:= econnrefused; + Reason =:= timeout; + Reason =:= normal; + Reason =:= {shutdown, normal}; + Reason =:= {shutdown, closed} +-> + ?tp("couchbase_query_error", #{reason => Reason}), + {error, {recoverable_error, Reason}}; +map_response({error, {closed, _Message} = Reason}) -> + %% _Message = "The connection was lost." + ?tp("couchbase_query_error", #{reason => Reason}), + {error, {recoverable_error, Reason}}; +map_response({error, Reason}) -> + ?tp("couchbase_query_error", #{reason => Reason}), + {error, {unrecoverable_error, Reason}}; +map_response({ok, StatusCode, Headers}) when ?SUCCESS(StatusCode) -> + ?tp("couchbase_query_success", #{}), + {ok, #{status_code => StatusCode, headers => Headers}}; +map_response({ok, StatusCode, Headers, Body0}) when ?SUCCESS(StatusCode) -> + %% couchbase returns status = 200 with "status: errors" and "errors" key in body... 🫠 + case maybe_decode_json(Body0) of + #{<<"status">> := <<"success">>} = Body -> + ?tp("couchbase_query_success", #{}), + {ok, #{status_code => StatusCode, headers => Headers, body => Body}}; + Body -> + ?tp("couchbase_query_error", #{reason => {StatusCode, Headers, Body}}), + {error, + {unrecoverable_error, #{ + status_code => StatusCode, headers => Headers, body => Body + }}} + end; +map_response({ok, StatusCode, Headers}) -> + ?tp("couchbase_query_error", #{reason => {StatusCode, Headers}}), + {error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}}; +map_response({ok, StatusCode, Headers, Body0}) -> + Body = maybe_decode_json(Body0), + ?tp("couchbase_query_error", #{reason => {StatusCode, Headers, Body}}), + {error, + {unrecoverable_error, #{ + status_code => StatusCode, headers => Headers, body => Body + }}}. diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_info.erl b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_info.erl new file mode 100644 index 000000000..c09fcfe08 --- /dev/null +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_info.erl @@ -0,0 +1,73 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_couchbase_connector_info). + +-behaviour(emqx_connector_info). + +-include("emqx_bridge_couchbase.hrl"). + +%% `emqx_connector_info' API +-export([ + type_name/0, + bridge_types/0, + resource_callback_module/0, + config_schema/0, + schema_module/0, + api_schema/1 +]). + +%% API +-export([]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-define(SCHEMA_MOD, emqx_bridge_couchbase_connector_schema). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% `emqx_connector_info' API +%%------------------------------------------------------------------------------ + +type_name() -> + ?CONNECTOR_TYPE. + +bridge_types() -> + [?ACTION_TYPE]. + +resource_callback_module() -> + emqx_bridge_couchbase_connector. + +config_schema() -> + {?CONNECTOR_TYPE, + hoconsc:mk( + hoconsc:map( + name, + hoconsc:ref( + ?SCHEMA_MOD, + "config_connector" + ) + ), + #{ + desc => <<"Couchbase Connector Config">>, + required => false + } + )}. + +schema_module() -> + ?SCHEMA_MOD. + +api_schema(Method) -> + emqx_connector_schema:api_ref( + ?SCHEMA_MOD, ?CONNECTOR_TYPE_BIN, Method ++ "_connector" + ). + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_schema.erl b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_schema.erl new file mode 100644 index 000000000..c34f0592b --- /dev/null +++ b/apps/emqx_bridge_couchbase/src/emqx_bridge_couchbase_connector_schema.erl @@ -0,0 +1,136 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_couchbase_connector_schema). + +-behaviour(hocon_schema). +-behaviour(emqx_connector_examples). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_bridge_couchbase.hrl"). + +%% `hocon_schema' API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% `emqx_connector_examples' API +-export([ + connector_examples/1 +]). + +%% API +-export([]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "connector_couchbase". + +roots() -> + []. + +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields(connector_config)); +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ fields(connector_config); +fields(connector_config) -> + [ + {server, + emqx_schema:servers_sc( + #{required => true, desc => ?DESC("server")}, + ?SERVER_OPTIONS + )}, + {connect_timeout, + mk(emqx_schema:timeout_duration_ms(), #{ + default => <<"15s">>, desc => ?DESC("connect_timeout") + })}, + {pipelining, mk(pos_integer(), #{default => 100, desc => ?DESC("pipelining")})}, + {pool_size, mk(pos_integer(), #{default => 8, desc => ?DESC("pool_size")})}, + {username, mk(binary(), #{required => true, desc => ?DESC("username")})}, + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} + ] ++ + emqx_connector_schema:resource_opts() ++ + emqx_connector_schema_lib:ssl_fields(). + +desc("config_connector") -> + ?DESC("config_connector"); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, resource_opts); +desc(_Name) -> + undefined. + +%%------------------------------------------------------------------------------------------------- +%% `emqx_connector_examples' API +%%------------------------------------------------------------------------------------------------- + +connector_examples(Method) -> + [ + #{ + <<"couchbase">> => #{ + summary => <<"Couchbase Connector">>, + value => connector_example(Method) + } + } + ]. + +connector_example(get) -> + maps:merge( + connector_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +connector_example(post) -> + maps:merge( + connector_example(put), + #{ + type => atom_to_binary(?CONNECTOR_TYPE), + name => <<"my_connector">> + } + ); +connector_example(put) -> + #{ + enable => true, + description => <<"My connector">>, + server => <<"couchbase:8093">>, + username => <<"admin">>, + password => <<"******">>, + ssl => #{enable => true}, + resource_opts => #{ + health_check_interval => <<"45s">>, + start_after_created => true, + start_timeout => <<"5s">> + } + }. + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +mk(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_bridge_couchbase/test/emqx_bridge_couchbase_SUITE.erl b/apps/emqx_bridge_couchbase/test/emqx_bridge_couchbase_SUITE.erl new file mode 100644 index 000000000..0733d7191 --- /dev/null +++ b/apps/emqx_bridge_couchbase/test/emqx_bridge_couchbase_SUITE.erl @@ -0,0 +1,484 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_couchbase_SUITE). + +-feature(maybe_expr, enable). + +-compile(nowarn_export_all). +-compile(export_all). + +-elvis([{elvis_text_style, line_length, #{skip_comments => whole_line}}]). + +%% -import(emqx_common_test_helpers, [on_exit/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include("../src/emqx_bridge_couchbase.hrl"). + +-define(USERNAME, <<"admin">>). +-define(PASSWORD, <<"public">>). +-define(BUCKET, <<"mqtt">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Host = os:getenv("COUCHBASE_HOST", "toxiproxy"), + DirectHost = os:getenv("COUCHBASE_DIRECT_HOST", "couchbase"), + Port = list_to_integer(os:getenv("COUCHBASE_PORT", "8093")), + AdminPort = list_to_integer(os:getenv("COUCHBASE_ADMIN_PORT", "8091")), + Server = iolist_to_binary([Host, ":", integer_to_binary(Port)]), + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + ProxyName = "couchbase", + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_bridge_couchbase, + emqx_bridge, + emqx_rule_engine, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [ + {apps, Apps}, + {proxy_name, ProxyName}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {server, Server}, + {host, Host}, + {direct_host, DirectHost}, + {admin_port, AdminPort} + | Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_couchbase); + _ -> + {skip, no_couchbase} + end + end. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(16)), + Server = ?config(server, Config0), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>, + ConnectorConfig = connector_config(Name, Server), + ActionConfig0 = action_config(Name, #{connector => Name}), + ActionConfig = emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, ActionConfig0), + Config = [ + {bridge_kind, action}, + {action_type, ?ACTION_TYPE_BIN}, + {action_name, Name}, + {action_config, ActionConfig}, + {connector_name, Name}, + {connector_type, ?CONNECTOR_TYPE_BIN}, + {connector_config, ConnectorConfig} + | Config0 + ], + start_admin_client(Config). + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + delete_scope(scope(), Config), + stop_admin_client(Config), + emqx_common_test_helpers:call_janitor(), + ok = snabbkaffe:stop(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +start_admin_client(Config) -> + DirectHost = ?config(direct_host, Config), + AdminPort = ?config(admin_port, Config), + AdminPool = <<"couchbase_SUITE_admin">>, + PoolOpts = [ + {host, DirectHost}, + {port, AdminPort}, + {transport, tcp} + ], + {ok, _} = ehttpc_sup:start_pool(AdminPool, PoolOpts), + [{admin_pool, AdminPool} | Config]. + +stop_admin_client(Config) -> + AdminPool = ?config(admin_pool, Config), + ok = ehttpc_sup:stop_pool(AdminPool), + ok. + +connector_config(Name, Server) -> + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"tags">> => [<<"bridge">>], + <<"description">> => <<"my cool bridge">>, + <<"server">> => Server, + <<"username">> => ?USERNAME, + <<"password">> => ?PASSWORD, + <<"ssl">> => #{<<"enable">> => false}, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"1s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }, + emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap0). + +action_config(Name, Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + CommonConfig = + #{ + <<"enable">> => true, + <<"connector">> => <<"please override">>, + <<"parameters">> => + #{ + <<"sql">> => sql(Name), + <<"max_retries">> => 3 + }, + <<"resource_opts">> => #{ + %% Batch is not yet supported + %% <<"batch_size">> => 1, + %% <<"batch_time">> => <<"0ms">>, + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"1s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"15s">>, + <<"resume_interval">> => <<"1s">>, + <<"worker_pool_size">> => <<"1">> + } + }, + emqx_utils_maps:deep_merge(CommonConfig, Overrides). + +auth_header() -> + BasicAuth = base64:encode(<>), + {<<"Authorization">>, [<<"Basic ">>, BasicAuth]}. + +ensure_scope(Scope, Config) -> + case get_scope(Scope, Config) of + {ok, _} -> + ct:pal("scope ~s already exists", [Scope]), + ok; + undefined -> + ct:pal("creating scope ~s", [Scope]), + {200, _} = create_scope(Scope, Config), + ok + end. + +ensure_collection(Scope, Collection, Opts, Config) -> + case get_collection(Scope, Collection, Config) of + {ok, _} -> + ct:pal("collection ~s.~s already exists", [Scope, Collection]), + ok; + undefined -> + ct:pal("creating collection ~s.~s", [Scope, Collection]), + {200, _} = create_collection(Scope, Collection, Opts, Config), + ok + end. + +create_scope(Scope, Config) -> + AdminPool = ?config(admin_pool, Config), + ReqBody = [<<"name=">>, Scope], + Request = { + [<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>], + [ + auth_header(), + {<<"Content-Type">>, <<"application/x-www-form-urlencoded">>} + ], + ReqBody + }, + RequestTTL = timer:seconds(5), + MaxRetries = 3, + {ok, StatusCode, _Headers, Body0} = ehttpc:request( + AdminPool, post, Request, RequestTTL, MaxRetries + ), + Body = maybe_decode_json(Body0), + ct:pal("create scope response:\n ~p", [{StatusCode, Body}]), + {StatusCode, Body}. + +create_collection(Scope, Collection, _Opts, Config) -> + AdminPool = ?config(admin_pool, Config), + ReqBody = [<<"name=">>, Collection], + Request = { + [<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope, <<"/collections">>], + [ + auth_header(), + {<<"Content-Type">>, <<"application/x-www-form-urlencoded">>} + ], + ReqBody + }, + RequestTTL = timer:seconds(5), + MaxRetries = 3, + {ok, StatusCode, _Headers, Body0} = ehttpc:request( + AdminPool, post, Request, RequestTTL, MaxRetries + ), + Body = maybe_decode_json(Body0), + ct:pal("create collection response:\n ~p", [{StatusCode, Body}]), + {StatusCode, Body}. + +delete_scope(Scope, Config) -> + AdminPool = ?config(admin_pool, Config), + Request = { + [<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope], + [auth_header()] + }, + RequestTTL = timer:seconds(5), + MaxRetries = 3, + {ok, StatusCode, _Headers, Body0} = ehttpc:request( + AdminPool, delete, Request, RequestTTL, MaxRetries + ), + Body = maybe_decode_json(Body0), + ct:pal("delete scope response:\n ~p", [{StatusCode, Body}]), + {StatusCode, Body}. + +get_scopes(Config) -> + AdminPool = ?config(admin_pool, Config), + Request = { + [<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>], + [auth_header()] + }, + RequestTTL = timer:seconds(5), + MaxRetries = 3, + {ok, 200, _Headers, Body0} = ehttpc:request(AdminPool, get, Request, RequestTTL, MaxRetries), + Body = maybe_decode_json(Body0), + ct:pal("get scopes response:\n ~p", [Body]), + Body. + +get_scope(Scope, Config) -> + #{<<"scopes">> := Scopes} = get_scopes(Config), + fetch_with_name(Scopes, Scope). + +get_collections(Scope, Config) -> + maybe + {ok, #{<<"collections">> := Cs}} = get_scope(Scope, Config), + {ok, Cs} + end. + +get_collection(Scope, Collection, Config) -> + maybe + {ok, Cs} = get_collections(Scope, Config), + fetch_with_name(Cs, Collection) + end. + +fetch_with_name(Xs, Name) -> + case [X || X = #{<<"name">> := N} <- Xs, N =:= Name] of + [] -> + undefined; + [X] -> + {ok, X} + end. + +maybe_decode_json(Body) -> + case emqx_utils_json:safe_decode(Body, [return_maps]) of + {ok, JSON} -> + JSON; + {error, _} -> + Body + end. + +%% Collection creation is async... Trying to insert or select from a recently created +%% collection might result in error 12003, "Keyspace not found in CB datastore". +%% https://www.couchbase.com/forums/t/error-creating-primary-index-immediately-after-collection-creation-keyspace-not-found-in-cb-datastore/32479 +wait_until_collection_is_ready(Scope, Collection, Config) -> + wait_until_collection_is_ready(Scope, Collection, Config, _N = 5, _SleepMS = 200). + +wait_until_collection_is_ready(Scope, Collection, _Config, N, _SleepMS) when N < 0 -> + error({collection_not_ready_timeout, Scope, Collection}); +wait_until_collection_is_ready(Scope, Collection, Config, N, SleepMS) -> + case get_all_rows(Scope, Collection, Config) of + {ok, _} -> + ct:pal("collection ~s.~s ready", [Scope, Collection]), + ok; + Resp -> + ct:pal("waiting for collection ~s.~s error response:\n ~p", [Scope, Collection, Resp]), + ct:sleep(SleepMS), + wait_until_collection_is_ready(Scope, Collection, Config, N - 1, SleepMS) + end. + +scope() -> + <<"some_scope">>. + +sql(Name) -> + Scope = scope(), + iolist_to_binary([ + <<"insert into default:mqtt.">>, + Scope, + <<".">>, + <<"`">>, + Name, + <<"`">>, + <<" (key, value) values (${.id}, ${.})">> + ]). + +get_all_rows(Scope, Collection, Config) -> + ConnResId = emqx_bridge_v2_testlib:connector_resource_id(Config), + SQL = iolist_to_binary([ + <<"select * from default:mqtt.">>, + Scope, + <<".">>, + <<"`">>, + Collection, + <<"`">> + ]), + Opts = #{}, + Resp = emqx_bridge_couchbase_connector:query(ConnResId, SQL, Opts), + ct:pal("get rows response:\n ~p", [Resp]), + case Resp of + {ok, #{ + status_code := 200, body := #{<<"status">> := <<"success">>, <<"results">> := Rows0} + }} -> + Rows = lists:map( + fun(#{Collection := Value}) -> + maybe_decode_json(Value) + end, + Rows0 + ), + {ok, Rows}; + {error, _} -> + Resp + end. + +proplist_update(Proplist, K, Fn) -> + {K, OldV} = lists:keyfind(K, 1, Proplist), + NewV = Fn(OldV), + lists:keystore(K, 1, Proplist, {K, NewV}). + +pre_publish_fn(Scope, Collection, Config) -> + fun(Context) -> + ensure_scope(Scope, Config), + ensure_collection(Scope, Collection, _Opts = #{}, Config), + wait_until_collection_is_ready(Scope, Collection, Config), + Context + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_stop(Config) -> + ok = emqx_bridge_v2_testlib:t_start_stop(Config, "couchbase_connector_stop"), + ok. + +t_create_via_http(Config) -> + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. + +t_on_get_status(Config) -> + ok = emqx_bridge_v2_testlib:t_on_get_status(Config), + ok. + +t_rule_action(Config) -> + Scope = scope(), + Collection = ?config(action_name, Config), + PrePublishFn = pre_publish_fn(Scope, Collection, Config), + PostPublishFn = fun(#{payload := Payload} = Context) -> + %% need to retry because things are async in couchbase + ?retry( + 100, + 10, + ?assertMatch( + {ok, [#{<<"payload">> := Payload}]}, + get_all_rows(Scope, Collection, Config) + ) + ), + Context + end, + Opts = #{pre_publish_fn => PrePublishFn, post_publish_fn => PostPublishFn}, + ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts), + ok. + +%% batch is not yet supported +skip_t_rule_action_batch(Config0) -> + Config = proplist_update(Config0, action_config, fun(ActionConfig) -> + emqx_utils_maps:deep_merge( + ActionConfig, + #{ + <<"resource_opts">> => #{ + <<"batch_size">> => 10, + <<"batch_time">> => <<"100ms">> + } + } + ) + end), + Scope = scope(), + Collection = ?config(action_name, Config), + PrePublishFn = pre_publish_fn(Scope, Collection, Config), + PublishFn = fun(#{rule_topic := RuleTopic, payload_fn := PayloadFn} = Context) -> + Payloads = emqx_utils:pmap( + fun(_) -> + Payload = PayloadFn(), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + {ok, C} = emqtt:start_link(#{clean_start => true, clientid => ClientId}), + {ok, _} = emqtt:connect(C), + ?assertMatch({ok, _}, emqtt:publish(C, RuleTopic, Payload, [{qos, 2}])), + Payload + end, + lists:seq(1, 10) + ), + Context#{payloads => Payloads} + end, + PostPublishFn = fun(#{payloads := _Payloads} = Context) -> + %% need to retry because things are async in couchbase + ?retry( + 200, + 10, + ?assertMatch( + {ok, [#{<<"payload">> := todo}]}, + get_all_rows(Scope, Collection, Config) + ) + ), + Context + end, + Opts = #{ + pre_publish_fn => PrePublishFn, + publish_fn => PublishFn, + post_publish_fn => PostPublishFn + }, + ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts), + ok. + +t_sync_query_down(Config) -> + Scope = scope(), + Collection = ?config(action_name, Config), + MakeMsgFn = fun(RuleTopic) -> + ensure_scope(Scope, Config), + ensure_collection(Scope, Collection, _Opts = #{}, Config), + wait_until_collection_is_ready(Scope, Collection, Config), + emqx_message:make(RuleTopic, <<"hi">>) + end, + Opts = #{ + make_message_fn => MakeMsgFn, + enter_tp_filter => ?match_event(#{?snk_kind := "couchbase_on_query_enter"}), + error_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_error"}), + success_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_success"}) + }, + emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts), + ok. diff --git a/apps/emqx_connector/src/emqx_connector_info.erl b/apps/emqx_connector/src/emqx_connector_info.erl index a3f06edf0..3c44b4da7 100644 --- a/apps/emqx_connector/src/emqx_connector_info.erl +++ b/apps/emqx_connector/src/emqx_connector_info.erl @@ -77,38 +77,39 @@ -if(?EMQX_RELEASE_EDITION == ee). hard_coded_connector_info_modules_ee() -> [ - emqx_bridge_dynamo_connector_info, + emqx_bridge_azure_blob_storage_connector_info, emqx_bridge_azure_event_hub_connector_info, + emqx_bridge_cassandra_connector_info, + emqx_bridge_clickhouse_connector_info, emqx_bridge_confluent_producer_connector_info, + emqx_bridge_couchbase_connector_info, + emqx_bridge_dynamo_connector_info, + emqx_bridge_es_connector_info, emqx_bridge_gcp_pubsub_consumer_connector_info, emqx_bridge_gcp_pubsub_producer_connector_info, + emqx_bridge_greptimedb_connector_info, emqx_bridge_hstreamdb_connector_info, + emqx_bridge_influxdb_connector_info, + emqx_bridge_iotdb_connector_info, emqx_bridge_kafka_consumer_connector_info, emqx_bridge_kafka_producer_connector_info, emqx_bridge_kinesis_connector_info, emqx_bridge_matrix_connector_info, - emqx_bridge_pgsql_connector_info, - emqx_bridge_timescale_connector_info, emqx_bridge_mongodb_connector_info, - emqx_bridge_oracle_connector_info, - emqx_bridge_influxdb_connector_info, - emqx_bridge_cassandra_connector_info, - emqx_bridge_clickhouse_connector_info, emqx_bridge_mysql_connector_info, + emqx_bridge_opents_connector_info, + emqx_bridge_oracle_connector_info, + emqx_bridge_pgsql_connector_info, + emqx_bridge_pulsar_connector_info, + emqx_bridge_rabbitmq_connector_info, emqx_bridge_redis_connector_info, emqx_bridge_rocketmq_connector_info, + emqx_bridge_s3_connector_info, + emqx_bridge_sqlserver_connector_info, emqx_bridge_syskeeper_connector_info, emqx_bridge_syskeeper_proxy_connector_info, - emqx_bridge_sqlserver_connector_info, - emqx_bridge_iotdb_connector_info, - emqx_bridge_es_connector_info, - emqx_bridge_opents_connector_info, - emqx_bridge_greptimedb_connector_info, - emqx_bridge_pulsar_connector_info, emqx_bridge_tdengine_connector_info, - emqx_bridge_rabbitmq_connector_info, - emqx_bridge_s3_connector_info, - emqx_bridge_azure_blob_storage_connector_info + emqx_bridge_timescale_connector_info ]. -else. hard_coded_connector_info_modules_ee() -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 4dc714e72..060f3ba83 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -50,6 +50,7 @@ -export([ common_resource_opts_subfields/0, common_resource_opts_subfields_bin/0, + resource_opts/0, resource_opts_fields/0, resource_opts_fields/1, resource_opts_ref/2, @@ -431,6 +432,8 @@ roots() -> fields(connectors) -> connector_info_fields_connectors(); +fields(resource_opts) -> + resource_opts_fields(); fields("node_status") -> [ node_name(), @@ -454,6 +457,8 @@ desc(connectors) -> ?DESC("desc_connectors"); desc("node_status") -> ?DESC("desc_node_status"); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, "creation_opts"); desc(_) -> undefined. @@ -548,6 +553,9 @@ common_resource_opts_subfields() -> common_resource_opts_subfields_bin() -> lists:map(fun atom_to_binary/1, common_resource_opts_subfields()). +resource_opts() -> + resource_opts_ref(?MODULE, resource_opts). + resource_opts_fields() -> resource_opts_fields(_Overrides = []). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index 277d9fd66..4830f51c4 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -121,6 +121,7 @@ emqx_s3, emqx_bridge_s3, emqx_bridge_azure_blob_storage, + emqx_bridge_couchbase, emqx_schema_registry, emqx_eviction_agent, emqx_node_rebalance, diff --git a/changes/ee/feat-13415.en.md b/changes/ee/feat-13415.en.md new file mode 100644 index 000000000..f06551b23 --- /dev/null +++ b/changes/ee/feat-13415.en.md @@ -0,0 +1 @@ +Implemented Couchbase data integration. diff --git a/mix.exs b/mix.exs index 8228a3ee9..9e9e778b0 100644 --- a/mix.exs +++ b/mix.exs @@ -341,6 +341,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_s3, :emqx_bridge_s3, :emqx_bridge_azure_blob_storage, + :emqx_bridge_couchbase, :emqx_schema_registry, :emqx_schema_validation, :emqx_message_transformation, diff --git a/rebar.config.erl b/rebar.config.erl index 5e949ff5a..c2f63a2ad 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -105,6 +105,7 @@ is_community_umbrella_app("apps/emqx_ft") -> false; is_community_umbrella_app("apps/emqx_s3") -> false; is_community_umbrella_app("apps/emqx_bridge_s3") -> false; is_community_umbrella_app("apps/emqx_bridge_azure_blob_storage") -> false; +is_community_umbrella_app("apps/emqx_bridge_couchbase") -> false; is_community_umbrella_app("apps/emqx_schema_registry") -> false; is_community_umbrella_app("apps/emqx_enterprise") -> false; is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false; diff --git a/rel/i18n/emqx_bridge_couchbase_action_schema.hocon b/rel/i18n/emqx_bridge_couchbase_action_schema.hocon new file mode 100644 index 000000000..8194d8571 --- /dev/null +++ b/rel/i18n/emqx_bridge_couchbase_action_schema.hocon @@ -0,0 +1,22 @@ +emqx_bridge_couchbase_action_schema { + couchbase.label: + """Upload to Couchbase""" + couchbase.desc: + """Action that takes incoming events and uploads them to the Couchbase service.""" + + parameters.label: + """Couchbase action parameters""" + parameters.desc: + """Set of parameters for the action.""" + + sql.label: + """SQL Template""" + sql.desc: + """SQL Template""" + + max_retries.label: + """Max Retries""" + max_retries.desc: + """Max retry times if error on sending request.""" + +} diff --git a/rel/i18n/emqx_bridge_couchbase_connector_schema.hocon b/rel/i18n/emqx_bridge_couchbase_connector_schema.hocon new file mode 100644 index 000000000..cc039207b --- /dev/null +++ b/rel/i18n/emqx_bridge_couchbase_connector_schema.hocon @@ -0,0 +1,44 @@ +emqx_bridge_couchbase_connector_schema { + config_connector.label: + """Couchbase Connector Configuration""" + config_connector.desc: + """Configuration for a connector to Couchbase service.""" + + username.label: + """Username""" + username.desc: + """Username for Couchbase service.""" + + password.label: + """Password""" + password.desc: + """Password for Couchbase service.""" + + password.label: + """Password""" + password.desc: + """Password for Couchbase service.""" + + server.label: + """Server Host""" + server.desc: + """The IPv4 or IPv6 address or the hostname to connect to.
+ A host entry has the following form: `Host[:Port]`.
+ The Couchbase default query service port 8093 is used if `[:Port]` is not specified.""" + + pipelining.label: + """HTTP Pipelining""" + pipelining.desc: + """A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request.""" + + connect_timeout.label: + """Connect Timeout""" + connect_timeout.desc: + """The timeout when connecting to the HTTP server.""" + + pool_size.label: + """Pool Size""" + pool_size.desc: + """The pool size.""" + +} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index d1de79b5b..4a6bf3e38 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -253,6 +253,9 @@ for dep in ${CT_DEPS}; do azurite) FILES+=( '.ci/docker-compose-file/docker-compose-azurite.yaml' ) ;; + couchbase) + FILES+=( '.ci/docker-compose-file/docker-compose-couchbase.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 347020b63..3892bda73 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -13,6 +13,7 @@ CMD CN CONNACK CoAP +Couchbase CRLs Cygwin DES