Merge pull request #10471 from lafirest/fix/dynamo_bridge

fix(dynamo): separate the implementation of connector and client of Dynamo bridge
This commit is contained in:
lafirest 2023-04-23 14:25:23 +08:00 committed by GitHub
commit 6ef032026f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 196 additions and 115 deletions

View File

@ -251,7 +251,7 @@ directly_setup_dynamo() ->
directly_query(Query) -> directly_query(Query) ->
directly_setup_dynamo(), directly_setup_dynamo(),
emqx_ee_connector_dynamo:execute(Query, ?TABLE_BIN). emqx_ee_connector_dynamo_client:execute(Query, ?TABLE_BIN).
directly_get_payload(Key) -> directly_get_payload(Key) ->
case directly_query({get_item, {<<"id">>, Key}}) of case directly_query({get_item, {<<"id">>, Key}}) of

View File

@ -28,10 +28,7 @@
]). ]).
-export([ -export([
connect/1, connect/1
do_get_status/1,
do_async_reply/2,
worker_do_query/4
]). ]).
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
@ -40,10 +37,6 @@
default_port => 8000 default_port => 8000
}). }).
-ifdef(TEST).
-export([execute/2]).
-endif.
%%===================================================================== %%=====================================================================
%% Hocon schema %% Hocon schema
roots() -> roots() ->
@ -126,45 +119,39 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) ->
emqx_plugin_libs_pool:stop_pool(PoolName). emqx_plugin_libs_pool:stop_pool(PoolName).
on_query(InstanceId, Query, State) -> on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, handover, State). do_query(InstanceId, Query, sync, State).
on_query_async(InstanceId, Query, Reply, State) -> on_query_async(InstanceId, Query, ReplyCtx, State) ->
do_query( do_query(
InstanceId, InstanceId,
Query, Query,
{handover_async, {?MODULE, do_async_reply, [Reply]}}, {async, ReplyCtx},
State State
). ).
%% we only support batch insert %% we only support batch insert
on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) ->
do_query(InstanceId, Query, handover, State); do_query(InstanceId, Query, sync, State);
on_batch_query(_InstanceId, Query, _State) -> on_batch_query(_InstanceId, Query, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}. {error, {unrecoverable_error, {invalid_request, Query}}}.
%% we only support batch insert %% we only support batch insert
on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) -> on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) ->
do_query( do_query(
InstanceId, InstanceId,
Query, Query,
{handover_async, {?MODULE, do_async_reply, [Reply]}}, {async, ReplyCtx},
State State
); );
on_batch_query_async(_InstanceId, Query, _Reply, _State) -> on_batch_query_async(_InstanceId, Query, _Reply, _State) ->
{error, {unrecoverable_error, {invalid_request, Query}}}. {error, {unrecoverable_error, {invalid_request, Query}}}.
on_get_status(_InstanceId, #{poolname := Pool}) -> on_get_status(_InstanceId, #{poolname := Pool}) ->
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), Health = emqx_plugin_libs_pool:health_check_ecpool_workers(
Pool, {emqx_ee_connector_dynamo_client, is_connected, []}
),
status_result(Health). status_result(Health).
do_get_status(_Conn) ->
%% because the dynamodb driver connection process is the ecpool worker self
%% so we must call the checker function inside the worker
case erlcloud_ddb2:list_tables() of
{ok, _} -> true;
_ -> false
end.
status_result(_Status = true) -> connected; status_result(_Status = true) -> connected;
status_result(_Status = false) -> connecting. status_result(_Status = false) -> connecting.
@ -185,8 +172,8 @@ do_query(
), ),
Result = ecpool:pick_and_do( Result = ecpool:pick_and_do(
PoolName, PoolName,
{?MODULE, worker_do_query, [Table, Query, Templates]}, {emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]},
ApplyMode no_handover
), ),
case Result of case Result of
@ -210,47 +197,10 @@ do_query(
Result Result
end. end.
worker_do_query(_Client, Table, Query0, Templates) ->
try
Query = apply_template(Query0, Templates),
execute(Query, Table)
catch
_Type:Reason ->
{error, {unrecoverable_error, {invalid_request, Reason}}}
end.
%% some simple query commands for authn/authz or test
execute({insert_item, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute({delete_item, Key}, Table) ->
erlcloud_ddb2:delete_item(Table, Key);
execute({get_item, Key}, Table) ->
erlcloud_ddb2:get_item(Table, Key);
%% commands for data bridge query or batch query
execute({send_message, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute([{put, _} | _] = Msgs, Table) ->
%% type of batch_write_item argument :: batch_write_item_request_items()
%% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
%% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
%% batch_write_item_request() :: {put, item()} | {delete, key()}
erlcloud_ddb2:batch_write_item({Table, Msgs}).
connect(Opts) -> connect(Opts) ->
#{ Options = proplists:get_value(config, Opts),
aws_access_key_id := AccessKeyID, {ok, _Pid} = Result = emqx_ee_connector_dynamo_client:start_link(Options),
aws_secret_access_key := SecretAccessKey, Result.
host := Host,
port := Port,
schema := Schema
} = proplists:get_value(config, Opts),
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
%% The dynamodb driver uses caller process as its connection process
%% so at here, the connection process is the ecpool worker self
{ok, self()}.
parse_template(Config) -> parse_template(Config) ->
Templates = Templates =
@ -283,54 +233,5 @@ get_host_schema("https://" ++ Server) ->
get_host_schema(Server) -> get_host_schema(Server) ->
{"http://", Server}. {"http://", Server}.
apply_template({Key, Msg} = Req, Templates) ->
case maps:get(Key, Templates, undefined) of
undefined ->
Req;
Template ->
{Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)}
end;
%% now there is no batch delete, so
%% 1. we can simply replace the `send_message` to `put`
%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`,
%% so we can reduce some list map cost
apply_template([{send_message, _Msg} | _] = Msgs, Templates) ->
lists:map(
fun(Req) ->
{_, Msg} = apply_template(Req, Templates),
{put, convert_to_item(Msg)}
end,
Msgs
).
convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 ->
maps:fold(
fun
(_K, <<>>, AccIn) ->
AccIn;
(K, V, AccIn) ->
[{convert2binary(K), convert2binary(V)} | AccIn]
end,
[],
Msg
);
convert_to_item(MsgBin) when is_binary(MsgBin) ->
Msg = emqx_utils_json:decode(MsgBin),
convert_to_item(Msg);
convert_to_item(Item) ->
erlang:throw({invalid_item, Item}).
convert2binary(Value) when is_atom(Value) ->
erlang:atom_to_binary(Value, utf8);
convert2binary(Value) when is_binary(Value); is_number(Value) ->
Value;
convert2binary(Value) when is_list(Value) ->
unicode:characters_to_binary(Value);
convert2binary(Value) when is_map(Value) ->
emqx_utils_json:encode(Value).
do_async_reply(Result, {ReplyFun, [Context]}) ->
ReplyFun(Context, Result).
redact(Data) -> redact(Data) ->
emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end).

View File

@ -0,0 +1,180 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_dynamo_client).
-behaviour(gen_server).
%% API
-export([
start_link/1,
is_connected/1,
query/5,
query/4
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
format_status/2
]).
-ifdef(TEST).
-export([execute/2]).
-endif.
%%%===================================================================
%%% API
%%%===================================================================
is_connected(Pid) ->
try
gen_server:call(Pid, is_connected)
catch
_:_ ->
false
end.
query(Pid, sync, Table, Query, Templates) ->
query(Pid, Table, Query, Templates);
query(Pid, {async, ReplyCtx}, Table, Query, Templates) ->
gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}).
query(Pid, Table, Query, Templates) ->
gen_server:call(Pid, {query, Table, Query, Templates}, infinity).
%%--------------------------------------------------------------------
%% @doc
%% Starts Bridge which transfer data to DynamoDB
%% @endn
%%--------------------------------------------------------------------
start_link(Options) ->
gen_server:start_link(?MODULE, Options, []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% Initialize dynamodb data bridge
init(#{
aws_access_key_id := AccessKeyID,
aws_secret_access_key := SecretAccessKey,
host := Host,
port := Port,
schema := Schema
}) ->
erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
{ok, #{}}.
handle_call(is_connected, _From, State) ->
_ = erlcloud_ddb2:list_tables(),
{reply, true, State};
handle_call({query, Table, Query, Templates}, _From, State) ->
Result = do_query(Table, Query, Templates),
{reply, Result, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) ->
Result = do_query(Table, Query, Templates),
ReplyFun(Context, Result),
{noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-spec format_status(
Opt :: normal | terminate,
Status :: list()
) -> Status :: term().
format_status(_Opt, Status) ->
Status.
%%%===================================================================
%%% Internal functions
%%%===================================================================
do_query(Table, Query0, Templates) ->
try
Query = apply_template(Query0, Templates),
execute(Query, Table)
catch
_Type:Reason ->
{error, {unrecoverable_error, {invalid_request, Reason}}}
end.
%% some simple query commands for authn/authz or test
execute({insert_item, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute({delete_item, Key}, Table) ->
erlcloud_ddb2:delete_item(Table, Key);
execute({get_item, Key}, Table) ->
erlcloud_ddb2:get_item(Table, Key);
%% commands for data bridge query or batch query
execute({send_message, Msg}, Table) ->
Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Table, Item);
execute([{put, _} | _] = Msgs, Table) ->
%% type of batch_write_item argument :: batch_write_item_request_items()
%% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
%% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
%% batch_write_item_request() :: {put, item()} | {delete, key()}
erlcloud_ddb2:batch_write_item({Table, Msgs}).
apply_template({Key, Msg} = Req, Templates) ->
case maps:get(Key, Templates, undefined) of
undefined ->
Req;
Template ->
{Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)}
end;
%% now there is no batch delete, so
%% 1. we can simply replace the `send_message` to `put`
%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`,
%% so we can reduce some list map cost
apply_template([{send_message, _Msg} | _] = Msgs, Templates) ->
lists:map(
fun(Req) ->
{_, Msg} = apply_template(Req, Templates),
{put, convert_to_item(Msg)}
end,
Msgs
).
convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 ->
maps:fold(
fun
(_K, <<>>, AccIn) ->
AccIn;
(K, V, AccIn) ->
[{convert2binary(K), convert2binary(V)} | AccIn]
end,
[],
Msg
);
convert_to_item(MsgBin) when is_binary(MsgBin) ->
Msg = emqx_utils_json:decode(MsgBin),
convert_to_item(Msg);
convert_to_item(Item) ->
erlang:throw({invalid_item, Item}).
convert2binary(Value) when is_atom(Value) ->
erlang:atom_to_binary(Value, utf8);
convert2binary(Value) when is_binary(Value); is_number(Value) ->
Value;
convert2binary(Value) when is_list(Value) ->
unicode:characters_to_binary(Value);
convert2binary(Value) when is_map(Value) ->
emqx_utils_json:encode(Value).