commit
0d1f1066d9
|
@ -92,7 +92,7 @@ jobs:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: broker-autotest-v4
|
ref: broker-autotest-v5
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v3
|
- uses: actions/setup-java@v3
|
||||||
with:
|
with:
|
||||||
|
@ -191,7 +191,7 @@ jobs:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: broker-autotest-v4
|
ref: broker-autotest-v5
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v3
|
- uses: actions/setup-java@v3
|
||||||
with:
|
with:
|
||||||
|
@ -297,7 +297,7 @@ jobs:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: broker-autotest-v4
|
ref: broker-autotest-v5
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v3
|
- uses: actions/setup-java@v3
|
||||||
with:
|
with:
|
||||||
|
@ -396,7 +396,7 @@ jobs:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: broker-autotest-v4
|
ref: broker-autotest-v5
|
||||||
path: scripts
|
path: scripts
|
||||||
- name: run jwks_server
|
- name: run jwks_server
|
||||||
timeout-minutes: 10
|
timeout-minutes: 10
|
||||||
|
@ -496,7 +496,7 @@ jobs:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
with:
|
with:
|
||||||
repository: emqx/emqx-fvt
|
repository: emqx/emqx-fvt
|
||||||
ref: broker-autotest-v4
|
ref: broker-autotest-v5
|
||||||
path: scripts
|
path: scripts
|
||||||
- uses: actions/setup-java@v3
|
- uses: actions/setup-java@v3
|
||||||
with:
|
with:
|
||||||
|
|
|
@ -152,7 +152,7 @@ deep_convert(Val, _, _Args) ->
|
||||||
|
|
||||||
-spec unsafe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
|
-spec unsafe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
|
||||||
unsafe_atom_key_map(Map) ->
|
unsafe_atom_key_map(Map) ->
|
||||||
covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
|
convert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
|
||||||
|
|
||||||
-spec binary_key_map(map()) -> map().
|
-spec binary_key_map(map()) -> map().
|
||||||
binary_key_map(Map) ->
|
binary_key_map(Map) ->
|
||||||
|
@ -167,7 +167,7 @@ binary_key_map(Map) ->
|
||||||
|
|
||||||
-spec safe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
|
-spec safe_atom_key_map(#{binary() | atom() => any()}) -> #{atom() => any()}.
|
||||||
safe_atom_key_map(Map) ->
|
safe_atom_key_map(Map) ->
|
||||||
covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).
|
convert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).
|
||||||
|
|
||||||
-spec jsonable_map(map() | list()) -> map() | list().
|
-spec jsonable_map(map() | list()) -> map() | list().
|
||||||
jsonable_map(Map) ->
|
jsonable_map(Map) ->
|
||||||
|
@ -221,7 +221,7 @@ binary_string(Val) ->
|
||||||
Val.
|
Val.
|
||||||
|
|
||||||
%%---------------------------------------------------------------------------
|
%%---------------------------------------------------------------------------
|
||||||
covert_keys_to_atom(BinKeyMap, Conv) ->
|
convert_keys_to_atom(BinKeyMap, Conv) ->
|
||||||
deep_convert(
|
deep_convert(
|
||||||
BinKeyMap,
|
BinKeyMap,
|
||||||
fun
|
fun
|
||||||
|
|
|
@ -1,28 +1,28 @@
|
||||||
emqx_authz_api_mnesia {
|
emqx_authz_api_mnesia {
|
||||||
users_username_get {
|
users_username_get {
|
||||||
desc {
|
desc {
|
||||||
en: """Show the list of record for username"""
|
en: """Show the list of rules for users"""
|
||||||
zh: """获取内置数据库中所有用户名类型的规则记录"""
|
zh: """获取内置数据库中所有用户名类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
users_username_post {
|
users_username_post {
|
||||||
desc {
|
desc {
|
||||||
en: """Add new records for username"""
|
en: """Add new rule for 'username'"""
|
||||||
zh: """添加内置数据库中用户名类型的规则记录"""
|
zh: """添加内置数据库中用户名类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
users_clientid_get {
|
users_clientid_get {
|
||||||
desc {
|
desc {
|
||||||
en: """Show the list of record for clientid"""
|
en: """Show the list of rules for clients"""
|
||||||
zh: """获取内置数据库中所有客户端标识符类型的规则记录"""
|
zh: """获取内置数据库中所有客户端标识符类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
users_clientid_post {
|
users_clientid_post {
|
||||||
desc {
|
desc {
|
||||||
en: """Add new records for clientid"""
|
en: """Add new rule for 'clientid'"""
|
||||||
zh: """添加内置数据库中客户端标识符类型的规则记录"""
|
zh: """添加内置数据库中客户端标识符类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,71 +30,71 @@ emqx_authz_api_mnesia {
|
||||||
|
|
||||||
user_username_get {
|
user_username_get {
|
||||||
desc {
|
desc {
|
||||||
en: """Get record info for username"""
|
en: """Get rule for 'username'"""
|
||||||
zh: """获取内置数据库中指定用户名类型的规则记录"""
|
zh: """获取内置数据库中指定用户名类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
user_username_put {
|
user_username_put {
|
||||||
desc {
|
desc {
|
||||||
en: """Set record for username"""
|
en: """Set rule for 'username'"""
|
||||||
zh: """更新内置数据库中指定用户名类型的规则记录"""
|
zh: """更新内置数据库中指定用户名类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
user_username_delete {
|
user_username_delete {
|
||||||
desc {
|
desc {
|
||||||
en: """Delete one record for username"""
|
en: """Delete rule for 'username'"""
|
||||||
zh: """删除内置数据库中指定用户名类型的规则记录"""
|
zh: """删除内置数据库中指定用户名类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
user_clientid_get {
|
user_clientid_get {
|
||||||
desc {
|
desc {
|
||||||
en: """Get record info for clientid"""
|
en: """Get rule for 'clientid'"""
|
||||||
zh: """获取内置数据库中指定客户端标识符类型的规则记录"""
|
zh: """获取内置数据库中指定客户端标识符类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
user_clientid_put {
|
user_clientid_put {
|
||||||
desc {
|
desc {
|
||||||
en: """Set record for clientid"""
|
en: """Set rule for 'clientid'"""
|
||||||
zh: """更新内置数据库中指定客户端标识符类型的规则记录"""
|
zh: """更新内置数据库中指定客户端标识符类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
user_clientid_delete {
|
user_clientid_delete {
|
||||||
desc {
|
desc {
|
||||||
en: """Delete one record for clientid"""
|
en: """Delete rule for 'clientid'"""
|
||||||
zh: """删除内置数据库中指定客户端标识符类型的规则记录"""
|
zh: """删除内置数据库中指定客户端标识符类型的规则记录"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rules_all_get {
|
||||||
rules_for_all_get {
|
|
||||||
desc {
|
desc {
|
||||||
en: """Show the list of rules for all"""
|
en: """Show the list of rules for 'all'"""
|
||||||
zh: """列出为所有客户端启用的规则列表"""
|
zh: """列出为所有客户端启用的规则列表"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rules_for_all_post {
|
rules_all_post {
|
||||||
desc {
|
desc {
|
||||||
en: """
|
en: """Create/Update the list of rules for 'all'."""
|
||||||
Create/Update the list of rules for all.
|
zh: """创建/更新 为所有客户端启用的规则列表。"""
|
||||||
Set a empty list to clean up rules
|
|
||||||
"""
|
|
||||||
zh: """
|
|
||||||
创建/更新 为所有客户端启用的规则列表。
|
|
||||||
设为空列表以清楚所有规则
|
|
||||||
"""
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
purge_all_delete {
|
rules_all_delete {
|
||||||
desc {
|
desc {
|
||||||
en: """Purge all records for username/clientid/all"""
|
en: """Delete rules for 'all'"""
|
||||||
zh: """清除所有内置数据库中的规则, 用户名/客户端标识符/所有"""
|
zh: """删除 `all` 规则"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rules_delete {
|
||||||
|
desc {
|
||||||
|
en: """Delete all rules for all 'users', 'clients' and 'all'"""
|
||||||
|
zh: """清除内置数据库中的所有类型('users' 、'clients' 、'all')的所有规则"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@
|
||||||
user/2,
|
user/2,
|
||||||
client/2,
|
client/2,
|
||||||
all/2,
|
all/2,
|
||||||
purge/2
|
rules/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% query funs
|
%% query funs
|
||||||
|
@ -70,19 +70,19 @@ api_spec() ->
|
||||||
|
|
||||||
paths() ->
|
paths() ->
|
||||||
[
|
[
|
||||||
"/authorization/sources/built_in_database/username",
|
"/authorization/sources/built_in_database/rules/users",
|
||||||
"/authorization/sources/built_in_database/clientid",
|
"/authorization/sources/built_in_database/rules/clients",
|
||||||
"/authorization/sources/built_in_database/username/:username",
|
"/authorization/sources/built_in_database/rules/users/:username",
|
||||||
"/authorization/sources/built_in_database/clientid/:clientid",
|
"/authorization/sources/built_in_database/rules/clients/:clientid",
|
||||||
"/authorization/sources/built_in_database/all",
|
"/authorization/sources/built_in_database/rules/all",
|
||||||
"/authorization/sources/built_in_database/purge-all"
|
"/authorization/sources/built_in_database/rules"
|
||||||
].
|
].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Schema for each URI
|
%% Schema for each URI
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
schema("/authorization/sources/built_in_database/username") ->
|
schema("/authorization/sources/built_in_database/rules/users") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => users,
|
'operationId' => users,
|
||||||
get =>
|
get =>
|
||||||
|
@ -128,7 +128,7 @@ schema("/authorization/sources/built_in_database/username") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
schema("/authorization/sources/built_in_database/clientid") ->
|
schema("/authorization/sources/built_in_database/rules/clients") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => clients,
|
'operationId' => clients,
|
||||||
get =>
|
get =>
|
||||||
|
@ -174,7 +174,7 @@ schema("/authorization/sources/built_in_database/clientid") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
schema("/authorization/sources/built_in_database/username/:username") ->
|
schema("/authorization/sources/built_in_database/rules/users/:username") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => user,
|
'operationId' => user,
|
||||||
get =>
|
get =>
|
||||||
|
@ -227,7 +227,7 @@ schema("/authorization/sources/built_in_database/username/:username") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
schema("/authorization/sources/built_in_database/clientid/:clientid") ->
|
schema("/authorization/sources/built_in_database/rules/clients/:clientid") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => client,
|
'operationId' => client,
|
||||||
get =>
|
get =>
|
||||||
|
@ -280,20 +280,20 @@ schema("/authorization/sources/built_in_database/clientid/:clientid") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
schema("/authorization/sources/built_in_database/all") ->
|
schema("/authorization/sources/built_in_database/rules/all") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => all,
|
'operationId' => all,
|
||||||
get =>
|
get =>
|
||||||
#{
|
#{
|
||||||
tags => [<<"authorization">>],
|
tags => [<<"authorization">>],
|
||||||
description => ?DESC(rules_for_all_get),
|
description => ?DESC(rules_all_get),
|
||||||
responses =>
|
responses =>
|
||||||
#{200 => swagger_with_example({rules, ?TYPE_REF}, {all, ?PUT_MAP_EXAMPLE})}
|
#{200 => swagger_with_example({rules, ?TYPE_REF}, {all, ?PUT_MAP_EXAMPLE})}
|
||||||
},
|
},
|
||||||
post =>
|
post =>
|
||||||
#{
|
#{
|
||||||
tags => [<<"authorization">>],
|
tags => [<<"authorization">>],
|
||||||
description => ?DESC(rules_for_all_post),
|
description => ?DESC(rules_all_post),
|
||||||
'requestBody' =>
|
'requestBody' =>
|
||||||
swagger_with_example({rules, ?TYPE_REF}, {all, ?PUT_MAP_EXAMPLE}),
|
swagger_with_example({rules, ?TYPE_REF}, {all, ?PUT_MAP_EXAMPLE}),
|
||||||
responses =>
|
responses =>
|
||||||
|
@ -303,15 +303,24 @@ schema("/authorization/sources/built_in_database/all") ->
|
||||||
[?BAD_REQUEST], <<"Bad rule schema">>
|
[?BAD_REQUEST], <<"Bad rule schema">>
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
};
|
|
||||||
schema("/authorization/sources/built_in_database/purge-all") ->
|
|
||||||
#{
|
|
||||||
'operationId' => purge,
|
|
||||||
delete =>
|
delete =>
|
||||||
#{
|
#{
|
||||||
tags => [<<"authorization">>],
|
tags => [<<"authorization">>],
|
||||||
description => ?DESC(purge_all_delete),
|
description => ?DESC(rules_all_delete),
|
||||||
|
responses =>
|
||||||
|
#{
|
||||||
|
204 => <<"Deleted">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/authorization/sources/built_in_database/rules") ->
|
||||||
|
#{
|
||||||
|
'operationId' => rules,
|
||||||
|
delete =>
|
||||||
|
#{
|
||||||
|
tags => [<<"authorization">>],
|
||||||
|
description => ?DESC(rules_delete),
|
||||||
responses =>
|
responses =>
|
||||||
#{
|
#{
|
||||||
204 => <<"Deleted">>,
|
204 => <<"Deleted">>,
|
||||||
|
@ -555,9 +564,12 @@ all(get, _) ->
|
||||||
end;
|
end;
|
||||||
all(post, #{body := #{<<"rules">> := Rules}}) ->
|
all(post, #{body := #{<<"rules">> := Rules}}) ->
|
||||||
emqx_authz_mnesia:store_rules(all, format_rules(Rules)),
|
emqx_authz_mnesia:store_rules(all, format_rules(Rules)),
|
||||||
|
{204};
|
||||||
|
all(delete, _) ->
|
||||||
|
emqx_authz_mnesia:store_rules(all, []),
|
||||||
{204}.
|
{204}.
|
||||||
|
|
||||||
purge(delete, _) ->
|
rules(delete, _) ->
|
||||||
case emqx_authz_api_sources:get_raw_source(<<"built_in_database">>) of
|
case emqx_authz_api_sources:get_raw_source(<<"built_in_database">>) of
|
||||||
[#{<<"enable">> := false}] ->
|
[#{<<"enable">> := false}] ->
|
||||||
ok = emqx_authz_mnesia:purge_rules(),
|
ok = emqx_authz_mnesia:purge_rules(),
|
||||||
|
|
|
@ -70,21 +70,21 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "username"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users"]),
|
||||||
[?USERNAME_RULES_EXAMPLE]
|
[?USERNAME_RULES_EXAMPLE]
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 409, _} =
|
{ok, 409, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "username"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users"]),
|
||||||
[?USERNAME_RULES_EXAMPLE]
|
[?USERNAME_RULES_EXAMPLE]
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 200, Request1} =
|
{ok, 200, Request1} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "username"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{
|
#{
|
||||||
|
@ -105,7 +105,8 @@ t_api(_) ->
|
||||||
"authorization",
|
"authorization",
|
||||||
"sources",
|
"sources",
|
||||||
"built_in_database",
|
"built_in_database",
|
||||||
"username?page=1&limit=20&like_username=noexist"
|
"rules",
|
||||||
|
"users?page=1&limit=20&like_username=noexist"
|
||||||
]),
|
]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
|
@ -124,7 +125,7 @@ t_api(_) ->
|
||||||
{ok, 200, Request2} =
|
{ok, 200, Request2} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "username", "user1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{<<"username">> := <<"user1">>, <<"rules">> := Rules1} = jsx:decode(Request2),
|
#{<<"username">> := <<"user1">>, <<"rules">> := Rules1} = jsx:decode(Request2),
|
||||||
|
@ -132,13 +133,13 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
put,
|
put,
|
||||||
uri(["authorization", "sources", "built_in_database", "username", "user1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]),
|
||||||
?USERNAME_RULES_EXAMPLE#{rules => []}
|
?USERNAME_RULES_EXAMPLE#{rules => []}
|
||||||
),
|
),
|
||||||
{ok, 200, Request3} =
|
{ok, 200, Request3} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "username", "user1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{<<"username">> := <<"user1">>, <<"rules">> := Rules2} = jsx:decode(Request3),
|
#{<<"username">> := <<"user1">>, <<"rules">> := Rules2} = jsx:decode(Request3),
|
||||||
|
@ -147,19 +148,19 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
delete,
|
delete,
|
||||||
uri(["authorization", "sources", "built_in_database", "username", "user1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
{ok, 404, _} =
|
{ok, 404, _} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "username", "user1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
{ok, 404, _} =
|
{ok, 404, _} =
|
||||||
request(
|
request(
|
||||||
delete,
|
delete,
|
||||||
uri(["authorization", "sources", "built_in_database", "username", "user1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users", "user1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -167,34 +168,34 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "username"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users"]),
|
||||||
[?USERNAME_RULES_EXAMPLE]
|
[?USERNAME_RULES_EXAMPLE]
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients"]),
|
||||||
[?CLIENTID_RULES_EXAMPLE]
|
[?CLIENTID_RULES_EXAMPLE]
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 409, _} =
|
{ok, 409, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients"]),
|
||||||
[?CLIENTID_RULES_EXAMPLE]
|
[?CLIENTID_RULES_EXAMPLE]
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 200, Request4} =
|
{ok, 200, Request4} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
{ok, 200, Request5} =
|
{ok, 200, Request5} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid", "client1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{
|
#{
|
||||||
|
@ -208,13 +209,13 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
put,
|
put,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid", "client1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]),
|
||||||
?CLIENTID_RULES_EXAMPLE#{rules => []}
|
?CLIENTID_RULES_EXAMPLE#{rules => []}
|
||||||
),
|
),
|
||||||
{ok, 200, Request6} =
|
{ok, 200, Request6} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid", "client1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{<<"clientid">> := <<"client1">>, <<"rules">> := Rules4} = jsx:decode(Request6),
|
#{<<"clientid">> := <<"client1">>, <<"rules">> := Rules4} = jsx:decode(Request6),
|
||||||
|
@ -223,32 +224,32 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
delete,
|
delete,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid", "client1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
{ok, 404, _} =
|
{ok, 404, _} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid", "client1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
{ok, 404, _} =
|
{ok, 404, _} =
|
||||||
request(
|
request(
|
||||||
delete,
|
delete,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid", "client1"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients", "client1"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "all"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "all"]),
|
||||||
?ALL_RULES_EXAMPLE
|
?ALL_RULES_EXAMPLE
|
||||||
),
|
),
|
||||||
{ok, 200, Request7} =
|
{ok, 200, Request7} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "all"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "all"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{<<"rules">> := Rules5} = jsx:decode(Request7),
|
#{<<"rules">> := Rules5} = jsx:decode(Request7),
|
||||||
|
@ -256,15 +257,14 @@ t_api(_) ->
|
||||||
|
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
delete,
|
||||||
uri(["authorization", "sources", "built_in_database", "all"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "all"]),
|
||||||
|
[]
|
||||||
?ALL_RULES_EXAMPLE#{rules => []}
|
|
||||||
),
|
),
|
||||||
{ok, 200, Request8} =
|
{ok, 200, Request8} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "all"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "all"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{<<"rules">> := Rules6} = jsx:decode(Request8),
|
#{<<"rules">> := Rules6} = jsx:decode(Request8),
|
||||||
|
@ -273,7 +273,7 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "username"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users"]),
|
||||||
[
|
[
|
||||||
#{username => erlang:integer_to_binary(N), rules => []}
|
#{username => erlang:integer_to_binary(N), rules => []}
|
||||||
|| N <- lists:seq(1, 20)
|
|| N <- lists:seq(1, 20)
|
||||||
|
@ -282,7 +282,7 @@ t_api(_) ->
|
||||||
{ok, 200, Request9} =
|
{ok, 200, Request9} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "username?page=2&limit=5"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "users?page=2&limit=5"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{<<"data">> := Data1} = jsx:decode(Request9),
|
#{<<"data">> := Data1} = jsx:decode(Request9),
|
||||||
|
@ -291,7 +291,7 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
post,
|
post,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients"]),
|
||||||
[
|
[
|
||||||
#{clientid => erlang:integer_to_binary(N), rules => []}
|
#{clientid => erlang:integer_to_binary(N), rules => []}
|
||||||
|| N <- lists:seq(1, 20)
|
|| N <- lists:seq(1, 20)
|
||||||
|
@ -300,7 +300,7 @@ t_api(_) ->
|
||||||
{ok, 200, Request10} =
|
{ok, 200, Request10} =
|
||||||
request(
|
request(
|
||||||
get,
|
get,
|
||||||
uri(["authorization", "sources", "built_in_database", "clientid?limit=5"]),
|
uri(["authorization", "sources", "built_in_database", "rules", "clients?limit=5"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
#{<<"data">> := Data2} = jsx:decode(Request10),
|
#{<<"data">> := Data2} = jsx:decode(Request10),
|
||||||
|
@ -309,7 +309,7 @@ t_api(_) ->
|
||||||
{ok, 400, Msg1} =
|
{ok, 400, Msg1} =
|
||||||
request(
|
request(
|
||||||
delete,
|
delete,
|
||||||
uri(["authorization", "sources", "built_in_database", "purge-all"]),
|
uri(["authorization", "sources", "built_in_database", "rules"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
?assertMatch({match, _}, re:run(Msg1, "must\sbe\sdisabled\sbefore")),
|
?assertMatch({match, _}, re:run(Msg1, "must\sbe\sdisabled\sbefore")),
|
||||||
|
@ -335,7 +335,7 @@ t_api(_) ->
|
||||||
{ok, 204, _} =
|
{ok, 204, _} =
|
||||||
request(
|
request(
|
||||||
delete,
|
delete,
|
||||||
uri(["authorization", "sources", "built_in_database", "purge-all"]),
|
uri(["authorization", "sources", "built_in_database", "rules"]),
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
?assertEqual(0, emqx_authz_mnesia:record_count()),
|
?assertEqual(0, emqx_authz_mnesia:record_count()),
|
||||||
|
|
|
@ -2,8 +2,8 @@ emqx_bridge_api {
|
||||||
|
|
||||||
desc_param_path_operation_cluster {
|
desc_param_path_operation_cluster {
|
||||||
desc {
|
desc {
|
||||||
en: """Operations can be one of: enable, disable, start, stop, restart"""
|
en: """Operations can be one of: stop, restart"""
|
||||||
zh: """集群可用操作:启用、禁用、启动、停止、重新启动"""
|
zh: """集群可用操作:停止、重新启动"""
|
||||||
}
|
}
|
||||||
label: {
|
label: {
|
||||||
en: "Cluster Operation"
|
en: "Cluster Operation"
|
||||||
|
@ -44,6 +44,16 @@ emqx_bridge_api {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
desc_param_path_enable {
|
||||||
|
desc {
|
||||||
|
en: """Whether or not the bridge is enabled"""
|
||||||
|
zh: """是否启用桥接"""
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "Enable bridge"
|
||||||
|
zh: "启用桥接"
|
||||||
|
}
|
||||||
|
}
|
||||||
desc_api1 {
|
desc_api1 {
|
||||||
desc {
|
desc {
|
||||||
en: """List all created bridges"""
|
en: """List all created bridges"""
|
||||||
|
@ -112,8 +122,8 @@ emqx_bridge_api {
|
||||||
|
|
||||||
desc_api7 {
|
desc_api7 {
|
||||||
desc {
|
desc {
|
||||||
en: """Enable/Disable/Stop/Restart bridges on all nodes in the cluster."""
|
en: """Stop/Restart bridges on all nodes in the cluster."""
|
||||||
zh: """在集群中的所有节点上启用/禁用/停止/重新启动 Bridge。"""
|
zh: """停止或启用所有节点上的桥接"""
|
||||||
}
|
}
|
||||||
label: {
|
label: {
|
||||||
en: "Cluster Bridge Operate"
|
en: "Cluster Bridge Operate"
|
||||||
|
@ -123,10 +133,8 @@ emqx_bridge_api {
|
||||||
|
|
||||||
desc_api8 {
|
desc_api8 {
|
||||||
desc {
|
desc {
|
||||||
en: """Stop/Restart bridges on a specific node.
|
en: """Stop/Restart bridges on a specific node."""
|
||||||
NOTE: It's not allowed to disable/enable bridges on a single node."""
|
zh: """在某个节点上停止/重新启动 Bridge。"""
|
||||||
zh: """在某个节点上停止/重新启动 Bridge。
|
|
||||||
NOTE:不允许在单节点上启用/禁用 Bridge"""
|
|
||||||
}
|
}
|
||||||
label: {
|
label: {
|
||||||
en: "Node Bridge Operate"
|
en: "Node Bridge Operate"
|
||||||
|
@ -134,4 +142,43 @@ NOTE:不允许在单节点上启用/禁用 Bridge"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
desc_api9 {
|
||||||
|
desc {
|
||||||
|
en: """
|
||||||
|
Test creating a new bridge by given ID </br>
|
||||||
|
The ID must be of format '{type}:{name}'
|
||||||
|
"""
|
||||||
|
zh: """
|
||||||
|
通过给定的 ID 测试创建一个新的桥接。 </br>
|
||||||
|
ID 的格式必须为 ’{type}:{name}”
|
||||||
|
"""
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "Test Bridge Creation"
|
||||||
|
zh: "测试桥接创建"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
desc_bridge_metrics {
|
||||||
|
desc {
|
||||||
|
en: """Get bridge metrics by Id"""
|
||||||
|
zh: """通过 Id 来获取桥接的指标信息"""
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "Get Bridge Metrics"
|
||||||
|
zh: "获取桥接的指标"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
desc_enable_bridge {
|
||||||
|
desc {
|
||||||
|
en: """Enable or Disable bridges on all nodes in the cluster."""
|
||||||
|
zh: """启用或禁用所有节点上的桥接"""
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "Cluster Bridge Enable"
|
||||||
|
zh: "是否启用集群内的桥接"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,9 +36,12 @@
|
||||||
-export([
|
-export([
|
||||||
'/bridges'/2,
|
'/bridges'/2,
|
||||||
'/bridges/:id'/2,
|
'/bridges/:id'/2,
|
||||||
'/bridges/:id/operation/:operation'/2,
|
'/bridges/:id/enable/:enable'/2,
|
||||||
'/nodes/:node/bridges/:id/operation/:operation'/2,
|
'/bridges/:id/:operation'/2,
|
||||||
'/bridges/:id/reset_metrics'/2
|
'/nodes/:node/bridges/:id/:operation'/2,
|
||||||
|
'/bridges/:id/metrics'/2,
|
||||||
|
'/bridges/:id/metrics/reset'/2,
|
||||||
|
'/bridges_probe'/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([lookup_from_local_node/2]).
|
-export([lookup_from_local_node/2]).
|
||||||
|
@ -66,9 +69,12 @@ paths() ->
|
||||||
[
|
[
|
||||||
"/bridges",
|
"/bridges",
|
||||||
"/bridges/:id",
|
"/bridges/:id",
|
||||||
"/bridges/:id/operation/:operation",
|
"/bridges/:id/enable/:enable",
|
||||||
"/nodes/:node/bridges/:id/operation/:operation",
|
"/bridges/:id/:operation",
|
||||||
"/bridges/:id/reset_metrics"
|
"/nodes/:node/bridges/:id/:operation",
|
||||||
|
"/bridges/:id/metrics",
|
||||||
|
"/bridges/:id/metrics/reset",
|
||||||
|
"/bridges_probe"
|
||||||
].
|
].
|
||||||
|
|
||||||
error_schema(Code, Message) when is_atom(Code) ->
|
error_schema(Code, Message) when is_atom(Code) ->
|
||||||
|
@ -87,7 +93,7 @@ get_response_body_schema() ->
|
||||||
param_path_operation_cluster() ->
|
param_path_operation_cluster() ->
|
||||||
{operation,
|
{operation,
|
||||||
mk(
|
mk(
|
||||||
enum([enable, disable, stop, restart]),
|
enum([stop, restart]),
|
||||||
#{
|
#{
|
||||||
in => path,
|
in => path,
|
||||||
required => true,
|
required => true,
|
||||||
|
@ -103,7 +109,7 @@ param_path_operation_on_node() ->
|
||||||
#{
|
#{
|
||||||
in => path,
|
in => path,
|
||||||
required => true,
|
required => true,
|
||||||
example => <<"start">>,
|
example => <<"stop">>,
|
||||||
desc => ?DESC("desc_param_path_operation_on_node")
|
desc => ?DESC("desc_param_path_operation_on_node")
|
||||||
}
|
}
|
||||||
)}.
|
)}.
|
||||||
|
@ -132,19 +138,34 @@ param_path_id() ->
|
||||||
}
|
}
|
||||||
)}.
|
)}.
|
||||||
|
|
||||||
bridge_info_array_example(Method) ->
|
param_path_enable() ->
|
||||||
[Config || #{value := Config} <- maps:values(bridge_info_examples(Method))].
|
{enable,
|
||||||
|
mk(
|
||||||
|
boolean(),
|
||||||
|
#{
|
||||||
|
in => path,
|
||||||
|
required => true,
|
||||||
|
desc => ?DESC("desc_param_path_enable"),
|
||||||
|
example => true
|
||||||
|
}
|
||||||
|
)}.
|
||||||
|
|
||||||
|
bridge_info_array_example(Method, WithMetrics) ->
|
||||||
|
[Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))].
|
||||||
|
|
||||||
bridge_info_examples(Method) ->
|
bridge_info_examples(Method) ->
|
||||||
|
bridge_info_examples(Method, false).
|
||||||
|
|
||||||
|
bridge_info_examples(Method, WithMetrics) ->
|
||||||
maps:merge(
|
maps:merge(
|
||||||
#{
|
#{
|
||||||
<<"webhook_example">> => #{
|
<<"webhook_example">> => #{
|
||||||
summary => <<"WebHook">>,
|
summary => <<"WebHook">>,
|
||||||
value => info_example(webhook, Method)
|
value => info_example(webhook, Method, WithMetrics)
|
||||||
},
|
},
|
||||||
<<"mqtt_example">> => #{
|
<<"mqtt_example">> => #{
|
||||||
summary => <<"MQTT Bridge">>,
|
summary => <<"MQTT Bridge">>,
|
||||||
value => info_example(mqtt, Method)
|
value => info_example(mqtt, Method, WithMetrics)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ee_bridge_examples(Method)
|
ee_bridge_examples(Method)
|
||||||
|
@ -157,24 +178,24 @@ ee_bridge_examples(Method) ->
|
||||||
_:_ -> #{}
|
_:_ -> #{}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
info_example(Type, Method) ->
|
info_example(Type, Method, WithMetrics) ->
|
||||||
maps:merge(
|
maps:merge(
|
||||||
info_example_basic(Type),
|
info_example_basic(Type),
|
||||||
method_example(Type, Method)
|
method_example(Type, Method, WithMetrics)
|
||||||
).
|
).
|
||||||
|
|
||||||
method_example(Type, Method) when Method == get; Method == post ->
|
method_example(Type, Method, WithMetrics) when Method == get; Method == post ->
|
||||||
SType = atom_to_list(Type),
|
SType = atom_to_list(Type),
|
||||||
SName = SType ++ "_example",
|
SName = SType ++ "_example",
|
||||||
TypeNameExam = #{
|
TypeNameExam = #{
|
||||||
type => bin(SType),
|
type => bin(SType),
|
||||||
name => bin(SName)
|
name => bin(SName)
|
||||||
},
|
},
|
||||||
maybe_with_metrics_example(TypeNameExam, Method);
|
maybe_with_metrics_example(TypeNameExam, Method, WithMetrics);
|
||||||
method_example(_Type, put) ->
|
method_example(_Type, put, _WithMetrics) ->
|
||||||
#{}.
|
#{}.
|
||||||
|
|
||||||
maybe_with_metrics_example(TypeNameExam, get) ->
|
maybe_with_metrics_example(TypeNameExam, get, true) ->
|
||||||
TypeNameExam#{
|
TypeNameExam#{
|
||||||
metrics => ?EMPTY_METRICS,
|
metrics => ?EMPTY_METRICS,
|
||||||
node_metrics => [
|
node_metrics => [
|
||||||
|
@ -184,7 +205,7 @@ maybe_with_metrics_example(TypeNameExam, get) ->
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
};
|
};
|
||||||
maybe_with_metrics_example(TypeNameExam, _) ->
|
maybe_with_metrics_example(TypeNameExam, _, _) ->
|
||||||
TypeNameExam.
|
TypeNameExam.
|
||||||
|
|
||||||
info_example_basic(webhook) ->
|
info_example_basic(webhook) ->
|
||||||
|
@ -274,7 +295,7 @@ schema("/bridges") ->
|
||||||
responses => #{
|
responses => #{
|
||||||
200 => emqx_dashboard_swagger:schema_with_example(
|
200 => emqx_dashboard_swagger:schema_with_example(
|
||||||
array(emqx_bridge_schema:get_response()),
|
array(emqx_bridge_schema:get_response()),
|
||||||
bridge_info_array_example(get)
|
bridge_info_array_example(get, true)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -334,9 +355,23 @@ schema("/bridges/:id") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
schema("/bridges/:id/reset_metrics") ->
|
schema("/bridges/:id/metrics") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => '/bridges/:id/reset_metrics',
|
'operationId' => '/bridges/:id/metrics',
|
||||||
|
get => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"Get Bridge Metrics">>,
|
||||||
|
description => ?DESC("desc_bridge_metrics"),
|
||||||
|
parameters => [param_path_id()],
|
||||||
|
responses => #{
|
||||||
|
200 => emqx_bridge_schema:metrics_fields(),
|
||||||
|
404 => error_schema('NOT_FOUND', "Bridge not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/bridges/:id/metrics/reset") ->
|
||||||
|
#{
|
||||||
|
'operationId' => '/bridges/:id/metrics/reset',
|
||||||
put => #{
|
put => #{
|
||||||
tags => [<<"bridges">>],
|
tags => [<<"bridges">>],
|
||||||
summary => <<"Reset Bridge Metrics">>,
|
summary => <<"Reset Bridge Metrics">>,
|
||||||
|
@ -348,12 +383,29 @@ schema("/bridges/:id/reset_metrics") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
schema("/bridges/:id/operation/:operation") ->
|
schema("/bridges/:id/enable/:enable") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => '/bridges/:id/operation/:operation',
|
'operationId' => '/bridges/:id/enable/:enable',
|
||||||
|
put =>
|
||||||
|
#{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
summary => <<"Enable or Disable Bridge">>,
|
||||||
|
desc => ?DESC("desc_enable_bridge"),
|
||||||
|
parameters => [param_path_id(), param_path_enable()],
|
||||||
|
responses =>
|
||||||
|
#{
|
||||||
|
204 => <<"Success">>,
|
||||||
|
400 => error_schema('INVALID_ID', "Bad bridge ID"),
|
||||||
|
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
schema("/bridges/:id/:operation") ->
|
||||||
|
#{
|
||||||
|
'operationId' => '/bridges/:id/:operation',
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"bridges">>],
|
tags => [<<"bridges">>],
|
||||||
summary => <<"Enable/Disable/Stop/Restart Bridge">>,
|
summary => <<"Stop or Restart Bridge">>,
|
||||||
description => ?DESC("desc_api7"),
|
description => ?DESC("desc_api7"),
|
||||||
parameters => [
|
parameters => [
|
||||||
param_path_id(),
|
param_path_id(),
|
||||||
|
@ -366,9 +418,9 @@ schema("/bridges/:id/operation/:operation") ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
schema("/nodes/:node/bridges/:id/:operation") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => '/nodes/:node/bridges/:id/operation/:operation',
|
'operationId' => '/nodes/:node/bridges/:id/:operation',
|
||||||
post => #{
|
post => #{
|
||||||
tags => [<<"bridges">>],
|
tags => [<<"bridges">>],
|
||||||
summary => <<"Stop/Restart Bridge">>,
|
summary => <<"Stop/Restart Bridge">>,
|
||||||
|
@ -385,6 +437,23 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
schema("/bridges_probe") ->
|
||||||
|
#{
|
||||||
|
'operationId' => '/bridges_probe',
|
||||||
|
post => #{
|
||||||
|
tags => [<<"bridges">>],
|
||||||
|
desc => ?DESC("desc_api9"),
|
||||||
|
summary => <<"Test creating bridge">>,
|
||||||
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
||||||
|
emqx_bridge_schema:post_request(),
|
||||||
|
bridge_info_examples(post)
|
||||||
|
),
|
||||||
|
responses => #{
|
||||||
|
204 => <<"Test bridge OK">>,
|
||||||
|
400 => error_schema(['TEST_FAILED'], "bridge test failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
|
'/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
|
||||||
|
@ -455,7 +524,10 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) ->
|
'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) ->
|
||||||
|
?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)).
|
||||||
|
|
||||||
|
'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
|
||||||
?TRY_PARSE_ID(
|
?TRY_PARSE_ID(
|
||||||
Id,
|
Id,
|
||||||
case
|
case
|
||||||
|
@ -468,11 +540,33 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
'/bridges_probe'(post, Request) ->
|
||||||
|
RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"},
|
||||||
|
case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
|
||||||
|
{ok, #{body := #{<<"type">> := ConnType} = Params}} ->
|
||||||
|
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of
|
||||||
|
ok ->
|
||||||
|
{204};
|
||||||
|
{error, Error} ->
|
||||||
|
{400, error_msg('TEST_FAILED', Error)}
|
||||||
|
end;
|
||||||
|
BadRequest ->
|
||||||
|
BadRequest
|
||||||
|
end.
|
||||||
|
|
||||||
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||||
|
FormatFun = fun format_bridge_info_without_metrics/1,
|
||||||
|
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
|
||||||
|
|
||||||
|
lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
|
||||||
|
FormatFun = fun format_bridge_metrics/1,
|
||||||
|
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
|
||||||
|
|
||||||
|
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
|
||||||
{ok, [{ok, _} | _] = Results} ->
|
{ok, [{ok, _} | _] = Results} ->
|
||||||
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
{SuccCode, FormatFun([R || {ok, R} <- Results])};
|
||||||
{ok, [{error, not_found} | _]} ->
|
{ok, [{error, not_found} | _]} ->
|
||||||
{404, error_msg('NOT_FOUND', <<"not_found">>)};
|
{404, error_msg('NOT_FOUND', <<"not_found">>)};
|
||||||
{error, ErrL} ->
|
{error, ErrL} ->
|
||||||
|
@ -485,19 +579,16 @@ lookup_from_local_node(BridgeType, BridgeName) ->
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
'/bridges/:id/operation/:operation'(post, #{
|
'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
|
||||||
bindings :=
|
|
||||||
#{id := Id, operation := Op}
|
|
||||||
}) ->
|
|
||||||
?TRY_PARSE_ID(
|
?TRY_PARSE_ID(
|
||||||
Id,
|
Id,
|
||||||
case operation_func(Op) of
|
case enable_func(Enable) of
|
||||||
invalid ->
|
invalid ->
|
||||||
{400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
|
{400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
|
||||||
OperFunc when OperFunc == enable; OperFunc == disable ->
|
OperFunc ->
|
||||||
case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of
|
case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{200};
|
{204};
|
||||||
{error, {pre_config_update, _, bridge_not_found}} ->
|
{error, {pre_config_update, _, bridge_not_found}} ->
|
||||||
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
|
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
|
||||||
{error, {_, _, timeout}} ->
|
{error, {_, _, timeout}} ->
|
||||||
|
@ -506,14 +597,26 @@ lookup_from_local_node(BridgeType, BridgeName) ->
|
||||||
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
|
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{500, error_msg('INTERNAL_ERROR', Reason)}
|
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||||
end;
|
end
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
|
'/bridges/:id/:operation'(post, #{
|
||||||
|
bindings :=
|
||||||
|
#{id := Id, operation := Op}
|
||||||
|
}) ->
|
||||||
|
?TRY_PARSE_ID(
|
||||||
|
Id,
|
||||||
|
case operation_func(Op) of
|
||||||
|
invalid ->
|
||||||
|
{400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
|
||||||
OperFunc ->
|
OperFunc ->
|
||||||
Nodes = mria_mnesia:running_nodes(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName)
|
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName)
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
'/nodes/:node/bridges/:id/operation/:operation'(post, #{
|
'/nodes/:node/bridges/:id/:operation'(post, #{
|
||||||
bindings :=
|
bindings :=
|
||||||
#{id := Id, operation := Op, node := Node}
|
#{id := Id, operation := Op, node := Node}
|
||||||
}) ->
|
}) ->
|
||||||
|
@ -543,10 +646,12 @@ node_operation_func(_) -> invalid.
|
||||||
|
|
||||||
operation_func(<<"stop">>) -> stop;
|
operation_func(<<"stop">>) -> stop;
|
||||||
operation_func(<<"restart">>) -> restart;
|
operation_func(<<"restart">>) -> restart;
|
||||||
operation_func(<<"enable">>) -> enable;
|
|
||||||
operation_func(<<"disable">>) -> disable;
|
|
||||||
operation_func(_) -> invalid.
|
operation_func(_) -> invalid.
|
||||||
|
|
||||||
|
enable_func(<<"true">>) -> enable;
|
||||||
|
enable_func(<<"false">>) -> disable;
|
||||||
|
enable_func(_) -> invalid.
|
||||||
|
|
||||||
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
|
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
|
||||||
RpcFunc =
|
RpcFunc =
|
||||||
case OperFunc of
|
case OperFunc of
|
||||||
|
@ -572,7 +677,7 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#{type := Type, name := Name}, Acc) ->
|
fun(#{type := Type, name := Name}, Acc) ->
|
||||||
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
|
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
|
||||||
[format_bridge_info(Bridges) | Acc]
|
[format_bridge_info_with_metrics(Bridges) | Acc]
|
||||||
end,
|
end,
|
||||||
[],
|
[],
|
||||||
BridgesFirstNode
|
BridgesFirstNode
|
||||||
|
@ -606,7 +711,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
|
||||||
BridgesAllNodes
|
BridgesAllNodes
|
||||||
).
|
).
|
||||||
|
|
||||||
format_bridge_info([FirstBridge | _] = Bridges) ->
|
format_bridge_info_with_metrics([FirstBridge | _] = Bridges) ->
|
||||||
Res = maps:remove(node, FirstBridge),
|
Res = maps:remove(node, FirstBridge),
|
||||||
NodeStatus = collect_status(Bridges),
|
NodeStatus = collect_status(Bridges),
|
||||||
NodeMetrics = collect_metrics(Bridges),
|
NodeMetrics = collect_metrics(Bridges),
|
||||||
|
@ -617,6 +722,14 @@ format_bridge_info([FirstBridge | _] = Bridges) ->
|
||||||
node_metrics => NodeMetrics
|
node_metrics => NodeMetrics
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
format_bridge_info_without_metrics(Bridges) ->
|
||||||
|
Res = format_bridge_info_with_metrics(Bridges),
|
||||||
|
maps:without([metrics, node_metrics], Res).
|
||||||
|
|
||||||
|
format_bridge_metrics(Bridges) ->
|
||||||
|
Res = format_bridge_info_with_metrics(Bridges),
|
||||||
|
maps:with([metrics, node_metrics], Res).
|
||||||
|
|
||||||
collect_status(Bridges) ->
|
collect_status(Bridges) ->
|
||||||
[maps:with([node, status], B) || B <- Bridges].
|
[maps:with([node, status], B) || B <- Bridges].
|
||||||
|
|
||||||
|
|
|
@ -215,14 +215,16 @@ recreate(Type, Name, Conf, Opts) ->
|
||||||
Opts
|
Opts
|
||||||
).
|
).
|
||||||
|
|
||||||
create_dry_run(Type, Conf) ->
|
create_dry_run(Type, Conf0) ->
|
||||||
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
|
TmpPath = iolist_to_binary(["bridges-create-dry-run:", emqx_misc:gen_id(8)]),
|
||||||
|
Conf = emqx_map_lib:safe_atom_key_map(Conf0),
|
||||||
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
|
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
{ok, ConfNew} ->
|
{ok, ConfNew} ->
|
||||||
|
ParseConf = parse_confs(bin(Type), TmpPath, ConfNew),
|
||||||
Res = emqx_resource:create_dry_run_local(
|
Res = emqx_resource:create_dry_run_local(
|
||||||
bridge_to_resource_type(Type), ConfNew
|
bridge_to_resource_type(Type), ParseConf
|
||||||
),
|
),
|
||||||
_ = maybe_clear_certs(TmpPath, ConfNew),
|
_ = maybe_clear_certs(TmpPath, ConfNew),
|
||||||
Res
|
Res
|
||||||
|
|
|
@ -51,7 +51,7 @@ fields("post") ->
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
fields("config");
|
fields("config");
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("config").
|
emqx_bridge_schema:status_fields() ++ fields("config").
|
||||||
|
|
||||||
desc("config") ->
|
desc("config") ->
|
||||||
?DESC("config");
|
?DESC("config");
|
||||||
|
|
|
@ -30,7 +30,8 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
common_bridge_fields/0,
|
common_bridge_fields/0,
|
||||||
metrics_status_fields/0
|
status_fields/0,
|
||||||
|
metrics_fields/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
|
@ -83,19 +84,23 @@ common_bridge_fields() ->
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
metrics_status_fields() ->
|
status_fields() ->
|
||||||
|
[
|
||||||
|
{"status", mk(status(), #{desc => ?DESC("desc_status")})},
|
||||||
|
{"node_status",
|
||||||
|
mk(
|
||||||
|
hoconsc:array(ref(?MODULE, "node_status")),
|
||||||
|
#{desc => ?DESC("desc_node_status")}
|
||||||
|
)}
|
||||||
|
].
|
||||||
|
|
||||||
|
metrics_fields() ->
|
||||||
[
|
[
|
||||||
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})},
|
{"metrics", mk(ref(?MODULE, "metrics"), #{desc => ?DESC("desc_metrics")})},
|
||||||
{"node_metrics",
|
{"node_metrics",
|
||||||
mk(
|
mk(
|
||||||
hoconsc:array(ref(?MODULE, "node_metrics")),
|
hoconsc:array(ref(?MODULE, "node_metrics")),
|
||||||
#{desc => ?DESC("desc_node_metrics")}
|
#{desc => ?DESC("desc_node_metrics")}
|
||||||
)},
|
|
||||||
{"status", mk(status(), #{desc => ?DESC("desc_status")})},
|
|
||||||
{"node_status",
|
|
||||||
mk(
|
|
||||||
hoconsc:array(ref(?MODULE, "node_status")),
|
|
||||||
#{desc => ?DESC("desc_node_status")}
|
|
||||||
)}
|
)}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ fields("post") ->
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
fields("config");
|
fields("config");
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post");
|
emqx_bridge_schema:status_fields() ++ fields("post");
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
lists:filter(
|
lists:filter(
|
||||||
fun({K, _V}) ->
|
fun({K, _V}) ->
|
||||||
|
|
|
@ -33,13 +33,21 @@
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
-define(HTTP_BRIDGE(URL, TYPE, NAME), #{
|
-define(BRIDGE(NAME, TYPE), #{
|
||||||
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"type">> => TYPE,
|
<<"type">> => TYPE,
|
||||||
<<"name">> => NAME,
|
<<"name">> => NAME
|
||||||
|
}).
|
||||||
|
-define(MQTT_BRIDGE(SERVER), ?BRIDGE(<<"mqtt_egress_test_bridge">>, <<"mqtt">>)#{
|
||||||
|
<<"server">> => SERVER,
|
||||||
|
<<"username">> => <<"user1">>,
|
||||||
|
<<"password">> => <<"">>,
|
||||||
|
<<"proto_ver">> => <<"v5">>
|
||||||
|
}).
|
||||||
|
-define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{
|
||||||
<<"url">> => URL,
|
<<"url">> => URL,
|
||||||
<<"local_topic">> => <<"emqx_webhook/#">>,
|
<<"local_topic">> => <<"emqx_webhook/#">>,
|
||||||
<<"method">> => <<"post">>,
|
<<"method">> => <<"post">>,
|
||||||
<<"ssl">> => #{<<"enable">> => false},
|
|
||||||
<<"body">> => <<"${payload}">>,
|
<<"body">> => <<"${payload}">>,
|
||||||
<<"headers">> => #{
|
<<"headers">> => #{
|
||||||
<<"content-type">> => <<"application/json">>
|
<<"content-type">> => <<"application/json">>
|
||||||
|
@ -187,8 +195,6 @@ t_http_crud_apis(Config) ->
|
||||||
<<"enable">> := true,
|
<<"enable">> := true,
|
||||||
<<"status">> := _,
|
<<"status">> := _,
|
||||||
<<"node_status">> := [_ | _],
|
<<"node_status">> := [_ | _],
|
||||||
<<"metrics">> := _,
|
|
||||||
<<"node_metrics">> := [_ | _],
|
|
||||||
<<"url">> := URL1
|
<<"url">> := URL1
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
|
|
||||||
|
@ -225,8 +231,6 @@ t_http_crud_apis(Config) ->
|
||||||
<<"enable">> := true,
|
<<"enable">> := true,
|
||||||
<<"status">> := _,
|
<<"status">> := _,
|
||||||
<<"node_status">> := [_ | _],
|
<<"node_status">> := [_ | _],
|
||||||
<<"metrics">> := _,
|
|
||||||
<<"node_metrics">> := [_ | _],
|
|
||||||
<<"url">> := URL2
|
<<"url">> := URL2
|
||||||
},
|
},
|
||||||
jsx:decode(Bridge2)
|
jsx:decode(Bridge2)
|
||||||
|
@ -259,8 +263,6 @@ t_http_crud_apis(Config) ->
|
||||||
<<"enable">> := true,
|
<<"enable">> := true,
|
||||||
<<"status">> := _,
|
<<"status">> := _,
|
||||||
<<"node_status">> := [_ | _],
|
<<"node_status">> := [_ | _],
|
||||||
<<"metrics">> := _,
|
|
||||||
<<"node_metrics">> := [_ | _],
|
|
||||||
<<"url">> := URL2
|
<<"url">> := URL2
|
||||||
},
|
},
|
||||||
jsx:decode(Bridge3Str)
|
jsx:decode(Bridge3Str)
|
||||||
|
@ -456,8 +458,6 @@ do_start_stop_bridges(Type, Config) ->
|
||||||
<<"enable">> := true,
|
<<"enable">> := true,
|
||||||
<<"status">> := <<"connected">>,
|
<<"status">> := <<"connected">>,
|
||||||
<<"node_status">> := [_ | _],
|
<<"node_status">> := [_ | _],
|
||||||
<<"metrics">> := _,
|
|
||||||
<<"node_metrics">> := [_ | _],
|
|
||||||
<<"url">> := URL1
|
<<"url">> := URL1
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||||
|
@ -502,25 +502,23 @@ t_enable_disable_bridges(Config) ->
|
||||||
<<"enable">> := true,
|
<<"enable">> := true,
|
||||||
<<"status">> := <<"connected">>,
|
<<"status">> := <<"connected">>,
|
||||||
<<"node_status">> := [_ | _],
|
<<"node_status">> := [_ | _],
|
||||||
<<"metrics">> := _,
|
|
||||||
<<"node_metrics">> := [_ | _],
|
|
||||||
<<"url">> := URL1
|
<<"url">> := URL1
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||||
%% disable it
|
%% disable it
|
||||||
{ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
|
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
|
||||||
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
|
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
|
||||||
?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
|
?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)),
|
||||||
%% enable again
|
%% enable again
|
||||||
{ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
|
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
|
||||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
||||||
%% enable an already started bridge
|
%% enable an already started bridge
|
||||||
{ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
|
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
|
||||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)),
|
||||||
%% disable it again
|
%% disable it again
|
||||||
{ok, 200, <<>>} = request(post, operation_path(cluster, disable, BridgeID), <<"">>),
|
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
|
||||||
|
|
||||||
{ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>),
|
{ok, 403, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
|
@ -529,7 +527,7 @@ t_enable_disable_bridges(Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% enable a stopped bridge
|
%% enable a stopped bridge
|
||||||
{ok, 200, <<>>} = request(post, operation_path(cluster, enable, BridgeID), <<"">>),
|
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
|
||||||
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
|
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
|
||||||
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)),
|
?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)),
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
|
@ -555,12 +553,10 @@ t_reset_bridges(Config) ->
|
||||||
<<"enable">> := true,
|
<<"enable">> := true,
|
||||||
<<"status">> := <<"connected">>,
|
<<"status">> := <<"connected">>,
|
||||||
<<"node_status">> := [_ | _],
|
<<"node_status">> := [_ | _],
|
||||||
<<"metrics">> := _,
|
|
||||||
<<"node_metrics">> := [_ | _],
|
|
||||||
<<"url">> := URL1
|
<<"url">> := URL1
|
||||||
} = jsx:decode(Bridge),
|
} = jsx:decode(Bridge),
|
||||||
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||||
{ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []),
|
{ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
|
||||||
|
@ -599,10 +595,155 @@ t_with_redact_update(_Config) ->
|
||||||
?assertEqual(Password, Value),
|
?assertEqual(Password, Value),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_bridges_probe(Config) ->
|
||||||
|
Port = ?config(port, Config),
|
||||||
|
URL = ?URL(Port, "some_path"),
|
||||||
|
|
||||||
|
{ok, 204, <<>>} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% second time with same name is ok since no real bridge created
|
||||||
|
{ok, 204, <<>>} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 400, NxDomain} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"code">> := <<"TEST_FAILED">>,
|
||||||
|
<<"message">> := _
|
||||||
|
},
|
||||||
|
jsx:decode(NxDomain)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 204, _} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 400, ConnRefused} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?MQTT_BRIDGE(<<"127.0.0.1:2883">>)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"code">> := <<"TEST_FAILED">>,
|
||||||
|
<<"message">> := <<"#{reason => econnrefused", _/binary>>
|
||||||
|
},
|
||||||
|
jsx:decode(ConnRefused)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 400, BadReq} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges_probe"]),
|
||||||
|
?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>)
|
||||||
|
),
|
||||||
|
?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_metrics(Config) ->
|
||||||
|
Port = ?config(port, Config),
|
||||||
|
%% assert we there's no bridges at first
|
||||||
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
|
||||||
|
%% then we add a webhook bridge, using POST
|
||||||
|
%% POST /bridges/ will create a bridge
|
||||||
|
URL1 = ?URL(Port, "path1"),
|
||||||
|
Name = ?BRIDGE_NAME,
|
||||||
|
{ok, 201, Bridge} = request(
|
||||||
|
post,
|
||||||
|
uri(["bridges"]),
|
||||||
|
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
|
||||||
|
),
|
||||||
|
|
||||||
|
%ct:pal("---bridge: ~p", [Bridge]),
|
||||||
|
#{
|
||||||
|
<<"type">> := ?BRIDGE_TYPE,
|
||||||
|
<<"name">> := Name,
|
||||||
|
<<"enable">> := true,
|
||||||
|
<<"status">> := _,
|
||||||
|
<<"node_status">> := [_ | _],
|
||||||
|
<<"url">> := URL1
|
||||||
|
} = jsx:decode(Bridge),
|
||||||
|
|
||||||
|
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
|
||||||
|
|
||||||
|
%% check for empty bridge metrics
|
||||||
|
{ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"metrics">> := #{<<"success">> := 0},
|
||||||
|
<<"node_metrics">> := [_ | _]
|
||||||
|
},
|
||||||
|
jsx:decode(Bridge1Str)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% check that the bridge doesn't contain metrics anymore
|
||||||
|
{ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []),
|
||||||
|
Decoded = jsx:decode(Bridge2Str),
|
||||||
|
?assertNot(maps:is_key(<<"metrics">>, Decoded)),
|
||||||
|
?assertNot(maps:is_key(<<"node_metrics">>, Decoded)),
|
||||||
|
|
||||||
|
%% send an message to emqx and the message should be forwarded to the HTTP server
|
||||||
|
Body = <<"my msg">>,
|
||||||
|
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
|
||||||
|
?assert(
|
||||||
|
receive
|
||||||
|
{http_server, received, #{
|
||||||
|
method := <<"POST">>,
|
||||||
|
path := <<"/path1">>,
|
||||||
|
body := Body
|
||||||
|
}} ->
|
||||||
|
true;
|
||||||
|
Msg ->
|
||||||
|
ct:pal("error: http got unexpected request: ~p", [Msg]),
|
||||||
|
false
|
||||||
|
after 100 ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
),
|
||||||
|
|
||||||
|
%% check for non-empty bridge metrics
|
||||||
|
{ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"metrics">> := #{<<"success">> := 1},
|
||||||
|
<<"node_metrics">> := [_ | _]
|
||||||
|
},
|
||||||
|
jsx:decode(Bridge3Str)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% check for non-empty metrics when listing all bridges
|
||||||
|
{ok, 200, BridgesStr} = request(get, uri(["bridges"]), []),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
<<"metrics">> := #{<<"success">> := 1},
|
||||||
|
<<"node_metrics">> := [_ | _]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
jsx:decode(BridgesStr)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
operation_path(node, Oper, BridgeID) ->
|
operation_path(node, Oper, BridgeID) ->
|
||||||
uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]);
|
uri(["nodes", node(), "bridges", BridgeID, Oper]);
|
||||||
operation_path(cluster, Oper, BridgeID) ->
|
operation_path(cluster, Oper, BridgeID) ->
|
||||||
uri(["bridges", BridgeID, "operation", Oper]).
|
uri(["bridges", BridgeID, Oper]).
|
||||||
|
|
||||||
|
enable_path(Enable, BridgeID) ->
|
||||||
|
uri(["bridges", BridgeID, "enable", Enable]).
|
||||||
|
|
||||||
str(S) when is_list(S) -> S;
|
str(S) when is_list(S) -> S;
|
||||||
str(S) when is_binary(S) -> binary_to_list(S).
|
str(S) when is_binary(S) -> binary_to_list(S).
|
||||||
|
|
|
@ -187,7 +187,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
|
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
<<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
||||||
|
@ -200,7 +200,7 @@ t_mqtt_conn_bridge_ingress(_) ->
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
jsx:decode(BridgeStr)
|
jsx:decode(BridgeMetricsStr)
|
||||||
),
|
),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
|
@ -255,7 +255,7 @@ t_mqtt_conn_bridge_egress(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
||||||
|
@ -268,7 +268,7 @@ t_mqtt_conn_bridge_egress(_) ->
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
jsx:decode(BridgeStr)
|
jsx:decode(BridgeMetricsStr)
|
||||||
),
|
),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
|
@ -354,7 +354,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
Payload = <<"hello">>,
|
Payload = <<"hello">>,
|
||||||
emqx:subscribe(RemoteTopic),
|
emqx:subscribe(RemoteTopic),
|
||||||
|
|
||||||
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{
|
<<"metrics">> := #{
|
||||||
<<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
|
<<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0
|
||||||
|
@ -371,7 +371,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
} = jsx:decode(BridgeStr1),
|
} = jsx:decode(BridgeMetricsStr1),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||||
%% the remote broker is also the local one.
|
%% the remote broker is also the local one.
|
||||||
|
@ -393,7 +393,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
timer:sleep(1000),
|
timer:sleep(1000),
|
||||||
{ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{
|
<<"metrics">> := #{
|
||||||
<<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
|
<<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0
|
||||||
|
@ -410,7 +410,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
} = jsx:decode(BridgeStr2),
|
} = jsx:decode(BridgeMetricsStr2),
|
||||||
?assertEqual(CntMatched2, CntMatched1 + 1),
|
?assertEqual(CntMatched2, CntMatched1 + 1),
|
||||||
?assertEqual(CntSuccess2, CntSuccess1 + 1),
|
?assertEqual(CntSuccess2, CntSuccess1 + 1),
|
||||||
?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
|
?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1),
|
||||||
|
@ -513,7 +513,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []),
|
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
<<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1},
|
||||||
|
@ -526,7 +526,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
jsx:decode(BridgeStr)
|
jsx:decode(BridgeMetricsStr)
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||||
|
@ -627,7 +627,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
|
<<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0},
|
||||||
|
@ -641,7 +641,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
jsx:decode(BridgeStr)
|
jsx:decode(BridgeMetricsStr)
|
||||||
),
|
),
|
||||||
|
|
||||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||||
|
@ -693,7 +693,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
assert_mqtt_msg_received(RemoteTopic, Payload0),
|
assert_mqtt_msg_received(RemoteTopic, Payload0),
|
||||||
|
|
||||||
%% verify the metrics of the bridge
|
%% verify the metrics of the bridge
|
||||||
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
|
||||||
|
@ -706,7 +706,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
jsx:decode(BridgeStr)
|
jsx:decode(BridgeMetricsStr)
|
||||||
),
|
),
|
||||||
|
|
||||||
%% stop the listener 1883 to make the bridge disconnected
|
%% stop the listener 1883 to make the bridge disconnected
|
||||||
|
@ -740,7 +740,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
|
|
||||||
%% verify the metrics of the bridge, the message should be queued
|
%% verify the metrics of the bridge, the message should be queued
|
||||||
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||||
|
{ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
||||||
Decoded1 = jsx:decode(BridgeStr1),
|
Decoded1 = jsx:decode(BridgeStr1),
|
||||||
|
DecodedMetrics1 = jsx:decode(BridgeMetricsStr1),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
Status when (Status == <<"connected">> orelse Status == <<"connecting">>),
|
Status when (Status == <<"connected">> orelse Status == <<"connecting">>),
|
||||||
maps:get(<<"status">>, Decoded1)
|
maps:get(<<"status">>, Decoded1)
|
||||||
|
@ -753,7 +755,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
<<"failed">> := 0,
|
<<"failed">> := 0,
|
||||||
<<"queuing">> := 2
|
<<"queuing">> := 2
|
||||||
} when Matched >= 3,
|
} when Matched >= 3,
|
||||||
maps:get(<<"metrics">>, Decoded1)
|
maps:get(<<"metrics">>, DecodedMetrics1)
|
||||||
),
|
),
|
||||||
|
|
||||||
%% start the listener 1883 to make the bridge reconnected
|
%% start the listener 1883 to make the bridge reconnected
|
||||||
|
@ -761,10 +763,12 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
timer:sleep(1500),
|
timer:sleep(1500),
|
||||||
%% verify the metrics of the bridge, the 2 queued messages should have been sent
|
%% verify the metrics of the bridge, the 2 queued messages should have been sent
|
||||||
{ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
{ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
|
||||||
|
{ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []),
|
||||||
|
Decoded2 = jsx:decode(BridgeStr2),
|
||||||
|
?assertEqual(<<"connected">>, maps:get(<<"status">>, Decoded2)),
|
||||||
%% matched >= 3 because of possible retries.
|
%% matched >= 3 because of possible retries.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
<<"status">> := <<"connected">>,
|
|
||||||
<<"metrics">> := #{
|
<<"metrics">> := #{
|
||||||
<<"matched">> := Matched,
|
<<"matched">> := Matched,
|
||||||
<<"success">> := 3,
|
<<"success">> := 3,
|
||||||
|
@ -773,7 +777,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||||
<<"retried">> := _
|
<<"retried">> := _
|
||||||
}
|
}
|
||||||
} when Matched >= 3,
|
} when Matched >= 3,
|
||||||
jsx:decode(BridgeStr2)
|
jsx:decode(BridgeMetricsStr2)
|
||||||
),
|
),
|
||||||
%% also verify the 2 messages have been sent to the remote broker
|
%% also verify the 2 messages have been sent to the remote broker
|
||||||
assert_mqtt_msg_received(RemoteTopic, Payload1),
|
assert_mqtt_msg_received(RemoteTopic, Payload1),
|
||||||
|
|
|
@ -384,14 +384,10 @@ on_query_async(
|
||||||
|
|
||||||
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
|
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
|
||||||
case do_get_status(PoolName, Timeout) of
|
case do_get_status(PoolName, Timeout) of
|
||||||
true ->
|
ok ->
|
||||||
connected;
|
{connected, State};
|
||||||
false ->
|
{error, Reason} ->
|
||||||
?SLOG(error, #{
|
{disconnected, State, Reason}
|
||||||
msg => "http_connector_get_status_failed",
|
|
||||||
state => State
|
|
||||||
}),
|
|
||||||
disconnected
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_get_status(PoolName, Timeout) ->
|
do_get_status(PoolName, Timeout) ->
|
||||||
|
@ -400,24 +396,28 @@ do_get_status(PoolName, Timeout) ->
|
||||||
fun(Worker) ->
|
fun(Worker) ->
|
||||||
case ehttpc:health_check(Worker, Timeout) of
|
case ehttpc:health_check(Worker, Timeout) of
|
||||||
ok ->
|
ok ->
|
||||||
true;
|
ok;
|
||||||
{error, Reason} ->
|
{error, Reason} = Error ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "ehttpc_health_check_failed",
|
msg => "http_connector_get_status_failed",
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
worker => Worker
|
worker => Worker
|
||||||
}),
|
}),
|
||||||
false
|
Error
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
|
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
|
||||||
[_ | _] = Status ->
|
% we crash in case of non-empty lists since we don't know what to do in that case
|
||||||
lists:all(fun(St) -> St =:= true end, Status);
|
[_ | _] = Results ->
|
||||||
[] ->
|
case [E || {error, _} = E <- Results] of
|
||||||
false
|
[] ->
|
||||||
|
ok;
|
||||||
|
Errors ->
|
||||||
|
hd(Errors)
|
||||||
|
end
|
||||||
catch
|
catch
|
||||||
exit:timeout ->
|
exit:timeout ->
|
||||||
false
|
{error, timeout}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -65,10 +65,21 @@ roots() ->
|
||||||
|
|
||||||
fields(config) ->
|
fields(config) ->
|
||||||
[{server, server()}] ++
|
[{server, server()}] ++
|
||||||
emqx_connector_schema_lib:relational_db_fields() ++
|
add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++
|
||||||
emqx_connector_schema_lib:ssl_fields() ++
|
emqx_connector_schema_lib:ssl_fields() ++
|
||||||
emqx_connector_schema_lib:prepare_statement_fields().
|
emqx_connector_schema_lib:prepare_statement_fields().
|
||||||
|
|
||||||
|
add_default_username([{username, OrigUsernameFn} | Tail], Head) ->
|
||||||
|
Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail];
|
||||||
|
add_default_username([Field | Tail], Head) ->
|
||||||
|
add_default_username(Tail, Head ++ [Field]).
|
||||||
|
|
||||||
|
add_default_fn(OrigFn, Default) ->
|
||||||
|
fun
|
||||||
|
(default) -> Default;
|
||||||
|
(Field) -> OrigFn(Field)
|
||||||
|
end.
|
||||||
|
|
||||||
server() ->
|
server() ->
|
||||||
Meta = #{desc => ?DESC("server")},
|
Meta = #{desc => ?DESC("server")},
|
||||||
emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
|
emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
|
||||||
|
@ -82,7 +93,7 @@ on_start(
|
||||||
#{
|
#{
|
||||||
server := Server,
|
server := Server,
|
||||||
database := DB,
|
database := DB,
|
||||||
username := User,
|
username := Username,
|
||||||
pool_size := PoolSize,
|
pool_size := PoolSize,
|
||||||
ssl := SSL
|
ssl := SSL
|
||||||
} = Config
|
} = Config
|
||||||
|
@ -101,13 +112,15 @@ on_start(
|
||||||
[]
|
[]
|
||||||
end,
|
end,
|
||||||
Options = [
|
Options = [
|
||||||
{host, Host},
|
maybe_password_opt(maps:get(password, Config, undefined))
|
||||||
{port, Port},
|
| [
|
||||||
{user, User},
|
{host, Host},
|
||||||
{password, maps:get(password, Config, <<>>)},
|
{port, Port},
|
||||||
{database, DB},
|
{user, Username},
|
||||||
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
|
{database, DB},
|
||||||
{pool_size, PoolSize}
|
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
|
||||||
|
{pool_size, PoolSize}
|
||||||
|
]
|
||||||
],
|
],
|
||||||
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
||||||
Prepares = parse_prepare_sql(Config),
|
Prepares = parse_prepare_sql(Config),
|
||||||
|
@ -123,6 +136,11 @@ on_start(
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_password_opt(undefined) ->
|
||||||
|
[];
|
||||||
|
maybe_password_opt(Password) ->
|
||||||
|
{password, Password}.
|
||||||
|
|
||||||
on_stop(InstId, #{poolname := PoolName}) ->
|
on_stop(InstId, #{poolname := PoolName}) ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "stopping_mysql_connector",
|
msg => "stopping_mysql_connector",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_resource, [
|
{application, emqx_resource, [
|
||||||
{description, "Manager for all external resources"},
|
{description, "Manager for all external resources"},
|
||||||
{vsn, "0.1.4"},
|
{vsn, "0.1.5"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_resource_app, []}},
|
{mod, {emqx_resource_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -174,6 +174,9 @@ create_dry_run(ResourceType, Config) ->
|
||||||
case wait_for_ready(ResId, 15000) of
|
case wait_for_ready(ResId, 15000) of
|
||||||
ok ->
|
ok ->
|
||||||
remove(ResId);
|
remove(ResId);
|
||||||
|
{error, Reason} ->
|
||||||
|
_ = remove(ResId),
|
||||||
|
{error, Reason};
|
||||||
timeout ->
|
timeout ->
|
||||||
_ = remove(ResId),
|
_ = remove(ResId),
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
|
@ -631,16 +634,18 @@ data_record_to_external_map_with_metrics(Data) ->
|
||||||
metrics => get_metrics(Data#data.id)
|
metrics => get_metrics(Data#data.id)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec wait_for_ready(resource_id(), integer()) -> ok | timeout.
|
-spec wait_for_ready(resource_id(), integer()) -> ok | timeout | {error, term()}.
|
||||||
wait_for_ready(ResId, WaitTime) ->
|
wait_for_ready(ResId, WaitTime) ->
|
||||||
do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
|
do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
|
||||||
|
|
||||||
do_wait_for_ready(_ResId, 0) ->
|
do_wait_for_ready(_ResId, 0) ->
|
||||||
timeout;
|
timeout;
|
||||||
do_wait_for_ready(ResId, Retry) ->
|
do_wait_for_ready(ResId, Retry) ->
|
||||||
case ets_lookup(ResId) of
|
case read_cache(ResId) of
|
||||||
{ok, _Group, #{status := connected}} ->
|
{_Group, #data{status = connected}} ->
|
||||||
ok;
|
ok;
|
||||||
|
{_Group, #data{status = disconnected, error = Reason}} ->
|
||||||
|
{error, Reason};
|
||||||
_ ->
|
_ ->
|
||||||
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
|
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
|
||||||
do_wait_for_ready(ResId, Retry - 1)
|
do_wait_for_ready(ResId, Retry - 1)
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Refactor `/authorization/sources/built_in_database/` by adding `rules/` to the path.
|
|
@ -0,0 +1 @@
|
||||||
|
重构 `/authorization/sources/built_in_database/` 接口,将 `rules/` 添加到了其路径中。
|
|
@ -0,0 +1 @@
|
||||||
|
`/bridges_probe` API endpoint to test params for creating a new data bridge.
|
|
@ -0,0 +1 @@
|
||||||
|
添加新 API 接口 `/bridges_probe` 用于测试创建桥接的参数是否可用。
|
|
@ -0,0 +1,5 @@
|
||||||
|
Refactor of /bridges API to make it more consistent with other APIs:
|
||||||
|
- bridge enable/disable is now done via the endpoint `/bridges/{id}/enable/[true,false]`
|
||||||
|
- `/bridges/{id}/operation/{operation}` endpoints are now `/bridges/{id}/{operation}`
|
||||||
|
- metrics are moved out from the GET `/bridges/{id}` response and can now be fetched via `/bridges/{id}/metrics`
|
||||||
|
- the `bridges/{id}/reset_metrics` endpoint is now `/bridges/{id}/metrics/reset`
|
|
@ -0,0 +1,5 @@
|
||||||
|
重构部分 /bridges 的API 使得其和其他 API 能够更加一致:
|
||||||
|
- 桥接的启用和禁用现在是通过 `/bridges/{id}/enable/[true,false]` API 来实现的
|
||||||
|
- 使用 `/bridges/{id}/{operation}` 替换了旧的 `/bridges/{id}/operation/{operation}` API
|
||||||
|
- 指标数据从 `/bridges/{id}` 的响应消息中移除,现在可以使用新的 API `/bridges/{id}/metrics` 进行访问
|
||||||
|
- 使用 `/bridges/{id}/metrics/reset` 替换了旧的 `bridges/{id}/reset_metrics` API
|
|
@ -124,7 +124,7 @@ fields(bridge_config) ->
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post");
|
emqx_bridge_schema:status_fields() ++ fields("post");
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[type_field(), name_field() | fields("config")];
|
[type_field(), name_field() | fields("config")];
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
|
|
|
@ -67,7 +67,7 @@ fields("post") ->
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
fields("config");
|
fields("config");
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
|
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||||
|
|
||||||
field(connector) ->
|
field(connector) ->
|
||||||
mk(
|
mk(
|
||||||
|
|
|
@ -139,7 +139,7 @@ method_fileds(get, ConnectorType) ->
|
||||||
influxdb_bridge_common_fields() ++
|
influxdb_bridge_common_fields() ++
|
||||||
connector_fields(ConnectorType) ++
|
connector_fields(ConnectorType) ++
|
||||||
type_name_fields(ConnectorType) ++
|
type_name_fields(ConnectorType) ++
|
||||||
emqx_bridge_schema:metrics_status_fields();
|
emqx_bridge_schema:status_fields();
|
||||||
method_fileds(put, ConnectorType) ->
|
method_fileds(put, ConnectorType) ->
|
||||||
influxdb_bridge_common_fields() ++
|
influxdb_bridge_common_fields() ++
|
||||||
connector_fields(ConnectorType).
|
connector_fields(ConnectorType).
|
||||||
|
|
|
@ -67,7 +67,7 @@ fields("post") ->
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
fields("config");
|
fields("config");
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post");
|
emqx_bridge_schema:status_fields() ++ fields("post");
|
||||||
fields("config") ->
|
fields("config") ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
|
|
|
@ -59,15 +59,15 @@ fields("put_sharded") ->
|
||||||
fields("put_single") ->
|
fields("put_single") ->
|
||||||
fields(mongodb_single);
|
fields(mongodb_single);
|
||||||
fields("get_rs") ->
|
fields("get_rs") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++
|
emqx_bridge_schema:status_fields() ++
|
||||||
fields(mongodb_rs) ++
|
fields(mongodb_rs) ++
|
||||||
type_and_name_fields(mongodb_rs);
|
type_and_name_fields(mongodb_rs);
|
||||||
fields("get_sharded") ->
|
fields("get_sharded") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++
|
emqx_bridge_schema:status_fields() ++
|
||||||
fields(mongodb_sharded) ++
|
fields(mongodb_sharded) ++
|
||||||
type_and_name_fields(mongodb_sharded);
|
type_and_name_fields(mongodb_sharded);
|
||||||
fields("get_single") ->
|
fields("get_single") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++
|
emqx_bridge_schema:status_fields() ++
|
||||||
fields(mongodb_single) ++
|
fields(mongodb_single) ++
|
||||||
type_and_name_fields(mongodb_single);
|
type_and_name_fields(mongodb_single);
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
|
|
|
@ -104,7 +104,7 @@ fields("post") ->
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
fields("config");
|
fields("config");
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
|
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||||
|
|
||||||
desc("config") ->
|
desc("config") ->
|
||||||
?DESC("desc_config");
|
?DESC("desc_config");
|
||||||
|
|
|
@ -106,7 +106,7 @@ fields("post") ->
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
fields("config");
|
fields("config");
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post").
|
emqx_bridge_schema:status_fields() ++ fields("post").
|
||||||
|
|
||||||
fields("post", Type) ->
|
fields("post", Type) ->
|
||||||
[type_field(Type), name_field() | fields("config")].
|
[type_field(Type), name_field() | fields("config")].
|
||||||
|
|
|
@ -126,7 +126,7 @@ method_fileds(get, ConnectorType) ->
|
||||||
redis_bridge_common_fields() ++
|
redis_bridge_common_fields() ++
|
||||||
connector_fields(ConnectorType) ++
|
connector_fields(ConnectorType) ++
|
||||||
type_name_fields(ConnectorType) ++
|
type_name_fields(ConnectorType) ++
|
||||||
emqx_bridge_schema:metrics_status_fields();
|
emqx_bridge_schema:status_fields();
|
||||||
method_fileds(put, ConnectorType) ->
|
method_fileds(put, ConnectorType) ->
|
||||||
redis_bridge_common_fields() ++
|
redis_bridge_common_fields() ++
|
||||||
connector_fields(ConnectorType).
|
connector_fields(ConnectorType).
|
||||||
|
|
|
@ -220,9 +220,10 @@ kafka_bridge_rest_api_helper(Config) ->
|
||||||
BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
|
BridgeIdUrlEnc = BridgeType ++ UrlEscColon ++ BridgeName,
|
||||||
BridgesParts = ["bridges"],
|
BridgesParts = ["bridges"],
|
||||||
BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"],
|
BridgesPartsIdDeleteAlsoActions = ["bridges", BridgeIdUrlEnc ++ "?also_delete_dep_actions"],
|
||||||
OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end,
|
OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, OpName] end,
|
||||||
BridgesPartsOpDisable = OpUrlFun("disable"),
|
EnableFun = fun(Enable) -> ["bridges", BridgeIdUrlEnc, "enable", Enable] end,
|
||||||
BridgesPartsOpEnable = OpUrlFun("enable"),
|
BridgesPartsOpDisable = EnableFun("false"),
|
||||||
|
BridgesPartsOpEnable = EnableFun("true"),
|
||||||
BridgesPartsOpRestart = OpUrlFun("restart"),
|
BridgesPartsOpRestart = OpUrlFun("restart"),
|
||||||
BridgesPartsOpStop = OpUrlFun("stop"),
|
BridgesPartsOpStop = OpUrlFun("stop"),
|
||||||
%% List bridges
|
%% List bridges
|
||||||
|
@ -321,10 +322,10 @@ kafka_bridge_rest_api_helper(Config) ->
|
||||||
?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
|
?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)),
|
||||||
?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
|
?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)),
|
||||||
%% Perform operations
|
%% Perform operations
|
||||||
{ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
|
{ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})),
|
||||||
{ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
|
{ok, 204, _} = show(http_put(show(BridgesPartsOpDisable), #{})),
|
||||||
{ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
|
{ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})),
|
||||||
{ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
|
{ok, 204, _} = show(http_put(show(BridgesPartsOpEnable), #{})),
|
||||||
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
|
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
|
||||||
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
|
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
|
||||||
{ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
|
{ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
|
||||||
|
|
Loading…
Reference in New Issue