Merge pull request #12137 from zmstone/1207-fix-dialyzer
fix(dialyzer): batch 2
This commit is contained in:
commit
1212ccc519
|
@ -201,7 +201,8 @@ gen_salt(#{name := Other}) when Other =/= plain, Other =/= bcrypt ->
|
||||||
<<X:128/big-unsigned-integer>> = crypto:strong_rand_bytes(16),
|
<<X:128/big-unsigned-integer>> = crypto:strong_rand_bytes(16),
|
||||||
iolist_to_binary(io_lib:format("~32.16.0b", [X])).
|
iolist_to_binary(io_lib:format("~32.16.0b", [X])).
|
||||||
|
|
||||||
-spec hash(algorithm_rw(), emqx_passwd:password()) -> {emqx_passwd:hash(), emqx_passwd:salt()}.
|
-spec hash(algorithm_rw(), emqx_passwd:password()) ->
|
||||||
|
{emqx_passwd:password_hash(), emqx_passwd:salt()}.
|
||||||
hash(#{name := bcrypt, salt_rounds := _} = Algorithm, Password) ->
|
hash(#{name := bcrypt, salt_rounds := _} = Algorithm, Password) ->
|
||||||
Salt0 = gen_salt(Algorithm),
|
Salt0 = gen_salt(Algorithm),
|
||||||
Hash = emqx_passwd:hash({bcrypt, Salt0}, Password),
|
Hash = emqx_passwd:hash({bcrypt, Salt0}, Password),
|
||||||
|
@ -231,7 +232,7 @@ hash(#{name := Other, salt_position := SaltPosition} = Algorithm, Password) ->
|
||||||
-spec check_password(
|
-spec check_password(
|
||||||
algorithm(),
|
algorithm(),
|
||||||
emqx_passwd:salt(),
|
emqx_passwd:salt(),
|
||||||
emqx_passwd:hash(),
|
emqx_passwd:password_hash(),
|
||||||
emqx_passwd:password()
|
emqx_passwd:password()
|
||||||
) -> boolean().
|
) -> boolean().
|
||||||
check_password(#{name := bcrypt}, _Salt, PasswordHash, Password) ->
|
check_password(#{name := bcrypt}, _Salt, PasswordHash, Password) ->
|
||||||
|
|
|
@ -31,7 +31,7 @@ introduced_in() ->
|
||||||
"5.0.0".
|
"5.0.0".
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], atom(), binary()) ->
|
-spec lookup_from_all_nodes([node()], atom(), binary()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID) ->
|
lookup_from_all_nodes(Nodes, ChainName, AuthenticatorID) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes, emqx_authn_api, lookup_from_local_node, [ChainName, AuthenticatorID], ?TIMEOUT
|
Nodes, emqx_authn_api, lookup_from_local_node, [ChainName, AuthenticatorID], ?TIMEOUT
|
||||||
|
|
|
@ -16,10 +16,6 @@
|
||||||
|
|
||||||
-module(emqx_authz_rule).
|
-module(emqx_authz_rule).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
|
||||||
-include_lib("emqx/include/emqx_placeholder.hrl").
|
|
||||||
-include("emqx_authz.hrl").
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -29,9 +25,16 @@
|
||||||
-export([
|
-export([
|
||||||
match/4,
|
match/4,
|
||||||
matches/4,
|
matches/4,
|
||||||
compile/1
|
compile/1,
|
||||||
|
compile/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([action/0, action_precompile/0]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_placeholder.hrl").
|
||||||
|
-include("emqx_authz.hrl").
|
||||||
|
|
||||||
-type permission() :: allow | deny.
|
-type permission() :: allow | deny.
|
||||||
|
|
||||||
-type who_condition() ::
|
-type who_condition() ::
|
||||||
|
@ -73,8 +76,24 @@
|
||||||
topic_condition/0
|
topic_condition/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-type action_precompile() ::
|
||||||
|
subscribe
|
||||||
|
| publish
|
||||||
|
| {subscribe, list()}
|
||||||
|
| {publish, list()}
|
||||||
|
| all.
|
||||||
|
|
||||||
|
-type topic_filter() :: emqx_types:topic().
|
||||||
|
|
||||||
|
-type rule_precompile() :: {permission(), who_condition(), action_precompile(), [topic_filter()]}.
|
||||||
|
|
||||||
-define(IS_PERMISSION(Permission), (Permission =:= allow orelse Permission =:= deny)).
|
-define(IS_PERMISSION(Permission), (Permission =:= allow orelse Permission =:= deny)).
|
||||||
|
|
||||||
|
-spec compile(permission(), who_condition(), action_precompile(), [topic_filter()]) -> rule().
|
||||||
|
compile(Permission, Who, Action, TopicFilters) ->
|
||||||
|
compile({Permission, Who, Action, TopicFilters}).
|
||||||
|
|
||||||
|
-spec compile({permission(), all} | rule_precompile()) -> rule().
|
||||||
compile({Permission, all}) when
|
compile({Permission, all}) when
|
||||||
?IS_PERMISSION(Permission)
|
?IS_PERMISSION(Permission)
|
||||||
->
|
->
|
||||||
|
|
|
@ -31,6 +31,6 @@ introduced_in() ->
|
||||||
"5.0.0".
|
"5.0.0".
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], atom()) ->
|
-spec lookup_from_all_nodes([node()], atom()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, Type) ->
|
lookup_from_all_nodes(Nodes, Type) ->
|
||||||
erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [Type], ?TIMEOUT).
|
erpc:multicall(Nodes, emqx_authz_api_sources, lookup_from_local_node, [Type], ?TIMEOUT).
|
||||||
|
|
|
@ -1,14 +1,15 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_auth_mnesia, [
|
{application, emqx_auth_mnesia, [
|
||||||
{description, "EMQX Buitl-in Database Authentication and Authorization"},
|
{description, "EMQX Buitl-in Database Authentication and Authorization"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_auth_mnesia_app, []}},
|
{mod, {emqx_auth_mnesia_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
stdlib,
|
stdlib,
|
||||||
emqx,
|
emqx,
|
||||||
emqx_auth
|
emqx_auth,
|
||||||
|
esasl
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, []},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
|
|
|
@ -32,7 +32,9 @@
|
||||||
-type clientid() :: {clientid, binary()}.
|
-type clientid() :: {clientid, binary()}.
|
||||||
-type who() :: username() | clientid() | all.
|
-type who() :: username() | clientid() | all.
|
||||||
|
|
||||||
-type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_types:topic()}.
|
-type rule() :: {
|
||||||
|
emqx_authz_rule:permission(), emqx_authz_rule:action_precompile(), emqx_types:topic()
|
||||||
|
}.
|
||||||
-type rules() :: [rule()].
|
-type rules() :: [rule()].
|
||||||
|
|
||||||
-record(emqx_acl, {
|
-record(emqx_acl, {
|
||||||
|
@ -223,7 +225,7 @@ do_get_rules(Key) ->
|
||||||
do_authorize(_Client, _PubSub, _Topic, []) ->
|
do_authorize(_Client, _PubSub, _Topic, []) ->
|
||||||
nomatch;
|
nomatch;
|
||||||
do_authorize(Client, PubSub, Topic, [{Permission, Action, TopicFilter} | Tail]) ->
|
do_authorize(Client, PubSub, Topic, [{Permission, Action, TopicFilter} | Tail]) ->
|
||||||
Rule = emqx_authz_rule:compile({Permission, all, Action, [TopicFilter]}),
|
Rule = emqx_authz_rule:compile(Permission, all, Action, [TopicFilter]),
|
||||||
case emqx_authz_rule:match(Client, PubSub, Topic, Rule) of
|
case emqx_authz_rule:match(Client, PubSub, Topic, Rule) of
|
||||||
{matched, Permission} -> {matched, Permission};
|
{matched, Permission} -> {matched, Permission};
|
||||||
nomatch -> do_authorize(Client, PubSub, Topic, Tail)
|
nomatch -> do_authorize(Client, PubSub, Topic, Tail)
|
||||||
|
|
|
@ -69,7 +69,7 @@ stop_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -80,7 +80,7 @@ restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -91,7 +91,7 @@ stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
|
|
@ -82,7 +82,7 @@ stop_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -93,7 +93,7 @@ restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -104,7 +104,7 @@ start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -115,7 +115,7 @@ stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
|
|
@ -88,7 +88,7 @@ stop_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -99,7 +99,7 @@ restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -110,7 +110,7 @@ start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -121,7 +121,7 @@ stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
|
|
@ -80,7 +80,7 @@ stop_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -91,7 +91,7 @@ restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -102,7 +102,7 @@ start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -113,7 +113,7 @@ stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
|
|
@ -86,7 +86,7 @@ stop_bridge_to_node(Node, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -97,7 +97,7 @@ restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -108,7 +108,7 @@ start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -119,7 +119,7 @@ stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -147,7 +147,7 @@ v2_list_bridges_on_nodes(Nodes) ->
|
||||||
erpc:multicall(Nodes, emqx_bridge_v2, list, [], ?TIMEOUT).
|
erpc:multicall(Nodes, emqx_bridge_v2, list, [], ?TIMEOUT).
|
||||||
|
|
||||||
-spec v2_lookup_from_all_nodes([node()], key(), key()) ->
|
-spec v2_lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -158,7 +158,7 @@ v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec v2_get_metrics_from_all_nodes([node()], key(), key()) ->
|
-spec v2_get_metrics_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName) ->
|
v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -169,7 +169,7 @@ v2_get_metrics_from_all_nodes(Nodes, ActionType, ActionName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec v2_start_bridge_to_all_nodes([node()], key(), key()) ->
|
-spec v2_start_bridge_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(ok).
|
||||||
v2_start_bridge_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
v2_start_bridge_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
|
|
@ -104,8 +104,7 @@ connect(Pid, Name) ->
|
||||||
config(#{remote := RC = #{}} = Conf) ->
|
config(#{remote := RC = #{}} = Conf) ->
|
||||||
Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}.
|
Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}.
|
||||||
|
|
||||||
-spec send(pid(), message(), egress()) ->
|
-spec send(pid(), message(), egress()) -> ok.
|
||||||
ok.
|
|
||||||
send(Pid, MsgIn, Egress) ->
|
send(Pid, MsgIn, Egress) ->
|
||||||
emqtt:publish(Pid, export_msg(MsgIn, Egress)).
|
emqtt:publish(Pid, export_msg(MsgIn, Egress)).
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
-type state() :: #{
|
-type state() :: #{
|
||||||
pulsar_client_id := pulsar_client_id(),
|
pulsar_client_id := pulsar_client_id(),
|
||||||
producers := pulsar_producers:producers(),
|
producers := pulsar_producers:producers(),
|
||||||
sync_timeout := infinity | time:time(),
|
sync_timeout := erlang:timeout(),
|
||||||
message_template := message_template()
|
message_template := message_template()
|
||||||
}.
|
}.
|
||||||
-type buffer_mode() :: memory | disk | hybrid.
|
-type buffer_mode() :: memory | disk | hybrid.
|
||||||
|
@ -43,8 +43,8 @@
|
||||||
bridge_name := atom(),
|
bridge_name := atom(),
|
||||||
buffer := #{
|
buffer := #{
|
||||||
mode := buffer_mode(),
|
mode := buffer_mode(),
|
||||||
per_partition_limit := emqx_schema:byte_size(),
|
per_partition_limit := emqx_schema:bytesize(),
|
||||||
segment_bytes := emqx_schema:byte_size(),
|
segment_bytes := emqx_schema:bytesize(),
|
||||||
memory_overload_protection := boolean()
|
memory_overload_protection := boolean()
|
||||||
},
|
},
|
||||||
compression := compression_mode(),
|
compression := compression_mode(),
|
||||||
|
|
|
@ -31,7 +31,7 @@
|
||||||
socket := inet:socket(),
|
socket := inet:socket(),
|
||||||
frame_state :=
|
frame_state :=
|
||||||
undefined
|
undefined
|
||||||
| emqx_bridge_sysk_frame:state(),
|
| emqx_bridge_syskeeper_frame:state(),
|
||||||
buffer := binary(),
|
buffer := binary(),
|
||||||
conf := map()
|
conf := map()
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -43,12 +43,12 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
%% @doc Adds a new config handler to emqx_config_handler.
|
%% @doc Adds a new config handler to emqx_config_handler.
|
||||||
-spec add_handler(emqx_config:config_key_path(), module()) -> ok.
|
-spec add_handler(emqx_utils_maps:config_key_path(), module()) -> ok.
|
||||||
add_handler(ConfKeyPath, HandlerName) ->
|
add_handler(ConfKeyPath, HandlerName) ->
|
||||||
emqx_config_handler:add_handler(ConfKeyPath, HandlerName).
|
emqx_config_handler:add_handler(ConfKeyPath, HandlerName).
|
||||||
|
|
||||||
%% @doc remove config handler from emqx_config_handler.
|
%% @doc remove config handler from emqx_config_handler.
|
||||||
-spec remove_handler(emqx_config:config_key_path()) -> ok.
|
-spec remove_handler(emqx_utils_maps:config_key_path()) -> ok.
|
||||||
remove_handler(ConfKeyPath) ->
|
remove_handler(ConfKeyPath) ->
|
||||||
emqx_config_handler:remove_handler(ConfKeyPath).
|
emqx_config_handler:remove_handler(ConfKeyPath).
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ init([]) ->
|
||||||
%% `emqx_connector_jwt_sup:ensure_jwt/1' to ensure that a JWT has
|
%% `emqx_connector_jwt_sup:ensure_jwt/1' to ensure that a JWT has
|
||||||
%% been stored, if synchronization is needed.
|
%% been stored, if synchronization is needed.
|
||||||
-spec ensure_worker_present(worker_id(), map()) ->
|
-spec ensure_worker_present(worker_id(), map()) ->
|
||||||
{ok, supervisor:child()} | {error, term()}.
|
{ok, pid()} | {error, term()}.
|
||||||
ensure_worker_present(Id, Config) ->
|
ensure_worker_present(Id, Config) ->
|
||||||
ChildSpec = jwt_worker_child_spec(Id, Config),
|
ChildSpec = jwt_worker_child_spec(Id, Config),
|
||||||
case supervisor:start_child(?MODULE, ChildSpec) of
|
case supervisor:start_child(?MODULE, ChildSpec) of
|
||||||
|
|
|
@ -42,7 +42,7 @@ list_connectors_on_nodes(Nodes) ->
|
||||||
-type key() :: atom() | binary() | [byte()].
|
-type key() :: atom() | binary() | [byte()].
|
||||||
|
|
||||||
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) ->
|
lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
@ -64,7 +64,7 @@ start_connector_to_node(Node, ConnectorType, ConnectorName) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec start_connectors_to_all_nodes([node()], key(), key()) ->
|
-spec start_connectors_to_all_nodes([node()], key(), key()) ->
|
||||||
emqx_rpc:erpc_multicall().
|
emqx_rpc:erpc_multicall(term()).
|
||||||
start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
|
start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
|
||||||
erpc:multicall(
|
erpc:multicall(
|
||||||
Nodes,
|
Nodes,
|
||||||
|
|
|
@ -181,12 +181,12 @@ fields(hasnext) ->
|
||||||
fields(meta) ->
|
fields(meta) ->
|
||||||
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext).
|
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext).
|
||||||
|
|
||||||
-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema_map().
|
-spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema().
|
||||||
schema_with_example(Type, Example) ->
|
schema_with_example(Type, Example) ->
|
||||||
hoconsc:mk(Type, #{examples => #{<<"example">> => Example}}).
|
hoconsc:mk(Type, #{examples => #{<<"example">> => Example}}).
|
||||||
|
|
||||||
-spec schema_with_examples(hocon_schema:type(), map() | list(tuple())) ->
|
-spec schema_with_examples(hocon_schema:type(), map() | list(tuple())) ->
|
||||||
hocon_schema:field_schema_map().
|
hocon_schema:field_schema().
|
||||||
schema_with_examples(Type, Examples) ->
|
schema_with_examples(Type, Examples) ->
|
||||||
hoconsc:mk(Type, #{examples => #{<<"examples">> => Examples}}).
|
hoconsc:mk(Type, #{examples => #{<<"examples">> => Examples}}).
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
stream_rank/0,
|
stream_rank/0,
|
||||||
iterator/0,
|
iterator/0,
|
||||||
message_id/0,
|
message_id/0,
|
||||||
|
message_store_opts/0,
|
||||||
next_result/1, next_result/0,
|
next_result/1, next_result/0,
|
||||||
store_batch_result/0,
|
store_batch_result/0,
|
||||||
make_iterator_result/1, make_iterator_result/0,
|
make_iterator_result/1, make_iterator_result/0,
|
||||||
|
|
|
@ -11,11 +11,17 @@
|
||||||
-export([iteration_options/1]).
|
-export([iteration_options/1]).
|
||||||
-export([default_iteration_options/0]).
|
-export([default_iteration_options/0]).
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
backend_config/0,
|
||||||
|
iteration_options/0
|
||||||
|
]).
|
||||||
|
|
||||||
-type backend_config() ::
|
-type backend_config() ::
|
||||||
{emqx_ds_message_storage_bitmask, emqx_ds_message_storage_bitmask:options()}
|
{emqx_ds_message_storage_bitmask, emqx_ds_storage_bitfield_lts:options()}
|
||||||
| {module(), _Options}.
|
| {module(), _Options}.
|
||||||
|
|
||||||
-export_type([backend_config/0]).
|
-type keyspace() :: atom().
|
||||||
|
-type iteration_options() :: map().
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API funcions
|
%% API funcions
|
||||||
|
@ -23,7 +29,7 @@
|
||||||
|
|
||||||
-define(APP, emqx_ds).
|
-define(APP, emqx_ds).
|
||||||
|
|
||||||
-spec keyspace_config(emqx_ds:keyspace()) -> backend_config().
|
-spec keyspace_config(keyspace()) -> backend_config().
|
||||||
keyspace_config(Keyspace) ->
|
keyspace_config(Keyspace) ->
|
||||||
DefaultKeyspaceConfig = application:get_env(
|
DefaultKeyspaceConfig = application:get_env(
|
||||||
?APP,
|
?APP,
|
||||||
|
@ -33,8 +39,8 @@ keyspace_config(Keyspace) ->
|
||||||
Keyspaces = application:get_env(?APP, keyspace_config, #{}),
|
Keyspaces = application:get_env(?APP, keyspace_config, #{}),
|
||||||
maps:get(Keyspace, Keyspaces, DefaultKeyspaceConfig).
|
maps:get(Keyspace, Keyspaces, DefaultKeyspaceConfig).
|
||||||
|
|
||||||
-spec iteration_options(emqx_ds:keyspace()) ->
|
-spec iteration_options(keyspace()) ->
|
||||||
emqx_ds_message_storage_bitmask:iteration_options().
|
iteration_options().
|
||||||
iteration_options(Keyspace) ->
|
iteration_options(Keyspace) ->
|
||||||
case keyspace_config(Keyspace) of
|
case keyspace_config(Keyspace) of
|
||||||
{emqx_ds_message_storage_bitmask, Config} ->
|
{emqx_ds_message_storage_bitmask, Config} ->
|
||||||
|
@ -43,7 +49,7 @@ iteration_options(Keyspace) ->
|
||||||
default_iteration_options()
|
default_iteration_options()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec default_iteration_options() -> emqx_ds_message_storage_bitmask:iteration_options().
|
-spec default_iteration_options() -> iteration_options().
|
||||||
default_iteration_options() ->
|
default_iteration_options() ->
|
||||||
{emqx_ds_message_storage_bitmask, Config} = default_keyspace_config(),
|
{emqx_ds_message_storage_bitmask, Config} = default_keyspace_config(),
|
||||||
maps:get(iteration, Config).
|
maps:get(iteration, Config).
|
||||||
|
@ -60,7 +66,7 @@ default_keyspace_config() ->
|
||||||
}
|
}
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
-spec db_options(emqx_ds:keyspace()) -> emqx_ds_storage_layer:db_options().
|
-spec db_options(keyspace()) -> emqx_ds_storage_layer:options().
|
||||||
db_options(Keyspace) ->
|
db_options(Keyspace) ->
|
||||||
DefaultDBOptions = application:get_env(?APP, default_db_options, []),
|
DefaultDBOptions = application:get_env(?APP, default_db_options, []),
|
||||||
Keyspaces = application:get_env(?APP, keyspace_config, #{}),
|
Keyspaces = application:get_env(?APP, keyspace_config, #{}),
|
||||||
|
|
|
@ -24,7 +24,12 @@
|
||||||
%% Debug:
|
%% Debug:
|
||||||
-export([trie_next/3, trie_insert/3, dump_to_dot/2]).
|
-export([trie_next/3, trie_insert/3, dump_to_dot/2]).
|
||||||
|
|
||||||
-export_type([options/0, static_key/0, trie/0]).
|
-export_type([
|
||||||
|
options/0,
|
||||||
|
static_key/0,
|
||||||
|
trie/0,
|
||||||
|
msg_storage_key/0
|
||||||
|
]).
|
||||||
|
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@
|
||||||
?enc := emqx_ds_storage_layer:iterator()
|
?enc := emqx_ds_storage_layer:iterator()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type message_id() :: emqx_ds_storage_layer:message_id().
|
-type message_id() :: emqx_ds:message_id().
|
||||||
|
|
||||||
-define(batch_messages, 2).
|
-define(batch_messages, 2).
|
||||||
|
|
||||||
|
@ -219,7 +219,7 @@ do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Opt
|
||||||
emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
|
emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
|
||||||
|
|
||||||
-spec do_get_streams_v1(
|
-spec do_get_streams_v1(
|
||||||
emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
|
||||||
) ->
|
) ->
|
||||||
[{integer(), emqx_ds_storage_layer:stream()}].
|
[{integer(), emqx_ds_storage_layer:stream()}].
|
||||||
do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
|
do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
|
||||||
|
@ -227,7 +227,7 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
|
||||||
|
|
||||||
-spec do_make_iterator_v1(
|
-spec do_make_iterator_v1(
|
||||||
emqx_ds:db(),
|
emqx_ds:db(),
|
||||||
emqx_ds_storage_layer:shard_id(),
|
emqx_ds_replication_layer:shard_id(),
|
||||||
emqx_ds_storage_layer:stream(),
|
emqx_ds_storage_layer:stream(),
|
||||||
emqx_ds:topic_filter(),
|
emqx_ds:topic_filter(),
|
||||||
emqx_ds:time()
|
emqx_ds:time()
|
||||||
|
|
|
@ -280,7 +280,7 @@ in_sync_replicas_trans(DB, Shard) ->
|
||||||
{error, no_shard}
|
{error, no_shard}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec set_leader_trans(emqx_ds:ds(), emqx_ds_replication_layer:shard_id(), node()) ->
|
-spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
|
||||||
ok.
|
ok.
|
||||||
set_leader_trans(DB, Shard, Node) ->
|
set_leader_trans(DB, Shard, Node) ->
|
||||||
[Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
|
[Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
|
||||||
|
|
|
@ -26,7 +26,16 @@
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
-export([db_dir/1]).
|
-export([db_dir/1]).
|
||||||
|
|
||||||
-export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]).
|
-export_type([
|
||||||
|
gen_id/0,
|
||||||
|
generation/0,
|
||||||
|
cf_refs/0,
|
||||||
|
stream/0,
|
||||||
|
iterator/0,
|
||||||
|
shard_id/0,
|
||||||
|
options/0,
|
||||||
|
prototype/0
|
||||||
|
]).
|
||||||
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
@ -64,7 +73,7 @@
|
||||||
?enc := term()
|
?enc := term()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%% Note: this might be stored permanently on a remote node.
|
%% Note: this might be stred permanently on a remote node.
|
||||||
-opaque iterator() ::
|
-opaque iterator() ::
|
||||||
#{
|
#{
|
||||||
?tag := ?IT,
|
?tag := ?IT,
|
||||||
|
@ -109,17 +118,19 @@
|
||||||
%% Shard (runtime):
|
%% Shard (runtime):
|
||||||
-type shard() :: shard(generation()).
|
-type shard() :: shard(generation()).
|
||||||
|
|
||||||
|
-type options() :: map().
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Generation callbacks
|
%% Generation callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
%% Create the new schema given generation id and the options.
|
%% Create the new schema given generation id and the options.
|
||||||
%% Create rocksdb column families.
|
%% Create rocksdb column families.
|
||||||
-callback create(shard_id(), rocksdb:db_handle(), gen_id(), _Options) ->
|
-callback create(shard_id(), rocksdb:db_handle(), gen_id(), Options :: map()) ->
|
||||||
{_Schema, cf_refs()}.
|
{_Schema, cf_refs()}.
|
||||||
|
|
||||||
%% Open the existing schema
|
%% Open the existing schema
|
||||||
-callback open(shard_id(), rocsdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
|
-callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
|
||||||
_Data.
|
_Data.
|
||||||
|
|
||||||
-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
||||||
|
@ -138,7 +149,7 @@
|
||||||
%% API for the replication layer
|
%% API for the replication layer
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec open_shard(shard_id(), emqx_ds:builtin_db_opts()) -> ok.
|
-spec open_shard(shard_id(), options()) -> ok.
|
||||||
open_shard(Shard, Options) ->
|
open_shard(Shard, Options) ->
|
||||||
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
|
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
|
||||||
|
|
||||||
|
@ -215,13 +226,13 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch
|
||||||
|
|
||||||
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
|
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
|
||||||
|
|
||||||
-spec start_link(shard_id(), emqx_ds:builtin_db_opts()) ->
|
-spec start_link(shard_id(), options()) ->
|
||||||
{ok, pid()}.
|
{ok, pid()}.
|
||||||
start_link(Shard = {_, _}, Options) ->
|
start_link(Shard = {_, _}, Options) ->
|
||||||
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
|
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
|
||||||
|
|
||||||
-record(s, {
|
-record(s, {
|
||||||
shard_id :: emqx_ds:shard_id(),
|
shard_id :: shard_id(),
|
||||||
db :: rocksdb:db_handle(),
|
db :: rocksdb:db_handle(),
|
||||||
cf_refs :: cf_refs(),
|
cf_refs :: cf_refs(),
|
||||||
schema :: shard_schema(),
|
schema :: shard_schema(),
|
||||||
|
@ -352,7 +363,7 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB
|
||||||
ok = put_schema_persistent(DB, Schema),
|
ok = put_schema_persistent(DB, Schema),
|
||||||
put_schema_runtime(ShardId, Runtime).
|
put_schema_runtime(ShardId, Runtime).
|
||||||
|
|
||||||
-spec rocksdb_open(shard_id(), emqx_ds:builtin_db_opts()) ->
|
-spec rocksdb_open(shard_id(), options()) ->
|
||||||
{ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
|
{ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
|
||||||
rocksdb_open(Shard, Options) ->
|
rocksdb_open(Shard, Options) ->
|
||||||
DBOptions = [
|
DBOptions = [
|
||||||
|
|
|
@ -30,7 +30,7 @@ start_link() ->
|
||||||
start_shard(Shard, Options) ->
|
start_shard(Shard, Options) ->
|
||||||
supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
|
supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
|
||||||
|
|
||||||
-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
|
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
|
||||||
stop_shard(Shard) ->
|
stop_shard(Shard) ->
|
||||||
ok = supervisor:terminate_child(?SUP, Shard),
|
ok = supervisor:terminate_child(?SUP, Shard),
|
||||||
ok = supervisor:delete_child(?SUP, Shard).
|
ok = supervisor:delete_child(?SUP, Shard).
|
||||||
|
|
|
@ -28,8 +28,7 @@
|
||||||
%% API funcions
|
%% API funcions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec drop_db([node()], emqx_ds:db()) ->
|
-spec drop_db([node()], emqx_ds:db()) -> [emqx_rpc:erpc(ok)].
|
||||||
[{ok, ok} | erpc:caught_call_exception()].
|
|
||||||
drop_db(Node, DB) ->
|
drop_db(Node, DB) ->
|
||||||
erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
|
erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
|
||||||
|
|
||||||
|
@ -65,7 +64,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
|
||||||
emqx_ds_storage_layer:iterator(),
|
emqx_ds_storage_layer:iterator(),
|
||||||
pos_integer()
|
pos_integer()
|
||||||
) ->
|
) ->
|
||||||
{ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]}
|
{ok, emqx_ds_storage_layer:iterator(), [emqx_types:message()]}
|
||||||
| {ok, end_of_stream}
|
| {ok, end_of_stream}
|
||||||
| {error, _}.
|
| {error, _}.
|
||||||
next(Node, DB, Shard, Iter, BatchSize) ->
|
next(Node, DB, Shard, Iter, BatchSize) ->
|
||||||
|
|
|
@ -72,6 +72,10 @@
|
||||||
import_config/1
|
import_config/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
server_name/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% Running servers
|
%% Running servers
|
||||||
-type state() :: #{servers := servers()}.
|
-type state() :: #{servers := servers()}.
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,10 @@
|
||||||
shutdown => Timeout
|
shutdown => Timeout
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
%% TODO: export_type:
|
||||||
|
%% grpc_client_sup:options/0
|
||||||
|
-type grpc_client_sup_options() :: map().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Supervisor APIs & Callbacks
|
%% Supervisor APIs & Callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -59,7 +63,7 @@ init([]) ->
|
||||||
-spec start_grpc_client_channel(
|
-spec start_grpc_client_channel(
|
||||||
binary(),
|
binary(),
|
||||||
uri_string:uri_string(),
|
uri_string:uri_string(),
|
||||||
grpc_client_sup:options()
|
grpc_client_sup_options()
|
||||||
) -> {ok, pid()} | {error, term()}.
|
) -> {ok, pid()} | {error, term()}.
|
||||||
start_grpc_client_channel(Name, SvrAddr, Options) ->
|
start_grpc_client_channel(Name, SvrAddr, Options) ->
|
||||||
grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).
|
grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_ft, [
|
{application, emqx_ft, [
|
||||||
{description, "EMQX file transfer over MQTT"},
|
{description, "EMQX file transfer over MQTT"},
|
||||||
{vsn, "0.1.9"},
|
{vsn, "0.1.10"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_ft_app, []}},
|
{mod, {emqx_ft_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -220,7 +220,7 @@ error_desc('SERVICE_UNAVAILABLE') ->
|
||||||
roots() ->
|
roots() ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
-spec fields(hocon_schema:name()) -> [hoconsc:field()].
|
-spec fields(hocon_schema:name()) -> [hocon_schema:field()].
|
||||||
fields(client_id) ->
|
fields(client_id) ->
|
||||||
[
|
[
|
||||||
{clientid,
|
{clientid,
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
%% Internal API
|
%% Internal API
|
||||||
-export([exporter/1]).
|
-export([exporter/1]).
|
||||||
|
|
||||||
-export_type([export/0]).
|
-export_type([export/0, exporter_conf/0]).
|
||||||
|
|
||||||
-type storage() :: emqx_ft_storage_fs:storage() | undefined.
|
-type storage() :: emqx_ft_storage_fs:storage() | undefined.
|
||||||
-type transfer() :: emqx_ft:transfer().
|
-type transfer() :: emqx_ft:transfer().
|
||||||
|
|
|
@ -56,6 +56,7 @@
|
||||||
-export_type([filefrag/1]).
|
-export_type([filefrag/1]).
|
||||||
-export_type([filefrag/0]).
|
-export_type([filefrag/0]).
|
||||||
-export_type([transferinfo/0]).
|
-export_type([transferinfo/0]).
|
||||||
|
-export_type([segmentinfo/0]).
|
||||||
|
|
||||||
-export_type([file_error/0]).
|
-export_type([file_error/0]).
|
||||||
|
|
||||||
|
@ -104,7 +105,7 @@
|
||||||
type := 'local',
|
type := 'local',
|
||||||
enable := true,
|
enable := true,
|
||||||
segments := segments(),
|
segments := segments(),
|
||||||
exporter := emqx_ft_storage_exporter:exporter()
|
exporter := emqx_ft_storage_exporter:exporter_conf()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type file_error() ::
|
-type file_error() ::
|
||||||
|
|
|
@ -66,14 +66,14 @@ table(ReaderPid, Bytes) when is_pid(ReaderPid) andalso is_integer(Bytes) andalso
|
||||||
end,
|
end,
|
||||||
qlc:table(fun() -> NextFun(ReaderPid) end, []).
|
qlc:table(fun() -> NextFun(ReaderPid) end, []).
|
||||||
|
|
||||||
-spec start_link(pid(), filename:filename()) -> startlink_ret().
|
-spec start_link(pid(), file:name()) -> startlink_ret().
|
||||||
start_link(CallerPid, Filename) when
|
start_link(CallerPid, Filename) when
|
||||||
is_pid(CallerPid) andalso
|
is_pid(CallerPid) andalso
|
||||||
?IS_FILENAME(Filename)
|
?IS_FILENAME(Filename)
|
||||||
->
|
->
|
||||||
gen_server:start_link(?MODULE, [CallerPid, Filename], []).
|
gen_server:start_link(?MODULE, [CallerPid, Filename], []).
|
||||||
|
|
||||||
-spec start_supervised(pid(), filename:filename()) -> startlink_ret().
|
-spec start_supervised(pid(), file:name()) -> startlink_ret().
|
||||||
start_supervised(CallerPid, Filename) when
|
start_supervised(CallerPid, Filename) when
|
||||||
is_pid(CallerPid) andalso
|
is_pid(CallerPid) andalso
|
||||||
?IS_FILENAME(Filename)
|
?IS_FILENAME(Filename)
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
%% module if it integrated with emqx_gateway_conn module
|
%% module if it integrated with emqx_gateway_conn module
|
||||||
-module(emqx_gateway_channel).
|
-module(emqx_gateway_channel).
|
||||||
|
|
||||||
-export_type([gen_server_from/0]).
|
-export_type([gen_server_from/0, channel/0]).
|
||||||
|
|
||||||
-type channel() :: any().
|
-type channel() :: any().
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@
|
||||||
%% Init
|
%% Init
|
||||||
|
|
||||||
%% @doc Initialize the channel state
|
%% @doc Initialize the channel state
|
||||||
-callback init(emqx_types:conniinfo(), map()) -> channel().
|
-callback init(emqx_types:conninfo(), map()) -> channel().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Handles
|
%% Handles
|
||||||
|
@ -49,8 +49,8 @@
|
||||||
-type gen_server_from() :: {pid(), Tag :: term()}.
|
-type gen_server_from() :: {pid(), Tag :: term()}.
|
||||||
|
|
||||||
-type reply() ::
|
-type reply() ::
|
||||||
{outgoing, emqx_gateway_frame:packet()}
|
{outgoing, emqx_gateway_frame:frame()}
|
||||||
| {outgoing, [emqx_gateway_frame:packet()]}
|
| {outgoing, [emqx_gateway_frame:frame()]}
|
||||||
| {event, conn_state() | updated}
|
| {event, conn_state() | updated}
|
||||||
| {close, Reason :: atom()}.
|
| {close, Reason :: atom()}.
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@
|
||||||
%% Parse State
|
%% Parse State
|
||||||
parse_state :: emqx_gateway_frame:parse_state(),
|
parse_state :: emqx_gateway_frame:parse_state(),
|
||||||
%% Serialize options
|
%% Serialize options
|
||||||
serialize :: emqx_gateway_frame:serialize_opts(),
|
serialize :: emqx_gateway_frame:serialize_options(),
|
||||||
%% Channel State
|
%% Channel State
|
||||||
channel :: emqx_gateway_channel:channel(),
|
channel :: emqx_gateway_channel:channel(),
|
||||||
%% GC State
|
%% GC State
|
||||||
|
|
|
@ -22,6 +22,13 @@
|
||||||
%%
|
%%
|
||||||
-module(emqx_gateway_frame).
|
-module(emqx_gateway_frame).
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
parse_state/0,
|
||||||
|
parse_result/0,
|
||||||
|
serialize_options/0,
|
||||||
|
frame/0
|
||||||
|
]).
|
||||||
|
|
||||||
-type parse_state() :: map().
|
-type parse_state() :: map().
|
||||||
|
|
||||||
-type frame() :: any().
|
-type frame() :: any().
|
||||||
|
@ -32,13 +39,6 @@
|
||||||
|
|
||||||
-type serialize_options() :: map().
|
-type serialize_options() :: map().
|
||||||
|
|
||||||
-export_type([
|
|
||||||
parse_state/0,
|
|
||||||
parse_result/0,
|
|
||||||
serialize_options/0,
|
|
||||||
frame/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% Callbacks
|
%% Callbacks
|
||||||
|
|
||||||
%% @doc Initial the frame parser states
|
%% @doc Initial the frame parser states
|
||||||
|
|
|
@ -42,6 +42,8 @@
|
||||||
code_change/3
|
code_change/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([descriptor/0]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
reged = #{} :: #{gateway_name() => descriptor()}
|
reged = #{} :: #{gateway_name() => descriptor()}
|
||||||
}).
|
}).
|
||||||
|
|
|
@ -46,6 +46,8 @@
|
||||||
quote_mysql/1
|
quote_mysql/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([tmpl_token/0]).
|
||||||
|
|
||||||
-define(PH_VAR_THIS, '$this').
|
-define(PH_VAR_THIS, '$this').
|
||||||
|
|
||||||
%% To match any pattern starts with '$' and followed by '{', and closed by a '}' char:
|
%% To match any pattern starts with '$' and followed by '{', and closed by a '}' char:
|
||||||
|
|
Loading…
Reference in New Issue