chore(exhook): reformat exhook codes

This commit is contained in:
firest 2022-04-13 17:30:37 +08:00
parent cfb6c4b0be
commit 1a4afabe9f
19 changed files with 2299 additions and 1613 deletions

View File

@ -21,26 +21,26 @@
-define(HOOKS_REF_COUNTER, emqx_exhook_ref_counter).
-define(HOOKS_METRICS, emqx_exhook_metrics).
-define(ENABLED_HOOKS,
[ {'client.connect', {emqx_exhook_handler, on_client_connect, []}}
, {'client.connack', {emqx_exhook_handler, on_client_connack, []}}
, {'client.connected', {emqx_exhook_handler, on_client_connected, []}}
, {'client.disconnected', {emqx_exhook_handler, on_client_disconnected, []}}
, {'client.authenticate', {emqx_exhook_handler, on_client_authenticate, []}}
, {'client.authorize', {emqx_exhook_handler, on_client_authorize, []}}
, {'client.subscribe', {emqx_exhook_handler, on_client_subscribe, []}}
, {'client.unsubscribe', {emqx_exhook_handler, on_client_unsubscribe, []}}
, {'session.created', {emqx_exhook_handler, on_session_created, []}}
, {'session.subscribed', {emqx_exhook_handler, on_session_subscribed, []}}
, {'session.unsubscribed',{emqx_exhook_handler, on_session_unsubscribed, []}}
, {'session.resumed', {emqx_exhook_handler, on_session_resumed, []}}
, {'session.discarded', {emqx_exhook_handler, on_session_discarded, []}}
, {'session.takenover', {emqx_exhook_handler, on_session_takenover, []}}
, {'session.terminated', {emqx_exhook_handler, on_session_terminated, []}}
, {'message.publish', {emqx_exhook_handler, on_message_publish, []}}
, {'message.delivered', {emqx_exhook_handler, on_message_delivered, []}}
, {'message.acked', {emqx_exhook_handler, on_message_acked, []}}
, {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
-define(ENABLED_HOOKS, [
{'client.connect', {emqx_exhook_handler, on_client_connect, []}},
{'client.connack', {emqx_exhook_handler, on_client_connack, []}},
{'client.connected', {emqx_exhook_handler, on_client_connected, []}},
{'client.disconnected', {emqx_exhook_handler, on_client_disconnected, []}},
{'client.authenticate', {emqx_exhook_handler, on_client_authenticate, []}},
{'client.authorize', {emqx_exhook_handler, on_client_authorize, []}},
{'client.subscribe', {emqx_exhook_handler, on_client_subscribe, []}},
{'client.unsubscribe', {emqx_exhook_handler, on_client_unsubscribe, []}},
{'session.created', {emqx_exhook_handler, on_session_created, []}},
{'session.subscribed', {emqx_exhook_handler, on_session_subscribed, []}},
{'session.unsubscribed', {emqx_exhook_handler, on_session_unsubscribed, []}},
{'session.resumed', {emqx_exhook_handler, on_session_resumed, []}},
{'session.discarded', {emqx_exhook_handler, on_session_discarded, []}},
{'session.takenover', {emqx_exhook_handler, on_session_takenover, []}},
{'session.terminated', {emqx_exhook_handler, on_session_terminated, []}},
{'message.publish', {emqx_exhook_handler, on_message_publish, []}},
{'message.delivered', {emqx_exhook_handler, on_message_delivered, []}},
{'message.acked', {emqx_exhook_handler, on_message_acked, []}},
{'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
]).
-endif.

View File

@ -1,42 +1,57 @@
%%-*- mode: erlang -*-
{plugins,
[rebar3_proper,
{plugins, [
rebar3_proper,
{grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
]}.
{deps,
[ {emqx, {path, "../emqx"}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
{deps, [
{emqx, {path, "../emqx"}},
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
]}.
{grpc,
[{protos, ["priv/protos"]},
{gpb_opts, [{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}]}
{grpc, [
{protos, ["priv/protos"]},
{gpb_opts, [
{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}
]}
]}.
{provider_hooks,
[{pre, [{compile, {grpc, gen}},
{clean, {grpc, clean}}]}
{provider_hooks, [
{pre, [
{compile, {grpc, gen}},
{clean, {grpc, clean}}
]}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
{erl_opts, [
warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{parse_transform}
]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{xref_checks, [
undefined_function_calls,
undefined_functions,
locals_not_used,
deprecated_function_calls,
warnings_as_errors,
deprecated_functions
]}.
{xref_ignores, [emqx_exhook_pb]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{cover_excl_mods, [emqx_exhook_pb,
{cover_excl_mods, [
emqx_exhook_pb,
emqx_exhook_v_1_hook_provider_bhvr,
emqx_exhook_v_1_hook_provider_client]}.
emqx_exhook_v_1_hook_provider_client
]}.
{project_plugins, [erlfmt]}.

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang -*-
{application, emqx_exhook,
[{description, "EMQX Extension for Hook"},
{application, emqx_exhook, [
{description, "EMQX Extension for Hook"},
{vsn, "5.0.0"},
{modules, []},
{registered, []},

View File

@ -5,5 +5,4 @@
],
[
{<<".*">>, []}
]
}.
]}.

View File

@ -19,8 +19,9 @@
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ cast/2
, call_fold/3
-export([
cast/2,
call_fold/3
]).
%% exported for `emqx_telemetry'
@ -38,11 +39,15 @@ cast(_, _, []) ->
ok;
cast(Hookpoint, Req, [ServerName | More]) ->
%% XXX: Need a real asynchronous running
_ = emqx_exhook_server:call(Hookpoint, Req,
emqx_exhook_mgr:server(ServerName)),
_ = emqx_exhook_server:call(
Hookpoint,
Req,
emqx_exhook_mgr:server(ServerName)
),
cast(Hookpoint, Req, More).
-spec call_fold(atom(), term(), function()) -> {ok, term()}
-spec call_fold(atom(), term(), function()) ->
{ok, term()}
| {stop, term()}.
call_fold(Hookpoint, Req, AccFun) ->
case emqx_exhook_mgr:running() of
@ -90,12 +95,18 @@ deny_action_result('message.publish', Msg) ->
%%--------------------------------------------------------------------
-spec get_basic_usage_info() ->
#{ num_servers => non_neg_integer()
, servers =>
[#{ driver => Driver
, hooks => [emqx_exhook_server:hookpoint()]
}]
} when Driver :: grpc.
#{
num_servers => non_neg_integer(),
servers =>
[
#{
driver => Driver,
hooks => [emqx_exhook_server:hookpoint()]
}
]
}
when
Driver :: grpc.
get_basic_usage_info() ->
try
Servers = emqx_exhook_mgr:running(),
@ -105,18 +116,22 @@ get_basic_usage_info() ->
fun(ServerName) ->
Hooks = emqx_exhook_mgr:hooks(ServerName),
HookNames = lists:map(fun(#{name := Name}) -> Name end, Hooks),
#{ hooks => HookNames
, %% currently, only grpc driver exists.
#{
hooks => HookNames,
%% currently, only grpc driver exists.
driver => grpc
}
end,
Servers),
#{ num_servers => NumServers
, servers => ServerInfo
Servers
),
#{
num_servers => NumServers,
servers => ServerInfo
}
catch
_:_ ->
#{ num_servers => 0
, servers => []
#{
num_servers => 0,
servers => []
}
end.

View File

@ -33,10 +33,13 @@
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(BAD_RPC, 'BAD_RPC').
-dialyzer([{nowarn_function, [ fill_cluster_server_info/5
, nodes_server_info/5
, fill_server_hooks_info/4
]}]).
-dialyzer([
{nowarn_function, [
fill_cluster_server_info/5,
nodes_server_info/5,
fill_server_hooks_info/4
]}
]).
%%--------------------------------------------------------------------
%% schema
@ -51,68 +54,82 @@ paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move", "/exhooks/:name
schema(("/exhooks")) ->
#{
'operationId' => exhooks,
get => #{tags => ?TAGS,
get => #{
tags => ?TAGS,
desc => <<"List all servers">>,
responses => #{200 => mk(array(ref(detail_server_info)), #{})}
},
post => #{tags => ?TAGS,
post => #{
tags => ?TAGS,
desc => <<"Add a servers">>,
'requestBody' => server_conf_schema(),
responses => #{201 => mk(ref(detail_server_info), #{}),
responses => #{
201 => mk(ref(detail_server_info), #{}),
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
}
}
};
schema("/exhooks/:name") ->
#{'operationId' => action_with_name,
get => #{tags => ?TAGS,
#{
'operationId' => action_with_name,
get => #{
tags => ?TAGS,
desc => <<"Get the detail information of server">>,
parameters => params_server_name_in_path(),
responses => #{200 => mk(ref(detail_server_info), #{}),
responses => #{
200 => mk(ref(detail_server_info), #{}),
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
}
},
put => #{tags => ?TAGS,
put => #{
tags => ?TAGS,
desc => <<"Update the server">>,
parameters => params_server_name_in_path(),
'requestBody' => server_conf_schema(),
responses => #{200 => <<>>,
responses => #{
200 => <<>>,
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
}
},
delete => #{tags => ?TAGS,
delete => #{
tags => ?TAGS,
desc => <<"Delete the server">>,
parameters => params_server_name_in_path(),
responses => #{204 => <<>>,
responses => #{
204 => <<>>,
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
}
}
};
schema("/exhooks/:name/hooks") ->
#{'operationId' => server_hooks,
get => #{tags => ?TAGS,
#{
'operationId' => server_hooks,
get => #{
tags => ?TAGS,
desc => <<"Get the hooks information of server">>,
parameters => params_server_name_in_path(),
responses => #{200 => mk(array(ref(list_hook_info)), #{}),
responses => #{
200 => mk(array(ref(list_hook_info)), #{}),
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
}
}
};
schema("/exhooks/:name/move") ->
#{'operationId' => move,
post => #{tags => ?TAGS,
#{
'operationId' => move,
post => #{
tags => ?TAGS,
desc =>
<<"Move the server.\n",
"NOTE: The position should be \"front|rear|before:{name}|after:{name}\"\n">>,
parameters => params_server_name_in_path(),
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(move_req),
position_example()),
responses => #{204 => <<"No Content">>,
position_example()
),
responses => #{
204 => <<"No Content">>,
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
}
@ -120,72 +137,90 @@ schema("/exhooks/:name/move") ->
}.
fields(move_req) ->
[{position, mk(string(), #{ desc => <<"The target position to be moved.">>
, example => <<"front">>})}];
[
{position,
mk(string(), #{
desc => <<"The target position to be moved.">>,
example => <<"front">>
})}
];
fields(detail_server_info) ->
[ {metrics, mk(ref(metrics), #{})}
, {node_metrics, mk(array(ref(node_metrics)), #{})}
, {node_status, mk(array(ref(node_status)), #{})}
, {hooks, mk(array(ref(hook_info)), #{})}
[
{metrics, mk(ref(metrics), #{})},
{node_metrics, mk(array(ref(node_metrics)), #{})},
{node_status, mk(array(ref(node_status)), #{})},
{hooks, mk(array(ref(hook_info)), #{})}
] ++ emqx_exhook_schema:server_config();
fields(list_hook_info) ->
[ {name, mk(binary(), #{desc => <<"The hook's name">>})}
, {params, mk(map(name, binary()),
#{desc => <<"The parameters used when the hook is registered">>})}
, {metrics, mk(ref(metrics), #{})}
, {node_metrics, mk(array(ref(node_metrics)), #{})}
[
{name, mk(binary(), #{desc => <<"The hook's name">>})},
{params,
mk(
map(name, binary()),
#{desc => <<"The parameters used when the hook is registered">>}
)},
{metrics, mk(ref(metrics), #{})},
{node_metrics, mk(array(ref(node_metrics)), #{})}
];
fields(node_metrics) ->
[ {node, mk(string(), #{})}
, {metrics, mk(ref(metrics), #{})}
[
{node, mk(string(), #{})},
{metrics, mk(ref(metrics), #{})}
];
fields(node_status) ->
[ {node, mk(string(), #{})}
, {status, mk(enum([running, waiting, stopped, error]), #{})}
[
{node, mk(string(), #{})},
{status, mk(enum([running, waiting, stopped, error]), #{})}
];
fields(hook_info) ->
[ {name, mk(binary(), #{desc => <<"The hook's name">>})}
, {params, mk(map(name, binary()),
#{desc => <<"The parameters used when the hook is registered">>})}
[
{name, mk(binary(), #{desc => <<"The hook's name">>})},
{params,
mk(
map(name, binary()),
#{desc => <<"The parameters used when the hook is registered">>}
)}
];
fields(metrics) ->
[ {succeed, mk(integer(), #{})}
, {failed, mk(integer(), #{})}
, {rate, mk(integer(), #{})}
, {max_rate, mk(integer(), #{})}
[
{succeed, mk(integer(), #{})},
{failed, mk(integer(), #{})},
{rate, mk(integer(), #{})},
{max_rate, mk(integer(), #{})}
];
fields(server_config) ->
emqx_exhook_schema:server_config().
params_server_name_in_path() ->
[{name, mk(string(), #{in => path,
[
{name,
mk(string(), #{
in => path,
required => true,
example => <<"default">>})}
example => <<"default">>
})}
].
server_conf_schema() ->
SSL = #{ enable => false
, cacertfile => emqx:cert_file(<<"cacert.pem">>)
, certfile => emqx:cert_file(<<"cert.pem">>)
, keyfile => emqx:cert_file(<<"key.pem">>)
SSL = #{
enable => false,
cacertfile => emqx:cert_file(<<"cacert.pem">>),
certfile => emqx:cert_file(<<"cert.pem">>),
keyfile => emqx:cert_file(<<"key.pem">>)
},
schema_with_example(ref(server_config),
#{ name => "default"
, enable => true
, url => <<"http://127.0.0.1:8081">>
, request_timeout => "5s"
, failed_action => deny
, auto_reconnect => "60s"
, pool_size => 8
, ssl => SSL
}).
schema_with_example(
ref(server_config),
#{
name => "default",
enable => true,
url => <<"http://127.0.0.1:8081">>,
request_timeout => "5s",
failed_action => deny,
auto_reconnect => "60s",
pool_size => 8,
ssl => SSL
}
).
%%--------------------------------------------------------------------
%% API
@ -194,7 +229,6 @@ exhooks(get, _) ->
Confs = emqx:get_config([exhook, servers]),
Infos = nodes_all_server_info(Confs),
{200, Infos};
exhooks(post, #{body := Body}) ->
{ok, _} = emqx_exhook_mgr:update_config([exhook, servers], {add, Body}),
#{<<"name">> := Name} = Body,
@ -202,34 +236,45 @@ exhooks(post, #{body := Body}) ->
action_with_name(get, #{bindings := #{name := Name}}) ->
get_nodes_server_info(Name);
action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
case emqx_exhook_mgr:update_config([exhook, servers],
{update, Name, Body}) of
case
emqx_exhook_mgr:update_config(
[exhook, servers],
{update, Name, Body}
)
of
{ok, not_found} ->
{400, #{code => <<"BAD_REQUEST">>,
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Server not found">>
}};
{ok, {error, Reason}} ->
{400, #{code => <<"BAD_REQUEST">>,
{400, #{
code => <<"BAD_REQUEST">>,
message => unicode:characters_to_binary(
io_lib:format("Error Reason:~p~n", [Reason]))
io_lib:format("Error Reason:~p~n", [Reason])
)
}};
{ok, _} ->
{200};
{error, Error} ->
{500, #{code => <<"BAD_RPC">>,
{500, #{
code => <<"BAD_RPC">>,
message => Error
}}
end;
action_with_name(delete, #{bindings := #{name := Name}}) ->
case emqx_exhook_mgr:update_config([exhook, servers],
{delete, Name}) of
case
emqx_exhook_mgr:update_config(
[exhook, servers],
{delete, Name}
)
of
{ok, _} ->
{200};
{error, Error} ->
{500, #{code => <<"BAD_RPC">>,
{500, #{
code => <<"BAD_RPC">>,
message => Error
}}
end.
@ -237,21 +282,28 @@ action_with_name(delete, #{bindings := #{name := Name}}) ->
move(post, #{bindings := #{name := Name}, body := #{<<"position">> := RawPosition}}) ->
case parse_position(RawPosition) of
{ok, Position} ->
case emqx_exhook_mgr:update_config([exhook, servers],
{move, Name, Position}) of
case
emqx_exhook_mgr:update_config(
[exhook, servers],
{move, Name, Position}
)
of
{ok, ok} ->
{204};
{ok, not_found} ->
{400, #{code => <<"BAD_REQUEST">>,
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Server not found">>
}};
{error, Error} ->
{500, #{code => <<"BAD_RPC">>,
{500, #{
code => <<"BAD_RPC">>,
message => Error
}}
end;
{error, invalid_position} ->
{400, #{code => <<"BAD_REQUEST">>,
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Invalid Position">>
}}
end.
@ -260,7 +312,8 @@ server_hooks(get, #{bindings := #{name := Name}}) ->
Confs = emqx:get_config([exhook, servers]),
case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
false ->
{400, #{code => <<"BAD_REQUEST">>,
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Server not found">>
}};
_ ->
@ -272,7 +325,8 @@ get_nodes_server_info(Name) ->
Confs = emqx:get_config([exhook, servers]),
case lists:search(fun(#{name := CfgName}) -> CfgName =:= Name end, Confs) of
false ->
{400, #{code => <<"BAD_REQUEST">>,
{400, #{
code => <<"BAD_REQUEST">>,
message => <<"Server not found">>
}};
{value, Conf} ->
@ -292,17 +346,17 @@ node_all_server_info([#{name := ServerName} = Conf | T], AllInfos, Default, Acc)
Info = fill_cluster_server_info(AllInfos, [], [], ServerName, Default),
AllInfo = maps:merge(Conf, Info),
node_all_server_info(T, AllInfos, Default, [AllInfo | Acc]);
node_all_server_info([], _, _, Acc) ->
lists:reverse(Acc).
fill_cluster_server_info([{Node, {error, _}} | T], StatusL, MetricsL, ServerName, Default) ->
fill_cluster_server_info(T,
fill_cluster_server_info(
T,
[#{node => Node, status => error} | StatusL],
[#{node => Node, metrics => Default} | MetricsL],
ServerName,
Default);
Default
);
fill_cluster_server_info([{Node, Result} | T], StatusL, MetricsL, ServerName, Default) ->
#{status := Status, metrics := Metrics} = Result,
fill_cluster_server_info(
@ -310,11 +364,12 @@ fill_cluster_server_info([{Node, Result} | T], StatusL, MetricsL, ServerName, De
[#{node => Node, status => maps:get(ServerName, Status, error)} | StatusL],
[#{node => Node, metrics => maps:get(ServerName, Metrics, Default)} | MetricsL],
ServerName,
Default);
Default
);
fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
#{metrics => Metrics,
#{
metrics => Metrics,
node_metrics => MetricsL,
node_status => StatusL,
hooks => emqx_exhook_mgr:hooks(ServerName)
@ -329,24 +384,25 @@ nodes_server_info(Name) ->
nodes_server_info(InfoL, Name, Default, [], []).
nodes_server_info([{Node, {error, _}} | T], Name, Default, StatusL, MetricsL) ->
nodes_server_info(T,
nodes_server_info(
T,
Name,
Default,
[#{node => Node, status => error} | StatusL],
[#{node => Node, metrics => Default} | MetricsL]
);
nodes_server_info([{Node, Result} | T], Name, Default, StatusL, MetricsL) ->
#{status := Status, metrics := Metrics} = Result,
nodes_server_info(T,
nodes_server_info(
T,
Name,
Default,
[#{node => Node, status => Status} | StatusL],
[#{node => Node, metrics => Metrics} | MetricsL]
);
nodes_server_info([], Name, _, StatusL, MetricsL) ->
#{metrics => emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
#{
metrics => emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
node_status => StatusL,
node_metrics => MetricsL,
hooks => emqx_exhook_mgr:hooks(Name)
@ -357,7 +413,8 @@ nodes_server_info([], Name, _, StatusL, MetricsL) ->
%%--------------------------------------------------------------------
get_nodes_server_hooks_info(Name) ->
case emqx_exhook_mgr:hooks(Name) of
[] -> [];
[] ->
[];
Hooks ->
AllInfos = call_cluster(fun(Nodes) ->
emqx_exhook_proto_v1:server_hooks_metrics(Nodes, Name)
@ -370,18 +427,15 @@ get_nodes_server_hooks_info([#{name := Name} = Spec | T], AllInfos, Default, Acc
Info = fill_server_hooks_info(AllInfos, Name, Default, []),
AllInfo = maps:merge(Spec, Info),
get_nodes_server_hooks_info(T, AllInfos, Default, [AllInfo | Acc]);
get_nodes_server_hooks_info([], _, _, Acc) ->
Acc.
fill_server_hooks_info([{_, {error, _}} | T], Name, Default, MetricsL) ->
fill_server_hooks_info(T, Name, Default, MetricsL);
fill_server_hooks_info([{Node, MetricsMap} | T], Name, Default, MetricsL) ->
Metrics = maps:get(Name, MetricsMap, Default),
NodeMetrics = #{node => Node, metrics => Metrics},
fill_server_hooks_info(T, Name, Default, [NodeMetrics | MetricsL]);
fill_server_hooks_info([], _Name, _Default, MetricsL) ->
Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
#{metrics => Metrics, node_metrics => MetricsL}.
@ -397,24 +451,32 @@ call_cluster(Fun) ->
Ret = Fun(Nodes),
lists:zip(Nodes, lists:map(fun emqx_rpc:unwrap_erpc/1, Ret)).
%%--------------------------------------------------------------------
%% Internal Funcs
%%--------------------------------------------------------------------
position_example() ->
#{ front =>
#{ summary => <<"absolute position 'front'">>
, value => #{<<"position">> => <<"front">>}}
, rear =>
#{ summary => <<"absolute position 'rear'">>
, value => #{<<"position">> => <<"rear">>}}
, related_before =>
#{ summary => <<"relative position 'before'">>
, value => #{<<"position">> => <<"before:default">>}}
, related_after =>
#{ summary => <<"relative position 'after'">>
, value => #{<<"position">> => <<"after:default">>}}
#{
front =>
#{
summary => <<"absolute position 'front'">>,
value => #{<<"position">> => <<"front">>}
},
rear =>
#{
summary => <<"absolute position 'rear'">>,
value => #{<<"position">> => <<"rear">>}
},
related_before =>
#{
summary => <<"relative position 'before'">>,
value => #{<<"position">> => <<"before:default">>}
},
related_after =>
#{
summary => <<"relative position 'after'">>,
value => #{<<"position">> => <<"after:default">>}
}
}.
parse_position(<<"front">>) ->

View File

@ -20,9 +20,10 @@
-include("emqx_exhook.hrl").
-export([ start/2
, stop/1
, prep_stop/1
-export([
start/2,
stop/1,
prep_stop/1
]).
%%--------------------------------------------------------------------

View File

@ -20,47 +20,53 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ on_client_connect/2
, on_client_connack/3
, on_client_connected/2
, on_client_disconnected/3
, on_client_authenticate/2
, on_client_authorize/4
, on_client_subscribe/3
, on_client_unsubscribe/3
-export([
on_client_connect/2,
on_client_connack/3,
on_client_connected/2,
on_client_disconnected/3,
on_client_authenticate/2,
on_client_authorize/4,
on_client_subscribe/3,
on_client_unsubscribe/3
]).
%% Session Lifecircle Hooks
-export([ on_session_created/2
, on_session_subscribed/3
, on_session_unsubscribed/3
, on_session_resumed/2
, on_session_discarded/2
, on_session_takenover/2
, on_session_terminated/3
-export([
on_session_created/2,
on_session_subscribed/3,
on_session_unsubscribed/3,
on_session_resumed/2,
on_session_discarded/2,
on_session_takenover/2,
on_session_terminated/3
]).
-export([ on_message_publish/1
, on_message_dropped/3
, on_message_delivered/2
, on_message_acked/2
-export([
on_message_publish/1,
on_message_dropped/3,
on_message_delivered/2,
on_message_acked/2
]).
%% Utils
-export([ message/1
, headers/1
, stringfy/1
, merge_responsed_bool/2
, merge_responsed_message/2
, assign_to_message/2
, clientinfo/1
-export([
message/1,
headers/1,
stringfy/1,
merge_responsed_bool/2,
merge_responsed_message/2,
assign_to_message/2,
clientinfo/1
]).
-import(emqx_exhook,
[ cast/2
, call_fold/3
]).
-import(
emqx_exhook,
[
cast/2,
call_fold/3
]
).
-elvis([{elvis_style, god_modules, disable}]).
@ -69,15 +75,18 @@
%%--------------------------------------------------------------------
on_client_connect(ConnInfo, Props) ->
Req = #{conninfo => conninfo(ConnInfo),
Req = #{
conninfo => conninfo(ConnInfo),
props => properties(Props)
},
cast('client.connect', Req).
on_client_connack(ConnInfo, Rc, Props) ->
Req = #{conninfo => conninfo(ConnInfo),
Req = #{
conninfo => conninfo(ConnInfo),
result_code => stringfy(Rc),
props => properties(Props)},
props => properties(Props)
},
cast('client.connack', Req).
on_client_connected(ClientInfo, _ConnInfo) ->
@ -85,7 +94,8 @@ on_client_connected(ClientInfo, _ConnInfo) ->
cast('client.connected', Req).
on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
reason => stringfy(Reason)
},
cast('client.disconnected', Req).
@ -98,14 +108,24 @@ on_client_authenticate(ClientInfo, AuthResult) ->
%% detailed info too.
%%
Bool = AuthResult == ok,
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
result => Bool
},
case call_fold('client.authenticate', Req,
fun merge_responsed_bool/2) of
case
call_fold(
'client.authenticate',
Req,
fun merge_responsed_bool/2
)
of
{StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
Result = case Result0 of true -> ok; _ -> {error, not_authorized} end,
Result =
case Result0 of
true -> ok;
_ -> {error, not_authorized}
end,
{StopOrOk, Result};
_ ->
{ok, AuthResult}
@ -113,32 +133,46 @@ on_client_authenticate(ClientInfo, AuthResult) ->
on_client_authorize(ClientInfo, PubSub, Topic, Result) ->
Bool = Result == allow,
Type = case PubSub of
Type =
case PubSub of
publish -> 'PUBLISH';
subscribe -> 'SUBSCRIBE'
end,
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
type => Type,
topic => Topic,
result => Bool
},
case call_fold('client.authorize', Req,
fun merge_responsed_bool/2) of
case
call_fold(
'client.authorize',
Req,
fun merge_responsed_bool/2
)
of
{StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
NResult = case Result0 of true -> allow; _ -> deny end,
NResult =
case Result0 of
true -> allow;
_ -> deny
end,
{StopOrOk, NResult};
_ -> {ok, Result}
_ ->
{ok, Result}
end.
on_client_subscribe(ClientInfo, Props, TopicFilters) ->
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
props => properties(Props),
topic_filters => topicfilters(TopicFilters)
},
cast('client.subscribe', Req).
on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
props => properties(Props),
topic_filters => topicfilters(TopicFilters)
},
@ -153,14 +187,16 @@ on_session_created(ClientInfo, _SessInfo) ->
cast('session.created', Req).
on_session_subscribed(ClientInfo, Topic, SubOpts) ->
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
topic => Topic,
subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
},
cast('session.subscribed', Req).
on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
topic => Topic
},
cast('session.unsubscribed', Req).
@ -178,8 +214,10 @@ on_session_takenover(ClientInfo, _SessInfo) ->
cast('session.takenover', Req).
on_session_terminated(ClientInfo, Reason, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo),
reason => stringfy(Reason)},
Req = #{
clientinfo => clientinfo(ClientInfo),
reason => stringfy(Reason)
},
cast('session.terminated', Req).
%%--------------------------------------------------------------------
@ -190,17 +228,24 @@ on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_publish(Message) ->
Req = #{message => message(Message)},
case call_fold('message.publish', Req,
fun emqx_exhook_handler:merge_responsed_message/2) of
case
call_fold(
'message.publish',
Req,
fun emqx_exhook_handler:merge_responsed_message/2
)
of
{StopOrOk, #{message := NMessage}} ->
{StopOrOk, assign_to_message(NMessage, Message)};
_ -> {ok, Message}
_ ->
{ok, Message}
end.
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
ok;
on_message_dropped(Message, _By, Reason) ->
Req = #{message => message(Message),
Req = #{
message => message(Message),
reason => stringfy(Reason)
},
cast('message.dropped', Req).
@ -208,7 +253,8 @@ on_message_dropped(Message, _By, Reason) ->
on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_delivered(ClientInfo, Message) ->
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
message => message(Message)
},
cast('message.delivered', Req).
@ -216,7 +262,8 @@ on_message_delivered(ClientInfo, Message) ->
on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_acked(ClientInfo, Message) ->
Req = #{clientinfo => clientinfo(ClientInfo),
Req = #{
clientinfo => clientinfo(ClientInfo),
message => message(Message)
},
cast('message.acked', Req).
@ -224,34 +271,59 @@ on_message_acked(ClientInfo, Message) ->
%%--------------------------------------------------------------------
%% Types
properties(undefined) -> [];
properties(undefined) ->
[];
properties(M) when is_map(M) ->
maps:fold(fun(K, V, Acc) ->
[#{name => stringfy(K),
value => stringfy(V)} | Acc]
end, [], M).
maps:fold(
fun(K, V, Acc) ->
[
#{
name => stringfy(K),
value => stringfy(V)
}
| Acc
]
end,
[],
M
).
conninfo(ConnInfo =
#{clientid := ClientId,
conninfo(
ConnInfo =
#{
clientid := ClientId,
peername := {Peerhost, _},
sockname := {_, SockPort}}) ->
sockname := {_, SockPort}
}
) ->
Username = maps:get(username, ConnInfo, undefined),
ProtoName = maps:get(proto_name, ConnInfo, undefined),
ProtoVer = maps:get(proto_ver, ConnInfo, undefined),
Keepalive = maps:get(keepalive, ConnInfo, 0),
#{node => stringfy(node()),
#{
node => stringfy(node()),
clientid => ClientId,
username => maybe(Username),
peerhost => ntoa(Peerhost),
sockport => SockPort,
proto_name => ProtoName,
proto_ver => stringfy(ProtoVer),
keepalive => Keepalive}.
keepalive => Keepalive
}.
clientinfo(ClientInfo =
#{clientid := ClientId, username := Username, peerhost := PeerHost,
sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont}) ->
#{node => stringfy(node()),
clientinfo(
ClientInfo =
#{
clientid := ClientId,
username := Username,
peerhost := PeerHost,
sockport := SockPort,
protocol := Protocol,
mountpoint := Mountpoiont
}
) ->
#{
node => stringfy(node()),
clientid => ClientId,
username => maybe(Username),
password => maybe(maps:get(password, ClientInfo, undefined)),
@ -262,11 +334,20 @@ clientinfo(ClientInfo =
is_superuser => maps:get(is_superuser, ClientInfo, false),
anonymous => maps:get(anonymous, ClientInfo, true),
cn => maybe(maps:get(cn, ClientInfo, undefined)),
dn => maybe(maps:get(dn, ClientInfo, undefined))}.
dn => maybe(maps:get(dn, ClientInfo, undefined))
}.
message(#message{id = Id, qos = Qos, from = From, topic = Topic,
payload = Payload, timestamp = Ts, headers = Headers}) ->
#{node => stringfy(node()),
message(#message{
id = Id,
qos = Qos,
from = From,
topic = Topic,
payload = Payload,
timestamp = Ts,
headers = Headers
}) ->
#{
node => stringfy(node()),
id => emqx_guid:to_hexstr(Id),
qos => Qos,
from => stringfy(From),
@ -281,7 +362,8 @@ headers(Headers) ->
maps:fold(
fun
(_, undefined, Acc) ->
Acc; %% Ignore undefined value
%% Ignore undefined value
Acc;
(K, V, Acc) ->
case lists:member(K, Ls) of
true ->
@ -289,11 +371,16 @@ headers(Headers) ->
_ ->
Acc
end
end, #{}, Headers).
end,
#{},
Headers
).
bin(K, V) when K == username;
bin(K, V) when
K == username;
K == protocol;
K == allow_publish ->
K == allow_publish
->
bin(V);
bin(peerhost, V) ->
bin(inet:ntoa(V)).
@ -302,8 +389,14 @@ bin(V) when is_binary(V) -> V;
bin(V) when is_atom(V) -> atom_to_binary(V);
bin(V) when is_list(V) -> iolist_to_binary(V).
assign_to_message(InMessage = #{qos := Qos, topic := Topic,
payload := Payload}, Message) ->
assign_to_message(
InMessage = #{
qos := Qos,
topic := Topic,
payload := Payload
},
Message
) ->
NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
enrich_header(maps:get(headers, InMessage, #{}), NMsg).
@ -344,8 +437,9 @@ stringfy(Term) ->
%% see exhook.proto
merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
ignore;
merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
when is_boolean(NewBool) ->
merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when
is_boolean(NewBool)
->
{ret(Type), Req#{result => NewBool}};
merge_responsed_bool(_Req, Resp) ->
?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),

View File

@ -19,19 +19,28 @@
-include("emqx_exhook.hrl").
%% API
-export([ init/0, succeed/2, failed/2
, update/1, new_metrics_info/0, servers_metrics/0
, on_server_deleted/1, server_metrics/1, hooks_metrics/1
, metrics_aggregate/1, metrics_aggregate_by_key/2
, metrics_aggregate_by/2
-export([
init/0,
succeed/2,
failed/2,
update/1,
new_metrics_info/0,
servers_metrics/0,
on_server_deleted/1,
server_metrics/1,
hooks_metrics/1,
metrics_aggregate/1,
metrics_aggregate_by_key/2,
metrics_aggregate_by/2
]).
-record(metrics, { index :: index()
, succeed = 0 :: non_neg_integer()
, failed = 0 :: non_neg_integer()
, rate = 0 :: non_neg_integer()
, max_rate = 0 :: non_neg_integer()
, window_rate :: integer()
-record(metrics, {
index :: index(),
succeed = 0 :: non_neg_integer(),
failed = 0 :: non_neg_integer(),
rate = 0 :: non_neg_integer(),
max_rate = 0 :: non_neg_integer(),
window_rate :: integer()
}).
-type metrics() :: #metrics{}.
@ -41,10 +50,11 @@
-type hooks_metrics() :: #{hookpoint() => metrics_info()}.
-type servers_metrics() :: #{server_name() => metrics_info()}.
-type metrics_info() :: #{ succeed := non_neg_integer()
, failed := non_neg_integer()
, rate := number()
, max_rate := number()
-type metrics_info() :: #{
succeed := non_neg_integer(),
failed := non_neg_integer(),
rate := number(),
max_rate := number()
}.
-define(INDEX(ServerName, HookPoint), {ServerName, HookPoint}).
@ -54,16 +64,23 @@
%%% API
%%--------------------------------------------------------------------
init() ->
_ = ets:new(?HOOKS_METRICS,
[ set, named_table, public
, {keypos, #metrics.index}, {write_concurrency, true}
, {read_concurrency, true}
]),
_ = ets:new(
?HOOKS_METRICS,
[
set,
named_table,
public,
{keypos, #metrics.index},
{write_concurrency, true},
{read_concurrency, true}
]
),
ok.
-spec new_metric_info() -> metrics_info().
new_metric_info() ->
#{succeed => 0,
#{
succeed => 0,
failed => 0,
rate => 0,
max_rate => 0
@ -71,33 +88,50 @@ new_metric_info() ->
-spec succeed(server_name(), hookpoint()) -> ok.
succeed(Server, Hook) ->
inc(Server, Hook, #metrics.succeed,
#metrics{ index = {Server, Hook}
, window_rate = 0
, succeed = 0
}).
inc(
Server,
Hook,
#metrics.succeed,
#metrics{
index = {Server, Hook},
window_rate = 0,
succeed = 0
}
).
-spec failed(server_name(), hookpoint()) -> ok.
failed(Server, Hook) ->
inc(Server, Hook, #metrics.failed,
#metrics{ index = {Server, Hook}
, window_rate = 0
, failed = 0
}).
inc(
Server,
Hook,
#metrics.failed,
#metrics{
index = {Server, Hook},
window_rate = 0,
failed = 0
}
).
-spec update(pos_integer()) -> true.
update(Interval) ->
Fun = fun(#metrics{rate = Rate,
Fun = fun(
#metrics{
rate = Rate,
window_rate = WindowRate,
max_rate = MaxRate} = Metrics,
_) ->
max_rate = MaxRate
} = Metrics,
_
) ->
case calc_metric(WindowRate, Interval) of
Rate -> true;
Rate ->
true;
NewRate ->
MaxRate2 = erlang:max(MaxRate, NewRate),
Metrics2 = Metrics#metrics{rate = NewRate,
Metrics2 = Metrics#metrics{
rate = NewRate,
window_rate = 0,
max_rate = MaxRate2},
max_rate = MaxRate2
},
ets:insert(?HOOKS_METRICS, Metrics2)
end
end,
@ -106,24 +140,36 @@ update(Interval) ->
-spec on_server_deleted(server_name()) -> true.
on_server_deleted(Name) ->
ets:match_delete(?HOOKS_METRICS,
{metrics, {Name, '_'}, '_', '_', '_', '_', '_'}).
ets:match_delete(
?HOOKS_METRICS,
{metrics, {Name, '_'}, '_', '_', '_', '_', '_'}
).
-spec server_metrics(server_name()) -> metrics_info().
server_metrics(SvrName) ->
Hooks = ets:match_object(?HOOKS_METRICS,
{metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}),
Hooks = ets:match_object(
?HOOKS_METRICS,
{metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}
),
Fold = fun(#metrics{succeed = Succeed,
Fold = fun(
#metrics{
succeed = Succeed,
failed = Failed,
rate = Rate,
max_rate = MaxRate},
Acc) ->
[#{ succeed => Succeed
, failed => Failed
, rate => Rate
, max_rate => MaxRate
} | Acc]
max_rate = MaxRate
},
Acc
) ->
[
#{
succeed => Succeed,
failed => Failed,
rate => Rate,
max_rate => MaxRate
}
| Acc
]
end,
AllMetrics = lists:foldl(Fold, [], Hooks),
@ -133,18 +179,22 @@ server_metrics(SvrName) ->
servers_metrics() ->
AllMetrics = ets:tab2list(?HOOKS_METRICS),
GroupFun = fun(#metrics{index = ?INDEX(ServerName, _),
GroupFun = fun(
#metrics{
index = ?INDEX(ServerName, _),
succeed = Succeed,
failed = Failed,
rate = Rate,
max_rate = MaxRate
},
Acc) ->
Acc
) ->
SvrGroup = maps:get(ServerName, Acc, []),
Metrics = #{ succeed => Succeed
, failed => Failed
, rate => Rate
, max_rate => MaxRate
Metrics = #{
succeed => Succeed,
failed => Failed,
rate => Rate,
max_rate => MaxRate
},
Acc#{ServerName => [Metrics | SvrGroup]}
end,
@ -156,20 +206,29 @@ servers_metrics() ->
-spec hooks_metrics(server_name()) -> hooks_metrics().
hooks_metrics(SvrName) ->
Hooks = ets:match_object(?HOOKS_METRICS,
{metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}),
Hooks = ets:match_object(
?HOOKS_METRICS,
{metrics, {SvrName, '_'}, '_', '_', '_', '_', '_'}
),
Fold = fun(#metrics{index = ?INDEX(_, HookPoint),
Fold = fun(
#metrics{
index = ?INDEX(_, HookPoint),
succeed = Succeed,
failed = Failed,
rate = Rate,
max_rate = MaxRate},
Acc) ->
Acc#{HookPoint => #{ succeed => Succeed
, failed => Failed
, rate => Rate
, max_rate => MaxRate
}}
max_rate = MaxRate
},
Acc
) ->
Acc#{
HookPoint => #{
succeed => Succeed,
failed => Failed,
rate => Rate,
max_rate => MaxRate
}
}
end,
lists:foldl(Fold, #{}, Hooks).
@ -178,12 +237,14 @@ hooks_metrics(SvrName) ->
metrics_aggregate(MetricsL) ->
metrics_aggregate_by(fun(X) -> X end, MetricsL).
-spec metrics_aggregate_by_key(Key, list(HasMetrics)) -> metrics_info()
when Key :: any(),
-spec metrics_aggregate_by_key(Key, list(HasMetrics)) -> metrics_info() when
Key :: any(),
HasMetrics :: #{Key => metrics_info()}.
metrics_aggregate_by_key(Key, MetricsL) ->
metrics_aggregate_by(fun(X) -> maps:get(Key, X, new_metrics_info()) end,
MetricsL).
metrics_aggregate_by(
fun(X) -> maps:get(Key, X, new_metrics_info()) end,
MetricsL
).
%%--------------------------------------------------------------------
%%% Internal functions
@ -191,18 +252,21 @@ metrics_aggregate_by_key(Key, MetricsL) ->
-spec inc(server_name(), hookpoint(), pos_integer(), metrics()) -> ok.
inc(Server, Hook, Pos, Default) ->
Index = {Server, Hook},
_ = ets:update_counter(?HOOKS_METRICS,
_ = ets:update_counter(
?HOOKS_METRICS,
Index,
[{#metrics.window_rate, 1}, {Pos, 1}],
Default),
Default
),
ok.
-spec new_metrics_info() -> metrics_info().
new_metrics_info() ->
#{ succeed => 0
, failed => 0
, rate => 0
, max_rate => 0
#{
succeed => 0,
failed => 0,
rate => 0,
max_rate => 0
}.
-spec calc_metric(non_neg_integer(), non_neg_integer()) -> non_neg_integer().
@ -211,26 +275,31 @@ calc_metric(Val, Interval) ->
erlang:ceil(Val * 1000 / Interval).
-spec metrics_add(metrics_info(), metrics_info()) -> metrics_info().
metrics_add(#{succeed := S1, failed := F1, rate := R1, max_rate := M1}
, #{succeed := S2, failed := F2, rate := R2, max_rate := M2} = Acc) ->
Acc#{ succeed := S1 + S2
, failed := F1 + F2
, rate := R1 + R2
, max_rate := M1 + M2
metrics_add(
#{succeed := S1, failed := F1, rate := R1, max_rate := M1},
#{succeed := S2, failed := F2, rate := R2, max_rate := M2} = Acc
) ->
Acc#{
succeed := S1 + S2,
failed := F1 + F2,
rate := R1 + R2,
max_rate := M1 + M2
}.
-spec metrics_aggregate_by(fun((X) -> metrics_info()), list(X)) -> metrics_info()
when X :: any().
-spec metrics_aggregate_by(fun((X) -> metrics_info()), list(X)) -> metrics_info() when
X :: any().
metrics_aggregate_by(_, []) ->
new_metric_info();
metrics_aggregate_by(Fun, MetricsL) ->
Fold = fun(E, Acc) -> metrics_add(Fun(E), Acc) end,
#{rate := Rate,
max_rate := MaxRate} = Result = lists:foldl(Fold, new_metric_info(), MetricsL),
#{
rate := Rate,
max_rate := MaxRate
} = Result = lists:foldl(Fold, new_metric_info(), MetricsL),
Len = erlang:length(MetricsL),
Result#{rate := Rate div Len,
Result#{
rate := Rate div Len,
max_rate := MaxRate div Len
}.

View File

@ -26,39 +26,44 @@
-export([start_link/0]).
%% Mgmt API
-export([ list/0
, lookup/1
, enable/1
, disable/1
, server_info/1
, all_servers_info/0
, server_hooks_metrics/1
-export([
list/0,
lookup/1,
enable/1,
disable/1,
server_info/1,
all_servers_info/0,
server_hooks_metrics/1
]).
%% Helper funcs
-export([ running/0
, server/1
, hooks/1
, init_ref_counter_table/0
-export([
running/0,
server/1,
hooks/1,
init_ref_counter_table/0
]).
-export([ update_config/2
, pre_config_update/3
, post_config_update/5
-export([
update_config/2,
pre_config_update/3,
post_config_update/5
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-export([roots/0]).
-type state() :: #{%% Running servers
%% Running servers
-type state() :: #{
running := servers(),
%% Wait to reload servers
waiting := servers(),
@ -74,14 +79,16 @@
-type server() :: server_options().
-type server_options() :: map().
-type position() :: front
-type position() ::
front
| rear
| {before, binary()}
| {'after', binary()}.
-type orders() :: #{server_name() => integer()}.
-type server_info() :: #{name := server_name(),
-type server_info() :: #{
name := server_name(),
status := running | waiting | stopped,
atom() => term()
@ -96,7 +103,8 @@
%% APIs
%%--------------------------------------------------------------------
-spec start_link() -> ignore
-spec start_link() ->
ignore
| {ok, pid()}
| {error, any()}.
start_link() ->
@ -146,26 +154,30 @@ update_config(KeyPath, UpdateReq) ->
pre_config_update(_, {add, Conf}, OldConf) ->
{ok, OldConf ++ [Conf]};
pre_config_update(_, {update, Name, Conf}, OldConf) ->
case replace_conf(Name, fun(_) -> Conf end, OldConf) of
not_found -> {error, not_found};
NewConf -> {ok, NewConf}
end;
pre_config_update(_, {delete, ToDelete}, OldConf) ->
{ok, lists:dropwhile(fun(#{<<"name">> := Name}) -> Name =:= ToDelete end,
OldConf)};
{ok,
lists:dropwhile(
fun(#{<<"name">> := Name}) -> Name =:= ToDelete end,
OldConf
)};
pre_config_update(_, {move, Name, Position}, OldConf) ->
case do_move(Name, Position, OldConf) of
not_found -> {error, not_found};
NewConf -> {ok, NewConf}
end;
pre_config_update(_, {enable, Name, Enable}, OldConf) ->
case replace_conf(Name,
fun(Conf) -> Conf#{<<"enable">> => Enable} end, OldConf) of
case
replace_conf(
Name,
fun(Conf) -> Conf#{<<"enable">> => Enable} end,
OldConf
)
of
not_found -> {error, not_found};
NewConf -> {ok, NewConf}
end.
@ -186,13 +198,16 @@ init([]) ->
{Waiting, Running, Stopped} = load_all_servers(ServerL),
Orders = reorder(ServerL),
refresh_tick(),
{ok, ensure_reload_timer(
#{waiting => Waiting,
{ok,
ensure_reload_timer(
#{
waiting => Waiting,
running => Running,
stopped => Stopped,
trefs => #{},
orders => Orders
})}.
}
)}.
-spec load_all_servers(list(server_options())) -> {servers(), servers(), servers()}.
load_all_servers(ServerL) ->
@ -208,15 +223,19 @@ load_all_servers([#{name := Name} = Options | More], Waiting, Running, Stopped)
disable ->
load_all_servers(More, Waiting, Running, Stopped#{Name => Options})
end;
load_all_servers([], Waiting, Running, Stopped) ->
{Waiting, Running, Stopped}.
handle_call(list, _From, State = #{running := Running,
handle_call(
list,
_From,
State = #{
running := Running,
waiting := Waiting,
stopped := Stopped,
orders := Orders}) ->
orders := Orders
}
) ->
R = get_servers_info(running, Running),
W = get_servers_info(waiting, Waiting),
S = get_servers_info(stopped, Stopped),
@ -225,29 +244,33 @@ handle_call(list, _From, State = #{running := Running,
OrderServers = sort_name_by_order(Servers, Orders),
{reply, OrderServers, State};
handle_call({update_config, {move, _Name, _Position}, NewConfL},
handle_call(
{update_config, {move, _Name, _Position}, NewConfL},
_From,
State) ->
State
) ->
Orders = reorder(NewConfL),
{reply, ok, State#{orders := Orders}};
handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
{ok, #{orders := Orders,
{ok,
#{
orders := Orders,
stopped := Stopped
} = State2} = do_unload_server(ToDelete, State),
State3 = State2#{stopped := maps:remove(ToDelete, Stopped),
State3 = State2#{
stopped := maps:remove(ToDelete, Stopped),
orders := maps:remove(ToDelete, Orders)
},
emqx_exhook_metrics:on_server_deleted(ToDelete),
{reply, ok, State3};
handle_call({update_config, {add, RawConf}, NewConfL},
handle_call(
{update_config, {add, RawConf}, NewConfL},
_From,
#{running := Running, waiting := Waitting, stopped := Stopped} = State) ->
#{running := Running, waiting := Waitting, stopped := Stopped} = State
) ->
{_, #{name := Name} = Conf} = emqx_config:check_config(?MODULE, RawConf),
case emqx_exhook_server:load(Name, Conf) of
@ -262,7 +285,6 @@ handle_call({update_config, {add, RawConf}, NewConfL},
end,
Orders = reorder(NewConfL),
{reply, ok, State2#{orders := Orders}};
handle_call({lookup, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
@ -271,51 +293,57 @@ handle_call({lookup, Name}, _From, State) ->
Result = maps:merge(Conf, #{status => Where})
end,
{reply, Result, State};
handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({server_info, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
Result = not_found;
{Status, _} ->
HooksMetrics = emqx_exhook_metrics:server_metrics(Name),
Result = #{ status => Status
, metrics => HooksMetrics
Result = #{
status => Status,
metrics => HooksMetrics
}
end,
{reply, Result, State};
handle_call(all_servers_info, _From, #{running := Running,
handle_call(
all_servers_info,
_From,
#{
running := Running,
waiting := Waiting,
stopped := Stopped} = State) ->
stopped := Stopped
} = State
) ->
MakeStatus = fun(Status, Servers, Acc) ->
lists:foldl(fun(Name, IAcc) -> IAcc#{Name => Status} end,
lists:foldl(
fun(Name, IAcc) -> IAcc#{Name => Status} end,
Acc,
maps:keys(Servers))
maps:keys(Servers)
)
end,
Status = lists:foldl(fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end,
Status = lists:foldl(
fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end,
#{},
[{running, Running}, {waiting, Waiting}, {stopped, Stopped}]),
[{running, Running}, {waiting, Waiting}, {stopped, Stopped}]
),
Metrics = emqx_exhook_metrics:servers_metrics(),
Result = #{ status => Status
, metrics => Metrics
Result = #{
status => Status,
metrics => Metrics
},
{reply, Result, State};
handle_call({server_hooks_metrics, Name}, _From, State) ->
Result = emqx_exhook_metrics:hooks_metrics(Name),
{reply, Result, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
@ -331,26 +359,32 @@ handle_info({timeout, _Ref, {reload, Name}}, State) ->
{error, not_found} ->
{noreply, NState};
{error, Reason} ->
?SLOG(warning,
#{msg => "failed_to_reload_exhook_callback_server",
?SLOG(
warning,
#{
msg => "failed_to_reload_exhook_callback_server",
reason => Reason,
name => Name}),
name => Name
}
),
{noreply, ensure_reload_timer(NState)}
end;
handle_info(refresh_tick, State) ->
refresh_tick(),
emqx_exhook_metrics:update(?REFRESH_INTERVAL),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State = #{running := Running}) ->
_ = maps:fold(fun(Name, _, AccIn) ->
_ = maps:fold(
fun(Name, _, AccIn) ->
{ok, NAccIn} = do_unload_server(Name, AccIn),
NAccIn
end, State, Running),
end,
State,
Running
),
_ = unload_exhooks(),
ok.
@ -362,10 +396,13 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
unload_exhooks() ->
[emqx:unhook(Name, {M, F}) ||
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
[
emqx:unhook(Name, {M, F})
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
].
-spec do_load_server(server_name(), state()) -> {{error, not_found}, state()}
-spec do_load_server(server_name(), state()) ->
{{error, not_found}, state()}
| {{error, already_started}, state()}
| {ok, state()}.
do_load_server(Name, State = #{orders := Orders}) ->
@ -378,14 +415,18 @@ do_load_server(Name, State = #{orders := Orders}) ->
State2 = clean_reload_timer(Name, State),
{Options, Map2} = maps:take(Name, Map),
State3 = State2#{Where := Map2},
#{running := Running,
stopped := Stopped} = State3,
#{
running := Running,
stopped := Stopped
} = State3,
case emqx_exhook_server:load(Name, Options) of
{ok, ServerState} ->
save(Name, ServerState),
update_order(Orders),
?SLOG(info, #{msg => "load_exhook_callback_server_ok",
name => Name}),
?SLOG(info, #{
msg => "load_exhook_callback_server_ok",
name => Name
}),
{ok, State3#{running := maps:put(Name, Options, Running)}};
{error, Reason} ->
{{error, Reason}, State};
@ -397,11 +438,15 @@ do_load_server(Name, State = #{orders := Orders}) ->
-spec do_unload_server(server_name(), state()) -> {ok, state()}.
do_unload_server(Name, #{stopped := Stopped} = State) ->
case where_is_server(Name, State) of
{stopped, _} -> {ok, State};
{stopped, _} ->
{ok, State};
{waiting, Waiting} ->
{Options, Waiting2} = maps:take(Name, Waiting),
{ok, clean_reload_timer(Name,
State#{waiting := Waiting2,
{ok,
clean_reload_timer(
Name,
State#{
waiting := Waiting2,
stopped := maps:put(Name, Options, Stopped)
}
)};
@ -410,32 +455,41 @@ do_unload_server(Name, #{stopped := Stopped} = State) ->
ok = unsave(Name),
ok = emqx_exhook_server:unload(Service),
{Options, Running2} = maps:take(Name, Running),
{ok, State#{running := Running2,
{ok, State#{
running := Running2,
stopped := maps:put(Name, Options, Stopped)
}};
not_found -> {ok, State}
not_found ->
{ok, State}
end.
-spec ensure_reload_timer(state()) -> state().
ensure_reload_timer(State = #{waiting := Waiting,
ensure_reload_timer(
State = #{
waiting := Waiting,
stopped := Stopped,
trefs := TRefs}) ->
trefs := TRefs
}
) ->
Iter = maps:iterator(Waiting),
{Waitting2, Stopped2, TRefs2} =
ensure_reload_timer(maps:next(Iter), Waiting, Stopped, TRefs),
State#{waiting := Waitting2,
State#{
waiting := Waitting2,
stopped := Stopped2,
trefs := TRefs2}.
trefs := TRefs2
}.
ensure_reload_timer(none, Waiting, Stopped, TimerRef) ->
{Waiting, Stopped, TimerRef};
ensure_reload_timer({Name, #{auto_reconnect := Intv}, Iter},
ensure_reload_timer(
{Name, #{auto_reconnect := Intv}, Iter},
Waiting,
Stopped,
TimerRef) when is_integer(Intv) ->
TimerRef
) when is_integer(Intv) ->
Next = maps:next(Iter),
case maps:is_key(Name, TimerRef) of
true ->
@ -445,17 +499,19 @@ ensure_reload_timer({Name, #{auto_reconnect := Intv}, Iter},
TimerRef2 = maps:put(Name, Ref, TimerRef),
ensure_reload_timer(Next, Waiting, Stopped, TimerRef2)
end;
ensure_reload_timer({Name, Opts, Iter}, Waiting, Stopped, TimerRef) ->
ensure_reload_timer(maps:next(Iter),
ensure_reload_timer(
maps:next(Iter),
maps:remove(Name, Waiting),
maps:put(Name, Opts, Stopped),
TimerRef).
TimerRef
).
-spec clean_reload_timer(server_name(), state()) -> state().
clean_reload_timer(Name, State = #{trefs := TRefs}) ->
case maps:take(Name, TRefs) of
error -> State;
error ->
State;
{TRef, NTRefs} ->
_ = erlang:cancel_timer(TRef),
State#{trefs := NTRefs}
@ -468,31 +524,24 @@ do_move(Name, Position, ConfL) ->
move([#{<<"name">> := Name} = Server | T], Name, Position, HeadL) ->
move_to(Position, Server, lists:reverse(HeadL) ++ T);
move([Server | T], Name, Position, HeadL) ->
move(T, Name, Position, [Server | HeadL]);
move([], _Name, _Position, _HeadL) ->
not_found.
move_to(?CMD_MOVE_FRONT, Server, ServerL) ->
[Server | ServerL];
move_to(?CMD_MOVE_REAR, Server, ServerL) ->
ServerL ++ [Server];
move_to(Position, Server, ServerL) ->
move_to(ServerL, Position, Server, []).
move_to([#{<<"name">> := Name} | _] = T, ?CMD_MOVE_BEFORE(Name), Server, HeadL) ->
lists:reverse(HeadL) ++ [Server | T];
move_to([#{<<"name">> := Name} = H | T], ?CMD_MOVE_AFTER(Name), Server, HeadL) ->
lists:reverse(HeadL) ++ [H, Server | T];
move_to([H | T], Position, Server, HeadL) ->
move_to(T, Position, Server, [H | HeadL]);
move_to([], _Position, _Server, _HeadL) ->
not_found.
@ -504,32 +553,34 @@ reorder(ServerL) ->
reorder([#{name := Name} | T], Order, Orders) ->
reorder(T, Order + 1, Orders#{Name => Order});
reorder([], _Order, Orders) ->
Orders.
get_servers_info(Status, Map) ->
Fold = fun(Name, Conf, Acc) ->
[maps:merge(Conf, #{status => Status,
hooks => hooks(Name)}) | Acc]
[
maps:merge(Conf, #{
status => Status,
hooks => hooks(Name)
})
| Acc
]
end,
maps:fold(Fold, [], Map).
where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) ->
{running, Running};
where_is_server(Name, #{waiting := Waiting}) when is_map_key(Name, Waiting) ->
{waiting, Waiting};
where_is_server(Name, #{stopped := Stopped}) when is_map_key(Name, Stopped) ->
{stopped, Stopped};
where_is_server(_, _) ->
not_found.
-type replace_fun() :: fun((server_options()) -> server_options()).
-spec replace_conf(binary(), replace_fun(), list(server_options())) -> not_found
-spec replace_conf(binary(), replace_fun(), list(server_options())) ->
not_found
| list(server_options()).
replace_conf(Name, ReplaceFun, ConfL) ->
replace_conf(ConfL, Name, ReplaceFun, []).
@ -537,14 +588,13 @@ replace_conf(Name, ReplaceFun, ConfL) ->
replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) ->
New = ReplaceFun(H),
lists:reverse(HeadL) ++ [New | T];
replace_conf([H | T], Name, ReplaceFun, HeadL) ->
replace_conf(T, Name, ReplaceFun, [H | HeadL]);
replace_conf([], _, _, _) ->
not_found.
-spec restart_server(binary(), list(server_options()), state()) -> {ok, state()}
-spec restart_server(binary(), list(server_options()), state()) ->
{ok, state()}
| {{error, term()}, state()}.
restart_server(Name, ConfL, State) ->
case lists:search(fun(#{name := CName}) -> CName =:= Name end, ConfL) of
@ -567,12 +617,15 @@ restart_server(Name, ConfL, State) ->
end.
sort_name_by_order(Names, Orders) ->
lists:sort(fun(A, B) when is_binary(A) ->
lists:sort(
fun
(A, B) when is_binary(A) ->
maps:get(A, Orders) < maps:get(B, Orders);
(#{name := A}, #{name := B}) ->
maps:get(A, Orders) < maps:get(B, Orders)
end,
Names).
Names
).
refresh_tick() ->
erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME).

View File

@ -39,46 +39,67 @@ namespace() -> exhook.
roots() -> [exhook].
fields(exhook) ->
[{servers,
sc(hoconsc:array(ref(server)),
#{ default => []
, desc => "List of exhook servers."
})}
[
{servers,
sc(
hoconsc:array(ref(server)),
#{
default => [],
desc => "List of exhook servers."
}
)}
];
fields(server) ->
[ {name, sc(binary(),
#{ desc => "Name of the exhook server."
})}
, {enable, sc(boolean(),
#{ default => true
, desc => "Enable the exhook server."
})}
, {url, sc(binary(),
#{ desc => "URL of the gRPC server."
})}
, {request_timeout, sc(duration(),
#{ default => "5s"
, desc => "The timeout to request gRPC server."
})}
, {failed_action, failed_action()}
, {ssl,
sc(ref(ssl_conf), #{})}
, {auto_reconnect,
sc(hoconsc:union([false, duration()]),
#{ default => "60s"
, desc => "Whether to automatically reconnect (initialize) the gRPC server.<br/>"
[
{name,
sc(
binary(),
#{desc => "Name of the exhook server."}
)},
{enable,
sc(
boolean(),
#{
default => true,
desc => "Enable the exhook server."
}
)},
{url,
sc(
binary(),
#{desc => "URL of the gRPC server."}
)},
{request_timeout,
sc(
duration(),
#{
default => "5s",
desc => "The timeout to request gRPC server."
}
)},
{failed_action, failed_action()},
{ssl, sc(ref(ssl_conf), #{})},
{auto_reconnect,
sc(
hoconsc:union([false, duration()]),
#{
default => "60s",
desc =>
"Whether to automatically reconnect (initialize) the gRPC server.<br/>"
"When gRPC is not available, exhook tries to request the gRPC service at "
"that interval and reinitialize the list of mounted hooks."
})}
, {pool_size,
sc(integer(),
#{ default => 8
, example => 8
, desc => "The process pool size for gRPC client."
})}
}
)},
{pool_size,
sc(
integer(),
#{
default => 8,
example => 8,
desc => "The process pool size for gRPC client."
}
)}
];
fields(ssl_conf) ->
Schema = emqx_schema:client_ssl_opts_schema(#{}),
lists:keydelete(user_lookup_fun, 1, Schema).
@ -99,11 +120,15 @@ ref(Field) ->
hoconsc:ref(?MODULE, Field).
failed_action() ->
sc(hoconsc:enum([deny, ignore]),
#{ default => deny
, desc => "The value that is returned when the request "
sc(
hoconsc:enum([deny, ignore]),
#{
default => deny,
desc =>
"The value that is returned when the request "
"to the gRPC server fails for any reason."
}).
}
).
server_config() ->
fields(server).

View File

@ -22,25 +22,28 @@
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
%% Load/Unload
-export([ load/2
, unload/1
-export([
load/2,
unload/1
]).
%% APIs
-export([call/3]).
%% Infos
-export([ name/1
, hooks/1
, format/1
, failed_action/1
-export([
name/1,
hooks/1,
format/1,
failed_action/1
]).
-ifdef(TEST).
-export([hk2func/1]).
-endif.
-type server() :: #{%% Server name (equal to grpc client channel name)
%% Server name (equal to grpc client channel name)
-type server() :: #{
name := binary(),
%% The function options
options := map(),
@ -52,8 +55,8 @@
prefix := list()
}.
-type hookpoint() :: 'client.connect'
-type hookpoint() ::
'client.connect'
| 'client.connack'
| 'client.connected'
| 'client.disconnected'
@ -86,14 +89,16 @@
-spec load(binary(), map()) -> {ok, server()} | {error, term()} | disable.
load(_Name, #{enable := false}) ->
disable;
load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
ReqOpts = #{timeout => Timeout, failed_action => FailedAction},
{SvrAddr, ClientOpts} = channel_opts(Opts),
case emqx_exhook_sup:start_grpc_client_channel(
case
emqx_exhook_sup:start_grpc_client_channel(
Name,
SvrAddr,
ClientOpts) of
ClientOpts
)
of
{ok, _ChannPoolPid} ->
case do_init(Name, ReqOpts) of
{ok, HookSpecs} ->
@ -102,41 +107,52 @@ load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts)
ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks
ensure_hooks(HookSpecs),
{ok, #{name => Name,
{ok, #{
name => Name,
options => ReqOpts,
channel => _ChannPoolPid,
hookspec => HookSpecs,
prefix => Prefix }};
prefix => Prefix
}};
{error, _} = E ->
emqx_exhook_sup:stop_grpc_client_channel(Name),
E
end;
{error, _} = E -> E
{error, _} = E ->
E
end.
%% @private
channel_opts(Opts = #{url := URL}) ->
ClientOpts = maps:merge(#{pool_size => erlang:system_info(schedulers)},
Opts),
ClientOpts = maps:merge(
#{pool_size => erlang:system_info(schedulers)},
Opts
),
case uri_string:parse(URL) of
#{scheme := <<"http">>, host := Host, port := Port} ->
{format_http_uri("http", Host, Port), ClientOpts};
#{scheme := <<"https">>, host := Host, port := Port} ->
SslOpts =
case maps:get(ssl, Opts, undefined) of
undefined -> [];
#{enable := false} -> [];
undefined ->
[];
#{enable := false} ->
[];
MapOpts ->
filter(
[{cacertfile, maps:get(cacertfile, MapOpts, undefined)},
[
{cacertfile, maps:get(cacertfile, MapOpts, undefined)},
{certfile, maps:get(certfile, MapOpts, undefined)},
{keyfile, maps:get(keyfile, MapOpts, undefined)}
])
]
)
end,
NClientOpts = ClientOpts#{
gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}
#{
transport => ssl,
transport_opts => SslOpts
}
},
{format_http_uri("https", Host, Port), NClientOpts};
Error ->
@ -162,18 +178,23 @@ do_deinit(Name, ReqOpts) ->
do_init(ChannName, ReqOpts) ->
%% BrokerInfo defined at: exhook.protos
BrokerInfo = maps:with([version, sysdescr, uptime, datetime],
maps:from_list(emqx_sys:info())),
BrokerInfo = maps:with(
[version, sysdescr, uptime, datetime],
maps:from_list(emqx_sys:info())
),
Req = #{broker => BrokerInfo},
case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of
{ok, InitialResp} ->
try
{ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
catch _:Reason:Stk ->
?SLOG(error, #{msg => "failed_to_init_channel",
catch
_:Reason:Stk ->
?SLOG(error, #{
msg => "failed_to_init_channel",
channel_name => ChannName,
reason => Reason,
stacktrace => Stk}),
stacktrace => Stk
}),
{error, Reason}
end;
{error, Reason} ->
@ -184,34 +205,46 @@ do_init(ChannName, ReqOpts) ->
resolve_hookspec(HookSpecs) when is_list(HookSpecs) ->
MessageHooks = message_hooks(),
AvailableHooks = available_hooks(),
lists:foldr(fun(HookSpec, Acc) ->
lists:foldr(
fun(HookSpec, Acc) ->
case maps:get(name, HookSpec, undefined) of
undefined -> Acc;
undefined ->
Acc;
Name0 ->
Name = try
Name =
try
binary_to_existing_atom(Name0, utf8)
catch T:R:_ -> {T,R}
catch
T:R:_ -> {T, R}
end,
case {lists:member(Name, AvailableHooks),
lists:member(Name, MessageHooks)} of
case {lists:member(Name, AvailableHooks), lists:member(Name, MessageHooks)} of
{false, _} ->
error({unknown_hookpoint, Name});
{true, false} ->
Acc#{Name => #{}};
{true, true} ->
Acc#{Name => #{
topics => maps:get(topics, HookSpec, [])}}
Acc#{
Name => #{
topics => maps:get(topics, HookSpec, [])
}
}
end
end
end, #{}, HookSpecs).
end,
#{},
HookSpecs
).
ensure_metrics(Prefix, HookSpecs) ->
Keys = [list_to_atom(Prefix ++ atom_to_list(Hookpoint))
|| Hookpoint <- maps:keys(HookSpecs)],
Keys = [
list_to_atom(Prefix ++ atom_to_list(Hookpoint))
|| Hookpoint <- maps:keys(HookSpecs)
],
lists:foreach(fun emqx_metrics:ensure/1, Keys).
ensure_hooks(HookSpecs) ->
lists:foreach(fun(Hookpoint) ->
lists:foreach(
fun(Hookpoint) ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
false ->
?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
@ -219,25 +252,33 @@ ensure_hooks(HookSpecs) ->
emqx_hooks:put(Hookpoint, {M, F, A}),
ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
end
end, maps:keys(HookSpecs)).
end,
maps:keys(HookSpecs)
).
may_unload_hooks(HookSpecs) ->
lists:foreach(fun(Hookpoint) ->
lists:foreach(
fun(Hookpoint) ->
case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
Cnt when Cnt =< 0 ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
{Hookpoint, {M, F, _A}} ->
emqx_hooks:del(Hookpoint, {M, F});
_ -> ok
_ ->
ok
end,
ets:delete(?HOOKS_REF_COUNTER, Hookpoint);
_ -> ok
_ ->
ok
end
end, maps:keys(HookSpecs)).
end,
maps:keys(HookSpecs)
).
format(#{name := Name, hookspec := Hooks}) ->
lists:flatten(
io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])).
io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])
).
%%--------------------------------------------------------------------
%% APIs
@ -248,28 +289,41 @@ name(#{name := Name}) ->
hooks(#{hookspec := Hooks}) ->
FoldFun = fun(Hook, Params, Acc) ->
[#{ name => Hook
, params => Params
} | Acc]
[
#{
name => Hook,
params => Params
}
| Acc
]
end,
maps:fold(FoldFun, [], Hooks).
-spec call(hookpoint(), map(), server()) -> ignore
-spec call(hookpoint(), map(), server()) ->
ignore
| {ok, Resp :: term()}
| {error, term()}.
call(Hookpoint, Req, #{name := ChannName, options := ReqOpts,
hookspec := Hooks, prefix := Prefix}) ->
call(Hookpoint, Req, #{
name := ChannName,
options := ReqOpts,
hookspec := Hooks,
prefix := Prefix
}) ->
case maps:get(Hookpoint, Hooks, undefined) of
undefined -> ignore;
undefined ->
ignore;
Opts ->
NeedCall = case lists:member(Hookpoint, message_hooks()) of
false -> true;
NeedCall =
case lists:member(Hookpoint, message_hooks()) of
false ->
true;
_ ->
#{message := #{topic := Topic}} = Req,
match_topic_filter(Topic, maps:get(topics, Opts, []))
end,
case NeedCall of
false -> ignore;
false ->
ignore;
_ ->
inc_metrics(Prefix, Hookpoint),
GrpcFun = hk2func(Hookpoint),
@ -293,42 +347,67 @@ match_topic_filter(TopicName, TopicFilter) ->
-ifdef(TEST).
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
apply(?PB_CLIENT_MOD, Fun, [Req, #{<<"channel">> => ChannName}, Options])).
apply(?PB_CLIENT_MOD, Fun, [Req, #{<<"channel">> => ChannName}, Options])
).
-else.
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
apply(?PB_CLIENT_MOD, Fun, [Req, Options])).
apply(?PB_CLIENT_MOD, Fun, [Req, Options])
).
-endif.
-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
Options = ReqOpts#{channel => ChannName},
?SLOG(debug, #{msg => "do_call", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options}),
?SLOG(debug, #{
msg => "do_call",
module => ?PB_CLIENT_MOD,
function => Fun,
req => Req,
options => Options
}),
case catch ?CALL_PB_CLIENT(ChanneName, Fun, Req, Options) of
{ok, Resp, Metadata} ->
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
{ok, Resp};
{error, {Code, Msg}, _Metadata} ->
?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options, code => Code, packet => Msg}),
?SLOG(error, #{
msg => "exhook_call_error",
module => ?PB_CLIENT_MOD,
function => Fun,
req => Req,
options => Options,
code => Code,
packet => Msg
}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, {Code, Msg}};
{error, Reason} ->
?SLOG(error, #{msg => "exhook_call_error", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options, reason => Reason}),
?SLOG(error, #{
msg => "exhook_call_error",
module => ?PB_CLIENT_MOD,
function => Fun,
req => Req,
options => Options,
reason => Reason
}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, Reason};
{'EXIT', {Reason, Stk}} ->
?SLOG(error, #{msg => "exhook_call_exception", module => ?PB_CLIENT_MOD, function => Fun,
req => Req, options => Options, stacktrace => Stk}),
?SLOG(error, #{
msg => "exhook_call_exception",
module => ?PB_CLIENT_MOD,
function => Fun,
req => Req,
options => Options,
stacktrace => Stk
}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, Reason}
end.
update_metrics(undefined, _ChannName, _Fun) ->
ok;
update_metrics(Hookpoint, ChannName, Fun) ->
Fun(ChannName, Hookpoint).
@ -362,14 +441,30 @@ hk2func('message.dropped') ->'on_message_dropped'.
-compile({inline, [message_hooks/0]}).
message_hooks() ->
['message.publish', 'message.delivered',
'message.acked', 'message.dropped'].
[
'message.publish',
'message.delivered',
'message.acked',
'message.dropped'
].
-compile({inline, [available_hooks/0]}).
available_hooks() ->
['client.connect', 'client.connack', 'client.connected',
'client.disconnected', 'client.authenticate', 'client.authorize',
'client.subscribe', 'client.unsubscribe',
'session.created', 'session.subscribed', 'session.unsubscribed',
'session.resumed', 'session.discarded', 'session.takenover',
'session.terminated' | message_hooks()].
[
'client.connect',
'client.connack',
'client.connected',
'client.disconnected',
'client.authenticate',
'client.authorize',
'client.subscribe',
'client.unsubscribe',
'session.created',
'session.subscribed',
'session.unsubscribed',
'session.resumed',
'session.discarded',
'session.takenover',
'session.terminated'
| message_hooks()
].

View File

@ -18,21 +18,22 @@
-behaviour(supervisor).
-export([ start_link/0
, init/1
-export([
start_link/0,
init/1
]).
-export([ start_grpc_client_channel/3
, stop_grpc_client_channel/1
-export([
start_grpc_client_channel/3,
stop_grpc_client_channel/1
]).
-define(CHILD(Mod, Type, Args),
#{ id => Mod
, start => {Mod, start_link, Args}
, type => Type
, shutdown => 15000
}
).
-define(CHILD(Mod, Type, Args), #{
id => Mod,
start => {Mod, start_link, Args},
type => Type,
shutdown => 15000
}).
%%--------------------------------------------------------------------
%% Supervisor APIs & Callbacks
@ -54,7 +55,8 @@ init([]) ->
-spec start_grpc_client_channel(
binary(),
uri_string:uri_string(),
grpc_client_sup:options()) -> {ok, pid()} | {error, term()}.
grpc_client_sup:options()
) -> {ok, pid()} | {error, term()}.
start_grpc_client_channel(Name, SvrAddr, Options) ->
grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).

View File

@ -18,11 +18,12 @@
-behaviour(emqx_bpapi).
-export([ introduced_in/0
-export([
introduced_in/0,
, all_servers_info/1
, server_info/2
, server_hooks_metrics/2
all_servers_info/1,
server_info/2,
server_hooks_metrics/2
]).
-include_lib("emqx/include/bpapi.hrl").

View File

@ -23,26 +23,27 @@
-include_lib("common_test/include/ct.hrl").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, <<"
exhook {
servers = [
{ name = default,
url = \"http://127.0.0.1:9000\"
},
{ name = enable,
enable = false,
url = \"http://127.0.0.1:9000\"
},
{ name = error,
url = \"http://127.0.0.1:9001\"
},
{ name = not_reconnect,
auto_reconnect = false,
url = \"http://127.0.0.1:9001\"
}
]
}
">>).
-define(CONF_DEFAULT, <<
"\n"
"exhook {\n"
" servers = [\n"
" { name = default,\n"
" url = \"http://127.0.0.1:9000\"\n"
" },\n"
" { name = enable,\n"
" enable = false,\n"
" url = \"http://127.0.0.1:9000\"\n"
" },\n"
" { name = error,\n"
" url = \"http://127.0.0.1:9001\"\n"
" },\n"
" { name = not_reconnect,\n"
" auto_reconnect = false,\n"
" url = \"http://127.0.0.1:9001\"\n"
" }\n"
" ]\n"
"}\n"
>>).
%%--------------------------------------------------------------------
%% Setups
@ -79,7 +80,8 @@ init_per_testcase(_, Config) ->
end_per_testcase(_, Config) ->
case erlang:whereis(node()) of
undefined -> ok;
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
@ -95,22 +97,29 @@ load_cfg(Cfg) ->
t_access_failed_if_no_server_running(_) ->
emqx_exhook_mgr:disable(<<"default">>),
ClientInfo = #{clientid => <<"user-id-1">>,
ClientInfo = #{
clientid => <<"user-id-1">>,
username => <<"usera">>,
peerhost => {127, 0, 0, 1},
sockport => 1883,
protocol => mqtt,
mountpoint => undefined
},
?assertMatch({stop, {error, not_authorized}},
emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})),
?assertMatch(
{stop, {error, not_authorized}},
emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})
),
?assertMatch({stop, deny},
emqx_exhook_handler:on_client_authorize(ClientInfo, publish, <<"t/1">>, allow)),
?assertMatch(
{stop, deny},
emqx_exhook_handler:on_client_authorize(ClientInfo, publish, <<"t/1">>, allow)
),
Message = emqx_message:make(<<"t/1">>, <<"abc">>),
?assertMatch({stop, Message},
emqx_exhook_handler:on_message_publish(Message)),
?assertMatch(
{stop, Message},
emqx_exhook_handler:on_message_publish(Message)
),
emqx_exhook_mgr:enable(<<"default">>).
t_lookup(_) ->
@ -120,9 +129,14 @@ t_lookup(_) ->
t_list(_) ->
[H | _] = emqx_exhook_mgr:list(),
?assertMatch(#{name := _,
?assertMatch(
#{
name := _,
status := _,
hooks := _}, H).
hooks := _
},
H
).
t_unexpected(_) ->
ok = gen_server:cast(emqx_exhook_mgr, unexpected),
@ -149,9 +163,11 @@ t_error_update_conf(_) ->
ErrorAnd = #{<<"name">> => Name, <<"url">> => <<"http://127.0.0.1:9001">>},
{ok, _} = emqx_exhook_mgr:update_config(Path, {add, ErrorAnd}),
DisableAnd = #{<<"name">> => Name,
DisableAnd = #{
<<"name">> => Name,
<<"url">> => <<"http://127.0.0.1:9001">>,
<<"enable">> => false},
<<"enable">> => false
},
{ok, _} = emqx_exhook_mgr:update_config(Path, {add, DisableAnd}),
{ok, _} = emqx_exhook_mgr:update_config(Path, {delete, <<"error">>}),
@ -179,10 +195,12 @@ t_metrics(_) ->
t_handler(_) ->
%% connect
{ok, C} = emqtt:start_link([{host, "localhost"},
{ok, C} = emqtt:start_link([
{host, "localhost"},
{port, 1883},
{username, <<"gooduser">>},
{clientid, <<"exhook_gooduser">>}]),
{clientid, <<"exhook_gooduser">>}
]),
{ok, _} = emqtt:connect(C),
%% pub/sub
@ -212,7 +230,8 @@ t_handler(_) ->
ok.
t_simulated_handler(_) ->
ClientInfo = #{clientid => <<"user-id-1">>,
ClientInfo = #{
clientid => <<"user-id-1">>,
username => <<"usera">>,
peerhost => {127, 0, 0, 1},
sockport => 1883,
@ -232,8 +251,9 @@ t_misc_test(_) ->
ok.
t_get_basic_usage_info(_Config) ->
#{ num_servers := NumServers
, servers := Servers
#{
num_servers := NumServers,
servers := Servers
} = emqx_exhook:get_basic_usage_info(),
?assertEqual(1, NumServers),
?assertMatch([_], Servers),
@ -261,7 +281,8 @@ t_get_basic_usage_info(_Config) ->
'session.terminated',
'session.unsubscribed'
],
lists:sort(Hooks)).
lists:sort(Hooks)
).
%%--------------------------------------------------------------------
%% Utils
@ -276,14 +297,17 @@ unmeck_print() ->
meck:unload(emqx_ctl).
loaded_exhook_hookpoints() ->
lists:filtermap(fun(E) ->
lists:filtermap(
fun(E) ->
Name = element(2, E),
Callbacks = element(3, E),
case lists:any(fun is_exhook_callback/1, Callbacks) of
true -> {true, Name};
_ -> false
end
end, ets:tab2list(emqx_hooks)).
end,
ets:tab2list(emqx_hooks)
).
is_exhook_callback(Cb) ->
Action = element(2, Cb),

View File

@ -26,19 +26,29 @@
-define(BASE_PATH, "api").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, <<"
exhook {
servers =
[ { name = default,
url = \"http://127.0.0.1:9000\"
}
]
}
">>).
-define(CONF_DEFAULT, <<
"\n"
"exhook {\n"
" servers =\n"
" [ { name = default,\n"
" url = \"http://127.0.0.1:9000\"\n"
" }\n"
" ]\n"
"}\n"
>>).
all() ->
[ t_list, t_get, t_add, t_move_front, t_move_rear
, t_move_before, t_move_after, t_delete, t_hooks, t_update
[
t_list,
t_get,
t_add,
t_move_front,
t_move_rear,
t_move_before,
t_move_after,
t_delete,
t_hooks,
t_update
].
init_per_suite(Config) ->
@ -71,7 +81,6 @@ init_per_testcase(t_add, Config) ->
_ = emqx_exhook_demo_svr:start(<<"test1">>, 9001),
timer:sleep(200),
Config;
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
timer:sleep(200),
@ -79,7 +88,8 @@ init_per_testcase(_, Config) ->
end_per_testcase(_, Config) ->
case erlang:whereis(node()) of
undefined -> ok;
undefined ->
ok;
P ->
erlang:unlink(P),
erlang:exit(P, kill)
@ -87,108 +97,168 @@ end_per_testcase(_, Config) ->
Config.
t_list(_) ->
{ok, Data} = request_api(get, api_path(["exhooks"]), "",
auth_header_()),
{ok, Data} = request_api(
get,
api_path(["exhooks"]),
"",
auth_header_()
),
List = decode_json(Data),
?assertEqual(1, length(List)),
[Svr] = List,
?assertMatch(#{name := <<"default">>,
?assertMatch(
#{
name := <<"default">>,
metrics := _,
node_metrics := _,
node_status := _,
hooks := _
}, Svr).
},
Svr
).
t_get(_) ->
{ok, Data} = request_api(get, api_path(["exhooks", "default"]), "",
auth_header_()),
{ok, Data} = request_api(
get,
api_path(["exhooks", "default"]),
"",
auth_header_()
),
Svr = decode_json(Data),
?assertMatch(#{name := <<"default">>,
?assertMatch(
#{
name := <<"default">>,
metrics := _,
node_metrics := _,
node_status := _,
hooks := _
}, Svr).
},
Svr
).
t_add(Cfg) ->
Template = proplists:get_value(template, Cfg),
Instance = Template#{name => <<"test1">>,
Instance = Template#{
name => <<"test1">>,
url => "http://127.0.0.1:9001"
},
{ok, Data} = request_api(post, api_path(["exhooks"]), "",
auth_header_(), Instance),
{ok, Data} = request_api(
post,
api_path(["exhooks"]),
"",
auth_header_(),
Instance
),
Svr = decode_json(Data),
?assertMatch(#{name := <<"test1">>,
?assertMatch(
#{
name := <<"test1">>,
metrics := _,
node_metrics := _,
node_status := _,
hooks := _}, Svr),
hooks := _
},
Svr
),
?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()).
t_move_front(_) ->
Result = request_api(post, api_path(["exhooks", "default", "move"]), "",
Result = request_api(
post,
api_path(["exhooks", "default", "move"]),
"",
auth_header_(),
#{position => <<"front">>}),
#{position => <<"front">>}
),
?assertMatch({ok, <<>>}, Result),
?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()).
t_move_rear(_) ->
Result = request_api(post, api_path(["exhooks", "default", "move"]), "",
Result = request_api(
post,
api_path(["exhooks", "default", "move"]),
"",
auth_header_(),
#{position => <<"rear">>}),
#{position => <<"rear">>}
),
?assertMatch({ok, <<>>}, Result),
?assertMatch([<<"test1">>, <<"default">>], emqx_exhook_mgr:running()).
t_move_before(_) ->
Result = request_api(post, api_path(["exhooks", "default", "move"]), "",
Result = request_api(
post,
api_path(["exhooks", "default", "move"]),
"",
auth_header_(),
#{position => <<"before:test1">>}),
#{position => <<"before:test1">>}
),
?assertMatch({ok, <<>>}, Result),
?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()).
t_move_after(_) ->
Result = request_api(post, api_path(["exhooks", "default", "move"]), "",
Result = request_api(
post,
api_path(["exhooks", "default", "move"]),
"",
auth_header_(),
#{position => <<"after:test1">>}),
#{position => <<"after:test1">>}
),
?assertMatch({ok, <<>>}, Result),
?assertMatch([<<"test1">>, <<"default">>], emqx_exhook_mgr:running()).
t_delete(_) ->
Result = request_api(delete, api_path(["exhooks", "test1"]), "",
auth_header_()),
Result = request_api(
delete,
api_path(["exhooks", "test1"]),
"",
auth_header_()
),
?assertMatch({ok, <<>>}, Result),
?assertMatch([<<"default">>], emqx_exhook_mgr:running()).
t_hooks(_Cfg) ->
{ok, Data} = request_api(get, api_path(["exhooks", "default", "hooks"]), "",
auth_header_()),
{ok, Data} = request_api(
get,
api_path(["exhooks", "default", "hooks"]),
"",
auth_header_()
),
[Hook1 | _] = decode_json(Data),
?assertMatch(#{name := _,
?assertMatch(
#{
name := _,
params := _,
metrics := _,
node_metrics := _
}, Hook1).
},
Hook1
).
t_update(Cfg) ->
Template = proplists:get_value(template, Cfg),
Instance = Template#{enable => false},
{ok, <<>>} = request_api(put, api_path(["exhooks", "default"]), "",
auth_header_(), Instance),
{ok, <<>>} = request_api(
put,
api_path(["exhooks", "default"]),
"",
auth_header_(),
Instance
),
?assertMatch([], emqx_exhook_mgr:running()).
@ -203,13 +273,15 @@ request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
NewUrl =
case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
NewUrl =
case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
@ -219,8 +291,9 @@ do_request_api(Method, Request)->
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 204 orelse Code =:= 201 ->
{ok, {{"HTTP/1.1", Code, _}, _, Return}} when
Code =:= 200 orelse Code =:= 204 orelse Code =:= 201
->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}

View File

@ -19,36 +19,38 @@
-behaviour(emqx_exhook_v_1_hook_provider_bhvr).
%%
-export([ start/0
, start/2
, stop/0
, stop/1
, take/0
, in/1
-export([
start/0,
start/2,
stop/0,
stop/1,
take/0,
in/1
]).
%% gRPC server HookProvider callbacks
-export([ on_provider_loaded/2
, on_provider_unloaded/2
, on_client_connect/2
, on_client_connack/2
, on_client_connected/2
, on_client_disconnected/2
, on_client_authenticate/2
, on_client_authorize/2
, on_client_subscribe/2
, on_client_unsubscribe/2
, on_session_created/2
, on_session_subscribed/2
, on_session_unsubscribed/2
, on_session_resumed/2
, on_session_discarded/2
, on_session_takenover/2
, on_session_terminated/2
, on_message_publish/2
, on_message_delivered/2
, on_message_dropped/2
, on_message_acked/2
-export([
on_provider_loaded/2,
on_provider_unloaded/2,
on_client_connect/2,
on_client_connack/2,
on_client_connected/2,
on_client_disconnected/2,
on_client_authenticate/2,
on_client_authorize/2,
on_client_subscribe/2,
on_client_unsubscribe/2,
on_session_created/2,
on_session_subscribed/2,
on_session_unsubscribed/2,
on_session_resumed/2,
on_session_discarded/2,
on_session_takenover/2,
on_session_terminated/2,
on_message_publish/2,
on_message_delivered/2,
on_message_dropped/2,
on_message_acked/2
]).
-define(PORT, 9000).
@ -75,15 +77,18 @@ stop(Name) ->
take() ->
to_atom_name(?NAME) ! {take, self()},
receive {value, V} -> V
after 5000 -> error(timeout) end.
receive
{value, V} -> V
after 5000 -> error(timeout)
end.
in({FunName, Req}) ->
to_atom_name(?NAME) ! {in, FunName, Req}.
mgr_main(Name, Port) ->
application:ensure_all_started(grpc),
Services = #{protos => [emqx_exhook_pb],
Services = #{
protos => [emqx_exhook_pb],
services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr}
},
Options = [],
@ -103,9 +108,12 @@ mgr_loop([Svr, Q, Takes]) ->
end.
reply(Q1, Q2) ->
case queue:len(Q1) =:= 0 orelse
queue:len(Q2) =:= 0 of
true -> {Q1, Q2};
case
queue:len(Q1) =:= 0 orelse
queue:len(Q2) =:= 0
of
true ->
{Q1, Q2};
_ ->
{{value, {Name, V}}, NQ1} = queue:out(Q1),
{{value, From}, NQ2} = queue:out(Q2),
@ -115,7 +123,6 @@ reply(Q1, Q2) ->
to_atom_name(Name) when is_atom(Name) ->
Name;
to_atom_name(Name) ->
erlang:binary_to_atom(Name).
@ -123,14 +130,16 @@ to_atom_name(Name) ->
%% callbacks
%%--------------------------------------------------------------------
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_provider_loaded(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{hooks => [
{ok,
#{
hooks => [
#{name => <<"client.connect">>},
#{name => <<"client.connack">>},
#{name => <<"client.connected">>},
@ -149,49 +158,52 @@ on_provider_loaded(Req, Md) ->
#{name => <<"message.publish">>},
#{name => <<"message.delivered">>},
#{name => <<"message.acked">>},
#{name => <<"message.dropped">>}]}, Md}.
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
#{name => <<"message.dropped">>}
]
},
Md}.
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_provider_unloaded(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connect(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connack(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connected(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_disconnected(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
-spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_authenticate(#{clientinfo := #{username := Username}} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
@ -199,20 +211,32 @@ on_client_authenticate(#{clientinfo := #{username := Username}} = Req, Md) ->
%% some cases for testing
case Username of
<<"baduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, false}}, Md};
{ok,
#{
type => 'STOP_AND_RETURN',
value => {bool_result, false}
},
Md};
<<"gooduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, true}}, Md};
{ok,
#{
type => 'STOP_AND_RETURN',
value => {bool_result, true}
},
Md};
<<"normaluser">> ->
{ok, #{type => 'CONTINUE',
value => {bool_result, true}}, Md};
{ok,
#{
type => 'CONTINUE',
value => {bool_result, true}
},
Md};
_ ->
{ok, #{type => 'IGNORE'}, Md}
end.
-spec on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
-spec on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_authorize(#{clientinfo := #{username := Username}} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
@ -220,92 +244,104 @@ on_client_authorize(#{clientinfo := #{username := Username}} = Req, Md) ->
%% some cases for testing
case Username of
<<"baduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, false}}, Md};
{ok,
#{
type => 'STOP_AND_RETURN',
value => {bool_result, false}
},
Md};
<<"gooduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, true}}, Md};
{ok,
#{
type => 'STOP_AND_RETURN',
value => {bool_result, true}
},
Md};
<<"normaluser">> ->
{ok, #{type => 'CONTINUE',
value => {bool_result, true}}, Md};
{ok,
#{
type => 'CONTINUE',
value => {bool_result, true}
},
Md};
_ ->
{ok, #{type => 'IGNORE'}, Md}
end.
-spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_subscribe(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_unsubscribe(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_created(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_subscribed(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_unsubscribed(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_resumed(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_discarded(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_takenover(emqx_exhook_pb:session_takenover_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_session_takenover(emqx_exhook_pb:session_takenover_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_takenover(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_terminated(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
-spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
@ -313,49 +349,66 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
%% some cases for testing
case From of
<<"baduser">> ->
NMsg = deny(Msg#{qos => 0,
NMsg = deny(Msg#{
qos => 0,
topic => <<"">>,
payload => <<"">>
}),
{ok, #{type => 'STOP_AND_RETURN',
value => {message, NMsg}}, Md};
{ok,
#{
type => 'STOP_AND_RETURN',
value => {message, NMsg}
},
Md};
<<"gooduser">> ->
NMsg = allow(Msg#{topic => From,
payload => From}),
{ok, #{type => 'STOP_AND_RETURN',
value => {message, NMsg}}, Md};
NMsg = allow(Msg#{
topic => From,
payload => From
}),
{ok,
#{
type => 'STOP_AND_RETURN',
value => {message, NMsg}
},
Md};
_ ->
{ok, #{type => 'IGNORE'}, Md}
end.
deny(Msg) ->
NHeader = maps:put(<<"allow_publish">>, <<"false">>,
maps:get(headers, Msg, #{})),
NHeader = maps:put(
<<"allow_publish">>,
<<"false">>,
maps:get(headers, Msg, #{})
),
maps:put(headers, NHeader, Msg).
allow(Msg) ->
NHeader = maps:put(<<"allow_publish">>, <<"true">>,
maps:get(headers, Msg, #{})),
NHeader = maps:put(
<<"allow_publish">>,
<<"true">>,
maps:get(headers, Msg, #{})
),
maps:put(headers, NHeader, Msg).
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_delivered(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_dropped(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
-spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata()) ->
{ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_acked(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),

View File

@ -26,19 +26,20 @@
-define(TARGET_HOOK, 'message.publish').
-define(CONF, <<"
exhook {
servers = [
{ name = succed,
url = \"http://127.0.0.1:9000\"
},
{ name = failed,
failed_action = ignore,
url = \"http://127.0.0.1:9001\"
},
]
}
">>).
-define(CONF, <<
"\n"
"exhook {\n"
" servers = [\n"
" { name = succed,\n"
" url = \"http://127.0.0.1:9000\"\n"
" },\n"
" { name = failed,\n"
" failed_action = ignore,\n"
" url = \"http://127.0.0.1:9001\"\n"
" },\n"
" ]\n"
"}\n"
>>).
%%--------------------------------------------------------------------
%% Setups
@ -133,14 +134,18 @@ t_hooks_metrics(_) ->
repeat(Repeat, 5),
timer:sleep(200),
HM = emqx_exhook_metrics:hooks_metrics(<<"succed">>),
?assertMatch(#{'message.publish' :=
#{failed := 0, succeed := 5}}, HM)
?assertMatch(
#{
'message.publish' :=
#{failed := 0, succeed := 5}
},
HM
)
end,
with_connection(Test),
ok.
t_on_server_deleted(_) ->
Test = fun(C) ->
Repeat = fun() ->
emqtt:publish(C, <<"/exhook/metrics">>, <<>>, qos0)
@ -165,7 +170,8 @@ clear_metrics() ->
ets:delete_all_objects(?HOOKS_METRICS).
init_injections(Injects) ->
lists:map(fun({Name, _}) ->
lists:map(
fun({Name, _}) ->
Str = erlang:atom_to_list(Name),
case lists:prefix("on_", Str) of
true ->
@ -183,10 +189,12 @@ init_injections(Injects) ->
false
end
end,
emqx_exhook_demo_svr:module_info(exports)).
emqx_exhook_demo_svr:module_info(exports)
).
hook_injects() ->
#{?SvrFun(<<"failed">>, emqx_exhook_server:hk2func(?TARGET_HOOK)) =>
#{
?SvrFun(<<"failed">>, emqx_exhook_server:hk2func(?TARGET_HOOK)) =>
fun(_Req, _Md) ->
{error, "Error due to test"}
end,
@ -201,14 +209,17 @@ hook_injects() ->
}.
with_connection(Fun) ->
{ok, C} = emqtt:start_link([{host, "localhost"},
{ok, C} = emqtt:start_link([
{host, "localhost"},
{port, 1883},
{username, <<"admin">>},
{clientid, <<"exhook_tester">>}]),
{clientid, <<"exhook_tester">>}
]),
{ok, _} = emqtt:connect(C),
try
Fun(C)
catch Type:Error:Trace ->
catch
Type:Error:Trace ->
emqtt:stop(C),
erlang:raise(Type, Error, Trace)
end.

View File

@ -19,78 +19,97 @@
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_proper_types,
[ conninfo/0
, clientinfo/0
, sessioninfo/0
, message/0
, connack_return_code/0
, topictab/0
, topic/0
, subopts/0
]).
-define(CONF_DEFAULT, <<"
exhook {
servers =
[ { name = default,
url = \"http://127.0.0.1:9000\"
}
-import(
emqx_proper_types,
[
conninfo/0,
clientinfo/0,
sessioninfo/0,
message/0,
connack_return_code/0,
topictab/0,
topic/0,
subopts/0
]
}
">>).
).
-define(CONF_DEFAULT, <<
"\n"
"exhook {\n"
" servers =\n"
" [ { name = default,\n"
" url = \"http://127.0.0.1:9000\"\n"
" }\n"
" ]\n"
"}\n"
>>).
-define(ALL(Vars, Types, Exprs),
?SETUP(fun() ->
?SETUP(
fun() ->
State = do_setup(),
fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))).
end,
?FORALL(Vars, Types, Exprs)
)
).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_client_connect() ->
?ALL({ConnInfo, ConnProps},
?ALL(
{ConnInfo, ConnProps},
{conninfo(), conn_properties()},
begin
ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
{'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(ConnProps),
#{
props => properties(ConnProps),
conninfo => from_conninfo(ConnInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_client_connack() ->
?ALL({ConnInfo, Rc, AckProps},
?ALL(
{ConnInfo, Rc, AckProps},
{conninfo(), connack_return_code(), ack_properties()},
begin
ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
{'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(AckProps),
#{
props => properties(AckProps),
result_code => atom_to_binary(Rc, utf8),
conninfo => from_conninfo(ConnInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_client_authenticate() ->
?ALL({ClientInfo0, AuthResult},
?ALL(
{ClientInfo0, AuthResult},
{clientinfo(), authresult()},
begin
ClientInfo = inject_magic_into(username, ClientInfo0),
OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
ExpectedAuthResult = case maps:get(username, ClientInfo) of
<<"baduser">> -> {error, not_authorized};
<<"gooduser">> -> ok;
<<"normaluser">> -> ok;
_ -> case AuthResult of
ExpectedAuthResult =
case maps:get(username, ClientInfo) of
<<"baduser">> ->
{error, not_authorized};
<<"gooduser">> ->
ok;
<<"normaluser">> ->
ok;
_ ->
case AuthResult of
ok -> ok;
_ -> {error, not_authorized}
end
@ -99,24 +118,28 @@ prop_client_authenticate() ->
{'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{result => authresult_to_bool(AuthResult),
#{
result => authresult_to_bool(AuthResult),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_client_authorize() ->
?ALL({ClientInfo0, PubSub, Topic, Result},
{clientinfo(), oneof([publish, subscribe]),
topic(), oneof([allow, deny])},
?ALL(
{ClientInfo0, PubSub, Topic, Result},
{clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny])},
begin
ClientInfo = inject_magic_into(username, ClientInfo0),
OutResult = emqx_hooks:run_fold(
'client.authorize',
[ClientInfo, PubSub, Topic],
Result),
ExpectedOutResult = case maps:get(username, ClientInfo) of
Result
),
ExpectedOutResult =
case maps:get(username, ClientInfo) of
<<"baduser">> -> deny;
<<"gooduser">> -> allow;
<<"normaluser">> -> allow;
@ -126,192 +149,231 @@ prop_client_authorize() ->
{'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{result => aclresult_to_bool(Result),
#{
result => aclresult_to_bool(Result),
type => pubsub_to_enum(PubSub),
topic => Topic,
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_client_connected() ->
?ALL({ClientInfo, ConnInfo},
?ALL(
{ClientInfo, ConnInfo},
{clientinfo(), conninfo()},
begin
ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
{'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
#{clientinfo => from_clientinfo(ClientInfo)},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_client_disconnected() ->
?ALL({ClientInfo, Reason, ConnInfo},
?ALL(
{ClientInfo, Reason, ConnInfo},
{clientinfo(), shutdown_reason(), conninfo()},
begin
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
{'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{reason => stringfy(Reason),
#{
reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_client_subscribe() ->
?ALL({ClientInfo, SubProps, TopicTab},
?ALL(
{ClientInfo, SubProps, TopicTab},
{clientinfo(), sub_properties(), topictab()},
begin
ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
{'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(SubProps),
#{
props => properties(SubProps),
topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_client_unsubscribe() ->
?ALL({ClientInfo, UnSubProps, TopicTab},
?ALL(
{ClientInfo, UnSubProps, TopicTab},
{clientinfo(), unsub_properties(), topictab()},
begin
ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
{'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(UnSubProps),
#{
props => properties(UnSubProps),
topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_session_created() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
?ALL(
{ClientInfo, SessInfo},
{clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
{'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
#{clientinfo => from_clientinfo(ClientInfo)},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_session_subscribed() ->
?ALL({ClientInfo, Topic, SubOpts},
?ALL(
{ClientInfo, Topic, SubOpts},
{clientinfo(), topic(), subopts()},
begin
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{topic => Topic,
#{
topic => Topic,
subopts => subopts(SubOpts),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_session_unsubscribed() ->
?ALL({ClientInfo, Topic, SubOpts},
?ALL(
{ClientInfo, Topic, SubOpts},
{clientinfo(), topic(), subopts()},
begin
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{topic => Topic,
#{
topic => Topic,
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_session_resumed() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
?ALL(
{ClientInfo, SessInfo},
{clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
{'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
#{clientinfo => from_clientinfo(ClientInfo)},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_session_discared() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
?ALL(
{ClientInfo, SessInfo},
{clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
{'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
#{clientinfo => from_clientinfo(ClientInfo)},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_session_takenover() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
?ALL(
{ClientInfo, SessInfo},
{clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.takenover', [ClientInfo, SessInfo]),
{'on_session_takenover', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
#{clientinfo => from_clientinfo(ClientInfo)},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_session_terminated() ->
?ALL({ClientInfo, Reason, SessInfo},
?ALL(
{ClientInfo, Reason, SessInfo},
{clientinfo(), shutdown_reason(), sessioninfo()},
begin
ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
{'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{reason => stringfy(Reason),
#{
reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
end
).
prop_message_publish() ->
?ALL(Msg0, message(),
?ALL(
Msg0,
message(),
begin
Msg = emqx_message:from_map(
inject_magic_into(from, emqx_message:to_map(Msg0))),
inject_magic_into(from, emqx_message:to_map(Msg0))
),
OutMsg = emqx_hooks:run_fold('message.publish', [], Msg),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true ->
?assertEqual(Msg, OutMsg),
skip;
_ ->
ExpectedOutMsg = case emqx_message:from(Msg) of
ExpectedOutMsg =
case emqx_message:from(Msg) of
<<"baduser">> ->
MsgMap = #{headers := Headers}
= emqx_message:to_map(Msg),
MsgMap =
#{headers := Headers} =
emqx_message:to_map(Msg),
emqx_message:from_map(
MsgMap#{qos => 0,
MsgMap#{
qos => 0,
topic => <<"">>,
payload => <<"">>,
headers => maps:put(allow_publish, false, Headers)
});
}
);
<<"gooduser">> = From ->
MsgMap = #{headers := Headers}
= emqx_message:to_map(Msg),
MsgMap =
#{headers := Headers} =
emqx_message:to_map(Msg),
emqx_message:from_map(
MsgMap#{topic => From,
MsgMap#{
topic => From,
payload => From,
headers => maps:put(allow_publish, true, Headers)
});
}
);
_ ->
Msg
end,
@ -319,63 +381,78 @@ prop_message_publish() ->
{'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{message => from_message(Msg)
},
#{message => from_message(Msg)},
?assertEqual(Expected, Resp)
end,
true
end).
end
).
prop_message_dropped() ->
?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()},
?ALL(
{Msg, By, Reason},
{message(), hardcoded, shutdown_reason()},
begin
ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true -> skip;
true ->
skip;
_ ->
{'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{reason => stringfy(Reason),
#{
reason => stringfy(Reason),
message => from_message(Msg)
},
?assertEqual(Expected, Resp)
end,
true
end).
end
).
prop_message_delivered() ->
?ALL({ClientInfo, Msg}, {clientinfo(), message()},
?ALL(
{ClientInfo, Msg},
{clientinfo(), message()},
begin
ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true -> skip;
true ->
skip;
_ ->
{'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo),
#{
clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg)
},
?assertEqual(Expected, Resp)
end,
true
end).
end
).
prop_message_acked() ->
?ALL({ClientInfo, Msg}, {clientinfo(), message()},
?ALL(
{ClientInfo, Msg},
{clientinfo(), message()},
begin
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true -> skip;
true ->
skip;
_ ->
{'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo),
#{
clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg)
},
?assertEqual(Expected, Resp)
end,
true
end).
end
).
nodestr() ->
stringfy(node()).
@ -396,12 +473,22 @@ ntoa(IP) ->
maybe(undefined) -> <<>>;
maybe(B) -> B.
properties(undefined) -> [];
properties(undefined) ->
[];
properties(M) when is_map(M) ->
maps:fold(fun(K, V, Acc) ->
[#{name => stringfy(K),
value => stringfy(V)} | Acc]
end, [], M).
maps:fold(
fun(K, V, Acc) ->
[
#{
name => stringfy(K),
value => stringfy(V)
}
| Acc
]
end,
[],
M
).
topicfilters(Tfs) when is_list(Tfs) ->
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
@ -417,7 +504,8 @@ stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
subopts(SubOpts) ->
#{qos => maps:get(qos, SubOpts, 0),
#{
qos => maps:get(qos, SubOpts, 0),
rh => maps:get(rh, SubOpts, 0),
rap => maps:get(rap, SubOpts, 0),
nl => maps:get(nl, SubOpts, 0),
@ -434,7 +522,8 @@ pubsub_to_enum(publish) -> 'PUBLISH';
pubsub_to_enum(subscribe) -> 'SUBSCRIBE'.
from_conninfo(ConnInfo) ->
#{node => nodestr(),
#{
node => nodestr(),
clientid => maps:get(clientid, ConnInfo),
username => maybe(maps:get(username, ConnInfo, <<>>)),
peerhost => peerhost(ConnInfo),
@ -445,7 +534,8 @@ from_conninfo(ConnInfo) ->
}.
from_clientinfo(ClientInfo) ->
#{node => nodestr(),
#{
node => nodestr(),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo, <<>>)),
password => maybe(maps:get(password, ClientInfo, <<>>)),
@ -460,7 +550,8 @@ from_clientinfo(ClientInfo) ->
}.
from_message(Msg) ->
#{node => nodestr(),
#{
node => nodestr(),
id => emqx_guid:to_hexstr(emqx_message:id(Msg)),
qos => emqx_message:qos(Msg),
from => stringfy(emqx_message:from(Msg)),
@ -468,7 +559,8 @@ from_message(Msg) ->
payload => emqx_message:payload(Msg),
timestamp => emqx_message:timestamp(Msg),
headers => emqx_exhook_handler:headers(
emqx_message:get_headers(Msg))
emqx_message:get_headers(Msg)
)
}.
%%--------------------------------------------------------------------
@ -513,17 +605,19 @@ shutdown_reason() ->
oneof([utf8(), {shutdown, emqx_proper_types:limited_atom()}]).
authresult() ->
?LET(RC, connack_return_code(),
?LET(
RC,
connack_return_code(),
case RC of
success -> ok;
_ -> {error, RC}
end).
end
).
inject_magic_into(Key, Object) ->
case castspell() of
muggles -> Object;
Spell ->
Object#{Key => Spell}
Spell -> Object#{Key => Spell}
end.
castspell() ->