feat(emqx_data_bridge): add HTTP APIs for data bridge
This commit is contained in:
parent
4f451ee862
commit
2ff92d2880
|
@ -18,8 +18,6 @@
|
|||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource_behaviour.hrl").
|
||||
|
||||
-emqx_resource_api_path("connectors/mysql").
|
||||
|
||||
-export([ on_jsonify/1
|
||||
, on_api_reply_format/1
|
||||
]).
|
||||
|
|
|
@ -1,9 +1,16 @@
|
|||
-module(emqx_data_bridge).
|
||||
|
||||
-export([ load_bridges/0
|
||||
, resource_type/1
|
||||
, resource_id/1
|
||||
]).
|
||||
|
||||
load_bridges() ->
|
||||
Bridges = proplists:get_value(bridges,
|
||||
application:get_all_env(emqx_data_bridge), []),
|
||||
emqx_data_bridge_monitor:ensure_all_started(Bridges).
|
||||
|
||||
resource_type(<<"mysql">>) -> emqx_connector_mysql.
|
||||
|
||||
resource_id(BridgeName) ->
|
||||
<<"bridge:", BridgeName/binary>>.
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
-module(emqx_data_bridge_api).
|
||||
|
||||
-rest_api(#{ name => list_data_bridges
|
||||
, method => 'GET'
|
||||
, path => "/data_bridges"
|
||||
, func => list_bridges
|
||||
, descr => "List all data bridges"
|
||||
}).
|
||||
|
||||
-rest_api(#{ name => get_data_bridge
|
||||
, method => 'GET'
|
||||
, path => "/data_bridges/:bin:name"
|
||||
, func => get_bridge
|
||||
, descr => "Get a data bridge by name"
|
||||
}).
|
||||
|
||||
-rest_api(#{ name => create_data_bridge
|
||||
, method => 'POST'
|
||||
, path => "/data_bridges/:bin:name"
|
||||
, func => create_bridge
|
||||
, descr => "Create a new data bridge"
|
||||
}).
|
||||
|
||||
-rest_api(#{ name => update_data_bridge
|
||||
, method => 'POST'
|
||||
, path => "/data_bridges/:bin:name"
|
||||
, func => update_bridge
|
||||
, descr => "Update an existing data bridge"
|
||||
}).
|
||||
|
||||
-rest_api(#{ name => delete_data_bridge
|
||||
, method => 'DELETE'
|
||||
, path => "/data_bridges/:bin:name"
|
||||
, func => delete_bridge
|
||||
, descr => "Delete an existing data bridge"
|
||||
}).
|
||||
|
||||
-export([ list_bridges/2
|
||||
, get_bridge/2
|
||||
, create_bridge/2
|
||||
, update_bridge/2
|
||||
, delete_bridge/2
|
||||
]).
|
||||
|
||||
list_bridges(_Binding, _Params) ->
|
||||
{200, #{code => 0, data => emqx_resource_api:list_instances(fun is_bridge/1)}}.
|
||||
|
||||
get_bridge(#{name := Name}, _Params) ->
|
||||
case emqx_resource:get_instance(emqx_data_bridge:resource_id(Name)) of
|
||||
{ok, Data} ->
|
||||
{200, #{code => 0, data => emqx_resource_api:format_data(Data)}};
|
||||
{error, not_found} ->
|
||||
{404, #{code => 102, message => <<"not_found: ", Name/binary>>}}
|
||||
end.
|
||||
|
||||
create_bridge(#{name := Name}, Params) ->
|
||||
Config = proplists:get_value(<<"config">>, Params),
|
||||
BridgeType = proplists:get_value(<<"type">>, Params),
|
||||
case emqx_resource:check_and_create(
|
||||
emqx_data_bridge:resource_id(Name),
|
||||
emqx_data_bridge:resource_type(BridgeType), Config) of
|
||||
{ok, Data} ->
|
||||
{200, #{code => 0, data => emqx_resource_api:format_data(Data)}};
|
||||
{error, already_created} ->
|
||||
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
|
||||
{error, Reason0} ->
|
||||
Reason = emqx_data_bridge_api:stringnify(Reason0),
|
||||
{500, #{code => 102, message => <<"create bridge ", Name/binary,
|
||||
" failed:", Reason/binary>>}}
|
||||
end.
|
||||
|
||||
update_bridge(#{name := Name}, Params) ->
|
||||
Config = proplists:get_value(<<"config">>, Params),
|
||||
BridgeType = proplists:get_value(<<"type">>, Params),
|
||||
case emqx_resource:check_and_udpate(
|
||||
emqx_data_bridge:resource_id(Name),
|
||||
emqx_data_bridge:resource_type(BridgeType), Config, []) of
|
||||
{ok, Data} ->
|
||||
{200, #{code => 0, data => emqx_resource_api:format_data(Data)}};
|
||||
{error, not_found} ->
|
||||
{400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}};
|
||||
{error, Reason0} ->
|
||||
Reason = emqx_data_bridge_api:stringnify(Reason0),
|
||||
{500, #{code => 102, message => <<"update bridge ", Name/binary,
|
||||
" failed:", Reason/binary>>}}
|
||||
end.
|
||||
|
||||
delete_bridge(#{name := Name}, _Params) ->
|
||||
case emqx_resource:remove(emqx_data_bridge:resource_id(Name)) of
|
||||
ok -> {200, #{code => 0, data => #{}}};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 102, message => emqx_data_bridge_api:stringnify(Reason)}}
|
||||
end.
|
||||
|
||||
is_bridge(#{id := <<"bridge:", _/binary>>}) ->
|
||||
true;
|
||||
is_bridge(_Data) ->
|
||||
false.
|
|
@ -54,11 +54,11 @@ load_bridges(Configs) ->
|
|||
|
||||
load_bridge(#{<<"name">> := Name, <<"type">> := Type,
|
||||
<<"config">> := Config}) ->
|
||||
case emqx_resource:check_and_load_instance(Name, resource_type(Type), Config) of
|
||||
case emqx_resource:check_and_create_local(
|
||||
emqx_data_bridge:resource_id(Name),
|
||||
emqx_data_bridge:resource_type(Type), Config) of
|
||||
{ok, _} -> ok;
|
||||
{error, already_created} -> ok;
|
||||
{error, Reason} ->
|
||||
error({load_bridge, Reason})
|
||||
end.
|
||||
|
||||
resource_type(<<"mysql">>) -> emqx_connector_mysql.
|
|
@ -38,7 +38,10 @@
|
|||
%% APIs for instances
|
||||
|
||||
-export([ check_config/2
|
||||
, check_and_load_instance/3
|
||||
, check_and_create/3
|
||||
, check_and_create_local/3
|
||||
, check_and_update/4
|
||||
, check_and_update_local/4
|
||||
, resource_type_from_str/1
|
||||
]).
|
||||
|
||||
|
@ -46,10 +49,13 @@
|
|||
%% provisional solution: rpc:multical to all the nodes for creating/updating/removing
|
||||
%% todo: replicate operations
|
||||
-export([ create/3 %% store the config and start the instance
|
||||
, create_local/3
|
||||
, create_dry_run/3 %% run start/2, health_check/2 and stop/1 sequentially
|
||||
, create_dry_run_local/3
|
||||
, update/4 %% update the config, stop the old instance and start the new one
|
||||
%% it will create a new resource when the id does not exist
|
||||
, update_local/4
|
||||
, remove/1 %% remove the config and stop the instance
|
||||
, remove_local/1
|
||||
]).
|
||||
|
||||
%% Calls to the callback module with current resource state
|
||||
|
@ -73,7 +79,7 @@
|
|||
-export([ list_instances/0 %% list all the instances, id only.
|
||||
, list_instances_verbose/0 %% list all the instances
|
||||
, get_instance/1 %% return the data of the instance
|
||||
, get_instance_by_type/1 %% return all the instances of the same resource type
|
||||
, list_instances_by_type/1 %% return all the instances of the same resource type
|
||||
% , dependents/1
|
||||
% , inc_counter/2 %% increment the counter of the instance
|
||||
% , inc_counter/3 %% increment the counter by a given integer
|
||||
|
@ -152,22 +158,42 @@ query_failed({_, {OnFailed, Args}}) ->
|
|||
-spec create(instance_id(), resource_type(), resource_config()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
create(InstId, ResourceType, Config) ->
|
||||
?CLUSTER_CALL(call_instance, [InstId, {create, InstId, ResourceType, Config}], {ok, _}).
|
||||
?CLUSTER_CALL(create_local, [InstId, ResourceType, Config], {ok, _}).
|
||||
|
||||
-spec create_local(instance_id(), resource_type(), resource_config()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
create_local(InstId, ResourceType, Config) ->
|
||||
call_instance(InstId, {create, InstId, ResourceType, Config}).
|
||||
|
||||
-spec create_dry_run(instance_id(), resource_type(), resource_config()) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
create_dry_run(InstId, ResourceType, Config) ->
|
||||
?CLUSTER_CALL(call_instance, [InstId, {create_dry_run, InstId, ResourceType, Config}]).
|
||||
?CLUSTER_CALL(create_dry_run_local, [InstId, ResourceType, Config]).
|
||||
|
||||
-spec create_dry_run_local(instance_id(), resource_type(), resource_config()) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
create_dry_run_local(InstId, ResourceType, Config) ->
|
||||
call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}).
|
||||
|
||||
-spec update(instance_id(), resource_type(), resource_config(), term()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
update(InstId, ResourceType, Config, Params) ->
|
||||
?CLUSTER_CALL(call_instance, [InstId, {update, InstId, ResourceType, Config, Params}], {ok, _}).
|
||||
?CLUSTER_CALL(update_local, [InstId, ResourceType, Config, Params], {ok, _}).
|
||||
|
||||
-spec update_local(instance_id(), resource_type(), resource_config(), term()) ->
|
||||
{ok, resource_data()} | {error, Reason :: term()}.
|
||||
update_local(InstId, ResourceType, Config, Params) ->
|
||||
call_instance(InstId, {update, InstId, ResourceType, Config, Params}).
|
||||
|
||||
-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
|
||||
remove(InstId) ->
|
||||
?CLUSTER_CALL(call_instance, [InstId, {remove, InstId}]).
|
||||
?CLUSTER_CALL(remove_local, [InstId]).
|
||||
|
||||
-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
|
||||
remove_local(InstId) ->
|
||||
call_instance(InstId, {remove, InstId}).
|
||||
|
||||
%% =================================================================================
|
||||
-spec query(instance_id(), Request :: term()) -> Result :: term().
|
||||
query(InstId, Request) ->
|
||||
query(InstId, Request, undefined).
|
||||
|
@ -209,8 +235,8 @@ list_instances() ->
|
|||
list_instances_verbose() ->
|
||||
emqx_resource_instance:list_all().
|
||||
|
||||
-spec get_instance_by_type(module()) -> [resource_data()].
|
||||
get_instance_by_type(ResourceType) ->
|
||||
-spec list_instances_by_type(module()) -> [resource_data()].
|
||||
list_instances_by_type(ResourceType) ->
|
||||
emqx_resource_instance:lookup_by_type(ResourceType).
|
||||
|
||||
-spec call_start(instance_id(), module(), resource_config()) ->
|
||||
|
@ -271,11 +297,33 @@ do_check_config(ResourceType, MapConfig) ->
|
|||
Config -> {ok, maps:get(<<"config">>, hocon:richmap_to_map(Config))}
|
||||
end.
|
||||
|
||||
-spec check_and_load_instance(instance_id(), resource_type(), binary() | term()) ->
|
||||
-spec check_and_create(instance_id(), resource_type(), binary() | term()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
check_and_load_instance(InstId, ResourceType, Config) ->
|
||||
check_and_create(InstId, ResourceType, Config) ->
|
||||
check_and_do(ResourceType, Config,
|
||||
fun(InstConf) -> create(InstId, ResourceType, InstConf) end).
|
||||
|
||||
-spec check_and_create_local(instance_id(), resource_type(), binary() | term()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
check_and_create_local(InstId, ResourceType, Config) ->
|
||||
check_and_do(ResourceType, Config,
|
||||
fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end).
|
||||
|
||||
-spec check_and_update(instance_id(), resource_type(), binary() | term(), term()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
check_and_update(InstId, ResourceType, Config, Params) ->
|
||||
check_and_do(ResourceType, Config,
|
||||
fun(InstConf) -> update(InstId, ResourceType, InstConf, Params) end).
|
||||
|
||||
-spec check_and_update_local(instance_id(), resource_type(), binary() | term(), term()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
check_and_update_local(InstId, ResourceType, Config, Params) ->
|
||||
check_and_do(ResourceType, Config,
|
||||
fun(InstConf) -> update_local(InstId, ResourceType, InstConf, Params) end).
|
||||
|
||||
check_and_do(ResourceType, Config, Do) when is_function(Do) ->
|
||||
case check_config(ResourceType, Config) of
|
||||
{ok, InstConf} -> emqx_resource_instance:create_local(InstId, ResourceType, InstConf);
|
||||
{ok, InstConf} -> Do(InstConf);
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
|
|
|
@ -15,61 +15,16 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_resource_api).
|
||||
|
||||
-export([ get_all/3
|
||||
, get/3
|
||||
, put/3
|
||||
, delete/3
|
||||
-export([ list_instances/1
|
||||
, format_data/1
|
||||
, stringnify/1
|
||||
]).
|
||||
|
||||
-export([default_api_reply_format/1]).
|
||||
list_instances(Filter) ->
|
||||
[format_data(Data) || Data <- emqx_resource:list_instances_verbose(), Filter(Data)].
|
||||
|
||||
get_all(Mod, _Binding, _Params) ->
|
||||
{200, #{code => 0, data =>
|
||||
[format_data(Mod, Data) || Data <- emqx_resource:list_instances_verbose()]}}.
|
||||
|
||||
get(Mod, #{id := Id}, _Params) ->
|
||||
case emqx_resource:get_instance(stringnify(Id)) of
|
||||
{ok, Data} ->
|
||||
{200, #{code => 0, data => format_data(Mod, Data)}};
|
||||
{error, not_found} ->
|
||||
{404, #{code => 102, message => {resource_instance_not_found, stringnify(Id)}}}
|
||||
end.
|
||||
|
||||
put(Mod, #{id := Id}, Params) ->
|
||||
ConfigParams = proplists:get_value(<<"config">>, Params),
|
||||
ResourceTypeStr = proplists:get_value(<<"resource_type">>, Params, #{}),
|
||||
case emqx_resource:resource_type_from_str(ResourceTypeStr) of
|
||||
{ok, ResourceType} ->
|
||||
do_put(Mod, stringnify(Id), ConfigParams, ResourceType, Params);
|
||||
{error, Reason} ->
|
||||
{404, #{code => 102, message => stringnify(Reason)}}
|
||||
end.
|
||||
|
||||
do_put(Mod, Id, ConfigParams, ResourceType, Params) ->
|
||||
case emqx_resource:check_config(ResourceType, ConfigParams) of
|
||||
{ok, Config} ->
|
||||
case emqx_resource:update(Id, ResourceType, Config, Params) of
|
||||
{ok, Data} ->
|
||||
{200, #{code => 0, data => format_data(Mod, Data)}};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 102, message => stringnify(Reason)}}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{400, #{code => 108, message => stringnify(Reason)}}
|
||||
end.
|
||||
|
||||
delete(_Mod, #{id := Id}, _Params) ->
|
||||
case emqx_resource:remove(stringnify(Id)) of
|
||||
ok -> {200, #{code => 0, data => #{}}};
|
||||
{error, Reason} ->
|
||||
{500, #{code => 102, message => stringnify(Reason)}}
|
||||
end.
|
||||
|
||||
format_data(Mod, Data) ->
|
||||
emqx_resource:call_api_reply_format(Mod, Data).
|
||||
|
||||
default_api_reply_format(#{id := Id, mod := Mod, status := Status, config := Config}) ->
|
||||
#{node => node(), id => Id, status => Status, resource_type => Mod,
|
||||
format_data(#{id := Id, mod := Mod, status := Status, config := Config}) ->
|
||||
#{id => Id, status => Status, resource_type => Mod,
|
||||
config => emqx_resource:call_jsonify(Mod, Config)}.
|
||||
|
||||
stringnify(Bin) when is_binary(Bin) -> Bin;
|
||||
|
|
|
@ -167,7 +167,7 @@ do_update(InstId, ResourceType, NewConfig, Params) ->
|
|||
{ok, #{mod := Mod}} when Mod =/= ResourceType ->
|
||||
{error, updating_to_incorrect_resource_type};
|
||||
{error, not_found} ->
|
||||
do_create(InstId, ResourceType, NewConfig)
|
||||
{error, not_found}
|
||||
end.
|
||||
|
||||
do_create(InstId, ResourceType, Config) ->
|
||||
|
|
Loading…
Reference in New Issue