feat(bridges): add CRUD HTTP APIs for bridges
This commit is contained in:
parent
304c5613ac
commit
cb8dabe579
|
@ -11,11 +11,11 @@
|
||||||
{deps,
|
{deps,
|
||||||
[ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
[ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
|
||||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
|
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.0"}}}
|
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.1"}}}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||||
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
||||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
|
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
|
||||||
|
|
|
@ -2,48 +2,48 @@
|
||||||
## EMQ X Bridge
|
## EMQ X Bridge
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
|
|
||||||
#bridges.mqtt.my_mqtt_bridge_to_aws {
|
bridges.mqtt.my_mqtt_bridge_to_aws {
|
||||||
# server = "127.0.0.1:1883"
|
server = "127.0.0.1:1883"
|
||||||
# proto_ver = "v4"
|
proto_ver = "v4"
|
||||||
# username = "username1"
|
username = "username1"
|
||||||
# password = ""
|
password = ""
|
||||||
# clean_start = true
|
clean_start = true
|
||||||
# keepalive = 300
|
keepalive = 300
|
||||||
# retry_interval = "30s"
|
retry_interval = "30s"
|
||||||
# max_inflight = 32
|
max_inflight = 32
|
||||||
# reconnect_interval = "30s"
|
reconnect_interval = "30s"
|
||||||
# bridge_mode = true
|
bridge_mode = true
|
||||||
# replayq {
|
replayq {
|
||||||
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
|
dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
|
||||||
# seg_bytes = "100MB"
|
seg_bytes = "100MB"
|
||||||
# offload = false
|
offload = false
|
||||||
# max_total_bytes = "1GB"
|
max_total_bytes = "1GB"
|
||||||
# }
|
}
|
||||||
# ssl {
|
ssl {
|
||||||
# enable = false
|
enable = false
|
||||||
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
# }
|
}
|
||||||
# ## we will create one MQTT connection for each element of the `message_in`
|
## we will create one MQTT connection for each element of the `message_in`
|
||||||
# message_in: [{
|
message_in: [{
|
||||||
# ## the `id` will be used as part of the clientid
|
## the `id` will be used as part of the clientid
|
||||||
# id = "pull_msgs_from_aws"
|
id = "pull_msgs_from_aws"
|
||||||
# subscribe_remote_topic = "aws/#"
|
subscribe_remote_topic = "aws/#"
|
||||||
# subscribe_qos = 1
|
subscribe_qos = 1
|
||||||
# local_topic = "from_aws/${topic}"
|
local_topic = "from_aws/${topic}"
|
||||||
# payload = "${payload}"
|
payload = "${payload}"
|
||||||
# qos = "${qos}"
|
qos = "${qos}"
|
||||||
# retain = "${retain}"
|
retain = "${retain}"
|
||||||
# }]
|
}]
|
||||||
# ## we will create one MQTT connection for each element of the `message_out`
|
## we will create one MQTT connection for each element of the `message_out`
|
||||||
# message_out: [{
|
message_out: [{
|
||||||
# ## the `id` will be used as part of the clientid
|
## the `id` will be used as part of the clientid
|
||||||
# id = "push_msgs_to_aws"
|
id = "push_msgs_to_aws"
|
||||||
# subscribe_local_topic = "emqx/#"
|
subscribe_local_topic = "emqx/#"
|
||||||
# remote_topic = "from_emqx/${topic}"
|
remote_topic = "from_emqx/${topic}"
|
||||||
# payload = "${payload}"
|
payload = "${payload}"
|
||||||
# qos = 1
|
qos = 1
|
||||||
# retain = false
|
retain = false
|
||||||
# }]
|
}]
|
||||||
#}
|
}
|
||||||
|
|
|
@ -14,21 +14,34 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_bridge).
|
-module(emqx_bridge).
|
||||||
|
-behaviour(emqx_config_handler).
|
||||||
|
|
||||||
|
-export([post_config_update/4]).
|
||||||
|
|
||||||
-export([ load_bridges/0
|
-export([ load_bridges/0
|
||||||
, resource_type/1
|
, get_bridge/2
|
||||||
, bridge_type/1
|
, get_bridge/3
|
||||||
, name_to_resource_id/1
|
|
||||||
, resource_id_to_name/1
|
|
||||||
, list_bridges/0
|
, list_bridges/0
|
||||||
, is_bridge/1
|
, create_bridge/3
|
||||||
, config_key_path/0
|
, remove_bridge/3
|
||||||
, update_config/1
|
, update_bridge/3
|
||||||
|
, start_bridge/2
|
||||||
|
, stop_bridge/2
|
||||||
|
, restart_bridge/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
load_bridges() ->
|
-export([ config_key_path/0
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
]).
|
||||||
emqx_bridge_monitor:ensure_all_started(Bridges).
|
|
||||||
|
-export([ resource_type/1
|
||||||
|
, bridge_type/1
|
||||||
|
, resource_id/1
|
||||||
|
, resource_id/2
|
||||||
|
, parse_bridge_id/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
config_key_path() ->
|
||||||
|
[bridges].
|
||||||
|
|
||||||
resource_type(mqtt) -> emqx_connector_mqtt;
|
resource_type(mqtt) -> emqx_connector_mqtt;
|
||||||
resource_type(mysql) -> emqx_connector_mysql;
|
resource_type(mysql) -> emqx_connector_mysql;
|
||||||
|
@ -44,27 +57,136 @@ bridge_type(emqx_connector_mongo) -> mongo;
|
||||||
bridge_type(emqx_connector_redis) -> redis;
|
bridge_type(emqx_connector_redis) -> redis;
|
||||||
bridge_type(emqx_connector_ldap) -> ldap.
|
bridge_type(emqx_connector_ldap) -> ldap.
|
||||||
|
|
||||||
name_to_resource_id(BridgeName) ->
|
post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
|
||||||
Name = bin(BridgeName),
|
#{added := Added, removed := Removed, changed := Updated}
|
||||||
<<"bridge:", Name/binary>>.
|
= diff_confs(NewConf, OldConf),
|
||||||
|
perform_bridge_changes([
|
||||||
|
{fun remove_bridge/3, Removed},
|
||||||
|
{fun create_bridge/3, Added},
|
||||||
|
{fun update_bridge/3, Updated}
|
||||||
|
]).
|
||||||
|
|
||||||
resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) ->
|
perform_bridge_changes(Tasks) ->
|
||||||
BridgeName.
|
perform_bridge_changes(Tasks, ok).
|
||||||
|
|
||||||
|
perform_bridge_changes([], Result) ->
|
||||||
|
Result;
|
||||||
|
perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
|
||||||
|
Result = maps:fold(fun
|
||||||
|
({_Type, _Name}, _Conf, {error, Reason}) ->
|
||||||
|
{error, Reason};
|
||||||
|
({Type, Name}, Conf, _) ->
|
||||||
|
case Action(Type, Name, Conf) of
|
||||||
|
{error, Reason} -> {error, Reason};
|
||||||
|
Return -> Return
|
||||||
|
end
|
||||||
|
end, Result0, MapConfs),
|
||||||
|
perform_bridge_changes(Tasks, Result).
|
||||||
|
|
||||||
|
load_bridges() ->
|
||||||
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
|
emqx_bridge_monitor:ensure_all_started(Bridges).
|
||||||
|
|
||||||
|
resource_id(BridgeId) when is_binary(BridgeId) ->
|
||||||
|
<<"bridge:", BridgeId/binary>>.
|
||||||
|
|
||||||
|
resource_id(BridgeType, BridgeName) ->
|
||||||
|
BridgeId = bridge_id(BridgeType, BridgeName),
|
||||||
|
resource_id(BridgeId).
|
||||||
|
|
||||||
|
bridge_id(BridgeType, BridgeName) ->
|
||||||
|
Name = bin(BridgeName),
|
||||||
|
Type = bin(BridgeType),
|
||||||
|
<<Type/binary, ":", Name/binary>>.
|
||||||
|
|
||||||
|
parse_bridge_id(BridgeId) ->
|
||||||
|
try
|
||||||
|
[Type, Name] = string:split(str(BridgeId), ":", leading),
|
||||||
|
{list_to_existing_atom(Type), list_to_atom(Name)}
|
||||||
|
catch
|
||||||
|
_ : _ -> error({invalid_bridge_id, BridgeId})
|
||||||
|
end.
|
||||||
|
|
||||||
list_bridges() ->
|
list_bridges() ->
|
||||||
emqx_resource_api:list_instances(fun emqx_bridge:is_bridge/1).
|
lists:foldl(fun({Type, NameAndConf}, Bridges) ->
|
||||||
|
lists:foldl(fun({Name, RawConf}, Acc) ->
|
||||||
|
case get_bridge(Type, Name, RawConf) of
|
||||||
|
{error, not_found} -> Acc;
|
||||||
|
{ok, Res} -> [Res | Acc]
|
||||||
|
end
|
||||||
|
end, Bridges, maps:to_list(NameAndConf))
|
||||||
|
end, [], maps:to_list(emqx:get_raw_config([bridges]))).
|
||||||
|
|
||||||
is_bridge(#{id := <<"bridge:", _/binary>>}) ->
|
get_bridge(Type, Name) ->
|
||||||
true;
|
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
||||||
is_bridge(_Data) ->
|
get_bridge(Type, Name, RawConf).
|
||||||
false.
|
get_bridge(Type, Name, RawConf) ->
|
||||||
|
case emqx_resource:get_instance(resource_id(Type, Name)) of
|
||||||
|
{error, not_found} -> {error, not_found};
|
||||||
|
{ok, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data,
|
||||||
|
raw_config => RawConf}}
|
||||||
|
end.
|
||||||
|
|
||||||
config_key_path() ->
|
start_bridge(Type, Name) ->
|
||||||
[emqx_bridge, bridges].
|
restart_bridge(Type, Name).
|
||||||
|
|
||||||
update_config(ConfigReq) ->
|
stop_bridge(Type, Name) ->
|
||||||
emqx:update_config(config_key_path(), ConfigReq).
|
emqx_resource:stop(resource_id(Type, Name)).
|
||||||
|
|
||||||
|
restart_bridge(Type, Name) ->
|
||||||
|
emqx_resource:restart(resource_id(Type, Name)).
|
||||||
|
|
||||||
|
create_bridge(Type, Name, Conf) ->
|
||||||
|
ResId = resource_id(Type, Name),
|
||||||
|
case emqx_resource:create(ResId,
|
||||||
|
emqx_bridge:resource_type(Type), Conf) of
|
||||||
|
{ok, already_created} ->
|
||||||
|
emqx_resource:get_instance(ResId);
|
||||||
|
{ok, Data} ->
|
||||||
|
{ok, Data};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_bridge(Type, Name, Conf) ->
|
||||||
|
%% TODO: sometimes its not necessary to restart the bridge connection.
|
||||||
|
%%
|
||||||
|
%% - if the connection related configs like `username` is updated, we should restart/start
|
||||||
|
%% or stop bridges according to the change.
|
||||||
|
%% - if the connection related configs are not update, but channel configs `message_in` or
|
||||||
|
%% `message_out` are changed, then we should not restart the bridge, we only restart/start
|
||||||
|
%% the channels.
|
||||||
|
%%
|
||||||
|
emqx_resource:recreate(resource_id(Type, Name),
|
||||||
|
emqx_bridge:resource_type(Type), Conf).
|
||||||
|
|
||||||
|
remove_bridge(Type, Name, _Conf) ->
|
||||||
|
case emqx_resource:remove(resource_id(Type, Name)) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, not_found} -> ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
diff_confs(NewConfs, OldConfs) ->
|
||||||
|
emqx_map_lib:diff_maps(flatten_confs(NewConfs),
|
||||||
|
flatten_confs(OldConfs)).
|
||||||
|
|
||||||
|
flatten_confs(Conf0) ->
|
||||||
|
maps:from_list(
|
||||||
|
lists:append([do_flatten_confs(Type, Conf)
|
||||||
|
|| {Type, Conf} <- maps:to_list(Conf0)])).
|
||||||
|
|
||||||
|
do_flatten_confs(Type, Conf0) ->
|
||||||
|
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
||||||
|
str(A) when is_atom(A) ->
|
||||||
|
atom_to_list(A);
|
||||||
|
str(B) when is_binary(B) ->
|
||||||
|
binary_to_list(B);
|
||||||
|
str(S) when is_list(S) ->
|
||||||
|
S.
|
||||||
|
|
|
@ -15,128 +15,194 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_bridge_api).
|
-module(emqx_bridge_api).
|
||||||
|
|
||||||
-rest_api(#{ name => list_data_bridges
|
-behaviour(minirest_api).
|
||||||
, method => 'GET'
|
|
||||||
, path => "/data_bridges"
|
|
||||||
, func => list_bridges
|
|
||||||
, descr => "List all data bridges"
|
|
||||||
}).
|
|
||||||
|
|
||||||
-rest_api(#{ name => get_data_bridge
|
-export([api_spec/0]).
|
||||||
, method => 'GET'
|
|
||||||
, path => "/data_bridges/:bin:name"
|
|
||||||
, func => get_bridge
|
|
||||||
, descr => "Get a data bridge by name"
|
|
||||||
}).
|
|
||||||
|
|
||||||
-rest_api(#{ name => create_data_bridge
|
|
||||||
, method => 'POST'
|
|
||||||
, path => "/data_bridges/:bin:name"
|
|
||||||
, func => create_bridge
|
|
||||||
, descr => "Create a new data bridge"
|
|
||||||
}).
|
|
||||||
|
|
||||||
-rest_api(#{ name => update_data_bridge
|
|
||||||
, method => 'PUT'
|
|
||||||
, path => "/data_bridges/:bin:name"
|
|
||||||
, func => update_bridge
|
|
||||||
, descr => "Update an existing data bridge"
|
|
||||||
}).
|
|
||||||
|
|
||||||
-rest_api(#{ name => delete_data_bridge
|
|
||||||
, method => 'DELETE'
|
|
||||||
, path => "/data_bridges/:bin:name"
|
|
||||||
, func => delete_bridge
|
|
||||||
, descr => "Delete an existing data bridge"
|
|
||||||
}).
|
|
||||||
|
|
||||||
-export([ list_bridges/2
|
-export([ list_bridges/2
|
||||||
, get_bridge/2
|
, list_local_bridges/1
|
||||||
, create_bridge/2
|
, crud_bridges_cluster/2
|
||||||
, update_bridge/2
|
, crud_bridges/3
|
||||||
, delete_bridge/2
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(BRIDGE(N, T, C), #{<<"name">> => N, <<"type">> => T, <<"config">> => C}).
|
-define(TYPES, [mqtt]).
|
||||||
|
-define(BRIDGE(N, T, C), #{<<"id">> => N, <<"type">> => T, <<"config">> => C}).
|
||||||
|
-define(TRY_PARSE_ID(ID, EXPR),
|
||||||
|
try emqx_bridge:parse_bridge_id(Id) of
|
||||||
|
{BridgeType, BridgeName} -> EXPR
|
||||||
|
catch
|
||||||
|
error:{invalid_bridge_id, Id0} ->
|
||||||
|
{400, #{code => 102, message => <<"invalid_bridge_id: ", Id0/binary>>}}
|
||||||
|
end).
|
||||||
|
|
||||||
list_bridges(_Binding, _Params) ->
|
req_schema() ->
|
||||||
{200, #{code => 0, data => [format_api_reply(Data) ||
|
Schema = [
|
||||||
Data <- emqx_bridge:list_bridges()]}}.
|
case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of
|
||||||
|
%% the bridge is not configured, so we have no method to get the schema
|
||||||
|
[] -> #{};
|
||||||
|
[{_K, Conf} | _] ->
|
||||||
|
emqx_mgmt_api_configs:gen_schema(Conf)
|
||||||
|
end
|
||||||
|
|| T <- ?TYPES],
|
||||||
|
#{oneOf => Schema}.
|
||||||
|
|
||||||
get_bridge(#{name := Name}, _Params) ->
|
resp_schema() ->
|
||||||
case emqx_resource:get_instance(emqx_bridge:name_to_resource_id(Name)) of
|
#{oneOf := Schema} = req_schema(),
|
||||||
{ok, Data} ->
|
AddMetadata = fun(Prop) ->
|
||||||
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
|
Prop#{is_connected => #{type => boolean},
|
||||||
|
id => #{type => string},
|
||||||
|
bridge_type => #{type => string, enum => ?TYPES},
|
||||||
|
node => #{type => string}}
|
||||||
|
end,
|
||||||
|
Schema1 = [S#{properties => AddMetadata(Prop)}
|
||||||
|
|| S = #{properties := Prop} <- Schema],
|
||||||
|
#{oneOf => Schema1}.
|
||||||
|
|
||||||
|
api_spec() ->
|
||||||
|
{bridge_apis(), []}.
|
||||||
|
|
||||||
|
bridge_apis() ->
|
||||||
|
[list_all_bridges_api(), crud_bridges_apis(), operation_apis()].
|
||||||
|
|
||||||
|
list_all_bridges_api() ->
|
||||||
|
Metadata = #{
|
||||||
|
get => #{
|
||||||
|
description => <<"List all created bridges">>,
|
||||||
|
responses => #{
|
||||||
|
<<"200">> => emqx_mgmt_util:array_schema(resp_schema(),
|
||||||
|
<<"A list of the bridges">>)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{"/bridges/", Metadata, list_bridges}.
|
||||||
|
|
||||||
|
crud_bridges_apis() ->
|
||||||
|
ReqSchema = req_schema(),
|
||||||
|
RespSchema = resp_schema(),
|
||||||
|
Metadata = #{
|
||||||
|
get => #{
|
||||||
|
description => <<"Get a bridge by Id">>,
|
||||||
|
parameters => [param_path_id()],
|
||||||
|
responses => #{
|
||||||
|
<<"200">> => emqx_mgmt_util:array_schema(RespSchema,
|
||||||
|
<<"The details of the bridge">>),
|
||||||
|
<<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
|
||||||
|
}
|
||||||
|
},
|
||||||
|
put => #{
|
||||||
|
description => <<"Create or update a bridge">>,
|
||||||
|
parameters => [param_path_id()],
|
||||||
|
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
|
||||||
|
responses => #{
|
||||||
|
<<"200">> => emqx_mgmt_util:array_schema(RespSchema, <<"Bridge updated">>),
|
||||||
|
<<"400">> => emqx_mgmt_util:error_schema(<<"Update bridge failed">>,
|
||||||
|
['UPDATE_FAILED'])
|
||||||
|
}
|
||||||
|
},
|
||||||
|
delete => #{
|
||||||
|
description => <<"Delete a bridge">>,
|
||||||
|
parameters => [param_path_id()],
|
||||||
|
responses => #{
|
||||||
|
<<"200">> => emqx_mgmt_util:schema(<<"Bridge deleted">>),
|
||||||
|
<<"404">> => emqx_mgmt_util:error_schema(<<"Bridge not found">>, ['NOT_FOUND'])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{"/bridges/:id", Metadata, crud_bridges_cluster}.
|
||||||
|
|
||||||
|
operation_apis() ->
|
||||||
|
Metadata = #{
|
||||||
|
post => #{
|
||||||
|
description => <<"Restart bridges on all nodes in the cluster">>,
|
||||||
|
parameters => [
|
||||||
|
param_path_id(),
|
||||||
|
param_path_operation()],
|
||||||
|
responses => #{
|
||||||
|
<<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
|
||||||
|
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
|
||||||
|
{"/bridges/:id/operation/:operation", Metadata, manage_bridges}.
|
||||||
|
|
||||||
|
param_path_id() ->
|
||||||
|
#{
|
||||||
|
name => id,
|
||||||
|
in => path,
|
||||||
|
schema => #{type => string},
|
||||||
|
required => true
|
||||||
|
}.
|
||||||
|
|
||||||
|
param_path_operation()->
|
||||||
|
#{
|
||||||
|
name => operation,
|
||||||
|
in => path,
|
||||||
|
required => true,
|
||||||
|
schema => #{
|
||||||
|
type => string,
|
||||||
|
enum => [start, stop, restart]},
|
||||||
|
example => restart
|
||||||
|
}.
|
||||||
|
|
||||||
|
list_bridges(get, _Params) ->
|
||||||
|
{200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}.
|
||||||
|
|
||||||
|
list_local_bridges(Node) when Node =:= node() ->
|
||||||
|
[format_resp(Data) || Data <- emqx_bridge:list_bridges()];
|
||||||
|
list_local_bridges(Node) ->
|
||||||
|
rpc_call(Node, list_local_bridges, [Node]).
|
||||||
|
|
||||||
|
crud_bridges_cluster(Method, Params) ->
|
||||||
|
Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()],
|
||||||
|
case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of
|
||||||
|
[] ->
|
||||||
|
case Results of
|
||||||
|
[{200} | _] -> {200};
|
||||||
|
_ -> {200, [Res || {200, Res} <- Results]}
|
||||||
|
end;
|
||||||
|
Errors ->
|
||||||
|
hd(Errors)
|
||||||
|
end.
|
||||||
|
|
||||||
|
crud_bridges(Node, Method, Params) when Node =/= node() ->
|
||||||
|
rpc_call(Node, crud_bridges, [Node, Method, Params]);
|
||||||
|
|
||||||
|
crud_bridges(_, get, #{bindings := #{id := Id}}) ->
|
||||||
|
?TRY_PARSE_ID(Id, case emqx_bridge:get_bridge(BridgeType, BridgeName) of
|
||||||
|
{ok, Data} -> {200, format_resp(Data)};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
{404, #{code => 102, message => <<"not_found: ", Name/binary>>}}
|
{404, #{code => 102, message => <<"not_found: ", Id/binary>>}}
|
||||||
|
end);
|
||||||
|
|
||||||
|
crud_bridges(_, put, #{bindings := #{id := Id}, body := Conf}) ->
|
||||||
|
?TRY_PARSE_ID(Id,
|
||||||
|
case emqx:update_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
|
||||||
|
#{rawconf_with_defaults => true}) of
|
||||||
|
{ok, #{raw_config := RawConf, post_config_update := #{emqx_bridge := Data}}} ->
|
||||||
|
{200, format_resp(#{id => Id, raw_config => RawConf, resource_data => Data})};
|
||||||
|
{ok, _} -> %% the bridge already exits
|
||||||
|
{ok, Data} = emqx_bridge:get_bridge(BridgeType, BridgeName),
|
||||||
|
{200, format_resp(Data)};
|
||||||
|
{error, Reason} ->
|
||||||
|
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
||||||
|
end);
|
||||||
|
|
||||||
|
crud_bridges(_, delete, #{bindings := #{id := Id}}) ->
|
||||||
|
?TRY_PARSE_ID(Id,
|
||||||
|
case emqx:remove_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName]) of
|
||||||
|
{ok, _} -> {200};
|
||||||
|
{error, Reason} ->
|
||||||
|
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
||||||
|
end).
|
||||||
|
|
||||||
|
format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) ->
|
||||||
|
IsConnected = fun(started) -> true; (_) -> false end,
|
||||||
|
RawConf#{
|
||||||
|
id => Id,
|
||||||
|
node => node(),
|
||||||
|
bridge_type => emqx_bridge:bridge_type(Mod),
|
||||||
|
is_connected => IsConnected(Status)
|
||||||
|
}.
|
||||||
|
|
||||||
|
rpc_call(Node, Fun, Args) ->
|
||||||
|
case rpc:call(Node, ?MODULE, Fun, Args) of
|
||||||
|
{badrpc, Reason} -> {error, Reason};
|
||||||
|
Res -> Res
|
||||||
end.
|
end.
|
||||||
|
|
||||||
create_bridge(#{name := Name}, Params) ->
|
|
||||||
Config = proplists:get_value(<<"config">>, Params),
|
|
||||||
BridgeType = proplists:get_value(<<"type">>, Params),
|
|
||||||
case emqx_resource:check_and_create(
|
|
||||||
emqx_bridge:name_to_resource_id(Name),
|
|
||||||
emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of
|
|
||||||
{ok, already_created} ->
|
|
||||||
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
|
|
||||||
{ok, Data} ->
|
|
||||||
update_config_and_reply(Name, BridgeType, Config, Data);
|
|
||||||
{error, Reason0} ->
|
|
||||||
Reason = emqx_resource_api:stringnify(Reason0),
|
|
||||||
{500, #{code => 102, message => <<"create bridge ", Name/binary,
|
|
||||||
" failed:", Reason/binary>>}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
update_bridge(#{name := Name}, Params) ->
|
|
||||||
Config = proplists:get_value(<<"config">>, Params),
|
|
||||||
BridgeType = proplists:get_value(<<"type">>, Params),
|
|
||||||
case emqx_resource:check_and_update(
|
|
||||||
emqx_bridge:name_to_resource_id(Name),
|
|
||||||
emqx_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of
|
|
||||||
{ok, Data} ->
|
|
||||||
update_config_and_reply(Name, BridgeType, Config, Data);
|
|
||||||
{error, not_found} ->
|
|
||||||
{400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}};
|
|
||||||
{error, Reason0} ->
|
|
||||||
Reason = emqx_resource_api:stringnify(Reason0),
|
|
||||||
{500, #{code => 102, message => <<"update bridge ", Name/binary,
|
|
||||||
" failed:", Reason/binary>>}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
delete_bridge(#{name := Name}, _Params) ->
|
|
||||||
case emqx_resource:remove(emqx_bridge:name_to_resource_id(Name)) of
|
|
||||||
ok -> delete_config_and_reply(Name);
|
|
||||||
{error, Reason} ->
|
|
||||||
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := Status}) ->
|
|
||||||
#{type => emqx_bridge:bridge_type(Type),
|
|
||||||
name => emqx_bridge:resource_id_to_name(Id),
|
|
||||||
config => Conf, status => Status}.
|
|
||||||
|
|
||||||
% format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
|
|
||||||
% #{type => Type, name => emqx_bridge:resource_id_to_name(Id),
|
|
||||||
% config => Conf}.
|
|
||||||
|
|
||||||
% get_all_configs() ->
|
|
||||||
% [format_conf(Data) || Data <- emqx_bridge:list_bridges()].
|
|
||||||
|
|
||||||
update_config_and_reply(Name, BridgeType, Config, Data) ->
|
|
||||||
case emqx_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of
|
|
||||||
{ok, _} ->
|
|
||||||
{200, #{code => 0, data => format_api_reply(
|
|
||||||
emqx_resource_api:format_data(Data))}};
|
|
||||||
{error, Reason} ->
|
|
||||||
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
delete_config_and_reply(Name) ->
|
|
||||||
case emqx_bridge:update_config({delete, Name}) of
|
|
||||||
{ok, _} -> {200, #{code => 0, data => #{}}};
|
|
||||||
{error, Reason} ->
|
|
||||||
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
atom(B) when is_binary(B) ->
|
|
||||||
list_to_existing_atom(binary_to_list(B)).
|
|
||||||
|
|
|
@ -17,29 +17,15 @@
|
||||||
|
|
||||||
-behaviour(application).
|
-behaviour(application).
|
||||||
|
|
||||||
-behaviour(emqx_config_handler).
|
-export([start/2, stop/1]).
|
||||||
|
|
||||||
-export([start/2, stop/1, pre_config_update/2]).
|
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
{ok, Sup} = emqx_bridge_sup:start_link(),
|
{ok, Sup} = emqx_bridge_sup:start_link(),
|
||||||
ok = emqx_bridge:load_bridges(),
|
ok = emqx_bridge:load_bridges(),
|
||||||
emqx_config_handler:add_handler(emqx_bridge:config_key_path(), ?MODULE),
|
emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
pre_config_update({update, Bridge = #{<<"name">> := Name}}, OldConf) ->
|
|
||||||
{ok, [Bridge | remove_bridge(Name, OldConf)]};
|
|
||||||
pre_config_update({delete, Name}, OldConf) ->
|
|
||||||
{ok, remove_bridge(Name, OldConf)};
|
|
||||||
pre_config_update(NewConf, _OldConf) when is_list(NewConf) ->
|
|
||||||
%% overwrite the entire config!
|
|
||||||
{ok, NewConf}.
|
|
||||||
|
|
||||||
remove_bridge(_Name, undefined) ->
|
|
||||||
[];
|
|
||||||
remove_bridge(Name, OldConf) ->
|
|
||||||
[B || B = #{<<"name">> := Name0} <- OldConf, Name0 =/= Name].
|
|
|
@ -75,7 +75,7 @@ load_bridges(Configs) ->
|
||||||
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
|
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
|
||||||
load_bridge(Name, Type, Config) ->
|
load_bridge(Name, Type, Config) ->
|
||||||
case emqx_resource:create_local(
|
case emqx_resource:create_local(
|
||||||
emqx_bridge:name_to_resource_id(Name),
|
emqx_bridge:resource_id(Type, Name),
|
||||||
emqx_bridge:resource_type(Type), Config) of
|
emqx_bridge:resource_type(Type), Config) of
|
||||||
{ok, already_created} -> ok;
|
{ok, already_created} -> ok;
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
|
|
|
@ -47,11 +47,11 @@
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.9"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.9"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
|
||||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
|
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.2"}}}
|
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.3"}}}
|
||||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
|
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
|
||||||
, {replayq, "0.3.3"}
|
, {replayq, "0.3.3"}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||||
|
@ -60,7 +60,7 @@
|
||||||
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
|
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
|
||||||
, {getopt, "1.0.2"}
|
, {getopt, "1.0.2"}
|
||||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
|
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.0"}}}
|
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.17.1"}}}
|
||||||
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}}
|
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}}
|
||||||
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
|
||||||
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}}
|
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.1"}}}
|
||||||
|
|
Loading…
Reference in New Issue