feat(resource): add metrics to emqx_resource
This commit is contained in:
parent
4c149f92c1
commit
29ad6d215e
|
@ -21,7 +21,6 @@
|
||||||
-export([post_config_update/5]).
|
-export([post_config_update/5]).
|
||||||
|
|
||||||
-export([ load_hook/0
|
-export([ load_hook/0
|
||||||
, reload_hook/0
|
|
||||||
, unload_hook/0
|
, unload_hook/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -55,22 +54,21 @@
|
||||||
-export([ config_key_path/0
|
-export([ config_key_path/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
reload_hook() ->
|
|
||||||
unload_hook(),
|
|
||||||
load_hook().
|
|
||||||
|
|
||||||
load_hook() ->
|
load_hook() ->
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
|
load_hook(Bridges).
|
||||||
|
|
||||||
|
load_hook(Bridges) ->
|
||||||
lists:foreach(fun({_Type, Bridge}) ->
|
lists:foreach(fun({_Type, Bridge}) ->
|
||||||
lists:foreach(fun({_Name, BridgeConf}) ->
|
lists:foreach(fun({_Name, BridgeConf}) ->
|
||||||
load_hook(BridgeConf)
|
do_load_hook(BridgeConf)
|
||||||
end, maps:to_list(Bridge))
|
end, maps:to_list(Bridge))
|
||||||
end, maps:to_list(Bridges)).
|
end, maps:to_list(Bridges)).
|
||||||
|
|
||||||
load_hook(#{from_local_topic := _}) ->
|
do_load_hook(#{from_local_topic := _}) ->
|
||||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
|
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
|
||||||
ok;
|
ok;
|
||||||
load_hook(_Conf) -> ok.
|
do_load_hook(_Conf) -> ok.
|
||||||
|
|
||||||
unload_hook() ->
|
unload_hook() ->
|
||||||
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
|
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
|
||||||
|
@ -109,7 +107,8 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
|
||||||
{fun create/3, Added},
|
{fun create/3, Added},
|
||||||
{fun update/3, Updated}
|
{fun update/3, Updated}
|
||||||
]),
|
]),
|
||||||
ok = reload_hook(),
|
ok = unload_hook(),
|
||||||
|
ok = load_hook(NewConf),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
perform_bridge_changes(Tasks) ->
|
perform_bridge_changes(Tasks) ->
|
||||||
|
|
|
@ -36,21 +36,21 @@
|
||||||
". Bridge Ids must be of format <bridge_type>:<name>">>}}
|
". Bridge Ids must be of format <bridge_type>:<name>">>}}
|
||||||
end).
|
end).
|
||||||
|
|
||||||
-define(METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
|
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
|
||||||
#{
|
#{ matched => MATCH,
|
||||||
success => SUCC,
|
success => SUCC,
|
||||||
failed => FAILED,
|
failed => FAILED,
|
||||||
rate => RATE,
|
speed => RATE,
|
||||||
rate_last5m => RATE_5,
|
speed_last5m => RATE_5,
|
||||||
rate_max => RATE_MAX
|
speed_max => RATE_MAX
|
||||||
}).
|
}).
|
||||||
-define(MATCH_METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
|
-define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
|
||||||
#{
|
#{ matched := MATCH,
|
||||||
success := SUCC,
|
success := SUCC,
|
||||||
failed := FAILED,
|
failed := FAILED,
|
||||||
rate := RATE,
|
speed := RATE,
|
||||||
rate_last5m := RATE_5,
|
speed_last5m := RATE_5,
|
||||||
rate_max := RATE_MAX
|
speed_max := RATE_MAX
|
||||||
}).
|
}).
|
||||||
|
|
||||||
req_schema() ->
|
req_schema() ->
|
||||||
|
@ -73,11 +73,12 @@ status_schema() ->
|
||||||
metrics_schema() ->
|
metrics_schema() ->
|
||||||
#{ type => object
|
#{ type => object
|
||||||
, properties => #{
|
, properties => #{
|
||||||
|
matched => #{type => integer, example => "0"},
|
||||||
success => #{type => integer, example => "0"},
|
success => #{type => integer, example => "0"},
|
||||||
failed => #{type => integer, example => "0"},
|
failed => #{type => integer, example => "0"},
|
||||||
rate => #{type => number, format => float, example => "0.0"},
|
speed => #{type => number, format => float, example => "0.0"},
|
||||||
rate_last5m => #{type => number, format => float, example => "0.0"},
|
speed_last5m => #{type => number, format => float, example => "0.0"},
|
||||||
rate_max => #{type => number, format => float, example => "0.0"}
|
speed_max => #{type => number, format => float, example => "0.0"}
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -337,21 +338,22 @@ collect_metrics(Bridges) ->
|
||||||
[maps:with([node, metrics], B) || B <- Bridges].
|
[maps:with([node, metrics], B) || B <- Bridges].
|
||||||
|
|
||||||
aggregate_metrics(AllMetrics) ->
|
aggregate_metrics(AllMetrics) ->
|
||||||
InitMetrics = ?METRICS(0,0,0,0,0),
|
InitMetrics = ?METRICS(0,0,0,0,0,0),
|
||||||
lists:foldl(fun(#{metrics := ?MATCH_METRICS(Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
|
lists:foldl(fun(#{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
|
||||||
?MATCH_METRICS(Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
|
?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
|
||||||
?METRICS(Succ1 + Succ0, Failed1 + Failed0,
|
?METRICS(Match1 + Match0, Succ1 + Succ0, Failed1 + Failed0,
|
||||||
Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
|
Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
|
||||||
end, InitMetrics, AllMetrics).
|
end, InitMetrics, AllMetrics).
|
||||||
|
|
||||||
format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) ->
|
format_resp(#{id := Id, raw_config := RawConf,
|
||||||
|
resource_data := #{mod := Mod, status := Status, metrics := Metrics}}) ->
|
||||||
IsConnected = fun(started) -> connected; (_) -> disconnected end,
|
IsConnected = fun(started) -> connected; (_) -> disconnected end,
|
||||||
RawConf#{
|
RawConf#{
|
||||||
id => Id,
|
id => Id,
|
||||||
node => node(),
|
node => node(),
|
||||||
bridge_type => emqx_bridge:bridge_type(Mod),
|
bridge_type => emqx_bridge:bridge_type(Mod),
|
||||||
status => IsConnected(Status),
|
status => IsConnected(Status),
|
||||||
metrics => ?METRICS(0,0,0,0,0)
|
metrics => Metrics
|
||||||
}.
|
}.
|
||||||
|
|
||||||
rpc_multicall(Func, Args) ->
|
rpc_multicall(Func, Args) ->
|
||||||
|
|
|
@ -127,10 +127,11 @@ on_stop(_InstId, #{name := InstanceId}) ->
|
||||||
connector => InstanceId, reason => Reason})
|
connector => InstanceId, reason => Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_query(_InstId, {send_message, Msg}, _AfterQuery, #{name := InstanceId}) ->
|
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
|
||||||
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
|
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
|
||||||
connector => InstanceId}),
|
connector => InstanceId}),
|
||||||
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg).
|
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
|
||||||
|
emqx_resource:query_success(AfterQuery).
|
||||||
|
|
||||||
on_health_check(_InstId, #{name := InstanceId} = State) ->
|
on_health_check(_InstId, #{name := InstanceId} = State) ->
|
||||||
case emqx_connector_mqtt_worker:ping(InstanceId) of
|
case emqx_connector_mqtt_worker:ping(InstanceId) of
|
||||||
|
|
|
@ -25,7 +25,8 @@
|
||||||
-define(CONF_DEFAULT, <<"connectors: {}">>).
|
-define(CONF_DEFAULT, <<"connectors: {}">>).
|
||||||
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
|
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
|
||||||
-define(CONNECTR_ID, <<"mqtt:test_connector">>).
|
-define(CONNECTR_ID, <<"mqtt:test_connector">>).
|
||||||
-define(BRIDGE_ID, <<"mqtt:test_bridge">>).
|
-define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
|
||||||
|
-define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
|
||||||
-define(MQTT_CONNECOTR(Username),
|
-define(MQTT_CONNECOTR(Username),
|
||||||
#{
|
#{
|
||||||
<<"server">> => <<"127.0.0.1:1883">>,
|
<<"server">> => <<"127.0.0.1:1883">>,
|
||||||
|
@ -37,7 +38,7 @@
|
||||||
-define(MQTT_CONNECOTR2(Server),
|
-define(MQTT_CONNECOTR2(Server),
|
||||||
?MQTT_CONNECOTR(<<"user1">>)#{<<"server">> => Server}).
|
?MQTT_CONNECOTR(<<"user1">>)#{<<"server">> => Server}).
|
||||||
|
|
||||||
-define(MQTT_BRIDGE(ID),
|
-define(MQTT_BRIDGE_INGRESS(ID),
|
||||||
#{
|
#{
|
||||||
<<"connector">> => ID,
|
<<"connector">> => ID,
|
||||||
<<"direction">> => <<"ingress">>,
|
<<"direction">> => <<"ingress">>,
|
||||||
|
@ -49,6 +50,22 @@
|
||||||
<<"retain">> => <<"${retain}">>
|
<<"retain">> => <<"${retain}">>
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(MQTT_BRIDGE_EGRESS(ID),
|
||||||
|
#{
|
||||||
|
<<"connector">> => ID,
|
||||||
|
<<"direction">> => <<"egress">>,
|
||||||
|
<<"from_local_topic">> => <<"local_topic/#">>,
|
||||||
|
<<"to_remote_topic">> => <<"remote_topic/${topic}">>,
|
||||||
|
<<"payload">> => <<"${payload}">>,
|
||||||
|
<<"qos">> => <<"${qos}">>,
|
||||||
|
<<"retain">> => <<"${retain}">>
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX),
|
||||||
|
#{<<"matched">> := MATCH, <<"success">> := SUCC,
|
||||||
|
<<"failed">> := FAILED, <<"speed">> := SPEED,
|
||||||
|
<<"speed_last5m">> := SPEED5M, <<"speed_max">> := SPEEDMAX}).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
@ -162,7 +179,7 @@ t_mqtt_crud_apis(_) ->
|
||||||
}, jsx:decode(ErrMsg2)),
|
}, jsx:decode(ErrMsg2)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_mqtt_conn_bridge(_) ->
|
t_mqtt_conn_bridge_ingress(_) ->
|
||||||
%% assert we there's no connectors and no bridges at first
|
%% assert we there's no connectors and no bridges at first
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
@ -184,10 +201,10 @@ t_mqtt_conn_bridge(_) ->
|
||||||
%% ... and a MQTT bridge, using POST
|
%% ... and a MQTT bridge, using POST
|
||||||
%% we bind this bridge to the connector created just now
|
%% we bind this bridge to the connector created just now
|
||||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||||
?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
|
?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_INGRESS}),
|
||||||
|
|
||||||
%ct:pal("---bridge: ~p", [Bridge]),
|
%ct:pal("---bridge: ~p", [Bridge]),
|
||||||
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
|
||||||
, <<"bridge_type">> := <<"mqtt">>
|
, <<"bridge_type">> := <<"mqtt">>
|
||||||
, <<"status">> := <<"connected">>
|
, <<"status">> := <<"connected">>
|
||||||
, <<"connector">> := ?CONNECTR_ID
|
, <<"connector">> := ?CONNECTR_ID
|
||||||
|
@ -217,7 +234,77 @@ t_mqtt_conn_bridge(_) ->
|
||||||
end),
|
end),
|
||||||
|
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []),
|
||||||
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
|
||||||
|
%% delete the connector
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
|
||||||
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_mqtt_conn_bridge_egress(_) ->
|
||||||
|
%% assert we there's no connectors and no bridges at first
|
||||||
|
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
||||||
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
|
||||||
|
%% then we add a mqtt connector, using POST
|
||||||
|
User1 = <<"user1">>,
|
||||||
|
{ok, 201, Connector} = request(post, uri(["connectors"]),
|
||||||
|
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
|
||||||
|
|
||||||
|
%ct:pal("---connector: ~p", [Connector]),
|
||||||
|
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
|
||||||
|
, <<"server">> := <<"127.0.0.1:1883">>
|
||||||
|
, <<"username">> := User1
|
||||||
|
, <<"password">> := <<"">>
|
||||||
|
, <<"proto_ver">> := <<"v4">>
|
||||||
|
, <<"ssl">> := #{<<"enable">> := false}
|
||||||
|
}, jsx:decode(Connector)),
|
||||||
|
|
||||||
|
%% ... and a MQTT bridge, using POST
|
||||||
|
%% we bind this bridge to the connector created just now
|
||||||
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||||
|
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
|
||||||
|
|
||||||
|
%ct:pal("---bridge: ~p", [Bridge]),
|
||||||
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
||||||
|
, <<"bridge_type">> := <<"mqtt">>
|
||||||
|
, <<"status">> := <<"connected">>
|
||||||
|
, <<"connector">> := ?CONNECTR_ID
|
||||||
|
}, jsx:decode(Bridge)),
|
||||||
|
|
||||||
|
%% we now test if the bridge works as expected
|
||||||
|
LocalTopic = <<"local_topic/1">>,
|
||||||
|
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
emqx:subscribe(RemoteTopic),
|
||||||
|
%% PUBLISH a message to the 'local' broker, as we have only one broker,
|
||||||
|
%% the remote broker is also the local one.
|
||||||
|
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||||
|
|
||||||
|
%% we should receive a message on the "remote" broker, with specified topic
|
||||||
|
?assert(
|
||||||
|
receive
|
||||||
|
{deliver, RemoteTopic, #message{payload = Payload}} ->
|
||||||
|
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
|
||||||
|
true;
|
||||||
|
Msg ->
|
||||||
|
ct:pal("Msg: ~p", [Msg]),
|
||||||
|
false
|
||||||
|
after 100 ->
|
||||||
|
false
|
||||||
|
end),
|
||||||
|
|
||||||
|
%% verify the metrics of the bridge
|
||||||
|
{ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
||||||
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
||||||
|
, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
|
||||||
|
, <<"node_metrics">> :=
|
||||||
|
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
|
||||||
|
}, jsx:decode(BridgeStr)),
|
||||||
|
|
||||||
|
%% delete the bridge
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
|
||||||
%% delete the connector
|
%% delete the connector
|
||||||
|
@ -245,8 +332,8 @@ t_mqtt_conn_update(_) ->
|
||||||
%% ... and a MQTT bridge, using POST
|
%% ... and a MQTT bridge, using POST
|
||||||
%% we bind this bridge to the connector created just now
|
%% we bind this bridge to the connector created just now
|
||||||
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
||||||
?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
|
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}),
|
||||||
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
|
||||||
, <<"bridge_type">> := <<"mqtt">>
|
, <<"bridge_type">> := <<"mqtt">>
|
||||||
, <<"status">> := <<"connected">>
|
, <<"status">> := <<"connected">>
|
||||||
, <<"connector">> := ?CONNECTR_ID
|
, <<"connector">> := ?CONNECTR_ID
|
||||||
|
@ -260,7 +347,7 @@ t_mqtt_conn_update(_) ->
|
||||||
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
|
||||||
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
|
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
|
||||||
%% delete the bridge
|
%% delete the bridge
|
||||||
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
|
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||||
|
|
||||||
%% delete the connector
|
%% delete the connector
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
status => started | stopped
|
status => started | stopped
|
||||||
}.
|
}.
|
||||||
-type resource_group() :: binary().
|
-type resource_group() :: binary().
|
||||||
-type after_query() :: {OnSuccess :: after_query_fun(), OnFailed :: after_query_fun()} |
|
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
|
%% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback
|
||||||
|
|
|
@ -122,13 +122,18 @@ is_resource_mod(Module) ->
|
||||||
|
|
||||||
-spec query_success(after_query()) -> ok.
|
-spec query_success(after_query()) -> ok.
|
||||||
query_success(undefined) -> ok;
|
query_success(undefined) -> ok;
|
||||||
query_success({{OnSucc, Args}, _}) ->
|
query_success({OnSucc, _}) ->
|
||||||
safe_apply(OnSucc, Args).
|
apply_query_after_calls(OnSucc).
|
||||||
|
|
||||||
-spec query_failed(after_query()) -> ok.
|
-spec query_failed(after_query()) -> ok.
|
||||||
query_failed(undefined) -> ok;
|
query_failed(undefined) -> ok;
|
||||||
query_failed({_, {OnFailed, Args}}) ->
|
query_failed({_, OnFailed}) ->
|
||||||
safe_apply(OnFailed, Args).
|
apply_query_after_calls(OnFailed).
|
||||||
|
|
||||||
|
apply_query_after_calls(Funcs) ->
|
||||||
|
lists:foreach(fun({Fun, Args}) ->
|
||||||
|
safe_apply(Fun, Args)
|
||||||
|
end, Funcs).
|
||||||
|
|
||||||
%% =================================================================================
|
%% =================================================================================
|
||||||
%% APIs for resource instances
|
%% APIs for resource instances
|
||||||
|
@ -175,7 +180,7 @@ remove_local(InstId) ->
|
||||||
%% =================================================================================
|
%% =================================================================================
|
||||||
-spec query(instance_id(), Request :: term()) -> Result :: term().
|
-spec query(instance_id(), Request :: term()) -> Result :: term().
|
||||||
query(InstId, Request) ->
|
query(InstId, Request) ->
|
||||||
query(InstId, Request, undefined).
|
query(InstId, Request, inc_metrics_funcs(InstId)).
|
||||||
|
|
||||||
%% same to above, also defines what to do when the Module:on_query success or failed
|
%% same to above, also defines what to do when the Module:on_query success or failed
|
||||||
%% it is the duty of the Module to apply the `after_query()` functions.
|
%% it is the duty of the Module to apply the `after_query()` functions.
|
||||||
|
@ -321,6 +326,13 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
|
||||||
filter_instances(Filter) ->
|
filter_instances(Filter) ->
|
||||||
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
|
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
|
||||||
|
|
||||||
|
inc_metrics_funcs(InstId) ->
|
||||||
|
OnFailed = [{fun emqx_plugin_libs_metrics:inc_failed/2, [resource_metrics, InstId]}],
|
||||||
|
OnSucc = [ {fun emqx_plugin_libs_metrics:inc_matched/2, [resource_metrics, InstId]}
|
||||||
|
, {fun emqx_plugin_libs_metrics:inc_success/2, [resource_metrics, InstId]}
|
||||||
|
],
|
||||||
|
{OnSucc, OnFailed}.
|
||||||
|
|
||||||
call_instance(InstId, Query) ->
|
call_instance(InstId, Query) ->
|
||||||
emqx_resource_instance:hash_call(InstId, Query).
|
emqx_resource_instance:hash_call(InstId, Query).
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
%% load resource instances from *.conf files
|
%% load resource instances from *.conf files
|
||||||
-export([ lookup/1
|
-export([ lookup/1
|
||||||
|
, get_metrics/1
|
||||||
, list_all/0
|
, list_all/0
|
||||||
, create_local/3
|
, create_local/3
|
||||||
]).
|
]).
|
||||||
|
@ -65,9 +66,13 @@ hash_call(InstId, Request, Timeout) ->
|
||||||
lookup(InstId) ->
|
lookup(InstId) ->
|
||||||
case ets:lookup(emqx_resource_instance, InstId) of
|
case ets:lookup(emqx_resource_instance, InstId) of
|
||||||
[] -> {error, not_found};
|
[] -> {error, not_found};
|
||||||
[{_, Data}] -> {ok, Data#{id => InstId}}
|
[{_, Data}] ->
|
||||||
|
{ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_metrics(InstId) ->
|
||||||
|
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
|
||||||
|
|
||||||
force_lookup(InstId) ->
|
force_lookup(InstId) ->
|
||||||
{ok, Data} = lookup(InstId),
|
{ok, Data} = lookup(InstId),
|
||||||
Data.
|
Data.
|
||||||
|
@ -174,6 +179,7 @@ do_create(InstId, ResourceType, Config) ->
|
||||||
#{mod => ResourceType, config => Config,
|
#{mod => ResourceType, config => Config,
|
||||||
state => ResourceState, status => stopped}}),
|
state => ResourceState, status => stopped}}),
|
||||||
_ = do_health_check(InstId),
|
_ = do_health_check(InstId),
|
||||||
|
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId),
|
||||||
{ok, force_lookup(InstId)};
|
{ok, force_lookup(InstId)};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
logger:error("start ~ts resource ~ts failed: ~p",
|
logger:error("start ~ts resource ~ts failed: ~p",
|
||||||
|
@ -207,6 +213,7 @@ do_remove(InstId) ->
|
||||||
do_remove(Mod, InstId, ResourceState) ->
|
do_remove(Mod, InstId, ResourceState) ->
|
||||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
||||||
ets:delete(emqx_resource_instance, InstId),
|
ets:delete(emqx_resource_instance, InstId),
|
||||||
|
ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_restart(InstId) ->
|
do_restart(InstId) ->
|
||||||
|
|
|
@ -32,17 +32,20 @@ init([]) ->
|
||||||
_ = ets:new(emqx_resource_instance, TabOpts),
|
_ = ets:new(emqx_resource_instance, TabOpts),
|
||||||
|
|
||||||
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
|
SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
|
||||||
|
Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics),
|
||||||
|
|
||||||
Pool = ?RESOURCE_INST_MOD,
|
Pool = ?RESOURCE_INST_MOD,
|
||||||
Mod = ?RESOURCE_INST_MOD,
|
Mod = ?RESOURCE_INST_MOD,
|
||||||
ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]),
|
ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]),
|
||||||
{ok, {SupFlags, [
|
ResourceInsts = [
|
||||||
begin
|
begin
|
||||||
ensure_pool_worker(Pool, {Pool, Idx}, Idx),
|
ensure_pool_worker(Pool, {Pool, Idx}, Idx),
|
||||||
#{id => {Mod, Idx},
|
#{id => {Mod, Idx},
|
||||||
start => {Mod, start_link, [Pool, Idx]},
|
start => {Mod, start_link, [Pool, Idx]},
|
||||||
restart => transient,
|
restart => transient,
|
||||||
shutdown => 5000, type => worker, modules => [Mod]}
|
shutdown => 5000, type => worker, modules => [Mod]}
|
||||||
end || Idx <- lists:seq(1, ?POOL_SIZE)]}}.
|
end || Idx <- lists:seq(1, ?POOL_SIZE)],
|
||||||
|
{ok, {SupFlags, [Metrics | ResourceInsts]}}.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
ensure_pool(Pool, Type, Opts) ->
|
ensure_pool(Pool, Type, Opts) ->
|
||||||
|
|
Loading…
Reference in New Issue