From 2ff92d28807de0ebe42b48eace55789c86b9cdf7 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 7 Jun 2021 16:12:06 +0800 Subject: [PATCH] feat(emqx_data_bridge): add HTTP APIs for data bridge --- .../src/emqx_connector_mysql.erl | 2 - .../emqx_data_bridge/src/emqx_data_bridge.erl | 7 ++ .../src/emqx_data_bridge_api.erl | 98 +++++++++++++++++++ .../src/emqx_data_bridge_monitor.erl | 6 +- apps/emqx_resource/src/emqx_resource.erl | 72 +++++++++++--- apps/emqx_resource/src/emqx_resource_api.erl | 59 ++--------- .../src/emqx_resource_instance.erl | 2 +- 7 files changed, 176 insertions(+), 70 deletions(-) create mode 100644 apps/emqx_data_bridge/src/emqx_data_bridge_api.erl diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 1bc1bbc80..a00de8426 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -18,8 +18,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). --emqx_resource_api_path("connectors/mysql"). - -export([ on_jsonify/1 , on_api_reply_format/1 ]). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.erl b/apps/emqx_data_bridge/src/emqx_data_bridge.erl index cd26b9d3e..0f6c42246 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.erl @@ -1,9 +1,16 @@ -module(emqx_data_bridge). -export([ load_bridges/0 + , resource_type/1 + , resource_id/1 ]). load_bridges() -> Bridges = proplists:get_value(bridges, application:get_all_env(emqx_data_bridge), []), emqx_data_bridge_monitor:ensure_all_started(Bridges). + +resource_type(<<"mysql">>) -> emqx_connector_mysql. + +resource_id(BridgeName) -> + <<"bridge:", BridgeName/binary>>. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl new file mode 100644 index 000000000..02b1edb99 --- /dev/null +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl @@ -0,0 +1,98 @@ +-module(emqx_data_bridge_api). + +-rest_api(#{ name => list_data_bridges + , method => 'GET' + , path => "/data_bridges" + , func => list_bridges + , descr => "List all data bridges" + }). + +-rest_api(#{ name => get_data_bridge + , method => 'GET' + , path => "/data_bridges/:bin:name" + , func => get_bridge + , descr => "Get a data bridge by name" + }). + +-rest_api(#{ name => create_data_bridge + , method => 'POST' + , path => "/data_bridges/:bin:name" + , func => create_bridge + , descr => "Create a new data bridge" + }). + +-rest_api(#{ name => update_data_bridge + , method => 'POST' + , path => "/data_bridges/:bin:name" + , func => update_bridge + , descr => "Update an existing data bridge" + }). + +-rest_api(#{ name => delete_data_bridge + , method => 'DELETE' + , path => "/data_bridges/:bin:name" + , func => delete_bridge + , descr => "Delete an existing data bridge" + }). + +-export([ list_bridges/2 + , get_bridge/2 + , create_bridge/2 + , update_bridge/2 + , delete_bridge/2 + ]). + +list_bridges(_Binding, _Params) -> + {200, #{code => 0, data => emqx_resource_api:list_instances(fun is_bridge/1)}}. + +get_bridge(#{name := Name}, _Params) -> + case emqx_resource:get_instance(emqx_data_bridge:resource_id(Name)) of + {ok, Data} -> + {200, #{code => 0, data => emqx_resource_api:format_data(Data)}}; + {error, not_found} -> + {404, #{code => 102, message => <<"not_found: ", Name/binary>>}} + end. + +create_bridge(#{name := Name}, Params) -> + Config = proplists:get_value(<<"config">>, Params), + BridgeType = proplists:get_value(<<"type">>, Params), + case emqx_resource:check_and_create( + emqx_data_bridge:resource_id(Name), + emqx_data_bridge:resource_type(BridgeType), Config) of + {ok, Data} -> + {200, #{code => 0, data => emqx_resource_api:format_data(Data)}}; + {error, already_created} -> + {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; + {error, Reason0} -> + Reason = emqx_data_bridge_api:stringnify(Reason0), + {500, #{code => 102, message => <<"create bridge ", Name/binary, + " failed:", Reason/binary>>}} + end. + +update_bridge(#{name := Name}, Params) -> + Config = proplists:get_value(<<"config">>, Params), + BridgeType = proplists:get_value(<<"type">>, Params), + case emqx_resource:check_and_udpate( + emqx_data_bridge:resource_id(Name), + emqx_data_bridge:resource_type(BridgeType), Config, []) of + {ok, Data} -> + {200, #{code => 0, data => emqx_resource_api:format_data(Data)}}; + {error, not_found} -> + {400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}}; + {error, Reason0} -> + Reason = emqx_data_bridge_api:stringnify(Reason0), + {500, #{code => 102, message => <<"update bridge ", Name/binary, + " failed:", Reason/binary>>}} + end. + +delete_bridge(#{name := Name}, _Params) -> + case emqx_resource:remove(emqx_data_bridge:resource_id(Name)) of + ok -> {200, #{code => 0, data => #{}}}; + {error, Reason} -> + {500, #{code => 102, message => emqx_data_bridge_api:stringnify(Reason)}} + end. + +is_bridge(#{id := <<"bridge:", _/binary>>}) -> + true; +is_bridge(_Data) -> + false. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl index 22c216038..6073602b9 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_monitor.erl @@ -54,11 +54,11 @@ load_bridges(Configs) -> load_bridge(#{<<"name">> := Name, <<"type">> := Type, <<"config">> := Config}) -> - case emqx_resource:check_and_load_instance(Name, resource_type(Type), Config) of + case emqx_resource:check_and_create_local( + emqx_data_bridge:resource_id(Name), + emqx_data_bridge:resource_type(Type), Config) of {ok, _} -> ok; {error, already_created} -> ok; {error, Reason} -> error({load_bridge, Reason}) end. - -resource_type(<<"mysql">>) -> emqx_connector_mysql. \ No newline at end of file diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b909b0e50..8a974fe36 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -38,7 +38,10 @@ %% APIs for instances -export([ check_config/2 - , check_and_load_instance/3 + , check_and_create/3 + , check_and_create_local/3 + , check_and_update/4 + , check_and_update_local/4 , resource_type_from_str/1 ]). @@ -46,10 +49,13 @@ %% provisional solution: rpc:multical to all the nodes for creating/updating/removing %% todo: replicate operations -export([ create/3 %% store the config and start the instance + , create_local/3 , create_dry_run/3 %% run start/2, health_check/2 and stop/1 sequentially + , create_dry_run_local/3 , update/4 %% update the config, stop the old instance and start the new one - %% it will create a new resource when the id does not exist + , update_local/4 , remove/1 %% remove the config and stop the instance + , remove_local/1 ]). %% Calls to the callback module with current resource state @@ -73,7 +79,7 @@ -export([ list_instances/0 %% list all the instances, id only. , list_instances_verbose/0 %% list all the instances , get_instance/1 %% return the data of the instance - , get_instance_by_type/1 %% return all the instances of the same resource type + , list_instances_by_type/1 %% return all the instances of the same resource type % , dependents/1 % , inc_counter/2 %% increment the counter of the instance % , inc_counter/3 %% increment the counter by a given integer @@ -152,22 +158,42 @@ query_failed({_, {OnFailed, Args}}) -> -spec create(instance_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. create(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(call_instance, [InstId, {create, InstId, ResourceType, Config}], {ok, _}). + ?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}). + +-spec create_local(instance_id(), resource_type(), resource_config()) -> + {ok, resource_data()} | {error, Reason :: term()}. +create_local(InstId, ResourceType, Config) -> + call_instance(InstId, {create, InstId, ResourceType, Config}). -spec create_dry_run(instance_id(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(InstId, ResourceType, Config) -> - ?CLUSTER_CALL(call_instance, [InstId, {create_dry_run, InstId, ResourceType, Config}]). + ?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]). + +-spec create_dry_run_local(instance_id(), resource_type(), resource_config()) -> + ok | {error, Reason :: term()}. +create_dry_run_local(InstId, ResourceType, Config) -> + call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). -spec update(instance_id(), resource_type(), resource_config(), term()) -> {ok, resource_data()} | {error, Reason :: term()}. update(InstId, ResourceType, Config, Params) -> - ?CLUSTER_CALL(call_instance, [InstId, {update, InstId, ResourceType, Config, Params}], {ok, _}). + ?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}). + +-spec update_local(instance_id(), resource_type(), resource_config(), term()) -> + {ok, resource_data()} | {error, Reason :: term()}. +update_local(InstId, ResourceType, Config, Params) -> + call_instance(InstId, {update, InstId, ResourceType, Config, Params}). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> - ?CLUSTER_CALL(call_instance, [InstId, {remove, InstId}]). + ?CLUSTER_CALL(remove_local, [InstId]). +-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. +remove_local(InstId) -> + call_instance(InstId, {remove, InstId}). + +%% ================================================================================= -spec query(instance_id(), Request :: term()) -> Result :: term(). query(InstId, Request) -> query(InstId, Request, undefined). @@ -209,8 +235,8 @@ list_instances() -> list_instances_verbose() -> emqx_resource_instance:list_all(). --spec get_instance_by_type(module()) -> [resource_data()]. -get_instance_by_type(ResourceType) -> +-spec list_instances_by_type(module()) -> [resource_data()]. +list_instances_by_type(ResourceType) -> emqx_resource_instance:lookup_by_type(ResourceType). -spec call_start(instance_id(), module(), resource_config()) -> @@ -271,11 +297,33 @@ do_check_config(ResourceType, MapConfig) -> Config -> {ok, maps:get(<<"config">>, hocon:richmap_to_map(Config))} end. --spec check_and_load_instance(instance_id(), resource_type(), binary() | term()) -> +-spec check_and_create(instance_id(), resource_type(), binary() | term()) -> {ok, resource_data()} | {error, term()}. -check_and_load_instance(InstId, ResourceType, Config) -> +check_and_create(InstId, ResourceType, Config) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> create(InstId, ResourceType, InstConf) end). + +-spec check_and_create_local(instance_id(), resource_type(), binary() | term()) -> + {ok, resource_data()} | {error, term()}. +check_and_create_local(InstId, ResourceType, Config) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end). + +-spec check_and_update(instance_id(), resource_type(), binary() | term(), term()) -> + {ok, resource_data()} | {error, term()}. +check_and_update(InstId, ResourceType, Config, Params) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> update(InstId, ResourceType, InstConf, Params) end). + +-spec check_and_update_local(instance_id(), resource_type(), binary() | term(), term()) -> + {ok, resource_data()} | {error, term()}. +check_and_update_local(InstId, ResourceType, Config, Params) -> + check_and_do(ResourceType, Config, + fun(InstConf) -> update_local(InstId, ResourceType, InstConf, Params) end). + +check_and_do(ResourceType, Config, Do) when is_function(Do) -> case check_config(ResourceType, Config) of - {ok, InstConf} -> emqx_resource_instance:create_local(InstId, ResourceType, InstConf); + {ok, InstConf} -> Do(InstConf); Error -> Error end. diff --git a/apps/emqx_resource/src/emqx_resource_api.erl b/apps/emqx_resource/src/emqx_resource_api.erl index a92d41b75..fe1ca4509 100644 --- a/apps/emqx_resource/src/emqx_resource_api.erl +++ b/apps/emqx_resource/src/emqx_resource_api.erl @@ -15,61 +15,16 @@ %%-------------------------------------------------------------------- -module(emqx_resource_api). --export([ get_all/3 - , get/3 - , put/3 - , delete/3 +-export([ list_instances/1 + , format_data/1 + , stringnify/1 ]). --export([default_api_reply_format/1]). +list_instances(Filter) -> + [format_data(Data) || Data <- emqx_resource:list_instances_verbose(), Filter(Data)]. -get_all(Mod, _Binding, _Params) -> - {200, #{code => 0, data => - [format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}. - -get(Mod, #{id := Id}, _Params) -> - case emqx_resource:get_instance(stringnify(Id)) of - {ok, Data} -> - {200, #{code => 0, data => format_data(Mod, Data)}}; - {error, not_found} -> - {404, #{code => 102, message => {resource_instance_not_found, stringnify(Id)}}} - end. - -put(Mod, #{id := Id}, Params) -> - ConfigParams = proplists:get_value(<<"config">>, Params), - ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}), - case emqx_resource:resource_type_from_str(ResourceTypeStr) of - {ok, ResourceType} -> - do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params); - {error, Reason} -> - {404, #{code => 102, message => stringnify(Reason)}} - end. - -do_put(Mod, Id, ConfigParams, ResourceType, Params) -> - case emqx_resource:check_config(ResourceType, ConfigParams) of - {ok, Config} -> - case emqx_resource:update(Id, ResourceType, Config, Params) of - {ok, Data} -> - {200, #{code => 0, data => format_data(Mod, Data)}}; - {error, Reason} -> - {500, #{code => 102, message => stringnify(Reason)}} - end; - {error, Reason} -> - {400, #{code => 108, message => stringnify(Reason)}} - end. - -delete(_Mod, #{id := Id}, _Params) -> - case emqx_resource:remove(stringnify(Id)) of - ok -> {200, #{code => 0, data => #{}}}; - {error, Reason} -> - {500, #{code => 102, message => stringnify(Reason)}} - end. - -format_data(Mod, Data) -> - emqx_resource:call_api_reply_format(Mod, Data). - -default_api_reply_format(#{id := Id, mod := Mod, status := Status, config := Config}) -> - #{node => node(), id => Id, status => Status, resource_type => Mod, +format_data(#{id := Id, mod := Mod, status := Status, config := Config}) -> + #{id => Id, status => Status, resource_type => Mod, config => emqx_resource:call_jsonify(Mod, Config)}. stringnify(Bin) when is_binary(Bin) -> Bin; diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 470263a6d..1e924e249 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -167,7 +167,7 @@ do_update(InstId, ResourceType, NewConfig, Params) -> {ok, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> - do_create(InstId, ResourceType, NewConfig) + {error, not_found} end. do_create(InstId, ResourceType, Config) ->