Merge pull request #9449 from thalesmg/gcp-pubsub-ee50

feat(gcp_pubsub): implement GCP PubSub bridge (ee5.0)
This commit is contained in:
Thales Macedo Garitezi 2022-12-14 10:31:12 -03:00 committed by GitHub
commit f827062f0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 3486 additions and 79 deletions

View File

@ -66,6 +66,9 @@
-export([clear_screen/0]). -export([clear_screen/0]).
-export([with_mock/4]). -export([with_mock/4]).
-export([
on_exit/1
]).
%% Toxiproxy API %% Toxiproxy API
-export([ -export([
@ -161,7 +164,17 @@ boot_modules(Mods) ->
-spec start_apps(Apps :: apps()) -> ok. -spec start_apps(Apps :: apps()) -> ok.
start_apps(Apps) -> start_apps(Apps) ->
start_apps(Apps, fun(_) -> ok end). %% to avoid keeping the `db_hostname' that is set when loading
%% `system_monitor' application in `emqx_machine', and then it
%% crashing when trying to connect.
%% FIXME: add an `enable' option to sysmon_top and use that to
%% decide whether to start it or not.
DefaultHandler =
fun(_) ->
application:set_env(system_monitor, db_hostname, ""),
ok
end,
start_apps(Apps, DefaultHandler).
-spec start_apps(Apps :: apps(), Handler :: special_config_handler()) -> ok. -spec start_apps(Apps :: apps(), Handler :: special_config_handler()) -> ok.
start_apps(Apps, SpecAppConfig) when is_function(SpecAppConfig) -> start_apps(Apps, SpecAppConfig) when is_function(SpecAppConfig) ->
@ -920,3 +933,21 @@ latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
[], [],
[{body_format, binary}] [{body_format, binary}]
). ).
%%-------------------------------------------------------------------------------
%% Testcase teardown utilities
%%-------------------------------------------------------------------------------
get_or_spawn_janitor() ->
case get({?MODULE, janitor_proc}) of
undefined ->
{ok, Janitor} = emqx_test_janitor:start_link(),
put({?MODULE, janitor_proc}, Janitor),
Janitor;
Janitor ->
Janitor
end.
on_exit(Fun) ->
Janitor = get_or_spawn_janitor(),
ok = emqx_test_janitor:push_on_exit_callback(Janitor, Fun).

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_test_janitor).
-behaviour(gen_server).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
%% API
-export([
start_link/0,
push_on_exit_callback/2
]).
%%----------------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------------
start_link() ->
gen_server:start_link(?MODULE, self(), []).
push_on_exit_callback(Server, Callback) when is_function(Callback, 0) ->
gen_server:call(Server, {push, Callback}).
%%----------------------------------------------------------------------------------
%% `gen_server' API
%%----------------------------------------------------------------------------------
init(Parent) ->
process_flag(trap_exit, true),
Ref = monitor(process, Parent),
{ok, #{callbacks => [], owner => {Ref, Parent}}}.
terminate(_Reason, #{callbacks := Callbacks}) ->
lists:foreach(fun(Fun) -> Fun() end, Callbacks).
handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
{reply, ok, State#{callbacks := [Callback | Callbacks]}};
handle_call(_Req, _From, State) ->
{reply, error, State}.
handle_cast(_Req, State) ->
{noreply, State}.
handle_info({'DOWN', Ref, process, Parent, _Reason}, State = #{owner := {Ref, Parent}}) ->
{stop, normal, State};
handle_info(_Msg, State) ->
{noreply, State}.

View File

@ -51,9 +51,9 @@
-define(EGRESS_DIR_BRIDGES(T), -define(EGRESS_DIR_BRIDGES(T),
T == webhook; T == webhook;
T == mysql; T == mysql;
T == gcp_pubsub;
T == influxdb_api_v1; T == influxdb_api_v1;
T == influxdb_api_v2 T == influxdb_api_v2
%% T == influxdb_udp
). ).
load() -> load() ->

View File

@ -0,0 +1,17 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(JWT_TABLE, emqx_connector_jwt).

View File

@ -7,6 +7,7 @@
{deps, [ {deps, [
{emqx, {path, "../emqx"}}, {emqx, {path, "../emqx"}},
{emqx_resource, {path, "../emqx_resource"}},
{eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}}, {eldap2, {git, "https://github.com/emqx/eldap2", {tag, "v0.2.2"}}},
{mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}}, {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.1"}}},
{epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}}, {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7-emqx.2"}}},

View File

@ -16,6 +16,7 @@
mysql, mysql,
mongodb, mongodb,
ehttpc, ehttpc,
jose,
emqx, emqx,
emqtt emqtt
]}, ]},

View File

@ -0,0 +1,46 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_jwt).
-include("emqx_connector_tables.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
%% API
-export([
lookup_jwt/1,
lookup_jwt/2
]).
-type jwt() :: binary().
-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}.
lookup_jwt(ResourceId) ->
?MODULE:lookup_jwt(?JWT_TABLE, ResourceId).
-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}.
lookup_jwt(TId, ResourceId) ->
try
case ets:lookup(TId, {ResourceId, jwt}) of
[{{ResourceId, jwt}, JWT}] ->
{ok, JWT};
[] ->
{error, not_found}
end
catch
error:badarg ->
{error, not_found}
end.

View File

@ -0,0 +1,99 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_jwt_sup).
-behaviour(supervisor).
-include("emqx_connector_tables.hrl").
-export([
start_link/0,
ensure_worker_present/2,
ensure_worker_deleted/1
]).
-export([init/1]).
-type worker_id() :: term().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ensure_jwt_table(),
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 5,
auto_shutdown => never
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%% @doc Starts a new JWT worker. The caller should use
%% `emqx_connector_jwt_sup:ensure_jwt/1' to ensure that a JWT has
%% been stored, if synchronization is needed.
-spec ensure_worker_present(worker_id(), map()) ->
{ok, supervisor:child()} | {error, term()}.
ensure_worker_present(Id, Config) ->
ChildSpec = jwt_worker_child_spec(Id, Config),
case supervisor:start_child(?MODULE, ChildSpec) of
{ok, Pid} ->
{ok, Pid};
{error, {already_started, Pid}} ->
{ok, Pid};
{error, already_present} ->
supervisor:restart_child(?MODULE, Id)
end.
%% @doc Stops a given JWT worker by its id.
-spec ensure_worker_deleted(worker_id()) -> ok.
ensure_worker_deleted(Id) ->
case supervisor:terminate_child(?MODULE, Id) of
ok ->
_ = supervisor:delete_child(?MODULE, Id),
ok;
{error, not_found} ->
ok
end.
jwt_worker_child_spec(Id, Config) ->
#{
id => Id,
start => {emqx_connector_jwt_worker, start_link, [Config]},
restart => transient,
type => worker,
significant => false,
shutdown => brutal_kill,
modules => [emqx_connector_jwt_worker]
}.
-spec ensure_jwt_table() -> ok.
ensure_jwt_table() ->
case ets:whereis(?JWT_TABLE) of
undefined ->
Opts = [
named_table,
public,
{read_concurrency, true},
ordered_set
],
_ = ets:new(?JWT_TABLE, Opts),
ok;
_ ->
ok
end.

View File

@ -0,0 +1,237 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_jwt_worker).
-behaviour(gen_server).
%% API
-export([
start_link/1,
ensure_jwt/1
]).
%% gen_server API
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
format_status/1,
format_status/2
]).
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("jose/include/jose_jwk.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-type config() :: #{
private_key := binary(),
resource_id := resource_id(),
expiration := timer:time(),
table := ets:table(),
iss := binary(),
sub := binary(),
aud := binary(),
kid := binary(),
alg := binary()
}.
-type jwt() :: binary().
-type state() :: #{
refresh_timer := undefined | timer:tref(),
resource_id := resource_id(),
expiration := timer:time(),
table := ets:table(),
jwt := undefined | jwt(),
%% only undefined during startup
jwk := undefined | jose_jwk:key(),
iss := binary(),
sub := binary(),
aud := binary(),
kid := binary(),
alg := binary()
}.
-define(refresh_jwt, refresh_jwt).
%%-----------------------------------------------------------------------------------------
%% API
%%-----------------------------------------------------------------------------------------
-spec start_link(config()) -> gen_server:start_ret().
start_link(
#{
private_key := _,
expiration := _,
resource_id := _,
table := _,
iss := _,
sub := _,
aud := _,
kid := _,
alg := _
} = Config
) ->
gen_server:start_link(?MODULE, Config, []).
-spec ensure_jwt(pid()) -> reference().
ensure_jwt(Worker) ->
Ref = alias([reply]),
gen_server:cast(Worker, {ensure_jwt, Ref}),
Ref.
%%-----------------------------------------------------------------------------------------
%% gen_server API
%%-----------------------------------------------------------------------------------------
-spec init(config()) ->
{ok, state(), {continue, {make_key, binary()}}}
| {stop, {error, term()}}.
init(#{private_key := PrivateKeyPEM} = Config) ->
State0 = maps:without([private_key], Config),
State = State0#{
jwk => undefined,
jwt => undefined,
refresh_timer => undefined
},
{ok, State, {continue, {make_key, PrivateKeyPEM}}}.
handle_continue({make_key, PrivateKeyPEM}, State0) ->
?tp(connector_jwt_worker_make_key, #{state => State0}),
case jose_jwk:from_pem(PrivateKeyPEM) of
JWK = #jose_jwk{} ->
State = State0#{jwk := JWK},
{noreply, State, {continue, create_token}};
[] ->
?tp(connector_jwt_worker_startup_error, #{error => empty_key}),
{stop, {shutdown, {error, empty_key}}, State0};
{error, Reason} ->
Error = {invalid_private_key, Reason},
?tp(connector_jwt_worker_startup_error, #{error => Error}),
{stop, {shutdown, {error, Error}}, State0};
Error0 ->
Error = {invalid_private_key, Error0},
?tp(connector_jwt_worker_startup_error, #{error => Error}),
{stop, {shutdown, {error, Error}}, State0}
end;
handle_continue(create_token, State0) ->
State = generate_and_store_jwt(State0),
{noreply, State}.
handle_call(_Req, _From, State) ->
{reply, {error, bad_call}, State}.
handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) ->
State =
case JWT of
undefined ->
generate_and_store_jwt(State0);
_ ->
State0
end,
From ! {From, token_created},
{noreply, State};
handle_cast(_Req, State) ->
{noreply, State}.
handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) ->
State = generate_and_store_jwt(State0),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
format_status(State) ->
censor_secrets(State).
format_status(_Opt, [_PDict, State0]) ->
State = censor_secrets(State0),
[{data, [{"State", State}]}].
%%-----------------------------------------------------------------------------------------
%% Helper fns
%%-----------------------------------------------------------------------------------------
-spec do_generate_jwt(state()) -> jwt().
do_generate_jwt(
#{
expiration := ExpirationMS,
iss := Iss,
sub := Sub,
aud := Aud,
kid := KId,
alg := Alg,
jwk := JWK
} = _State
) ->
Headers = #{
<<"alg">> => Alg,
<<"kid">> => KId
},
Now = erlang:system_time(seconds),
ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second),
Claims = #{
<<"iss">> => Iss,
<<"sub">> => Sub,
<<"aud">> => Aud,
<<"iat">> => Now,
<<"exp">> => Now + ExpirationS
},
JWT0 = jose_jwt:sign(JWK, Headers, Claims),
{_, JWT} = jose_jws:compact(JWT0),
JWT.
-spec generate_and_store_jwt(state()) -> state().
generate_and_store_jwt(State0) ->
JWT = do_generate_jwt(State0),
store_jwt(State0, JWT),
?tp(connector_jwt_worker_refresh, #{jwt => JWT}),
State1 = State0#{jwt := JWT},
ensure_timer(State1).
-spec store_jwt(state(), jwt()) -> ok.
store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
true = ets:insert(TId, {{ResourceId, jwt}, JWT}),
?tp(connector_jwt_worker_token_stored, #{resource_id => ResourceId}),
ok.
-spec ensure_timer(state()) -> state().
ensure_timer(
State = #{
refresh_timer := undefined,
expiration := ExpirationMS0
}
) ->
ExpirationMS = max(5_000, ExpirationMS0 - 5_000),
TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt),
State#{refresh_timer => TRef};
ensure_timer(State) ->
State.
-spec censor_secrets(state()) -> map().
censor_secrets(State) ->
maps:map(
fun
(Key, _Value) when
Key =:= jwt;
Key =:= jwk
->
"******";
(_Key, Value) ->
Value
end,
State
).

View File

@ -192,11 +192,11 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
on_query_async( on_query_async(
_InstId, _InstId,
{send_message, Msg}, {send_message, Msg},
{ReplayFun, Args}, {ReplyFun, Args},
#{name := InstanceId} #{name := InstanceId}
) -> ) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}). emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true), AutoReconn = maps:get(auto_reconnect, Conf, true),

View File

@ -33,7 +33,8 @@ init([]) ->
period => 20 period => 20
}, },
ChildSpecs = [ ChildSpecs = [
child_spec(emqx_connector_mqtt) child_spec(emqx_connector_mqtt),
child_spec(emqx_connector_jwt_sup)
], ],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
@ -46,5 +47,3 @@ child_spec(Mod) ->
type => supervisor, type => supervisor,
modules => [Mod] modules => [Mod]
}. }.
%% internal functions

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_jwt_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_connector_tables.hrl").
-compile([export_all, nowarn_export_all]).
%%-----------------------------------------------------------------------------
%% CT boilerplate
%%-----------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_connector]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_connector]),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
ets:delete_all_objects(?JWT_TABLE),
ok.
%%-----------------------------------------------------------------------------
%% Helper fns
%%-----------------------------------------------------------------------------
insert_jwt(TId, ResourceId, JWT) ->
ets:insert(TId, {{ResourceId, jwt}, JWT}).
%%-----------------------------------------------------------------------------
%% Test cases
%%-----------------------------------------------------------------------------
t_lookup_jwt_ok(_Config) ->
TId = ?JWT_TABLE,
JWT = <<"some jwt">>,
ResourceId = <<"resource id">>,
true = insert_jwt(TId, ResourceId, JWT),
?assertEqual({ok, JWT}, emqx_connector_jwt:lookup_jwt(ResourceId)),
ok.
t_lookup_jwt_missing(_Config) ->
ResourceId = <<"resource id">>,
?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(ResourceId)),
ok.

View File

@ -0,0 +1,340 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_jwt_worker_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("jose/include/jose_jwt.hrl").
-include_lib("jose/include/jose_jws.hrl").
-compile([export_all, nowarn_export_all]).
%%-----------------------------------------------------------------------------
%% CT boilerplate
%%-----------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
%%-----------------------------------------------------------------------------
%% Helper fns
%%-----------------------------------------------------------------------------
generate_private_key_pem() ->
PublicExponent = 65537,
Size = 2048,
Key = public_key:generate_key({rsa, Size, PublicExponent}),
DERKey = public_key:der_encode('PrivateKeyInfo', Key),
public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
generate_config() ->
PrivateKeyPEM = generate_private_key_pem(),
ResourceID = emqx_guid:gen(),
#{
private_key => PrivateKeyPEM,
expiration => timer:hours(1),
resource_id => ResourceID,
table => ets:new(test_jwt_table, [ordered_set, public]),
iss => <<"issuer">>,
sub => <<"subject">>,
aud => <<"audience">>,
kid => <<"key id">>,
alg => <<"RS256">>
}.
is_expired(JWT) ->
#jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
Now = erlang:system_time(seconds),
Now >= Exp.
%%-----------------------------------------------------------------------------
%% Test cases
%%-----------------------------------------------------------------------------
t_create_success(_Config) ->
Config = generate_config(),
Res = emqx_connector_jwt_worker:start_link(Config),
?assertMatch({ok, _}, Res),
{ok, Worker} = Res,
Ref = emqx_connector_jwt_worker:ensure_jwt(Worker),
receive
{Ref, token_created} ->
ok
after 1_000 ->
ct:fail(
"should have confirmed token creation; msgs: ~0p",
[process_info(self(), messages)]
)
end,
ok.
t_empty_key(_Config) ->
Config0 = generate_config(),
Config = Config0#{private_key := <<>>},
process_flag(trap_exit, true),
?check_trace(
?wait_async_action(
?assertMatch({ok, _}, emqx_connector_jwt_worker:start_link(Config)),
#{?snk_kind := connector_jwt_worker_startup_error},
1_000
),
fun(Trace) ->
?assertMatch(
[#{error := empty_key}],
?of_kind(connector_jwt_worker_startup_error, Trace)
),
ok
end
),
ok.
t_unknown_error(_Config) ->
Config0 = generate_config(),
Config = Config0#{private_key := <<>>},
process_flag(trap_exit, true),
?check_trace(
{_, {ok, _}} = ?wait_async_action(
emqx_common_test_helpers:with_mock(
jose_jwk,
from_pem,
fun(_PrivateKeyPEM) -> {error, some_strange_error} end,
fun() ->
?assertMatch({ok, _}, emqx_connector_jwt_worker:start_link(Config))
end
),
#{?snk_kind := connector_jwt_worker_startup_error},
1_000
),
fun(Trace) ->
?assertMatch(
[#{error := {invalid_private_key, some_strange_error}}],
?of_kind(connector_jwt_worker_startup_error, Trace)
),
ok
end
),
ok.
t_invalid_pem(_Config) ->
Config0 = generate_config(),
InvalidPEM = public_key:pem_encode([
{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted},
{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}
]),
Config = Config0#{private_key := InvalidPEM},
process_flag(trap_exit, true),
?check_trace(
?wait_async_action(
?assertMatch({ok, _}, emqx_connector_jwt_worker:start_link(Config)),
#{?snk_kind := connector_jwt_worker_startup_error},
1_000
),
fun(Trace) ->
?assertMatch(
[#{error := {invalid_private_key, _}}],
?of_kind(connector_jwt_worker_startup_error, Trace)
),
ok
end
),
ok.
t_refresh(_Config) ->
Config0 =
#{
table := Table,
resource_id := ResourceId
} = generate_config(),
Config = Config0#{expiration => 5_000},
?check_trace(
begin
{{ok, _Pid}, {ok, _Event}} =
?wait_async_action(
emqx_connector_jwt_worker:start_link(Config),
#{?snk_kind := connector_jwt_worker_token_stored},
5_000
),
{ok, FirstJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId),
?block_until(
#{
?snk_kind := connector_jwt_worker_refresh,
jwt := JWT0
} when JWT0 =/= FirstJWT,
15_000
),
{ok, SecondJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId),
?assertNot(is_expired(SecondJWT)),
?assert(is_expired(FirstJWT)),
{FirstJWT, SecondJWT}
end,
fun({FirstJWT, SecondJWT}, Trace) ->
?assertMatch(
[_, _ | _],
?of_kind(connector_jwt_worker_token_stored, Trace)
),
?assertNotEqual(FirstJWT, SecondJWT),
ok
end
),
ok.
t_format_status(_Config) ->
Config = generate_config(),
{ok, Pid} = emqx_connector_jwt_worker:start_link(Config),
{status, _, _, Props} = sys:get_status(Pid),
[State] = [
State
|| Info = [_ | _] <- Props,
{data, Data = [_ | _]} <- Info,
{"State", State} <- Data
],
?assertMatch(
#{
jwt := "******",
jwk := "******"
},
State
),
ok.
t_lookup_ok(_Config) ->
Config =
#{
table := Table,
resource_id := ResourceId,
private_key := PrivateKeyPEM,
aud := Aud,
iss := Iss,
sub := Sub,
kid := KId
} = generate_config(),
{ok, Worker} = emqx_connector_jwt_worker:start_link(Config),
Ref = emqx_connector_jwt_worker:ensure_jwt(Worker),
receive
{Ref, token_created} ->
ok
after 500 ->
error(timeout)
end,
Res = emqx_connector_jwt:lookup_jwt(Table, ResourceId),
?assertMatch({ok, _}, Res),
{ok, JWT} = Res,
?assert(is_binary(JWT)),
JWK = jose_jwk:from_pem(PrivateKeyPEM),
{IsValid, ParsedJWT, JWS} = jose_jwt:verify_strict(JWK, [<<"RS256">>], JWT),
?assertMatch(
#jose_jwt{
fields = #{
<<"aud">> := Aud,
<<"iss">> := Iss,
<<"sub">> := Sub,
<<"exp">> := _,
<<"iat">> := _
}
},
ParsedJWT
),
?assertNot(is_expired(JWT)),
?assertMatch(
#jose_jws{
alg = {_, 'RS256'},
fields = #{
<<"kid">> := KId,
<<"typ">> := <<"JWT">>
}
},
JWS
),
?assert(IsValid),
ok.
t_lookup_not_found(_Config) ->
Table = ets:new(test_jwt_table, [ordered_set, public]),
InexistentResource = <<"xxx">>,
?assertEqual(
{error, not_found},
emqx_connector_jwt:lookup_jwt(Table, InexistentResource)
),
ok.
t_lookup_badarg(_Config) ->
InexistentTable = i_dont_exist,
InexistentResource = <<"xxx">>,
?assertEqual(
{error, not_found},
emqx_connector_jwt:lookup_jwt(InexistentTable, InexistentResource)
),
ok.
t_start_supervised_worker(_Config) ->
{ok, _} = emqx_connector_jwt_sup:start_link(),
Config = #{resource_id := ResourceId} = generate_config(),
{ok, Pid} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),
Ref = emqx_connector_jwt_worker:ensure_jwt(Pid),
receive
{Ref, token_created} ->
ok
after 5_000 ->
ct:fail("timeout")
end,
MRef = monitor(process, Pid),
?assert(is_process_alive(Pid)),
ok = emqx_connector_jwt_sup:ensure_worker_deleted(ResourceId),
receive
{'DOWN', MRef, process, Pid, _} ->
ok
after 1_000 ->
ct:fail("timeout")
end,
ok.
t_start_supervised_worker_already_started(_Config) ->
{ok, _} = emqx_connector_jwt_sup:start_link(),
Config = #{resource_id := ResourceId} = generate_config(),
{ok, Pid0} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),
{ok, Pid1} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),
?assertEqual(Pid0, Pid1),
ok.
t_start_supervised_worker_already_present(_Config) ->
{ok, _} = emqx_connector_jwt_sup:start_link(),
Config = #{resource_id := ResourceId} = generate_config(),
{ok, Pid0} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),
Ref = monitor(process, Pid0),
exit(Pid0, {shutdown, normal}),
receive
{'DOWN', Ref, process, Pid0, {shutdown, normal}} -> ok
after 1_000 -> error(worker_didnt_stop)
end,
{ok, Pid1} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config),
?assertNotEqual(Pid0, Pid1),
ok.
t_unknown_requests(_Config) ->
Config = generate_config(),
{ok, Worker} = emqx_connector_jwt_worker:start_link(Config),
Worker ! unknown_info,
gen_server:cast(Worker, unknown_cast),
?assertEqual({error, bad_call}, gen_server:call(Worker, unknown_call)),
ok.

View File

@ -0,0 +1,105 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_web_hook_server).
-compile([nowarn_export_all, export_all]).
-behaviour(supervisor).
-behaviour(cowboy_handler).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
start_link(Port, Path) ->
start_link(Port, Path, false).
start_link(Port, Path, SSLOpts) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Port, Path, SSLOpts]).
stop() ->
try
gen_server:stop(?MODULE)
catch
exit:noproc ->
ok
end.
set_handler(F) when is_function(F, 2) ->
true = ets:insert(?MODULE, {handler, F}),
ok.
%%------------------------------------------------------------------------------
%% supervisor API
%%------------------------------------------------------------------------------
init([Port, Path, SSLOpts]) ->
Dispatch = cowboy_router:compile(
[
{'_', [{Path, ?MODULE, []}]}
]
),
ProtoOpts = #{env => #{dispatch => Dispatch}},
Tab = ets:new(?MODULE, [set, named_table, public]),
ets:insert(Tab, {handler, fun default_handler/2}),
{Transport, TransOpts, CowboyModule} = transport_settings(Port, SSLOpts),
ChildSpec = ranch:child_spec(?MODULE, Transport, TransOpts, CowboyModule, ProtoOpts),
{ok, {#{}, [ChildSpec]}}.
%%------------------------------------------------------------------------------
%% cowboy_server API
%%------------------------------------------------------------------------------
init(Req, State) ->
[{handler, Handler}] = ets:lookup(?MODULE, handler),
Handler(Req, State).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
transport_settings(Port, _SSLOpts = false) ->
TransOpts = #{
socket_opts => [{port, Port}],
connection_type => supervisor
},
{ranch_tcp, TransOpts, cowboy_clear};
transport_settings(Port, SSLOpts) ->
TransOpts = #{
socket_opts => [
{port, Port},
{next_protocols_advertised, [<<"h2">>, <<"http/1.1">>]},
{alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
| SSLOpts
],
connection_type => supervisor
},
{ranch_ssl, TransOpts, cowboy_tls}.
default_handler(Req0, State) ->
Req = cowboy_req:reply(
400,
#{<<"content-type">> => <<"text/plain">>},
<<"">>,
Req0
),
{ok, Req, State}.

View File

@ -702,6 +702,8 @@ typename_to_spec("wordsize()", _Mod) ->
#{type => string, example => <<"1024KB">>}; #{type => string, example => <<"1024KB">>};
typename_to_spec("map()", _Mod) -> typename_to_spec("map()", _Mod) ->
#{type => object, example => #{}}; #{type => object, example => #{}};
typename_to_spec("service_account_json()", _Mod) ->
#{type => object, example => #{}};
typename_to_spec("#{" ++ _, Mod) -> typename_to_spec("#{" ++ _, Mod) ->
typename_to_spec("map()", Mod); typename_to_spec("map()", Mod);
typename_to_spec("qos()", _Mod) -> typename_to_spec("qos()", _Mod) ->

View File

@ -76,6 +76,8 @@
-type query_result() :: -type query_result() ::
ok ok
| {ok, term()} | {ok, term()}
| {ok, term(), term()}
| {ok, term(), term(), term()}
| {error, {recoverable_error, term()}} | {error, {recoverable_error, term()}}
| {error, term()}. | {error, term()}.

View File

@ -163,8 +163,8 @@ running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} =
running({call, From}, {query, Request, _Opts}, St) -> running({call, From}, {query, Request, _Opts}, St) ->
query_or_acc(From, Request, St); query_or_acc(From, Request, St);
running(cast, {query, Request, Opts}, St) -> running(cast, {query, Request, Opts}, St) ->
ReplayFun = maps:get(async_reply_fun, Opts, undefined), ReplyFun = maps:get(async_reply_fun, Opts, undefined),
query_or_acc(ReplayFun, Request, St); query_or_acc(ReplyFun, Request, St);
running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
flush(St#{tref := undefined}); flush(St#{tref := undefined});
running(info, {flush, _Ref}, _St) -> running(info, {flush, _Ref}, _St) ->
@ -191,11 +191,11 @@ blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
_ = reply_caller(Id, ?REPLY(From, Request, false, Error)), _ = reply_caller(Id, ?REPLY(From, Request, false, Error)),
{keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}};
blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
ReplayFun = maps:get(async_reply_fun, Opts, undefined), ReplyFun = maps:get(async_reply_fun, Opts, undefined),
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = reply_caller(Id, ?REPLY(ReplayFun, Request, false, Error)), _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)),
{keep_state, St#{ {keep_state, St#{
queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request, false))]) queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))])
}}. }}.
terminate(_Reason, #{id := Id, index := Index}) -> terminate(_Reason, #{id := Id, index := Index}) ->

View File

@ -0,0 +1,147 @@
emqx_ee_bridge_gcp_pubsub {
desc_config {
desc {
en: """Configuration for a GCP PubSub bridge."""
zh: """GCP PubSub 桥接配置"""
}
label {
en: "GCP PubSub Bridge Configuration"
zh: "GCP PubSub 桥接配置"
}
}
desc_type {
desc {
en: """The Bridge Type"""
zh: """桥接类型"""
}
label {
en: "Bridge Type"
zh: "桥接类型"
}
}
desc_name {
desc {
en: """Bridge name, used as a human-readable description of the bridge."""
zh: """桥接名字,可读描述"""
}
label {
en: "Bridge Name"
zh: "桥接名字"
}
}
connect_timeout {
desc {
en: "The timeout when connecting to the HTTP server."
zh: "连接 HTTP 服务器的超时时间。"
}
label: {
en: "Connect Timeout"
zh: "连接超时"
}
}
max_retries {
desc {
en: "Max retry times if an error occurs when sending a request."
zh: "请求出错时的最大重试次数。"
}
label: {
en: "Max Retries"
zh: "最大重试次数"
}
}
pool_size {
desc {
en: "The pool size."
zh: "连接池大小。"
}
label: {
en: "Pool Size"
zh: "连接池大小"
}
}
pipelining {
desc {
en: "A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request."
zh: "正整数,设置最大可发送的异步 HTTP 请求数量。当设置为 1 时,表示每次发送完成 HTTP 请求后都需要等待服务器返回,再继续发送下一个请求。"
}
label: {
en: "HTTP Pipelineing"
zh: "HTTP 流水线"
}
}
request_timeout {
desc {
en: "HTTP request timeout."
zh: "HTTP 请求超时。"
}
label: {
en: "Request Timeout"
zh: "HTTP 请求超时"
}
}
payload_template {
desc {
en: "The template for formatting the outgoing messages. If undefined, will send all the available context in JSON format."
zh: "用于格式化外发信息的模板。 如果未定义将以JSON格式发送所有可用的上下文。"
}
label: {
en: "Payload template"
zh: "HTTP 请求消息体模板"
}
}
local_topic {
desc {
en: """The MQTT topic filter to be forwarded to GCP PubSub. All MQTT 'PUBLISH' messages with the topic
matching `local_topic` will be forwarded.</br>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded.
"""
zh: """发送到 'local_topic' 的消息都会转发到 GCP PubSub。 </br>
注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 GCP PubSub。
"""
}
label {
en: "Local Topic"
zh: "本地 Topic"
}
}
pubsub_topic {
desc {
en: "The GCP PubSub topic to publish messages to."
zh: "要发布消息的GCP PubSub主题。"
}
label: {
en: "GCP PubSub Topic"
zh: "GCP PubSub 主题"
}
}
service_account_json {
desc {
en: "JSON containing the GCP Service Account credentials to be used with PubSub.\n"
"When a GCP Service Account is created "
"(as described in https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount), "
"you have the option of downloading the credentials in JSON form. That's the "
"file needed."
zh: "包含将与 PubSub 一起使用的 GCP 服务账户凭证的 JSON。\n"
"当创建GCP服务账户时"
"如https://developers.google.com/identity/protocols/oauth2/service-account#creatinganaccount"
"可以选择下载 JSON 形式的凭证,然后在该配置项中使用。"
}
label: {
en: "GCP Service Account Credentials"
zh: "GCP 服务账户凭证"
}
}
}

View File

@ -14,13 +14,13 @@
api_schemas(Method) -> api_schemas(Method) ->
[ [
ref(emqx_ee_bridge_gcp_pubsub, Method),
ref(emqx_ee_bridge_kafka, Method), ref(emqx_ee_bridge_kafka, Method),
ref(emqx_ee_bridge_mysql, Method), ref(emqx_ee_bridge_mysql, Method),
ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
ref(emqx_ee_bridge_mongodb, Method ++ "_single"), ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
ref(emqx_ee_bridge_hstreamdb, Method), ref(emqx_ee_bridge_hstreamdb, Method),
%% ref(emqx_ee_bridge_influxdb, Method ++ "_udp"),
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2") ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2")
]. ].
@ -29,6 +29,7 @@ schema_modules() ->
[ [
emqx_ee_bridge_kafka, emqx_ee_bridge_kafka,
emqx_ee_bridge_hstreamdb, emqx_ee_bridge_hstreamdb,
emqx_ee_bridge_gcp_pubsub,
emqx_ee_bridge_influxdb, emqx_ee_bridge_influxdb,
emqx_ee_bridge_mongodb, emqx_ee_bridge_mongodb,
emqx_ee_bridge_mysql emqx_ee_bridge_mysql
@ -49,11 +50,11 @@ examples(Method) ->
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
resource_type(kafka) -> emqx_bridge_impl_kafka; resource_type(kafka) -> emqx_bridge_impl_kafka;
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
resource_type(mongodb_rs) -> emqx_connector_mongo; resource_type(mongodb_rs) -> emqx_connector_mongo;
resource_type(mongodb_sharded) -> emqx_connector_mongo; resource_type(mongodb_sharded) -> emqx_connector_mongo;
resource_type(mongodb_single) -> emqx_connector_mongo; resource_type(mongodb_single) -> emqx_connector_mongo;
resource_type(mysql) -> emqx_connector_mysql; resource_type(mysql) -> emqx_connector_mysql;
resource_type(influxdb_udp) -> emqx_ee_connector_influxdb;
resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb. resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb.
@ -75,6 +76,14 @@ fields(bridges) ->
required => false required => false
} }
)}, )},
{gcp_pubsub,
mk(
hoconsc:map(name, ref(emqx_ee_bridge_gcp_pubsub, "config")),
#{
desc => <<"EMQX Enterprise Config">>,
required => false
}
)},
{mysql, {mysql,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
@ -109,7 +118,6 @@ influxdb_structs() ->
} }
)} )}
|| Protocol <- [ || Protocol <- [
%% influxdb_udp,
influxdb_api_v1, influxdb_api_v1,
influxdb_api_v2 influxdb_api_v2
] ]

View File

@ -0,0 +1,231 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_bridge_gcp_pubsub).
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
%% hocon_schema API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-export([
service_account_json_validator/1,
service_account_json_converter/1
]).
%% emqx_ee_bridge "unofficial" API
-export([conn_bridge_examples/1]).
-type service_account_json() :: map().
-reflect_type([service_account_json/0]).
-define(DEFAULT_PIPELINE_SIZE, 100).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"bridge_gcp_pubsub".
roots() ->
[].
fields("config") ->
emqx_bridge_schema:common_bridge_fields() ++
emqx_resource_schema:fields("resource_opts") ++
fields(bridge_config);
fields(bridge_config) ->
[
{connect_timeout,
sc(
emqx_schema:duration_ms(),
#{
default => "15s",
desc => ?DESC("connect_timeout")
}
)},
{pool_size,
sc(
pos_integer(),
#{
default => 8,
desc => ?DESC("pool_size")
}
)},
{pipelining,
sc(
pos_integer(),
#{
default => ?DEFAULT_PIPELINE_SIZE,
desc => ?DESC("pipelining")
}
)},
{max_retries,
sc(
non_neg_integer(),
#{
required => false,
default => 2,
desc => ?DESC("max_retries")
}
)},
{request_timeout,
sc(
emqx_schema:duration_ms(),
#{
required => false,
default => "15s",
desc => ?DESC("request_timeout")
}
)},
{payload_template,
sc(
binary(),
#{
default => <<>>,
desc => ?DESC("payload_template")
}
)},
{local_topic,
sc(
binary(),
#{
desc => ?DESC("local_topic")
}
)},
{pubsub_topic,
sc(
binary(),
#{
required => true,
desc => ?DESC("pubsub_topic")
}
)},
{service_account_json,
sc(
service_account_json(),
#{
required => true,
validator => fun ?MODULE:service_account_json_validator/1,
converter => fun ?MODULE:service_account_json_converter/1,
sensitive => true,
desc => ?DESC("service_account_json")
}
)}
];
fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post");
fields("post") ->
[type_field(), name_field() | fields("config")];
fields("put") ->
fields("config").
desc("config") ->
?DESC("desc_config");
desc(_) ->
undefined.
conn_bridge_examples(Method) ->
[
#{
<<"gcp_pubsub">> => #{
summary => <<"GCP PubSub Bridge">>,
value => values(Method)
}
}
].
values(get) ->
maps:merge(values(post), ?METRICS_EXAMPLE);
values(post) ->
#{
<<"pubsub_topic">> => <<"mytopic">>,
<<"service_account_json">> =>
#{
<<"auth_provider_x509_cert_url">> =>
<<"https://www.googleapis.com/oauth2/v1/certs">>,
<<"auth_uri">> =>
<<"https://accounts.google.com/o/oauth2/auth">>,
<<"client_email">> =>
<<"test@myproject.iam.gserviceaccount.com">>,
<<"client_id">> => <<"123812831923812319190">>,
<<"client_x509_cert_url">> =>
<<
"https://www.googleapis.com/robot/v1/"
"metadata/x509/test%40myproject.iam.gserviceaccount.com"
>>,
<<"private_key">> =>
<<
"-----BEGIN PRIVATE KEY-----\n"
"MIIEvQI..."
>>,
<<"private_key_id">> => <<"kid">>,
<<"project_id">> => <<"myproject">>,
<<"token_uri">> =>
<<"https://oauth2.googleapis.com/token">>,
<<"type">> => <<"service_account">>
}
};
values(put) ->
values(post).
%%-------------------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------------------
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
type_field() ->
{type, mk(enum([gcp_pubsub]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
-spec service_account_json_validator(map()) ->
ok
| {error, {wrong_type, term()}}
| {error, {missing_keys, [binary()]}}.
service_account_json_validator(Map) ->
ExpectedKeys = [
<<"type">>,
<<"project_id">>,
<<"private_key_id">>,
<<"private_key">>,
<<"client_email">>
],
MissingKeys = lists:sort([
K
|| K <- ExpectedKeys,
not maps:is_key(K, Map)
]),
Type = maps:get(<<"type">>, Map, null),
case {MissingKeys, Type} of
{[], <<"service_account">>} ->
ok;
{[], Type} ->
{error, {wrong_type, Type}};
{_, _} ->
{error, {missing_keys, MissingKeys}}
end.
service_account_json_converter(Map) when is_map(Map) ->
ExpectedKeys = [
<<"type">>,
<<"project_id">>,
<<"private_key_id">>,
<<"private_key">>,
<<"client_email">>
],
maps:with(ExpectedKeys, Map);
service_account_json_converter(Val) ->
Val.

View File

@ -64,34 +64,10 @@ wait_until_kafka_is_up(Attempts) ->
end. end.
init_per_suite(Config) -> init_per_suite(Config) ->
%% Need to unload emqx_authz. See emqx_machine_SUITE:init_per_suite for ok = emqx_common_test_helpers:start_apps([emqx_conf]),
%% more info. ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge, emqx_rule_engine]),
application:unload(emqx_authz), {ok, _} = application:ensure_all_started(emqx_connector),
%% some configs in emqx_conf app are mandatory emqx_mgmt_api_test_util:init_suite(),
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_rule_engine, emqx_bridge, emqx_management, emqx_dashboard],
fun set_special_configs/1
),
application:set_env(emqx_machine, applications, [
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_gateway,
emqx_statsd,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_ee_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_plugin
]),
{ok, _} = application:ensure_all_started(emqx_machine),
wait_until_kafka_is_up(), wait_until_kafka_is_up(),
%% Wait until bridges API is up %% Wait until bridges API is up
(fun WaitUntilRestApiUp() -> (fun WaitUntilRestApiUp() ->
@ -106,32 +82,12 @@ init_per_suite(Config) ->
end)(), end)(),
Config. Config.
end_per_suite(Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([ emqx_mgmt_api_test_util:end_suite(),
emqx_prometheus, ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
emqx_modules, ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource, emqx_rule_engine]),
emqx_dashboard, _ = application:stop(emqx_connector),
emqx_gateway, ok.
emqx_statsd,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_ee_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_plugin,
emqx_conf,
emqx_bridge,
emqx_management,
emqx_dashboard,
emqx_machine
]),
mria:stop(),
Config.
set_special_configs(emqx_management) -> set_special_configs(emqx_management) ->
Listeners = #{http => #{port => 8081}}, Listeners = #{http => #{port => 8081}},

File diff suppressed because it is too large Load Diff

View File

@ -53,6 +53,7 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
delete_all_bridges(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]), ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
@ -222,8 +223,6 @@ end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
%% catch clear_db(Config),
%% delete_bridge(Config),
delete_all_bridges(), delete_all_bridges(),
Config. Config.
@ -232,8 +231,6 @@ end_per_testcase(_Testcase, Config) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
%% catch clear_db(Config),
%% delete_bridge(Config),
delete_all_bridges(), delete_all_bridges(),
ok. ok.

View File

@ -0,0 +1,599 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_gcp_pubsub).
-behaviour(emqx_resource).
-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% `emqx_resource' API
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_get_status/2,
is_buffer_supported/0
]).
-export([reply_delegator/3]).
-type bridge_id() :: binary().
-type jwt_worker() :: binary().
-type service_account_json() :: emqx_ee_bridge_gcp_pubsub:service_account_json().
-type config() :: #{
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
pubsub_topic := binary(),
request_timeout := emqx_schema:duration_ms(),
service_account_json := service_account_json(),
any() => term()
}.
-type state() :: #{
connect_timeout := timer:time(),
instance_id := manager_id(),
jwt_worker_id := jwt_worker(),
max_retries := non_neg_integer(),
payload_template := emqx_plugin_libs_rule:tmpl_token(),
pool_name := atom(),
project_id := binary(),
pubsub_topic := binary(),
request_timeout := timer:time()
}.
-type headers() :: [{binary(), iodata()}].
-type body() :: iodata().
-type status_code() :: 100..599.
-define(DEFAULT_PIPELINE_SIZE, 100).
%%-------------------------------------------------------------------------------------------------
%% emqx_resource API
%%-------------------------------------------------------------------------------------------------
%% TODO: check
is_buffer_supported() -> false.
callback_mode() -> async_if_possible.
-spec on_start(manager_id(), config()) -> {ok, state()} | {error, term()}.
on_start(
InstanceId,
#{
connect_timeout := ConnectTimeout,
max_retries := MaxRetries,
payload_template := PayloadTemplate,
pool_size := PoolSize,
pubsub_topic := PubSubTopic,
request_timeout := RequestTimeout
} = Config
) ->
?SLOG(info, #{
msg => "starting_gcp_pubsub_bridge",
connector => InstanceId,
config => Config
}),
%% emulating the emulator behavior
%% https://cloud.google.com/pubsub/docs/emulator
HostPort = os:getenv("PUBSUB_EMULATOR_HOST", "pubsub.googleapis.com:443"),
{Host, Port} = emqx_connector_schema_lib:parse_server(
HostPort, #{host_type => hostname, default_port => 443}
),
PoolType = random,
Transport = tls,
TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),
NTransportOpts = emqx_misc:ipv6_probe(TransportOpts),
PoolOpts = [
{host, Host},
{port, Port},
{connect_timeout, ConnectTimeout},
{keepalive, 30_000},
{pool_type, PoolType},
{pool_size, PoolSize},
{transport, Transport},
{transport_opts, NTransportOpts},
{enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)}
],
#{
jwt_worker_id := JWTWorkerId,
project_id := ProjectId
} = ensure_jwt_worker(InstanceId, Config),
PoolName = emqx_plugin_libs_pool:pool_name(InstanceId),
State = #{
connect_timeout => ConnectTimeout,
instance_id => InstanceId,
jwt_worker_id => JWTWorkerId,
max_retries => MaxRetries,
payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate),
pool_name => PoolName,
project_id => ProjectId,
pubsub_topic => PubSubTopic,
request_timeout => RequestTimeout
},
?tp(
gcp_pubsub_on_start_before_starting_pool,
#{
instance_id => InstanceId,
pool_name => PoolName,
pool_opts => PoolOpts
}
),
?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => PoolName}),
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
{ok, _} ->
{ok, State};
{error, {already_started, _}} ->
?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => PoolName}),
{ok, State};
{error, Reason} ->
?tp(gcp_pubsub_ehttpc_pool_start_failure, #{
pool_name => PoolName,
reason => Reason
}),
{error, Reason}
end.
-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
on_stop(
InstanceId,
_State = #{
jwt_worker_id := JWTWorkerId,
pool_name := PoolName
}
) ->
?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}),
?SLOG(info, #{
msg => "stopping_gcp_pubsub_bridge",
connector => InstanceId
}),
emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
ehttpc_sup:stop_pool(PoolName).
-spec on_query(
bridge_id(),
{send_message, map()},
state()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_query(BridgeId, {send_message, Selected}, State) ->
Requests = [{send_message, Selected}],
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
),
do_send_requests_sync(State, Requests, BridgeId).
-spec on_query_async(
bridge_id(),
{send_message, map()},
{ReplyFun :: function(), Args :: list()},
state()
) -> ok.
on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) ->
Requests = [{send_message, Selected}],
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId).
-spec on_batch_query(
bridge_id(),
[{send_message, map()}],
state()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
on_batch_query(BridgeId, Requests, State) ->
?TRACE(
"QUERY_SYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
),
do_send_requests_sync(State, Requests, BridgeId).
-spec on_batch_query_async(
bridge_id(),
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
state()
) -> ok.
on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) ->
?TRACE(
"QUERY_ASYNC",
"gcp_pubsub_received",
#{requests => Requests, connector => BridgeId, state => State}
),
do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId).
-spec on_get_status(manager_id(), state()) -> connected | disconnected.
on_get_status(InstanceId, State) ->
#{
connect_timeout := Timeout,
pool_name := PoolName
} = State,
case do_get_status(InstanceId, PoolName, Timeout) of
true ->
connected;
false ->
?SLOG(error, #{
msg => "gcp_pubsub_bridge_get_status_failed",
state => State
}),
disconnected
end.
%%-------------------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------------------
-spec ensure_jwt_worker(manager_id(), config()) ->
#{
jwt_worker_id := jwt_worker(),
project_id := binary()
}.
ensure_jwt_worker(InstanceId, #{
service_account_json := ServiceAccountJSON,
pubsub_topic := PubSubTopic
}) ->
#{
project_id := ProjectId,
private_key_id := KId,
private_key := PrivateKeyPEM,
client_email := ServiceAccountEmail
} = ServiceAccountJSON,
%% fixed for pubsub; trailing slash is important.
Aud = <<"https://pubsub.googleapis.com/">>,
ExpirationMS = timer:hours(1),
Alg = <<"RS256">>,
Config = #{
private_key => PrivateKeyPEM,
resource_id => InstanceId,
expiration => ExpirationMS,
table => ?JWT_TABLE,
iss => ServiceAccountEmail,
sub => ServiceAccountEmail,
aud => Aud,
kid => KId,
alg => Alg
},
JWTWorkerId = <<"gcp_pubsub_jwt_worker:", InstanceId/binary>>,
Worker =
case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of
{ok, Worker0} ->
Worker0;
Error ->
?tp(
gcp_pubsub_bridge_jwt_worker_failed_to_start,
#{instance_id => InstanceId, reason => Error}
),
?SLOG(error, #{
msg => "failed_to_start_gcp_pubsub_jwt_worker",
instance_id => InstanceId,
pubsub_topic => PubSubTopic,
reason => Error
}),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
throw(failed_to_start_jwt_worker)
end,
MRef = monitor(process, Worker),
Ref = emqx_connector_jwt_worker:ensure_jwt(Worker),
%% to ensure that this resource and its actions will be ready to
%% serve when started, we must ensure that the first JWT has been
%% produced by the worker.
receive
{Ref, token_created} ->
?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => InstanceId}),
demonitor(MRef, [flush]),
ok;
{'DOWN', MRef, process, Worker, Reason} ->
?tp(
gcp_pubsub_bridge_jwt_worker_failed_to_start,
#{
resource_id => InstanceId,
reason => Reason
}
),
?SLOG(error, #{
msg => "gcp_pubsub_bridge_jwt_worker_failed_to_start",
connector => InstanceId,
reason => Reason
}),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
throw(failed_to_start_jwt_worker)
after 10_000 ->
?tp(gcp_pubsub_bridge_jwt_timeout, #{resource_id => InstanceId}),
?SLOG(warning, #{
msg => "gcp_pubsub_bridge_jwt_timeout",
connector => InstanceId
}),
demonitor(MRef, [flush]),
_ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId),
throw(timeout_creating_jwt)
end,
#{
jwt_worker_id => JWTWorkerId,
project_id => ProjectId
}.
-spec encode_payload(state(), Selected :: map()) -> #{data := binary()}.
encode_payload(_State = #{payload_template := PayloadTemplate}, Selected) ->
Interpolated =
case PayloadTemplate of
[] -> emqx_json:encode(Selected);
_ -> emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Selected)
end,
#{data => base64:encode(Interpolated)}.
-spec to_pubsub_request([#{data := binary()}]) -> binary().
to_pubsub_request(Payloads) ->
emqx_json:encode(#{messages => Payloads}).
-spec publish_path(state()) -> binary().
publish_path(
_State = #{
project_id := ProjectId,
pubsub_topic := PubSubTopic
}
) ->
<<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>.
-spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}].
get_jwt_authorization_header(InstanceId) ->
case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, InstanceId) of
%% Since we synchronize the JWT creation during resource start
%% (see `on_start/2'), this will be always be populated.
{ok, JWT} ->
[{<<"Authorization">>, <<"Bearer ", JWT/binary>>}]
end.
-spec do_send_requests_sync(
state(),
[{send_message, map()}],
resource_id()
) ->
{ok, status_code(), headers()}
| {ok, status_code(), headers(), body()}
| {error, {recoverable_error, term()}}
| {error, term()}.
do_send_requests_sync(State, Requests, ResourceId) ->
#{
pool_name := PoolName,
instance_id := InstanceId,
max_retries := MaxRetries,
request_timeout := RequestTimeout
} = State,
?tp(
gcp_pubsub_bridge_do_send_requests,
#{
query_mode => sync,
instance_id => InstanceId,
resource_id => ResourceId,
requests => Requests
}
),
Headers = get_jwt_authorization_header(InstanceId),
Payloads =
lists:map(
fun({send_message, Selected}) ->
encode_payload(State, Selected)
end,
Requests
),
Body = to_pubsub_request(Payloads),
Path = publish_path(State),
Method = post,
Request = {Path, Headers, Body},
case
ehttpc:request(
PoolName,
Method,
Request,
RequestTimeout,
MaxRetries
)
of
{error, Reason} when
Reason =:= econnrefused;
%% this comes directly from `gun'...
Reason =:= {closed, "The connection was lost."};
Reason =:= timeout
->
?tp(
warning,
gcp_pubsub_request_failed,
#{
reason => Reason,
query_mode => sync,
recoverable_error => true,
connector => ResourceId
}
),
{error, {recoverable_error, Reason}};
{error, Reason} = Result ->
?tp(
error,
gcp_pubsub_request_failed,
#{
reason => Reason,
query_mode => sync,
recoverable_error => false,
connector => ResourceId
}
),
Result;
{ok, StatusCode, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
?tp(
gcp_pubsub_response,
#{
response => Result,
query_mode => sync,
connector => ResourceId
}
),
Result;
{ok, StatusCode, _, _} = Result when StatusCode >= 200 andalso StatusCode < 300 ->
?tp(
gcp_pubsub_response,
#{
response => Result,
query_mode => sync,
connector => ResourceId
}
),
Result;
{ok, StatusCode, RespHeaders} = _Result ->
?tp(
gcp_pubsub_response,
#{
response => _Result,
query_mode => sync,
connector => ResourceId
}
),
?SLOG(error, #{
msg => "gcp_pubsub_error_response",
request => Request,
connector => ResourceId,
status_code => StatusCode
}),
{error, #{status_code => StatusCode, headers => RespHeaders}};
{ok, StatusCode, RespHeaders, RespBody} = _Result ->
?tp(
gcp_pubsub_response,
#{
response => _Result,
query_mode => sync,
connector => ResourceId
}
),
?SLOG(error, #{
msg => "gcp_pubsub_error_response",
request => Request,
connector => ResourceId,
status_code => StatusCode
}),
{error, #{status_code => StatusCode, headers => RespHeaders, body => RespBody}}
end.
-spec do_send_requests_async(
state(),
[{send_message, map()}],
{ReplyFun :: function(), Args :: list()},
resource_id()
) -> ok.
do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) ->
#{
pool_name := PoolName,
instance_id := InstanceId,
request_timeout := RequestTimeout
} = State,
?tp(
gcp_pubsub_bridge_do_send_requests,
#{
query_mode => async,
instance_id => InstanceId,
resource_id => ResourceId,
requests => Requests
}
),
Headers = get_jwt_authorization_header(InstanceId),
Payloads =
lists:map(
fun({send_message, Selected}) ->
encode_payload(State, Selected)
end,
Requests
),
Body = to_pubsub_request(Payloads),
Path = publish_path(State),
Method = post,
Request = {Path, Headers, Body},
Worker = ehttpc_pool:pick_worker(PoolName),
ok = ehttpc:request_async(
Worker,
Method,
Request,
RequestTimeout,
{fun ?MODULE:reply_delegator/3, [ResourceId, ReplyFunAndArgs]}
).
-spec reply_delegator(
resource_id(),
{ReplyFun :: function(), Args :: list()},
term() | {error, econnrefused | timeout | term()}
) -> ok.
reply_delegator(_ResourceId, ReplyFunAndArgs, Result) ->
case Result of
{error, Reason} when
Reason =:= econnrefused;
%% this comes directly from `gun'...
Reason =:= {closed, "The connection was lost."};
Reason =:= timeout
->
?tp(
gcp_pubsub_request_failed,
#{
reason => Reason,
query_mode => async,
recoverable_error => true,
connector => _ResourceId
}
),
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
_ ->
?tp(
gcp_pubsub_response,
#{
response => Result,
query_mode => async,
connector => _ResourceId
}
),
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end.
-spec do_get_status(manager_id(), atom(), timer:time()) -> boolean().
do_get_status(InstanceId, PoolName, Timeout) ->
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
DoPerWorker =
fun(Worker) ->
case ehttpc:health_check(Worker, Timeout) of
ok ->
true;
{error, Reason} ->
?SLOG(error, #{
msg => "ehttpc_health_check_failed",
instance_id => InstanceId,
reason => Reason,
worker => Worker
}),
false
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
end.

View File

@ -87,7 +87,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
on_query_async( on_query_async(
InstId, InstId,
{send_message, Data}, {send_message, Data},
{ReplayFun, Args}, {ReplyFun, Args},
_State = #{write_syntax := SyntaxLines, client := Client} _State = #{write_syntax := SyntaxLines, client := Client}
) -> ) ->
case data_to_points(Data, SyntaxLines) of case data_to_points(Data, SyntaxLines) of
@ -96,7 +96,7 @@ on_query_async(
influxdb_connector_send_query, influxdb_connector_send_query,
#{points => Points, batch => false, mode => async} #{points => Points, batch => false, mode => async}
), ),
do_async_query(InstId, Client, Points, {ReplayFun, Args}); do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, ErrorPoints} = Err -> {error, ErrorPoints} = Err ->
?tp( ?tp(
influxdb_connector_send_query_error, influxdb_connector_send_query_error,
@ -109,7 +109,7 @@ on_query_async(
on_batch_query_async( on_batch_query_async(
InstId, InstId,
BatchData, BatchData,
{ReplayFun, Args}, {ReplyFun, Args},
#{write_syntax := SyntaxLines, client := Client} #{write_syntax := SyntaxLines, client := Client}
) -> ) ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of case parse_batch_data(InstId, BatchData, SyntaxLines) of
@ -118,7 +118,7 @@ on_batch_query_async(
influxdb_connector_send_query, influxdb_connector_send_query,
#{points => Points, batch => true, mode => async} #{points => Points, batch => true, mode => async}
), ),
do_async_query(InstId, Client, Points, {ReplayFun, Args}); do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, Reason} -> {error, Reason} ->
?tp( ?tp(
influxdb_connector_send_query_error, influxdb_connector_send_query_error,
@ -336,13 +336,13 @@ do_query(InstId, Client, Points) ->
Err Err
end. end.
do_async_query(InstId, Client, Points, ReplayFunAndArgs) -> do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "influxdb write point async", msg => "influxdb write point async",
connector => InstId, connector => InstId,
points => Points points => Points
}), }),
ok = influxdb:write_async(Client, Points, ReplayFunAndArgs). ok = influxdb:write_async(Client, Points, ReplyFunAndArgs).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans

View File

@ -10,6 +10,7 @@ main(_) ->
Conf = [merge(Conf0, Cfgs1), Conf = [merge(Conf0, Cfgs1),
io_lib:nl() io_lib:nl()
], ],
ok = filelib:ensure_dir("apps/emqx_dashboard/priv/i18n.conf"),
ok = file:write_file("apps/emqx_dashboard/priv/i18n.conf", Conf). ok = file:write_file("apps/emqx_dashboard/priv/i18n.conf", Conf).
merge(BaseConf, Cfgs) -> merge(BaseConf, Cfgs) ->