diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 0957e3c18..66682caeb 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -133,9 +133,9 @@ drop_bridge(Name) -> %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received will be called %% if the bridge received msgs from the remote broker. -on_message_received(Msg, HookPoint, InstId) -> - emqx_resource:inc_matched(InstId), - emqx_resource:inc_success(InstId), +on_message_received(Msg, HookPoint, ResId) -> + emqx_resource:inc_matched(ResId), + emqx_resource:inc_success(ResId), emqx:run_hook(HookPoint, [Msg]). %% =================================================================== @@ -206,11 +206,12 @@ make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 -> make_sub_confs(undefined, _) -> undefined; make_sub_confs(SubRemoteConf, InstId) -> + ResId = emqx_resource_manager:manager_id_to_resource_id(InstId), case maps:take(hookpoint, SubRemoteConf) of error -> SubRemoteConf; {HookPoint, SubConf} -> - MFA = {?MODULE, on_message_received, [HookPoint, InstId]}, + MFA = {?MODULE, on_message_received, [HookPoint, ResId]}, SubConf#{on_message_received => MFA} end. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index b5bcbd330..7dad85d5b 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -38,8 +38,12 @@ list_group/1, ets_lookup/1, get_metrics/1, - reset_metrics/1, - set_resource_status_connecting/1 + reset_metrics/1 +]). + +-export([ + set_resource_status_connecting/1, + manager_id_to_resource_id/1 ]). % Server @@ -64,6 +68,13 @@ %% API %%------------------------------------------------------------------------------ +make_manager_id(ResId) -> + emqx_resource:generate_id(ResId). + +manager_id_to_resource_id(MgrId) -> + [ResId, _Index] = string:split(MgrId, ":", trailing), + ResId. + %% @doc Called from emqx_resource when starting a resource instance. %% %% Triggers the emqx_resource_manager_sup supervisor to actually create @@ -455,9 +466,6 @@ stop_resource(Data) -> _ = maybe_clear_alarm(Data#data.id), ok. -make_manager_id(ResId) -> - emqx_resource:generate_id(ResId). - make_test_id() -> RandId = iolist_to_binary(emqx_misc:gen_id(16)), <>.