diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 87d9c1368..d640293f0 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -66,6 +66,9 @@ -export([clear_screen/0]). -export([with_mock/4]). +-export([ + on_exit/1 +]). %% Toxiproxy API -export([ @@ -161,7 +164,17 @@ boot_modules(Mods) -> -spec start_apps(Apps :: apps()) -> ok. start_apps(Apps) -> - start_apps(Apps, fun(_) -> ok end). + %% to avoid keeping the `db_hostname' that is set when loading + %% `system_monitor' application in `emqx_machine', and then it + %% crashing when trying to connect. + %% FIXME: add an `enable' option to sysmon_top and use that to + %% decide whether to start it or not. + DefaultHandler = + fun(_) -> + application:set_env(system_monitor, db_hostname, ""), + ok + end, + start_apps(Apps, DefaultHandler). -spec start_apps(Apps :: apps(), Handler :: special_config_handler()) -> ok. start_apps(Apps, SpecAppConfig) when is_function(SpecAppConfig) -> @@ -920,3 +933,21 @@ latency_up_proxy(off, Name, ProxyHost, ProxyPort) -> [], [{body_format, binary}] ). + +%%------------------------------------------------------------------------------- +%% Testcase teardown utilities +%%------------------------------------------------------------------------------- + +get_or_spawn_janitor() -> + case get({?MODULE, janitor_proc}) of + undefined -> + {ok, Janitor} = emqx_test_janitor:start_link(), + put({?MODULE, janitor_proc}, Janitor), + Janitor; + Janitor -> + Janitor + end. + +on_exit(Fun) -> + Janitor = get_or_spawn_janitor(), + ok = emqx_test_janitor:push_on_exit_callback(Janitor, Fun). diff --git a/apps/emqx/test/emqx_test_janitor.erl b/apps/emqx/test/emqx_test_janitor.erl new file mode 100644 index 000000000..b7d2c3507 --- /dev/null +++ b/apps/emqx/test/emqx_test_janitor.erl @@ -0,0 +1,69 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_test_janitor). + +-behaviour(gen_server). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +%% API +-export([ + start_link/0, + push_on_exit_callback/2 +]). + +%%---------------------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link(?MODULE, self(), []). + +push_on_exit_callback(Server, Callback) when is_function(Callback, 0) -> + gen_server:call(Server, {push, Callback}). + +%%---------------------------------------------------------------------------------- +%% `gen_server' API +%%---------------------------------------------------------------------------------- + +init(Parent) -> + process_flag(trap_exit, true), + Ref = monitor(process, Parent), + {ok, #{callbacks => [], owner => {Ref, Parent}}}. + +terminate(_Reason, #{callbacks := Callbacks}) -> + lists:foreach(fun(Fun) -> Fun() end, Callbacks). + +handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) -> + {reply, ok, State#{callbacks := [Callback | Callbacks]}}; +handle_call(_Req, _From, State) -> + {reply, error, State}. + +handle_cast(_Req, State) -> + {noreply, State}. + +handle_info({'DOWN', Ref, process, Parent, _Reason}, State = #{owner := {Ref, Parent}}) -> + {stop, normal, State}; +handle_info(_Msg, State) -> + {noreply, State}. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 44028e900..321f8a2ae 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -51,9 +51,9 @@ -define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql; + T == gcp_pubsub; T == influxdb_api_v1; T == influxdb_api_v2 - %% T == influxdb_udp ). load() -> diff --git a/apps/emqx_connector/include/emqx_connector_tables.hrl b/apps/emqx_connector/include/emqx_connector_tables.hrl new file mode 100644 index 000000000..86b1b0549 --- /dev/null +++ b/apps/emqx_connector/include/emqx_connector_tables.hrl @@ -0,0 +1,17 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(JWT_TABLE, emqx_connector_jwt). diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index 4d0b53e9a..98490a91c 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -7,6 +7,7 @@ {deps, [ {emqx, {path, "../emqx"}}, + {emqx_resource, {path, "../emqx_resource"}}, {eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}, {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}}, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 1bfce0735..8061203e7 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -16,6 +16,7 @@ mysql, mongodb, ehttpc, + jose, emqx, emqtt ]}, diff --git a/apps/emqx_connector/src/emqx_connector_jwt.erl b/apps/emqx_connector/src/emqx_connector_jwt.erl new file mode 100644 index 000000000..3dc39c675 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_jwt.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_jwt). + +-include("emqx_connector_tables.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +%% API +-export([ + lookup_jwt/1, + lookup_jwt/2 +]). + +-type jwt() :: binary(). + +-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(ResourceId) -> + ?MODULE:lookup_jwt(?JWT_TABLE, ResourceId). + +-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}. +lookup_jwt(TId, ResourceId) -> + try + case ets:lookup(TId, {ResourceId, jwt}) of + [{{ResourceId, jwt}, JWT}] -> + {ok, JWT}; + [] -> + {error, not_found} + end + catch + error:badarg -> + {error, not_found} + end. diff --git a/apps/emqx_connector/src/emqx_connector_jwt_sup.erl b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl new file mode 100644 index 000000000..611e152bd --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_jwt_sup). + +-behaviour(supervisor). + +-include("emqx_connector_tables.hrl"). + +-export([ + start_link/0, + ensure_worker_present/2, + ensure_worker_deleted/1 +]). + +-export([init/1]). + +-type worker_id() :: term(). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ensure_jwt_table(), + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 5, + auto_shutdown => never + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% @doc Starts a new JWT worker. The caller should use +%% `emqx_connector_jwt_sup:ensure_jwt/1' to ensure that a JWT has +%% been stored, if synchronization is needed. +-spec ensure_worker_present(worker_id(), map()) -> + {ok, supervisor:child()} | {error, term()}. +ensure_worker_present(Id, Config) -> + ChildSpec = jwt_worker_child_spec(Id, Config), + case supervisor:start_child(?MODULE, ChildSpec) of + {ok, Pid} -> + {ok, Pid}; + {error, {already_started, Pid}} -> + {ok, Pid}; + {error, already_present} -> + supervisor:restart_child(?MODULE, Id) + end. + +%% @doc Stops a given JWT worker by its id. +-spec ensure_worker_deleted(worker_id()) -> ok. +ensure_worker_deleted(Id) -> + case supervisor:terminate_child(?MODULE, Id) of + ok -> + _ = supervisor:delete_child(?MODULE, Id), + ok; + {error, not_found} -> + ok + end. + +jwt_worker_child_spec(Id, Config) -> + #{ + id => Id, + start => {emqx_connector_jwt_worker, start_link, [Config]}, + restart => transient, + type => worker, + significant => false, + shutdown => brutal_kill, + modules => [emqx_connector_jwt_worker] + }. + +-spec ensure_jwt_table() -> ok. +ensure_jwt_table() -> + case ets:whereis(?JWT_TABLE) of + undefined -> + Opts = [ + named_table, + public, + {read_concurrency, true}, + ordered_set + ], + _ = ets:new(?JWT_TABLE, Opts), + ok; + _ -> + ok + end. diff --git a/apps/emqx_connector/src/emqx_connector_jwt_worker.erl b/apps/emqx_connector/src/emqx_connector_jwt_worker.erl new file mode 100644 index 000000000..c925aca20 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_jwt_worker.erl @@ -0,0 +1,237 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_jwt_worker). + +-behaviour(gen_server). + +%% API +-export([ + start_link/1, + ensure_jwt/1 +]). + +%% gen_server API +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2, + format_status/1, + format_status/2 +]). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("jose/include/jose_jwk.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-type config() :: #{ + private_key := binary(), + resource_id := resource_id(), + expiration := timer:time(), + table := ets:table(), + iss := binary(), + sub := binary(), + aud := binary(), + kid := binary(), + alg := binary() +}. +-type jwt() :: binary(). +-type state() :: #{ + refresh_timer := undefined | timer:tref(), + resource_id := resource_id(), + expiration := timer:time(), + table := ets:table(), + jwt := undefined | jwt(), + %% only undefined during startup + jwk := undefined | jose_jwk:key(), + iss := binary(), + sub := binary(), + aud := binary(), + kid := binary(), + alg := binary() +}. + +-define(refresh_jwt, refresh_jwt). + +%%----------------------------------------------------------------------------------------- +%% API +%%----------------------------------------------------------------------------------------- + +-spec start_link(config()) -> gen_server:start_ret(). +start_link( + #{ + private_key := _, + expiration := _, + resource_id := _, + table := _, + iss := _, + sub := _, + aud := _, + kid := _, + alg := _ + } = Config +) -> + gen_server:start_link(?MODULE, Config, []). + +-spec ensure_jwt(pid()) -> reference(). +ensure_jwt(Worker) -> + Ref = alias([reply]), + gen_server:cast(Worker, {ensure_jwt, Ref}), + Ref. + +%%----------------------------------------------------------------------------------------- +%% gen_server API +%%----------------------------------------------------------------------------------------- + +-spec init(config()) -> + {ok, state(), {continue, {make_key, binary()}}} + | {stop, {error, term()}}. +init(#{private_key := PrivateKeyPEM} = Config) -> + State0 = maps:without([private_key], Config), + State = State0#{ + jwk => undefined, + jwt => undefined, + refresh_timer => undefined + }, + {ok, State, {continue, {make_key, PrivateKeyPEM}}}. + +handle_continue({make_key, PrivateKeyPEM}, State0) -> + ?tp(connector_jwt_worker_make_key, #{state => State0}), + case jose_jwk:from_pem(PrivateKeyPEM) of + JWK = #jose_jwk{} -> + State = State0#{jwk := JWK}, + {noreply, State, {continue, create_token}}; + [] -> + ?tp(connector_jwt_worker_startup_error, #{error => empty_key}), + {stop, {shutdown, {error, empty_key}}, State0}; + {error, Reason} -> + Error = {invalid_private_key, Reason}, + ?tp(connector_jwt_worker_startup_error, #{error => Error}), + {stop, {shutdown, {error, Error}}, State0}; + Error0 -> + Error = {invalid_private_key, Error0}, + ?tp(connector_jwt_worker_startup_error, #{error => Error}), + {stop, {shutdown, {error, Error}}, State0} + end; +handle_continue(create_token, State0) -> + State = generate_and_store_jwt(State0), + {noreply, State}. + +handle_call(_Req, _From, State) -> + {reply, {error, bad_call}, State}. + +handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) -> + State = + case JWT of + undefined -> + generate_and_store_jwt(State0); + _ -> + State0 + end, + From ! {From, token_created}, + {noreply, State}; +handle_cast(_Req, State) -> + {noreply, State}. + +handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) -> + State = generate_and_store_jwt(State0), + {noreply, State}; +handle_info(_Msg, State) -> + {noreply, State}. + +format_status(State) -> + censor_secrets(State). + +format_status(_Opt, [_PDict, State0]) -> + State = censor_secrets(State0), + [{data, [{"State", State}]}]. + +%%----------------------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------------------- + +-spec do_generate_jwt(state()) -> jwt(). +do_generate_jwt( + #{ + expiration := ExpirationMS, + iss := Iss, + sub := Sub, + aud := Aud, + kid := KId, + alg := Alg, + jwk := JWK + } = _State +) -> + Headers = #{ + <<"alg">> => Alg, + <<"kid">> => KId + }, + Now = erlang:system_time(seconds), + ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second), + Claims = #{ + <<"iss">> => Iss, + <<"sub">> => Sub, + <<"aud">> => Aud, + <<"iat">> => Now, + <<"exp">> => Now + ExpirationS + }, + JWT0 = jose_jwt:sign(JWK, Headers, Claims), + {_, JWT} = jose_jws:compact(JWT0), + JWT. + +-spec generate_and_store_jwt(state()) -> state(). +generate_and_store_jwt(State0) -> + JWT = do_generate_jwt(State0), + store_jwt(State0, JWT), + ?tp(connector_jwt_worker_refresh, #{jwt => JWT}), + State1 = State0#{jwt := JWT}, + ensure_timer(State1). + +-spec store_jwt(state(), jwt()) -> ok. +store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> + true = ets:insert(TId, {{ResourceId, jwt}, JWT}), + ?tp(connector_jwt_worker_token_stored, #{resource_id => ResourceId}), + ok. + +-spec ensure_timer(state()) -> state(). +ensure_timer( + State = #{ + refresh_timer := undefined, + expiration := ExpirationMS0 + } +) -> + ExpirationMS = max(5_000, ExpirationMS0 - 5_000), + TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt), + State#{refresh_timer => TRef}; +ensure_timer(State) -> + State. + +-spec censor_secrets(state()) -> map(). +censor_secrets(State) -> + maps:map( + fun + (Key, _Value) when + Key =:= jwt; + Key =:= jwk + -> + "******"; + (_Key, Value) -> + Value + end, + State + ). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index b063d7436..438c7b87e 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -192,11 +192,11 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> on_query_async( _InstId, {send_message, Msg}, - {ReplayFun, Args}, + {ReplyFun, Args}, #{name := InstanceId} ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}). + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> AutoReconn = maps:get(auto_reconnect, Conf, true), diff --git a/apps/emqx_connector/src/emqx_connector_sup.erl b/apps/emqx_connector/src/emqx_connector_sup.erl index 29fc5b4b5..f6333bbcd 100644 --- a/apps/emqx_connector/src/emqx_connector_sup.erl +++ b/apps/emqx_connector/src/emqx_connector_sup.erl @@ -33,7 +33,8 @@ init([]) -> period => 20 }, ChildSpecs = [ - child_spec(emqx_connector_mqtt) + child_spec(emqx_connector_mqtt), + child_spec(emqx_connector_jwt_sup) ], {ok, {SupFlags, ChildSpecs}}. @@ -46,5 +47,3 @@ child_spec(Mod) -> type => supervisor, modules => [Mod] }. - -%% internal functions diff --git a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl new file mode 100644 index 000000000..87ce70d59 --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl @@ -0,0 +1,69 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_jwt_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include("emqx_connector_tables.hrl"). + +-compile([export_all, nowarn_export_all]). + +%%----------------------------------------------------------------------------- +%% CT boilerplate +%%----------------------------------------------------------------------------- + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:start_apps([emqx_connector]), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([emqx_connector]), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ets:delete_all_objects(?JWT_TABLE), + ok. + +%%----------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------- + +insert_jwt(TId, ResourceId, JWT) -> + ets:insert(TId, {{ResourceId, jwt}, JWT}). + +%%----------------------------------------------------------------------------- +%% Test cases +%%----------------------------------------------------------------------------- + +t_lookup_jwt_ok(_Config) -> + TId = ?JWT_TABLE, + JWT = <<"some jwt">>, + ResourceId = <<"resource id">>, + true = insert_jwt(TId, ResourceId, JWT), + ?assertEqual({ok, JWT}, emqx_connector_jwt:lookup_jwt(ResourceId)), + ok. + +t_lookup_jwt_missing(_Config) -> + ResourceId = <<"resource id">>, + ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(ResourceId)), + ok. diff --git a/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl new file mode 100644 index 000000000..74075917e --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl @@ -0,0 +1,340 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_jwt_worker_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("jose/include/jose_jwt.hrl"). +-include_lib("jose/include/jose_jws.hrl"). + +-compile([export_all, nowarn_export_all]). + +%%----------------------------------------------------------------------------- +%% CT boilerplate +%%----------------------------------------------------------------------------- + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +%%----------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------- + +generate_private_key_pem() -> + PublicExponent = 65537, + Size = 2048, + Key = public_key:generate_key({rsa, Size, PublicExponent}), + DERKey = public_key:der_encode('PrivateKeyInfo', Key), + public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]). + +generate_config() -> + PrivateKeyPEM = generate_private_key_pem(), + ResourceID = emqx_guid:gen(), + #{ + private_key => PrivateKeyPEM, + expiration => timer:hours(1), + resource_id => ResourceID, + table => ets:new(test_jwt_table, [ordered_set, public]), + iss => <<"issuer">>, + sub => <<"subject">>, + aud => <<"audience">>, + kid => <<"key id">>, + alg => <<"RS256">> + }. + +is_expired(JWT) -> + #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT), + Now = erlang:system_time(seconds), + Now >= Exp. + +%%----------------------------------------------------------------------------- +%% Test cases +%%----------------------------------------------------------------------------- + +t_create_success(_Config) -> + Config = generate_config(), + Res = emqx_connector_jwt_worker:start_link(Config), + ?assertMatch({ok, _}, Res), + {ok, Worker} = Res, + Ref = emqx_connector_jwt_worker:ensure_jwt(Worker), + receive + {Ref, token_created} -> + ok + after 1_000 -> + ct:fail( + "should have confirmed token creation; msgs: ~0p", + [process_info(self(), messages)] + ) + end, + ok. + +t_empty_key(_Config) -> + Config0 = generate_config(), + Config = Config0#{private_key := <<>>}, + process_flag(trap_exit, true), + ?check_trace( + ?wait_async_action( + ?assertMatch({ok, _}, emqx_connector_jwt_worker:start_link(Config)), + #{?snk_kind := connector_jwt_worker_startup_error}, + 1_000 + ), + fun(Trace) -> + ?assertMatch( + [#{error := empty_key}], + ?of_kind(connector_jwt_worker_startup_error, Trace) + ), + ok + end + ), + ok. + +t_unknown_error(_Config) -> + Config0 = generate_config(), + Config = Config0#{private_key := <<>>}, + process_flag(trap_exit, true), + ?check_trace( + {_, {ok, _}} = ?wait_async_action( + emqx_common_test_helpers:with_mock( + jose_jwk, + from_pem, + fun(_PrivateKeyPEM) -> {error, some_strange_error} end, + fun() -> + ?assertMatch({ok, _}, emqx_connector_jwt_worker:start_link(Config)) + end + ), + #{?snk_kind := connector_jwt_worker_startup_error}, + 1_000 + ), + fun(Trace) -> + ?assertMatch( + [#{error := {invalid_private_key, some_strange_error}}], + ?of_kind(connector_jwt_worker_startup_error, Trace) + ), + ok + end + ), + ok. + +t_invalid_pem(_Config) -> + Config0 = generate_config(), + InvalidPEM = public_key:pem_encode([ + {'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}, + {'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted} + ]), + Config = Config0#{private_key := InvalidPEM}, + process_flag(trap_exit, true), + ?check_trace( + ?wait_async_action( + ?assertMatch({ok, _}, emqx_connector_jwt_worker:start_link(Config)), + #{?snk_kind := connector_jwt_worker_startup_error}, + 1_000 + ), + fun(Trace) -> + ?assertMatch( + [#{error := {invalid_private_key, _}}], + ?of_kind(connector_jwt_worker_startup_error, Trace) + ), + ok + end + ), + ok. + +t_refresh(_Config) -> + Config0 = + #{ + table := Table, + resource_id := ResourceId + } = generate_config(), + Config = Config0#{expiration => 5_000}, + ?check_trace( + begin + {{ok, _Pid}, {ok, _Event}} = + ?wait_async_action( + emqx_connector_jwt_worker:start_link(Config), + #{?snk_kind := connector_jwt_worker_token_stored}, + 5_000 + ), + {ok, FirstJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId), + ?block_until( + #{ + ?snk_kind := connector_jwt_worker_refresh, + jwt := JWT0 + } when JWT0 =/= FirstJWT, + 15_000 + ), + {ok, SecondJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId), + ?assertNot(is_expired(SecondJWT)), + ?assert(is_expired(FirstJWT)), + {FirstJWT, SecondJWT} + end, + fun({FirstJWT, SecondJWT}, Trace) -> + ?assertMatch( + [_, _ | _], + ?of_kind(connector_jwt_worker_token_stored, Trace) + ), + ?assertNotEqual(FirstJWT, SecondJWT), + ok + end + ), + ok. + +t_format_status(_Config) -> + Config = generate_config(), + {ok, Pid} = emqx_connector_jwt_worker:start_link(Config), + {status, _, _, Props} = sys:get_status(Pid), + [State] = [ + State + || Info = [_ | _] <- Props, + {data, Data = [_ | _]} <- Info, + {"State", State} <- Data + ], + ?assertMatch( + #{ + jwt := "******", + jwk := "******" + }, + State + ), + ok. + +t_lookup_ok(_Config) -> + Config = + #{ + table := Table, + resource_id := ResourceId, + private_key := PrivateKeyPEM, + aud := Aud, + iss := Iss, + sub := Sub, + kid := KId + } = generate_config(), + {ok, Worker} = emqx_connector_jwt_worker:start_link(Config), + Ref = emqx_connector_jwt_worker:ensure_jwt(Worker), + receive + {Ref, token_created} -> + ok + after 500 -> + error(timeout) + end, + Res = emqx_connector_jwt:lookup_jwt(Table, ResourceId), + ?assertMatch({ok, _}, Res), + {ok, JWT} = Res, + ?assert(is_binary(JWT)), + JWK = jose_jwk:from_pem(PrivateKeyPEM), + {IsValid, ParsedJWT, JWS} = jose_jwt:verify_strict(JWK, [<<"RS256">>], JWT), + ?assertMatch( + #jose_jwt{ + fields = #{ + <<"aud">> := Aud, + <<"iss">> := Iss, + <<"sub">> := Sub, + <<"exp">> := _, + <<"iat">> := _ + } + }, + ParsedJWT + ), + ?assertNot(is_expired(JWT)), + ?assertMatch( + #jose_jws{ + alg = {_, 'RS256'}, + fields = #{ + <<"kid">> := KId, + <<"typ">> := <<"JWT">> + } + }, + JWS + ), + ?assert(IsValid), + ok. + +t_lookup_not_found(_Config) -> + Table = ets:new(test_jwt_table, [ordered_set, public]), + InexistentResource = <<"xxx">>, + ?assertEqual( + {error, not_found}, + emqx_connector_jwt:lookup_jwt(Table, InexistentResource) + ), + ok. + +t_lookup_badarg(_Config) -> + InexistentTable = i_dont_exist, + InexistentResource = <<"xxx">>, + ?assertEqual( + {error, not_found}, + emqx_connector_jwt:lookup_jwt(InexistentTable, InexistentResource) + ), + ok. + +t_start_supervised_worker(_Config) -> + {ok, _} = emqx_connector_jwt_sup:start_link(), + Config = #{resource_id := ResourceId} = generate_config(), + {ok, Pid} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), + Ref = emqx_connector_jwt_worker:ensure_jwt(Pid), + receive + {Ref, token_created} -> + ok + after 5_000 -> + ct:fail("timeout") + end, + MRef = monitor(process, Pid), + ?assert(is_process_alive(Pid)), + ok = emqx_connector_jwt_sup:ensure_worker_deleted(ResourceId), + receive + {'DOWN', MRef, process, Pid, _} -> + ok + after 1_000 -> + ct:fail("timeout") + end, + ok. + +t_start_supervised_worker_already_started(_Config) -> + {ok, _} = emqx_connector_jwt_sup:start_link(), + Config = #{resource_id := ResourceId} = generate_config(), + {ok, Pid0} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), + {ok, Pid1} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), + ?assertEqual(Pid0, Pid1), + ok. + +t_start_supervised_worker_already_present(_Config) -> + {ok, _} = emqx_connector_jwt_sup:start_link(), + Config = #{resource_id := ResourceId} = generate_config(), + {ok, Pid0} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), + Ref = monitor(process, Pid0), + exit(Pid0, {shutdown, normal}), + receive + {'DOWN', Ref, process, Pid0, {shutdown, normal}} -> ok + after 1_000 -> error(worker_didnt_stop) + end, + {ok, Pid1} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), + ?assertNotEqual(Pid0, Pid1), + ok. + +t_unknown_requests(_Config) -> + Config = generate_config(), + {ok, Worker} = emqx_connector_jwt_worker:start_link(Config), + Worker ! unknown_info, + gen_server:cast(Worker, unknown_cast), + ?assertEqual({error, bad_call}, gen_server:call(Worker, unknown_call)), + ok. diff --git a/apps/emqx_connector/test/emqx_connector_web_hook_server.erl b/apps/emqx_connector/test/emqx_connector_web_hook_server.erl new file mode 100644 index 000000000..3d5a4490a --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_web_hook_server.erl @@ -0,0 +1,105 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_web_hook_server). + +-compile([nowarn_export_all, export_all]). + +-behaviour(supervisor). +-behaviour(cowboy_handler). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link(Port, Path) -> + start_link(Port, Path, false). + +start_link(Port, Path, SSLOpts) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [Port, Path, SSLOpts]). + +stop() -> + try + gen_server:stop(?MODULE) + catch + exit:noproc -> + ok + end. + +set_handler(F) when is_function(F, 2) -> + true = ets:insert(?MODULE, {handler, F}), + ok. + +%%------------------------------------------------------------------------------ +%% supervisor API +%%------------------------------------------------------------------------------ + +init([Port, Path, SSLOpts]) -> + Dispatch = cowboy_router:compile( + [ + {'_', [{Path, ?MODULE, []}]} + ] + ), + + ProtoOpts = #{env => #{dispatch => Dispatch}}, + + Tab = ets:new(?MODULE, [set, named_table, public]), + ets:insert(Tab, {handler, fun default_handler/2}), + + {Transport, TransOpts, CowboyModule} = transport_settings(Port, SSLOpts), + + ChildSpec = ranch:child_spec(?MODULE, Transport, TransOpts, CowboyModule, ProtoOpts), + + {ok, {#{}, [ChildSpec]}}. + +%%------------------------------------------------------------------------------ +%% cowboy_server API +%%------------------------------------------------------------------------------ + +init(Req, State) -> + [{handler, Handler}] = ets:lookup(?MODULE, handler), + Handler(Req, State). + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +transport_settings(Port, _SSLOpts = false) -> + TransOpts = #{ + socket_opts => [{port, Port}], + connection_type => supervisor + }, + {ranch_tcp, TransOpts, cowboy_clear}; +transport_settings(Port, SSLOpts) -> + TransOpts = #{ + socket_opts => [ + {port, Port}, + {next_protocols_advertised, [<<"h2">>, <<"http/1.1">>]}, + {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} + | SSLOpts + ], + connection_type => supervisor + }, + {ranch_ssl, TransOpts, cowboy_tls}. + +default_handler(Req0, State) -> + Req = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"text/plain">>}, + <<"">>, + Req0 + ), + {ok, Req, State}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 5efe9114a..6514ed9ef 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -702,6 +702,8 @@ typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>}; typename_to_spec("map()", _Mod) -> #{type => object, example => #{}}; +typename_to_spec("service_account_json()", _Mod) -> + #{type => object, example => #{}}; typename_to_spec("#{" ++ _, Mod) -> typename_to_spec("map()", Mod); typename_to_spec("qos()", _Mod) -> diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 71300df72..02b1208b7 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -76,6 +76,8 @@ -type query_result() :: ok | {ok, term()} + | {ok, term(), term()} + | {ok, term(), term(), term()} | {error, {recoverable_error, term()}} | {error, term()}. diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a36cb15b7..6722e1b43 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -163,8 +163,8 @@ running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); running(cast, {query, Request, Opts}, St) -> - ReplayFun = maps:get(async_reply_fun, Opts, undefined), - query_or_acc(ReplayFun, Request, St); + ReplyFun = maps:get(async_reply_fun, Opts, undefined), + query_or_acc(ReplyFun, Request, St); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); running(info, {flush, _Ref}, _St) -> @@ -191,11 +191,11 @@ blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> _ = reply_caller(Id, ?REPLY(From, Request, false, Error)), {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> - ReplayFun = maps:get(async_reply_fun, Opts, undefined), + ReplyFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(ReplayFun, Request, false, Error)), + _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)), {keep_state, St#{ - queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request, false))]) + queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) }}. terminate(_Reason, #{id := Id, index := Index}) -> diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf new file mode 100644 index 000000000..dd7ce49ec --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_gcp_pubsub.conf @@ -0,0 +1,147 @@ +emqx_ee_bridge_gcp_pubsub { + desc_config { + desc { + en: """Configuration for a GCP PubSub bridge.""" + zh: """GCP PubSub 桥接配置""" + } + label { + en: "GCP PubSub Bridge Configuration" + zh: "GCP PubSub 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """桥接类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + desc_name { + desc { + en: """Bridge name, used as a human-readable description of the bridge.""" + zh: """桥接名字,可读描述""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } + + connect_timeout { + desc { + en: "The timeout when connecting to the HTTP server." + zh: "连接 HTTP 服务器的超时时间。" + } + label: { + en: "Connect Timeout" + zh: "连接超时" + } + } + + max_retries { + desc { + en: "Max retry times if an error occurs when sending a request." + zh: "请求出错时的最大重试次数。" + } + label: { + en: "Max Retries" + zh: "最大重试次数" + } + } + + pool_size { + desc { + en: "The pool size." + zh: "连接池大小。" + } + label: { + en: "Pool Size" + zh: "连接池大小" + } + } + + pipelining { + desc { + en: "A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request." + zh: "正整数,设置最大可发送的异步 HTTP 请求数量。当设置为 1 时,表示每次发送完成 HTTP 请求后都需要等待服务器返回,再继续发送下一个请求。" + } + label: { + en: "HTTP Pipelineing" + zh: "HTTP 流水线" + } + } + + request_timeout { + desc { + en: "HTTP request timeout." + zh: "HTTP 请求超时。" + } + label: { + en: "Request Timeout" + zh: "HTTP 请求超时" + } + } + + payload_template { + desc { + en: "The template for formatting the outgoing messages. If undefined, will send all the available context in JSON format." + zh: "用于格式化外发信息的模板。 如果未定义,将以JSON格式发送所有可用的上下文。" + } + label: { + en: "Payload template" + zh: "HTTP 请求消息体模板" + } + } + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to GCP PubSub. All MQTT 'PUBLISH' messages with the topic +matching `local_topic` will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 GCP PubSub。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 GCP PubSub。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + pubsub_topic { + desc { + en: "The GCP PubSub topic to publish messages to." + zh: "要发布消息的GCP PubSub主题。" + } + label: { + en: "GCP PubSub Topic" + zh: "GCP PubSub 主题" + } + } + + service_account_json { + desc { + en: "JSON containing the GCP Service Account credentials to be used with PubSub.\n" + "When a GCP Service Account is created " + "(as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), " + "you have the option of downloading the credentials in JSON form. That's the " + "file needed." + zh: "包含将与 PubSub 一起使用的 GCP 服务账户凭证的 JSON。\n" + "当创建GCP服务账户时" + "(如https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount)," + "可以选择下载 JSON 形式的凭证,然后在该配置项中使用。" + } + label: { + en: "GCP Service Account Credentials" + zh: "GCP 服务账户凭证" + } + } + +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index e0d362f5e..14ae7fc8d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -14,13 +14,13 @@ api_schemas(Method) -> [ + ref(emqx_ee_bridge_gcp_pubsub, Method), ref(emqx_ee_bridge_kafka, Method), ref(emqx_ee_bridge_mysql, Method), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), ref(emqx_ee_bridge_mongodb, Method ++ "_single"), ref(emqx_ee_bridge_hstreamdb, Method), - %% ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2") ]. @@ -29,6 +29,7 @@ schema_modules() -> [ emqx_ee_bridge_kafka, emqx_ee_bridge_hstreamdb, + emqx_ee_bridge_gcp_pubsub, emqx_ee_bridge_influxdb, emqx_ee_bridge_mongodb, emqx_ee_bridge_mysql @@ -49,11 +50,11 @@ examples(Method) -> resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(kafka) -> emqx_bridge_impl_kafka; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; +resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub; resource_type(mongodb_rs) -> emqx_connector_mongo; resource_type(mongodb_sharded) -> emqx_connector_mongo; resource_type(mongodb_single) -> emqx_connector_mongo; resource_type(mysql) -> emqx_connector_mysql; -resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb. @@ -75,6 +76,14 @@ fields(bridges) -> required => false } )}, + {gcp_pubsub, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_gcp_pubsub, "config")), + #{ + desc => <<"EMQX Enterprise Config">>, + required => false + } + )}, {mysql, mk( hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), @@ -109,7 +118,6 @@ influxdb_structs() -> } )} || Protocol <- [ - %% influxdb_udp, influxdb_api_v1, influxdb_api_v2 ] diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl new file mode 100644 index 000000000..63566bed9 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -0,0 +1,231 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_gcp_pubsub). + +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +%% hocon_schema API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). +-export([ + service_account_json_validator/1, + service_account_json_converter/1 +]). + +%% emqx_ee_bridge "unofficial" API +-export([conn_bridge_examples/1]). + +-type service_account_json() :: map(). +-reflect_type([service_account_json/0]). + +-define(DEFAULT_PIPELINE_SIZE, 100). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "bridge_gcp_pubsub". + +roots() -> + []. + +fields("config") -> + emqx_bridge_schema:common_bridge_fields() ++ + emqx_resource_schema:fields("resource_opts") ++ + fields(bridge_config); +fields(bridge_config) -> + [ + {connect_timeout, + sc( + emqx_schema:duration_ms(), + #{ + default => "15s", + desc => ?DESC("connect_timeout") + } + )}, + {pool_size, + sc( + pos_integer(), + #{ + default => 8, + desc => ?DESC("pool_size") + } + )}, + {pipelining, + sc( + pos_integer(), + #{ + default => ?DEFAULT_PIPELINE_SIZE, + desc => ?DESC("pipelining") + } + )}, + {max_retries, + sc( + non_neg_integer(), + #{ + required => false, + default => 2, + desc => ?DESC("max_retries") + } + )}, + {request_timeout, + sc( + emqx_schema:duration_ms(), + #{ + required => false, + default => "15s", + desc => ?DESC("request_timeout") + } + )}, + {payload_template, + sc( + binary(), + #{ + default => <<>>, + desc => ?DESC("payload_template") + } + )}, + {local_topic, + sc( + binary(), + #{ + desc => ?DESC("local_topic") + } + )}, + {pubsub_topic, + sc( + binary(), + #{ + required => true, + desc => ?DESC("pubsub_topic") + } + )}, + {service_account_json, + sc( + service_account_json(), + #{ + required => true, + validator => fun ?MODULE:service_account_json_validator/1, + converter => fun ?MODULE:service_account_json_converter/1, + sensitive => true, + desc => ?DESC("service_account_json") + } + )} + ]; +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("post"); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"). + +desc("config") -> + ?DESC("desc_config"); +desc(_) -> + undefined. + +conn_bridge_examples(Method) -> + [ + #{ + <<"gcp_pubsub">> => #{ + summary => <<"GCP PubSub Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + <<"pubsub_topic">> => <<"mytopic">>, + <<"service_account_json">> => + #{ + <<"auth_provider_x509_cert_url">> => + <<"https://www.googleapis.com/oauth2/v1/certs">>, + <<"auth_uri">> => + <<"https://accounts.google.com/o/oauth2/auth">>, + <<"client_email">> => + <<"test@myproject.iam.gserviceaccount.com">>, + <<"client_id">> => <<"123812831923812319190">>, + <<"client_x509_cert_url">> => + << + "https://www.googleapis.com/robot/v1/" + "metadata/x509/test%40myproject.iam.gserviceaccount.com" + >>, + <<"private_key">> => + << + "-----BEGIN PRIVATE KEY-----\n" + "MIIEvQI..." + >>, + <<"private_key_id">> => <<"kid">>, + <<"project_id">> => <<"myproject">>, + <<"token_uri">> => + <<"https://oauth2.googleapis.com/token">>, + <<"type">> => <<"service_account">> + } + }; +values(put) -> + values(post). + +%%------------------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------------------- + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). + +type_field() -> + {type, mk(enum([gcp_pubsub]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + +-spec service_account_json_validator(map()) -> + ok + | {error, {wrong_type, term()}} + | {error, {missing_keys, [binary()]}}. +service_account_json_validator(Map) -> + ExpectedKeys = [ + <<"type">>, + <<"project_id">>, + <<"private_key_id">>, + <<"private_key">>, + <<"client_email">> + ], + MissingKeys = lists:sort([ + K + || K <- ExpectedKeys, + not maps:is_key(K, Map) + ]), + Type = maps:get(<<"type">>, Map, null), + case {MissingKeys, Type} of + {[], <<"service_account">>} -> + ok; + {[], Type} -> + {error, {wrong_type, Type}}; + {_, _} -> + {error, {missing_keys, MissingKeys}} + end. + +service_account_json_converter(Map) when is_map(Map) -> + ExpectedKeys = [ + <<"type">>, + <<"project_id">>, + <<"private_key_id">>, + <<"private_key">>, + <<"client_email">> + ], + maps:with(ExpectedKeys, Map); +service_account_json_converter(Val) -> + Val. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 0a26e5d26..c9a8230ba 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -64,34 +64,10 @@ wait_until_kafka_is_up(Attempts) -> end. init_per_suite(Config) -> - %% Need to unload emqx_authz. See emqx_machine_SUITE:init_per_suite for - %% more info. - application:unload(emqx_authz), - %% some configs in emqx_conf app are mandatory - emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_rule_engine, emqx_bridge, emqx_management, emqx_dashboard], - fun set_special_configs/1 - ), - application:set_env(emqx_machine, applications, [ - emqx_prometheus, - emqx_modules, - emqx_dashboard, - emqx_gateway, - emqx_statsd, - emqx_resource, - emqx_rule_engine, - emqx_bridge, - emqx_ee_bridge, - emqx_plugin_libs, - emqx_management, - emqx_retainer, - emqx_exhook, - emqx_authn, - emqx_authz, - emqx_plugin - ]), - {ok, _} = application:ensure_all_started(emqx_machine), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), + {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), wait_until_kafka_is_up(), %% Wait until bridges API is up (fun WaitUntilRestApiUp() -> @@ -106,32 +82,12 @@ init_per_suite(Config) -> end)(), Config. -end_per_suite(Config) -> - emqx_common_test_helpers:stop_apps([ - emqx_prometheus, - emqx_modules, - emqx_dashboard, - emqx_gateway, - emqx_statsd, - emqx_resource, - emqx_rule_engine, - emqx_bridge, - emqx_ee_bridge, - emqx_plugin_libs, - emqx_management, - emqx_retainer, - emqx_exhook, - emqx_authn, - emqx_authz, - emqx_plugin, - emqx_conf, - emqx_bridge, - emqx_management, - emqx_dashboard, - emqx_machine - ]), - mria:stop(), - Config. +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), + _ = application:stop(emqx_connector), + ok. set_special_configs(emqx_management) -> Listeners = #{http => #{port => 8081}}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl new file mode 100644 index 000000000..b84b7d74b --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -0,0 +1,1450 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_gcp_pubsub_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("jose/include/jose_jwt.hrl"). +-include_lib("jose/include/jose_jws.hrl"). + +-define(BRIDGE_TYPE, gcp_pubsub). +-define(BRIDGE_TYPE_BIN, <<"gcp_pubsub">>). + +-import(emqx_common_test_helpers, [on_exit/1]). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + SimpleTCs = single_config_tests(), + [ + {group, with_batch}, + {group, without_batch} + | SimpleTCs + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + SimpleTCs = single_config_tests(), + MatrixTCs = TCs -- SimpleTCs, + SynchronyGroups = [ + {group, sync_query}, + {group, async_query} + ], + QueueGroups = [ + {group, queue_enabled}, + {group, queue_disabled} + ], + ResourceGroups = [{group, gcp_pubsub}], + [ + {with_batch, SynchronyGroups}, + {without_batch, SynchronyGroups}, + {sync_query, QueueGroups}, + {async_query, QueueGroups}, + {queue_enabled, ResourceGroups}, + {queue_disabled, ResourceGroups}, + {gcp_pubsub, MatrixTCs} + ]. + +%% these should not be influenced by the batch/no +%% batch/sync/async/queueing matrix. +single_config_tests() -> + [ + t_not_a_json, + t_not_of_service_account_type, + t_json_missing_fields, + t_invalid_private_key, + t_jwt_worker_start_timeout, + t_failed_to_start_jwt_worker, + t_stop, + t_get_status_ok, + t_get_status_down, + t_get_status_no_worker, + t_get_status_timeout_calling_workers, + t_on_start_ehttpc_pool_already_started + ]. + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]), + {ok, _} = application:ensure_all_started(emqx_connector), + emqx_mgmt_api_test_util:init_suite(), + HTTPHost = "localhost", + HTTPPort = 56000, + HostPort = HTTPHost ++ ":" ++ integer_to_list(HTTPPort), + true = os:putenv("PUBSUB_EMULATOR_HOST", HostPort), + [ + {http_host, HTTPHost}, + {http_port, HTTPPort} + | Config + ]. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]), + _ = application:stop(emqx_connector), + os:unsetenv("PUBSUB_EMULATOR_HOST"), + ok. + +init_per_group(sync_query, Config) -> + [{query_mode, sync} | Config]; +init_per_group(async_query, Config) -> + [{query_mode, async} | Config]; +init_per_group(with_batch, Config) -> + [{enable_batch, true} | Config]; +init_per_group(without_batch, Config) -> + [{enable_batch, false} | Config]; +init_per_group(queue_enabled, Config) -> + [{enable_queue, true} | Config]; +init_per_group(queue_disabled, Config) -> + [{enable_queue, false} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(gcp_pubsub, Config) -> + delete_bridge(Config), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config0) when + TestCase =:= t_publish_success_batch +-> + case ?config(enable_batch, Config0) of + true -> + {ok, _} = start_echo_http_server(), + delete_all_bridges(), + Tid = install_telemetry_handler(TestCase), + Config = generate_config(Config0), + [{telemetry_table, Tid} | Config]; + false -> + {skip, no_batching} + end; +init_per_testcase(TestCase, Config0) -> + {ok, _} = start_echo_http_server(), + delete_all_bridges(), + Tid = install_telemetry_handler(TestCase), + Config = generate_config(Config0), + [{telemetry_table, Tid} | Config]. + +end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), + delete_all_bridges(), + ok = emqx_connector_web_hook_server:stop(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +generate_config(Config0) -> + #{ + name := Name, + config_string := ConfigString, + pubsub_config := PubSubConfig, + service_account_json := ServiceAccountJSON + } = gcp_pubsub_config(Config0), + ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name), + [ + {gcp_pubsub_name, Name}, + {gcp_pubsub_config, PubSubConfig}, + {gcp_pubsub_config_string, ConfigString}, + {service_account_json, ServiceAccountJSON}, + {resource_id, ResourceId}, + {bridge_id, BridgeId} + | Config0 + ]. + +delete_all_bridges() -> + ct:pal("deleting all bridges"), + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +delete_bridge(Config) -> + Type = ?BRIDGE_TYPE, + Name = ?config(gcp_pubsub_name, Config), + ct:pal("deleting bridge ~p", [{Type, Name}]), + emqx_bridge:remove(Type, Name). + +create_bridge(Config) -> + create_bridge(Config, _GCPPubSubConfigOverrides = #{}). + +create_bridge(Config, GCPPubSubConfigOverrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(gcp_pubsub_name, Config), + GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config), + GCPPubSubConfig = emqx_map_lib:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides), + ct:pal("creating bridge: ~p", [GCPPubSubConfig]), + Res = emqx_bridge:create(TypeBin, Name, GCPPubSubConfig), + ct:pal("bridge creation result: ~p", [Res]), + Res. + +create_bridge_http(Config) -> + create_bridge_http(Config, _GCPPubSubConfigOverrides = #{}). + +create_bridge_http(Config, GCPPubSubConfigOverrides) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(gcp_pubsub_name, Config), + GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config), + GCPPubSubConfig = emqx_map_lib:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides), + Params = GCPPubSubConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + ct:pal("creating bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res0} -> {ok, emqx_json:decode(Res0, [return_maps])}; + Error -> Error + end, + ct:pal("bridge creation result: ~p", [Res]), + Res. + +create_rule_and_action_http(Config) -> + GCPPubSubName = ?config(gcp_pubsub_name, Config), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, GCPPubSubName), + Params = #{ + enable => true, + sql => <<"SELECT * FROM \"t/topic\"">>, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +success_http_handler() -> + TestPid = self(), + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + jiffy:encode(#{messageIds => [<<"6058891368195201">>]}), + Req + ), + {ok, Rep, State} + end. + +start_echo_http_server() -> + HTTPHost = "localhost", + HTTPPort = 56000, + HTTPPath = <<"/v1/projects/myproject/topics/mytopic:publish">>, + ServerSSLOpts = + [ + {verify, verify_none}, + {versions, ['tlsv1.2', 'tlsv1.3']}, + {ciphers, ["ECDHE-RSA-AES256-GCM-SHA384", "TLS_CHACHA20_POLY1305_SHA256"]} + ] ++ certs(), + {ok, _} = emqx_connector_web_hook_server:start_link(HTTPPort, HTTPPath, ServerSSLOpts), + ok = emqx_connector_web_hook_server:set_handler(success_http_handler()), + {ok, #{ + host_port => HTTPHost ++ ":" ++ integer_to_list(HTTPPort), + host => HTTPHost, + port => HTTPPort + }}. + +certs() -> + CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"), + [ + {keyfile, filename:join([CertsPath, "key.pem"])}, + {certfile, filename:join([CertsPath, "cert.pem"])}, + {cacertfile, filename:join([CertsPath, "cacert.pem"])} + ]. + +gcp_pubsub_config(Config) -> + EnableBatch = proplists:get_value(enable_batch, Config, true), + QueryMode = proplists:get_value(query_mode, Config, sync), + EnableQueue = proplists:get_value(enable_queue, Config, false), + BatchSize = proplists:get_value(batch_size, Config, 100), + BatchTime = proplists:get_value(batch_time, Config, <<"20ms">>), + PayloadTemplate = proplists:get_value(payload_template, Config, ""), + PubSubTopic = proplists:get_value(pubsub_topic, Config, <<"mytopic">>), + PipelineSize = proplists:get_value(pipeline_size, Config, 100), + ServiceAccountJSON = proplists:get_value(pubsub_topic, Config, generate_service_account_json()), + ServiceAccountJSONStr = emqx_json:encode(ServiceAccountJSON), + GUID = emqx_guid:to_hexstr(emqx_guid:gen()), + Name = <<(atom_to_binary(?MODULE))/binary, (GUID)/binary>>, + ConfigString = + io_lib:format( + "bridges.gcp_pubsub.~s {\n" + " enable = true\n" + " connect_timeout = 1s\n" + " request_timeout = 1s\n" + " service_account_json = ~s\n" + " payload_template = ~p\n" + " pubsub_topic = ~s\n" + " pool_size = 1\n" + " pipelining = ~b\n" + " resource_opts = {\n" + " worker_pool_size = 1\n" + " enable_batch = ~p\n" + " query_mode = ~s\n" + " enable_queue = ~p\n" + " batch_size = ~b\n" + " batch_time = \"~s\"\n" + " }\n" + "}\n", + [ + Name, + ServiceAccountJSONStr, + PayloadTemplate, + PubSubTopic, + PipelineSize, + EnableBatch, + QueryMode, + EnableQueue, + BatchSize, + BatchTime + ] + ), + #{ + name => Name, + config_string => ConfigString, + pubsub_config => parse_and_check(ConfigString, Name), + service_account_json => ServiceAccountJSON + }. + +parse_and_check(ConfigString, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = <<"gcp_pubsub">>, + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +generate_service_account_json() -> + PrivateKeyPEM = generate_private_key_pem(), + service_account_json(PrivateKeyPEM). + +generate_private_key_pem() -> + PublicExponent = 65537, + Size = 2048, + Key = public_key:generate_key({rsa, Size, PublicExponent}), + DERKey = public_key:der_encode('PrivateKeyInfo', Key), + public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]). + +service_account_json(PrivateKeyPEM) -> + #{ + <<"type">> => <<"service_account">>, + <<"project_id">> => <<"myproject">>, + <<"private_key_id">> => <<"kid">>, + <<"private_key">> => PrivateKeyPEM, + <<"client_email">> => <<"test@myproject.iam.gserviceaccount.com">>, + <<"client_id">> => <<"123812831923812319190">>, + <<"auth_uri">> => <<"https://accounts.google.com/o/oauth2/auth">>, + <<"token_uri">> => <<"https://oauth2.googleapis.com/token">>, + <<"auth_provider_x509_cert_url">> => <<"https://www.googleapis.com/oauth2/v1/certs">>, + <<"client_x509_cert_url">> => + <<"https://www.googleapis.com/robot/v1/metadata/x509/test%40myproject.iam.gserviceaccount.com">> + }. + +metrics_mapping() -> + #{ + batching => fun emqx_resource_metrics:batching_get/1, + dropped => fun emqx_resource_metrics:dropped_get/1, + dropped_other => fun emqx_resource_metrics:dropped_other_get/1, + dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1, + dropped_queue_not_enabled => fun emqx_resource_metrics:dropped_queue_not_enabled_get/1, + dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1, + dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1, + failed => fun emqx_resource_metrics:failed_get/1, + inflight => fun emqx_resource_metrics:inflight_get/1, + matched => fun emqx_resource_metrics:matched_get/1, + queuing => fun emqx_resource_metrics:queuing_get/1, + retried => fun emqx_resource_metrics:retried_get/1, + retried_failed => fun emqx_resource_metrics:retried_failed_get/1, + retried_success => fun emqx_resource_metrics:retried_success_get/1, + success => fun emqx_resource_metrics:success_get/1 + }. + +current_metrics(ResourceId) -> + Mapping = metrics_mapping(), + maps:from_list([ + {Metric, F(ResourceId)} + || {Metric, F} <- maps:to_list(Mapping) + ]). + +assert_metrics(ExpectedMetrics, ResourceId) -> + Mapping = metrics_mapping(), + Metrics = + lists:foldl( + fun(Metric, Acc) -> + #{Metric := Fun} = Mapping, + Value = Fun(ResourceId), + Acc#{Metric => Value} + end, + #{}, + maps:keys(ExpectedMetrics) + ), + CurrentMetrics = current_metrics(ResourceId), + ?assertEqual(ExpectedMetrics, Metrics, #{current_metrics => CurrentMetrics}), + ok. + +assert_empty_metrics(ResourceId) -> + Mapping = metrics_mapping(), + ExpectedMetrics = + lists:foldl( + fun(Metric, Acc) -> + Acc#{Metric => 0} + end, + #{}, + maps:keys(Mapping) + ), + assert_metrics(ExpectedMetrics, ResourceId). + +verify_token(ServiceAccountJSON, JWTBin) -> + #{ + <<"private_key">> := PrivateKeyPEM, + <<"private_key_id">> := KId, + <<"client_email">> := ServiceAccountEmail + } = ServiceAccountJSON, + JWK = jose_jwk:from_pem(PrivateKeyPEM), + {IsValid, JWT, JWS} = jose_jwt:verify(JWK, JWTBin), + ?assertMatch( + #jose_jwt{ + fields = + #{ + <<"aud">> := <<"https://pubsub.googleapis.com/">>, + <<"exp">> := _, + <<"iat">> := _, + <<"iss">> := ServiceAccountEmail, + <<"sub">> := ServiceAccountEmail + } + }, + JWT + ), + #jose_jwt{ + fields = + #{ + <<"exp">> := Exp, + <<"iat">> := Iat + } + } = JWT, + ?assertEqual(Iat + 60 * 60, Exp), + ?assert(Iat =< erlang:system_time(seconds)), + ?assertMatch( + #jose_jws{ + alg = {_Module, 'RS256'}, + fields = + #{ + <<"kid">> := KId, + <<"typ">> := <<"JWT">> + } + }, + JWS + ), + ?assert(IsValid, #{ + jwt => JWT, + jws => JWS + }), + ok. + +assert_valid_request_headers(Headers, ServiceAccountJSON) -> + case Headers of + #{<<"authorization">> := <<"Bearer ", JWT/binary>>} -> + verify_token(ServiceAccountJSON, JWT), + ok; + _ -> + %% better to raise a value than to use `ct:fail' + %% because it doesn't output very well... + error({expected_bearer_authn_header, #{headers => Headers}}) + end. + +assert_valid_request_body(Body) -> + BodyMap = emqx_json:decode(Body, [return_maps]), + ?assertMatch(#{<<"messages">> := [_ | _]}, BodyMap), + #{<<"messages">> := Messages} = BodyMap, + lists:map( + fun(Msg) -> + ?assertMatch(#{<<"data">> := <<_/binary>>}, Msg), + #{<<"data">> := Content64} = Msg, + Content = base64:decode(Content64), + Decoded = emqx_json:decode(Content, [return_maps]), + ct:pal("decoded payload: ~p", [Decoded]), + ?assert(is_map(Decoded)), + Decoded + end, + Messages + ). + +assert_http_request(ServiceAccountJSON) -> + receive + {http, Headers, Body} -> + assert_valid_request_headers(Headers, ServiceAccountJSON), + assert_valid_request_body(Body) + after 5_000 -> + {messages, Mailbox} = process_info(self(), messages), + error({timeout, #{mailbox => Mailbox}}) + end. + +install_telemetry_handler(TestCase) -> + Tid = ets:new(TestCase, [ordered_set, public]), + HandlerId = TestCase, + TestPid = self(), + _ = telemetry:attach_many( + HandlerId, + emqx_resource_metrics:events(), + fun(EventName, Measurements, Metadata, _Config) -> + Data = #{ + name => EventName, + measurements => Measurements, + metadata => Metadata + }, + ets:insert(Tid, {erlang:monotonic_time(), Data}), + TestPid ! {telemetry, Data}, + ok + end, + unused_config + ), + on_exit(fun() -> + telemetry:detach(HandlerId), + ets:delete(Tid) + end), + Tid. + +wait_telemetry_event(TelemetryTable, EventName, ResourceId) -> + wait_telemetry_event(TelemetryTable, EventName, ResourceId, #{timeout => 5_000, n_events => 1}). + +wait_telemetry_event( + TelemetryTable, + EventName, + ResourceId, + _Opts = #{ + timeout := Timeout, + n_events := NEvents + } +) -> + wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName). + +wait_n_events(_TelemetryTable, _ResourceId, NEvents, _Timeout, _EventName) when NEvents =< 0 -> + ok; +wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) -> + receive + {telemetry, #{name := [_, _, EventName]}} -> + wait_n_events(TelemetryTable, ResourceId, NEvents - 1, Timeout, EventName) + after Timeout -> + RecordedEvents = ets:tab2list(TelemetryTable), + CurrentMetrics = current_metrics(ResourceId), + ct:pal("recorded events: ~p", [RecordedEvents]), + ct:pal("current metrics: ~p", [CurrentMetrics]), + error({timeout_waiting_for_telemetry, EventName}) + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_publish_success(Config) -> + ResourceId = ?config(resource_id, Config), + ServiceAccountJSON = ?config(service_account_json, Config), + TelemetryTable = ?config(telemetry_table, Config), + Topic = <<"t/topic">>, + ?check_trace( + create_bridge(Config), + fun(Res, Trace) -> + ?assertMatch({ok, _}, Res), + ?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_created, Trace)), + ok + end + ), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + emqx:publish(Message), + DecodedMessages = assert_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"topic">> := Topic, + <<"payload">> := Payload, + <<"metadata">> := #{<<"rule_id">> := RuleId} + } + ], + DecodedMessages + ), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, success, ResourceId), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 1 + }, + ResourceId + ), + ok. + +t_publish_success_local_topic(Config) -> + ResourceId = ?config(resource_id, Config), + ServiceAccountJSON = ?config(service_account_json, Config), + TelemetryTable = ?config(telemetry_table, Config), + LocalTopic = <<"local/topic">>, + {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}), + assert_empty_metrics(ResourceId), + Payload = <<"payload">>, + Message = emqx_message:make(LocalTopic, Payload), + emqx:publish(Message), + DecodedMessages = assert_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"topic">> := LocalTopic, + <<"payload">> := Payload + } + ], + DecodedMessages + ), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, success, ResourceId), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 1 + }, + ResourceId + ), + ok. + +t_create_via_http(Config) -> + ?check_trace( + create_bridge_http(Config), + fun(Res, Trace) -> + ?assertMatch({ok, _}, Res), + ?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_created, Trace)), + ok + end + ), + ok. + +t_publish_templated(Config) -> + ResourceId = ?config(resource_id, Config), + ServiceAccountJSON = ?config(service_account_json, Config), + TelemetryTable = ?config(telemetry_table, Config), + Topic = <<"t/topic">>, + PayloadTemplate = << + "{\"payload\": \"${payload}\"," + " \"pub_props\": ${pub_props}}" + >>, + ?check_trace( + create_bridge( + Config, + #{<<"payload_template">> => PayloadTemplate} + ), + fun(Res, Trace) -> + ?assertMatch({ok, _}, Res), + ?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_created, Trace)), + ok + end + ), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + Payload = <<"payload">>, + Message = + emqx_message:set_header( + properties, + #{'User-Property' => #{'Correlation-Data' => <<"321">>}}, + emqx_message:make(Topic, Payload) + ), + emqx:publish(Message), + DecodedMessages = assert_http_request(ServiceAccountJSON), + ?assertMatch( + [ + #{ + <<"payload">> := Payload, + <<"pub_props">> := #{ + <<"User-Property">> := + #{ + <<"Correlation-Data">> := + <<"321">> + } + } + } + ], + DecodedMessages + ), + %% to avoid test flakiness + wait_telemetry_event(TelemetryTable, success, ResourceId), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 1 + }, + ResourceId + ), + ok. + +t_publish_success_batch(Config) -> + ResourceId = ?config(resource_id, Config), + ServiceAccountJSON = ?config(service_account_json, Config), + TelemetryTable = ?config(telemetry_table, Config), + Topic = <<"t/topic">>, + BatchSize = 5, + %% to give it time to form a batch + BatchTime = <<"2s">>, + ?assertMatch( + {ok, _}, + create_bridge( + Config, + #{ + <<"resource_opts">> => + #{ + <<"batch_size">> => BatchSize, + <<"batch_time">> => BatchTime + } + } + ) + ), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + NumMessages = BatchSize * 2, + Messages = [emqx_message:make(Topic, integer_to_binary(N)) || N <- lists:seq(1, NumMessages)], + %% publish in parallel to avoid each client blocking and then + %% making 1-sized batches. also important to note that the pool + %% size for the resource (replayq buffering) must be set to 1 to + %% avoid further segmentation of batches. + emqx_misc:pmap(fun emqx:publish/1, Messages), + DecodedMessages0 = assert_http_request(ServiceAccountJSON), + ?assertEqual(BatchSize, length(DecodedMessages0)), + DecodedMessages1 = assert_http_request(ServiceAccountJSON), + ?assertEqual(BatchSize, length(DecodedMessages1)), + PublishedPayloads = + sets:from_list( + [P || #{<<"payload">> := P} <- DecodedMessages0 ++ DecodedMessages1], + [{version, 2}] + ), + ExpectedPayloads = + sets:from_list( + lists:map(fun integer_to_binary/1, lists:seq(1, NumMessages)), + [{version, 2}] + ), + ?assertEqual(ExpectedPayloads, PublishedPayloads), + wait_telemetry_event( + TelemetryTable, + success, + ResourceId, + #{timeout => 15_000, n_events => NumMessages} + ), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 0, + inflight => 0, + matched => NumMessages div BatchSize, + queuing => 0, + retried => 0, + success => NumMessages + }, + ResourceId + ), + ok. + +t_not_a_json(Config) -> + ?assertMatch( + {error, + {_, [ + #{ + kind := validation_error, + reason := #{exception := {error, {badmap, "not a json"}}}, + %% should be censored as it contains secrets + value := <<"******">> + } + ]}}, + create_bridge( + Config, + #{ + <<"service_account_json">> => <<"not a json">> + } + ) + ), + ok. + +t_not_of_service_account_type(Config) -> + ?assertMatch( + {error, + {_, [ + #{ + kind := validation_error, + reason := {wrong_type, <<"not a service account">>}, + %% should be censored as it contains secrets + value := <<"******">> + } + ]}}, + create_bridge( + Config, + #{ + <<"service_account_json">> => #{<<"type">> => <<"not a service account">>} + } + ) + ), + ok. + +t_json_missing_fields(Config) -> + GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config), + ?assertMatch( + {error, + {_, [ + #{ + kind := validation_error, + reason := + {missing_keys, [ + <<"client_email">>, + <<"private_key">>, + <<"private_key_id">>, + <<"project_id">>, + <<"type">> + ]}, + %% should be censored as it contains secrets + value := <<"******">> + } + ]}}, + create_bridge([ + {gcp_pubsub_config, GCPPubSubConfig0#{<<"service_account_json">> := #{}}} + | Config + ]) + ), + ok. + +t_invalid_private_key(Config) -> + InvalidPrivateKeyPEM = <<"xxxxxx">>, + ?check_trace( + begin + {Res, {ok, _Event}} = + ?wait_async_action( + create_bridge( + Config, + #{ + <<"service_account_json">> => + #{<<"private_key">> => InvalidPrivateKeyPEM} + } + ), + #{?snk_kind := gcp_pubsub_bridge_jwt_worker_failed_to_start}, + 20_000 + ), + Res + end, + fun(Res, Trace) -> + ?assertMatch({ok, _}, Res), + ?assertMatch( + [#{reason := Reason}] when + Reason =:= noproc orelse + Reason =:= {shutdown, {error, empty_key}}, + ?of_kind(gcp_pubsub_bridge_jwt_worker_failed_to_start, Trace) + ), + ?assertMatch( + [#{error := empty_key}], + ?of_kind(connector_jwt_worker_startup_error, Trace) + ), + ok + end + ), + ok. + +t_jwt_worker_start_timeout(Config) -> + InvalidPrivateKeyPEM = <<"xxxxxx">>, + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := will_never_happen}, + #{?snk_kind := connector_jwt_worker_make_key} + ), + {Res, {ok, _Event}} = + ?wait_async_action( + create_bridge( + Config, + #{ + <<"service_account_json">> => + #{<<"private_key">> => InvalidPrivateKeyPEM} + } + ), + #{?snk_kind := gcp_pubsub_bridge_jwt_timeout}, + 20_000 + ), + Res + end, + fun(Res, Trace) -> + ?assertMatch({ok, _}, Res), + ?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_timeout, Trace)), + ok + end + ), + ok. + +t_publish_econnrefused(Config) -> + ResourceId = ?config(resource_id, Config), + %% set pipelining to 1 so that one of the 2 requests is `pending' + %% in ehttpc. + {ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + ok = emqx_connector_web_hook_server:stop(), + do_econnrefused_or_timeout_test(Config, econnrefused). + +t_publish_timeout(Config) -> + ResourceId = ?config(resource_id, Config), + %% set pipelining to 1 so that one of the 2 requests is `pending' + %% in ehttpc. also, we set the batch size to 1 to also ensure the + %% requests are done separately. + {ok, _} = create_bridge(Config, #{ + <<"pipelining">> => 1, + <<"resource_opts">> => #{<<"batch_size">> => 1} + }), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + assert_empty_metrics(ResourceId), + TestPid = self(), + TimeoutHandler = + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + %% NOTE: cannot just hang forever; ehttpc will never + %% reply `sent' requests, so the callback will never + %% be called... We just delay responding so that a + %% late response is delivered. + timer:sleep(timer:seconds(3)), + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + jiffy:encode(#{messageIds => [<<"6058891368195201">>]}), + Req + ), + {ok, Rep, State} + end, + ok = emqx_connector_web_hook_server:set_handler(TimeoutHandler), + do_econnrefused_or_timeout_test(Config, timeout). + +do_econnrefused_or_timeout_test(Config, Error) -> + EnableQueue = ?config(enable_queue, Config), + QueryMode = ?config(query_mode, Config), + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + Topic = <<"t/topic">>, + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + ?check_trace( + begin + case {QueryMode, Error} of + {sync, _} -> + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_request_failed, recoverable_error := true}, + 15_000 + ); + {async, econnrefused} -> + %% at the time of writing, async requests + %% are never considered expired by ehttpc + %% (even if they arrive late, or never + %% arrive at all). + %% so, we set pipelining to 1 and shoot 2 + %% requests, so that the second one may expire. + {_, {ok, _}} = + ?wait_async_action( + begin + emqx:publish(Message), + emqx:publish(Message) + end, + #{ + ?snk_kind := gcp_pubsub_request_failed, + query_mode := async, + recoverable_error := true + }, + 15_000 + ); + {async, timeout} -> + %% at the time of writing, async requests + %% are never considered expired by ehttpc + %% (even if they arrive late, or never + %% arrive at all). + %% with the timeout delay, this'll succeed. + emqx:publish(Message), + emqx:publish(Message), + {ok, _} = snabbkaffe:block_until( + ?match_n_events(2, #{ + ?snk_kind := gcp_pubsub_response, + query_mode := async + }), + _Timeout1 = 15_000 + ) + end + end, + fun(Trace) -> + case {QueryMode, Error} of + {sync, _} -> + ?assertMatch( + [#{reason := Error, connector := ResourceId} | _], + ?of_kind(gcp_pubsub_request_failed, Trace) + ); + {async, econnrefused} -> + ?assertMatch( + [#{reason := Error, connector := ResourceId} | _], + ?of_kind(gcp_pubsub_request_failed, Trace) + ); + {async, timeout} -> + ?assertMatch( + [_, _ | _], + ?of_kind(gcp_pubsub_response, Trace) + ) + end, + ok + end + ), + + case {Error, QueryMode, EnableQueue} of + {_, sync, false} -> + wait_telemetry_event(TelemetryTable, dropped_queue_not_enabled, ResourceId, #{ + timeout => 10_000, + n_events => 1 + }), + assert_metrics( + #{ + batching => 0, + dropped => 1, + dropped_queue_not_enabled => 1, + failed => 0, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 0 + }, + ResourceId + ); + %% apparently, async with disabled queue doesn't mark the + %% message as dropped; and since it never considers the + %% response expired, this succeeds. + {econnrefused, async, _} -> + wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ + timeout => 10_000, n_events => 2 + }), + CurrentMetrics = current_metrics(ResourceId), + RecordedEvents = ets:tab2list(TelemetryTable), + ct:pal("telemetry events: ~p", [RecordedEvents]), + ?assertMatch( + #{ + batching := 0, + dropped := Dropped, + failed := 0, + inflight := Inflight, + matched := Matched, + queuing := Queueing, + retried := 0, + success := 0 + } when Matched >= 1 andalso Inflight + Queueing + Dropped =< 2, + CurrentMetrics + ); + {timeout, async, _} -> + wait_telemetry_event(TelemetryTable, success, ResourceId, #{ + timeout => 10_000, n_events => 2 + }), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 0, + inflight => 0, + matched => 2, + queuing => 0, + retried => 0, + success => 2 + }, + ResourceId + ); + {_, sync, true} -> + wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ + timeout => 10_000, n_events => 2 + }), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 0, + inflight => 0, + matched => 1, + queuing => 1, + retried => 0, + success => 0 + }, + ResourceId + ) + end, + + ok. + +%% for complete coverage; pubsub actually returns a body with message +%% ids +t_success_no_body(Config) -> + TestPid = self(), + SuccessNoBodyHandler = + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<>>, + Req + ), + {ok, Rep, State} + end, + ok = emqx_connector_web_hook_server:set_handler(SuccessNoBodyHandler), + Topic = <<"t/topic">>, + {ok, _} = create_bridge(Config), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + ?check_trace( + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_response}, + 5_000 + ), + fun(Trace) -> + ?assertMatch( + [#{response := {ok, 200, _Headers}}], + ?of_kind(gcp_pubsub_response, Trace) + ), + ok + end + ), + ok. + +t_failure_with_body(Config) -> + TestPid = self(), + FailureWithBodyHandler = + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"application/json">>}, + jiffy:encode(#{}), + Req + ), + {ok, Rep, State} + end, + ok = emqx_connector_web_hook_server:set_handler(FailureWithBodyHandler), + Topic = <<"t/topic">>, + {ok, _} = create_bridge(Config), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + ?check_trace( + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_response}, + 5_000 + ), + fun(Trace) -> + ?assertMatch( + [#{response := {ok, 400, _Headers, _Body}}], + ?of_kind(gcp_pubsub_response, Trace) + ), + ok + end + ), + ok. + +t_failure_no_body(Config) -> + TestPid = self(), + FailureNoBodyHandler = + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"application/json">>}, + <<>>, + Req + ), + {ok, Rep, State} + end, + ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler), + Topic = <<"t/topic">>, + {ok, _} = create_bridge(Config), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + ?check_trace( + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_response}, + 5_000 + ), + fun(Trace) -> + ?assertMatch( + [#{response := {ok, 400, _Headers}}], + ?of_kind(gcp_pubsub_response, Trace) + ), + ok + end + ), + ok. + +t_unrecoverable_error(Config) -> + ResourceId = ?config(resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), + QueryMode = ?config(query_mode, Config), + TestPid = self(), + FailureNoBodyHandler = + fun(Req0, State) -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + %% kill the gun process while it's waiting for the + %% response so we provoke an `{error, _}' response from + %% ehttpc. + lists:foreach( + fun(Pid) -> exit(Pid, kill) end, + [Pid || {_, Pid, _, _} <- supervisor:which_children(gun_sup)] + ), + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<>>, + Req + ), + {ok, Rep, State} + end, + ok = emqx_connector_web_hook_server:set_handler(FailureNoBodyHandler), + Topic = <<"t/topic">>, + {ok, _} = create_bridge(Config), + assert_empty_metrics(ResourceId), + {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + Payload = <<"payload">>, + Message = emqx_message:make(Topic, Payload), + ?check_trace( + {_, {ok, _}} = + case QueryMode of + sync -> + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_request_failed}, + 5_000 + ); + async -> + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := gcp_pubsub_response}, + 5_000 + ) + end, + fun(Trace) -> + case QueryMode of + sync -> + ?assertMatch( + [#{reason := killed}], + ?of_kind(gcp_pubsub_request_failed, Trace) + ); + async -> + ?assertMatch( + [#{response := {error, killed}}], + ?of_kind(gcp_pubsub_response, Trace) + ) + end, + ok + end + ), + wait_telemetry_event(TelemetryTable, failed, ResourceId), + assert_metrics( + #{ + batching => 0, + dropped => 0, + failed => 1, + inflight => 0, + matched => 1, + queuing => 0, + retried => 0, + success => 0 + }, + ResourceId + ), + ok. + +t_failed_to_start_jwt_worker(Config) -> + ?check_trace( + emqx_common_test_helpers:with_mock( + emqx_connector_jwt_sup, + ensure_worker_present, + fun(_JWTWorkerId, _Config) -> {error, restarting} end, + fun() -> + ?assertMatch({ok, _}, create_bridge(Config)) + end + ), + fun(Trace) -> + ?assertMatch( + [#{reason := {error, restarting}}], + ?of_kind(gcp_pubsub_bridge_jwt_worker_failed_to_start, Trace) + ), + ok + end + ), + ok. + +t_stop(Config) -> + Name = ?config(gcp_pubsub_name, Config), + {ok, _} = create_bridge(Config), + ?check_trace( + ?wait_async_action( + emqx_bridge_resource:stop(?BRIDGE_TYPE, Name), + #{?snk_kind := gcp_pubsub_stop}, + 5_000 + ), + fun(Res, Trace) -> + ?assertMatch({ok, {ok, _}}, Res), + ?assertMatch([_], ?of_kind(gcp_pubsub_stop, Trace)), + ok + end + ), + ok. + +t_get_status_ok(Config) -> + ResourceId = ?config(resource_id, Config), + {ok, _} = create_bridge(Config), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + ok. + +t_get_status_no_worker(Config) -> + ResourceId = ?config(resource_id, Config), + {ok, _} = create_bridge(Config), + emqx_common_test_helpers:with_mock( + ehttpc, + workers, + fun(_Poolname) -> [] end, + fun() -> + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ok + end + ), + ok. + +t_get_status_down(Config) -> + ResourceId = ?config(resource_id, Config), + {ok, _} = create_bridge(Config), + emqx_common_test_helpers:with_mock( + ehttpc, + health_check, + fun(_Worker, _Timeout) -> + {error, connect_timeout} + end, + fun() -> + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ok + end + ), + ok. + +t_get_status_timeout_calling_workers(Config) -> + ResourceId = ?config(resource_id, Config), + {ok, _} = create_bridge(Config), + emqx_common_test_helpers:with_mock( + ehttpc, + health_check, + fun(_Worker, _Timeout) -> + receive + after infinity -> error(impossible) + end + end, + fun() -> + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), + ok + end + ), + ok. + +t_on_start_ehttpc_pool_already_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := pool_started}, + #{?snk_kind := gcp_pubsub_starting_ehttpc_pool} + ), + {ok, SubRef} = + snabbkaffe:subscribe( + fun + (#{?snk_kind := gcp_pubsub_on_start_before_starting_pool}) -> true; + (_) -> false + end, + 5_000 + ), + spawn_link(fun() -> {ok, _} = create_bridge(Config) end), + {ok, [#{pool_name := PoolName, pool_opts := PoolOpts}]} = snabbkaffe:receive_events( + SubRef + ), + ?assertMatch({ok, _}, ehttpc_sup:start_pool(PoolName, PoolOpts)), + ?tp(pool_started, #{}), + ?block_until(#{?snk_kind := gcp_pubsub_ehttpc_pool_already_started}, 2_000), + PoolName + end, + fun(PoolName, Trace) -> + ?assertMatch( + [#{pool_name := PoolName}], + ?of_kind(gcp_pubsub_ehttpc_pool_already_started, Trace) + ), + ok + end + ), + ok. + +t_on_start_ehttpc_pool_start_failure(Config) -> + ?check_trace( + emqx_common_test_helpers:with_mock( + ehttpc_sup, + start_pool, + fun(_PoolName, _PoolOpts) -> {error, some_error} end, + fun() -> + {ok, _} = create_bridge(Config) + end + ), + fun(Trace) -> + ?assertMatch( + [#{reason := some_error}], + ?of_kind(gcp_pubsub_ehttpc_pool_start_failure, Trace) + ), + ok + end + ), + ok. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index c2ac45551..3a7339e27 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -53,6 +53,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + delete_all_bridges(), ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]), _ = application:stop(emqx_connector), @@ -222,8 +223,6 @@ end_per_group(_Group, _Config) -> ok. init_per_testcase(_Testcase, Config) -> - %% catch clear_db(Config), - %% delete_bridge(Config), delete_all_bridges(), Config. @@ -232,8 +231,6 @@ end_per_testcase(_Testcase, Config) -> ProxyPort = ?config(proxy_port, Config), ok = snabbkaffe:stop(), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - %% catch clear_db(Config), - %% delete_bridge(Config), delete_all_bridges(), ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl new file mode 100644 index 000000000..139cb89e9 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -0,0 +1,599 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_gcp_pubsub). + +-behaviour(emqx_resource). + +-include_lib("emqx_connector/include/emqx_connector_tables.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_query_async/4, + on_batch_query/3, + on_batch_query_async/4, + on_get_status/2, + is_buffer_supported/0 +]). +-export([reply_delegator/3]). + +-type bridge_id() :: binary(). +-type jwt_worker() :: binary(). +-type service_account_json() :: emqx_ee_bridge_gcp_pubsub:service_account_json(). +-type config() :: #{ + connect_timeout := emqx_schema:duration_ms(), + max_retries := non_neg_integer(), + pubsub_topic := binary(), + request_timeout := emqx_schema:duration_ms(), + service_account_json := service_account_json(), + any() => term() +}. +-type state() :: #{ + connect_timeout := timer:time(), + instance_id := manager_id(), + jwt_worker_id := jwt_worker(), + max_retries := non_neg_integer(), + payload_template := emqx_plugin_libs_rule:tmpl_token(), + pool_name := atom(), + project_id := binary(), + pubsub_topic := binary(), + request_timeout := timer:time() +}. +-type headers() :: [{binary(), iodata()}]. +-type body() :: iodata(). +-type status_code() :: 100..599. + +-define(DEFAULT_PIPELINE_SIZE, 100). + +%%------------------------------------------------------------------------------------------------- +%% emqx_resource API +%%------------------------------------------------------------------------------------------------- + +%% TODO: check +is_buffer_supported() -> false. + +callback_mode() -> async_if_possible. + +-spec on_start(manager_id(), config()) -> {ok, state()} | {error, term()}. +on_start( + InstanceId, + #{ + connect_timeout := ConnectTimeout, + max_retries := MaxRetries, + payload_template := PayloadTemplate, + pool_size := PoolSize, + pubsub_topic := PubSubTopic, + request_timeout := RequestTimeout + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_gcp_pubsub_bridge", + connector => InstanceId, + config => Config + }), + %% emulating the emulator behavior + %% https://cloud.google.com/pubsub/docs/emulator + HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"), + {Host, Port} = emqx_connector_schema_lib:parse_server( + HostPort, #{host_type => hostname, default_port => 443} + ), + PoolType = random, + Transport = tls, + TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}), + NTransportOpts = emqx_misc:ipv6_probe(TransportOpts), + PoolOpts = [ + {host, Host}, + {port, Port}, + {connect_timeout, ConnectTimeout}, + {keepalive, 30_000}, + {pool_type, PoolType}, + {pool_size, PoolSize}, + {transport, Transport}, + {transport_opts, NTransportOpts}, + {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)} + ], + #{ + jwt_worker_id := JWTWorkerId, + project_id := ProjectId + } = ensure_jwt_worker(InstanceId, Config), + PoolName = emqx_plugin_libs_pool:pool_name(InstanceId), + State = #{ + connect_timeout => ConnectTimeout, + instance_id => InstanceId, + jwt_worker_id => JWTWorkerId, + max_retries => MaxRetries, + payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), + pool_name => PoolName, + project_id => ProjectId, + pubsub_topic => PubSubTopic, + request_timeout => RequestTimeout + }, + ?tp( + gcp_pubsub_on_start_before_starting_pool, + #{ + instance_id => InstanceId, + pool_name => PoolName, + pool_opts => PoolOpts + } + ), + ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => PoolName}), + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> + {ok, State}; + {error, {already_started, _}} -> + ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => PoolName}), + {ok, State}; + {error, Reason} -> + ?tp(gcp_pubsub_ehttpc_pool_start_failure, #{ + pool_name => PoolName, + reason => Reason + }), + {error, Reason} + end. + +-spec on_stop(manager_id(), state()) -> ok | {error, term()}. +on_stop( + InstanceId, + _State = #{ + jwt_worker_id := JWTWorkerId, + pool_name := PoolName + } +) -> + ?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}), + ?SLOG(info, #{ + msg => "stopping_gcp_pubsub_bridge", + connector => InstanceId + }), + emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), + ehttpc_sup:stop_pool(PoolName). + +-spec on_query( + bridge_id(), + {send_message, map()}, + state() +) -> + {ok, status_code(), headers()} + | {ok, status_code(), headers(), body()} + | {error, {recoverable_error, term()}} + | {error, term()}. +on_query(BridgeId, {send_message, Selected}, State) -> + Requests = [{send_message, Selected}], + ?TRACE( + "QUERY_SYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => BridgeId, state => State} + ), + do_send_requests_sync(State, Requests, BridgeId). + +-spec on_query_async( + bridge_id(), + {send_message, map()}, + {ReplyFun :: function(), Args :: list()}, + state() +) -> ok. +on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) -> + Requests = [{send_message, Selected}], + ?TRACE( + "QUERY_ASYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => BridgeId, state => State} + ), + do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId). + +-spec on_batch_query( + bridge_id(), + [{send_message, map()}], + state() +) -> + {ok, status_code(), headers()} + | {ok, status_code(), headers(), body()} + | {error, {recoverable_error, term()}} + | {error, term()}. +on_batch_query(BridgeId, Requests, State) -> + ?TRACE( + "QUERY_SYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => BridgeId, state => State} + ), + do_send_requests_sync(State, Requests, BridgeId). + +-spec on_batch_query_async( + bridge_id(), + [{send_message, map()}], + {ReplyFun :: function(), Args :: list()}, + state() +) -> ok. +on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) -> + ?TRACE( + "QUERY_ASYNC", + "gcp_pubsub_received", + #{requests => Requests, connector => BridgeId, state => State} + ), + do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId). + +-spec on_get_status(manager_id(), state()) -> connected | disconnected. +on_get_status(InstanceId, State) -> + #{ + connect_timeout := Timeout, + pool_name := PoolName + } = State, + case do_get_status(InstanceId, PoolName, Timeout) of + true -> + connected; + false -> + ?SLOG(error, #{ + msg => "gcp_pubsub_bridge_get_status_failed", + state => State + }), + disconnected + end. + +%%------------------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------------------- + +-spec ensure_jwt_worker(manager_id(), config()) -> + #{ + jwt_worker_id := jwt_worker(), + project_id := binary() + }. +ensure_jwt_worker(InstanceId, #{ + service_account_json := ServiceAccountJSON, + pubsub_topic := PubSubTopic +}) -> + #{ + project_id := ProjectId, + private_key_id := KId, + private_key := PrivateKeyPEM, + client_email := ServiceAccountEmail + } = ServiceAccountJSON, + %% fixed for pubsub; trailing slash is important. + Aud = <<"https://pubsub.googleapis.com/">>, + ExpirationMS = timer:hours(1), + Alg = <<"RS256">>, + Config = #{ + private_key => PrivateKeyPEM, + resource_id => InstanceId, + expiration => ExpirationMS, + table => ?JWT_TABLE, + iss => ServiceAccountEmail, + sub => ServiceAccountEmail, + aud => Aud, + kid => KId, + alg => Alg + }, + + JWTWorkerId = <<"gcp_pubsub_jwt_worker:", InstanceId/binary>>, + Worker = + case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of + {ok, Worker0} -> + Worker0; + Error -> + ?tp( + gcp_pubsub_bridge_jwt_worker_failed_to_start, + #{instance_id => InstanceId, reason => Error} + ), + ?SLOG(error, #{ + msg => "failed_to_start_gcp_pubsub_jwt_worker", + instance_id => InstanceId, + pubsub_topic => PubSubTopic, + reason => Error + }), + _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), + throw(failed_to_start_jwt_worker) + end, + MRef = monitor(process, Worker), + Ref = emqx_connector_jwt_worker:ensure_jwt(Worker), + + %% to ensure that this resource and its actions will be ready to + %% serve when started, we must ensure that the first JWT has been + %% produced by the worker. + receive + {Ref, token_created} -> + ?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => InstanceId}), + demonitor(MRef, [flush]), + ok; + {'DOWN', MRef, process, Worker, Reason} -> + ?tp( + gcp_pubsub_bridge_jwt_worker_failed_to_start, + #{ + resource_id => InstanceId, + reason => Reason + } + ), + ?SLOG(error, #{ + msg => "gcp_pubsub_bridge_jwt_worker_failed_to_start", + connector => InstanceId, + reason => Reason + }), + _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), + throw(failed_to_start_jwt_worker) + after 10_000 -> + ?tp(gcp_pubsub_bridge_jwt_timeout, #{resource_id => InstanceId}), + ?SLOG(warning, #{ + msg => "gcp_pubsub_bridge_jwt_timeout", + connector => InstanceId + }), + demonitor(MRef, [flush]), + _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), + throw(timeout_creating_jwt) + end, + #{ + jwt_worker_id => JWTWorkerId, + project_id => ProjectId + }. + +-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}. +encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) -> + Interpolated = + case PayloadTemplate of + [] -> emqx_json:encode(Selected); + _ -> emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Selected) + end, + #{data => base64:encode(Interpolated)}. + +-spec to_pubsub_request([#{data := binary()}]) -> binary(). +to_pubsub_request(Payloads) -> + emqx_json:encode(#{messages => Payloads}). + +-spec publish_path(state()) -> binary(). +publish_path( + _State = #{ + project_id := ProjectId, + pubsub_topic := PubSubTopic + } +) -> + <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. + +-spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}]. +get_jwt_authorization_header(InstanceId) -> + case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, InstanceId) of + %% Since we synchronize the JWT creation during resource start + %% (see `on_start/2'), this will be always be populated. + {ok, JWT} -> + [{<<"Authorization">>, <<"Bearer ", JWT/binary>>}] + end. + +-spec do_send_requests_sync( + state(), + [{send_message, map()}], + resource_id() +) -> + {ok, status_code(), headers()} + | {ok, status_code(), headers(), body()} + | {error, {recoverable_error, term()}} + | {error, term()}. +do_send_requests_sync(State, Requests, ResourceId) -> + #{ + pool_name := PoolName, + instance_id := InstanceId, + max_retries := MaxRetries, + request_timeout := RequestTimeout + } = State, + ?tp( + gcp_pubsub_bridge_do_send_requests, + #{ + query_mode => sync, + instance_id => InstanceId, + resource_id => ResourceId, + requests => Requests + } + ), + Headers = get_jwt_authorization_header(InstanceId), + Payloads = + lists:map( + fun({send_message, Selected}) -> + encode_payload(State, Selected) + end, + Requests + ), + Body = to_pubsub_request(Payloads), + Path = publish_path(State), + Method = post, + Request = {Path, Headers, Body}, + case + ehttpc:request( + PoolName, + Method, + Request, + RequestTimeout, + MaxRetries + ) + of + {error, Reason} when + Reason =:= econnrefused; + %% this comes directly from `gun'... + Reason =:= {closed, "The connection was lost."}; + Reason =:= timeout + -> + ?tp( + warning, + gcp_pubsub_request_failed, + #{ + reason => Reason, + query_mode => sync, + recoverable_error => true, + connector => ResourceId + } + ), + {error, {recoverable_error, Reason}}; + {error, Reason} = Result -> + ?tp( + error, + gcp_pubsub_request_failed, + #{ + reason => Reason, + query_mode => sync, + recoverable_error => false, + connector => ResourceId + } + ), + Result; + {ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> + ?tp( + gcp_pubsub_response, + #{ + response => Result, + query_mode => sync, + connector => ResourceId + } + ), + Result; + {ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 -> + ?tp( + gcp_pubsub_response, + #{ + response => Result, + query_mode => sync, + connector => ResourceId + } + ), + Result; + {ok, StatusCode, RespHeaders} = _Result -> + ?tp( + gcp_pubsub_response, + #{ + response => _Result, + query_mode => sync, + connector => ResourceId + } + ), + ?SLOG(error, #{ + msg => "gcp_pubsub_error_response", + request => Request, + connector => ResourceId, + status_code => StatusCode + }), + {error, #{status_code => StatusCode, headers => RespHeaders}}; + {ok, StatusCode, RespHeaders, RespBody} = _Result -> + ?tp( + gcp_pubsub_response, + #{ + response => _Result, + query_mode => sync, + connector => ResourceId + } + ), + ?SLOG(error, #{ + msg => "gcp_pubsub_error_response", + request => Request, + connector => ResourceId, + status_code => StatusCode + }), + {error, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}} + end. + +-spec do_send_requests_async( + state(), + [{send_message, map()}], + {ReplyFun :: function(), Args :: list()}, + resource_id() +) -> ok. +do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> + #{ + pool_name := PoolName, + instance_id := InstanceId, + request_timeout := RequestTimeout + } = State, + ?tp( + gcp_pubsub_bridge_do_send_requests, + #{ + query_mode => async, + instance_id => InstanceId, + resource_id => ResourceId, + requests => Requests + } + ), + Headers = get_jwt_authorization_header(InstanceId), + Payloads = + lists:map( + fun({send_message, Selected}) -> + encode_payload(State, Selected) + end, + Requests + ), + Body = to_pubsub_request(Payloads), + Path = publish_path(State), + Method = post, + Request = {Path, Headers, Body}, + Worker = ehttpc_pool:pick_worker(PoolName), + ok = ehttpc:request_async( + Worker, + Method, + Request, + RequestTimeout, + {fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]} + ). + +-spec reply_delegator( + resource_id(), + {ReplyFun :: function(), Args :: list()}, + term() | {error, econnrefused | timeout | term()} +) -> ok. +reply_delegator(_ResourceId, ReplyFunAndArgs, Result) -> + case Result of + {error, Reason} when + Reason =:= econnrefused; + %% this comes directly from `gun'... + Reason =:= {closed, "The connection was lost."}; + Reason =:= timeout + -> + ?tp( + gcp_pubsub_request_failed, + #{ + reason => Reason, + query_mode => async, + recoverable_error => true, + connector => _ResourceId + } + ), + Result1 = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + _ -> + ?tp( + gcp_pubsub_response, + #{ + response => Result, + query_mode => async, + connector => _ResourceId + } + ), + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + end. + +-spec do_get_status(manager_id(), atom(), timer:time()) -> boolean(). +do_get_status(InstanceId, PoolName, Timeout) -> + Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], + DoPerWorker = + fun(Worker) -> + case ehttpc:health_check(Worker, Timeout) of + ok -> + true; + {error, Reason} -> + ?SLOG(error, #{ + msg => "ehttpc_health_check_failed", + instance_id => InstanceId, + reason => Reason, + worker => Worker + }), + false + end + end, + try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of + [_ | _] = Status -> + lists:all(fun(St) -> St =:= true end, Status); + [] -> + false + catch + exit:timeout -> + false + end. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 36b2ec44d..2c5b8a8fc 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -87,7 +87,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client on_query_async( InstId, {send_message, Data}, - {ReplayFun, Args}, + {ReplyFun, Args}, _State = #{write_syntax := SyntaxLines, client := Client} ) -> case data_to_points(Data, SyntaxLines) of @@ -96,7 +96,7 @@ on_query_async( influxdb_connector_send_query, #{points => Points, batch => false, mode => async} ), - do_async_query(InstId, Client, Points, {ReplayFun, Args}); + do_async_query(InstId, Client, Points, {ReplyFun, Args}); {error, ErrorPoints} = Err -> ?tp( influxdb_connector_send_query_error, @@ -109,7 +109,7 @@ on_query_async( on_batch_query_async( InstId, BatchData, - {ReplayFun, Args}, + {ReplyFun, Args}, #{write_syntax := SyntaxLines, client := Client} ) -> case parse_batch_data(InstId, BatchData, SyntaxLines) of @@ -118,7 +118,7 @@ on_batch_query_async( influxdb_connector_send_query, #{points => Points, batch => true, mode => async} ), - do_async_query(InstId, Client, Points, {ReplayFun, Args}); + do_async_query(InstId, Client, Points, {ReplyFun, Args}); {error, Reason} -> ?tp( influxdb_connector_send_query_error, @@ -336,13 +336,13 @@ do_query(InstId, Client, Points) -> Err end. -do_async_query(InstId, Client, Points, ReplayFunAndArgs) -> +do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "influxdb write point async", connector => InstId, points => Points }), - ok = influxdb:write_async(Client, Points, ReplayFunAndArgs). + ok = influxdb:write_async(Client, Points, ReplyFunAndArgs). %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans diff --git a/scripts/merge-i18n.escript b/scripts/merge-i18n.escript index e98631cfc..751ca841c 100755 --- a/scripts/merge-i18n.escript +++ b/scripts/merge-i18n.escript @@ -10,6 +10,7 @@ main(_) -> Conf = [merge(Conf0, Cfgs1), io_lib:nl() ], + ok = filelib:ensure_dir("apps/emqx_dashboard/priv/i18n.conf"), ok = file:write_file("apps/emqx_dashboard/priv/i18n.conf", Conf). merge(BaseConf, Cfgs) ->