Merge pull request #9264 from terry-xiaoyu/message_acked_hook_for_coap

fix: run the 'message.acked' hook when ACK received from CoAP devices
This commit is contained in:
Xinyu Liu 2022-10-31 15:20:11 +08:00 committed by GitHub
commit 91bc2403a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 91 additions and 23 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_coap, {application, emqx_coap,
[{description, "EMQ X CoAP Gateway"}, [{description, "EMQ X CoAP Gateway"},
{vsn, "4.3.1"}, % strict semver, bump manually! {vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,gen_coap]}, {applications, [kernel,stdlib,gen_coap]},

View File

@ -1,9 +1,15 @@
%% -*-: erlang -*- %% -*-: erlang -*-
{VSN, {VSN,
[{"4.3.0",[ [{<<"4\\.3\\.[0-1]">>,[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}], {<<".*">>, []}],
[{"4.3.0",[ [{<<"4\\.3\\.[0-1]">>,[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []},
{load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []},
{load_module, emqx_coap_resource, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}] {<<".*">>, []}]
}. }.

View File

@ -31,6 +31,9 @@
-export([ subscribe/2 -export([ subscribe/2
, unsubscribe/2 , unsubscribe/2
, publish/3 , publish/3
, received_puback/2
, message_payload/1
, message_topic/1
]). ]).
-export([ client_pid/4 -export([ client_pid/4
@ -95,6 +98,15 @@ unsubscribe(Pid, Topic) ->
publish(Pid, Topic, Payload) -> publish(Pid, Topic, Payload) ->
gen_server:call(Pid, {publish, Topic, Payload}). gen_server:call(Pid, {publish, Topic, Payload}).
received_puback(Pid, Msg) ->
gen_server:cast(Pid, {received_puback, Msg}).
message_payload(#message{payload = Payload}) ->
Payload.
message_topic(#message{topic = Topic}) ->
Topic.
%% For emqx_management plugin %% For emqx_management plugin
call(Pid, Msg) -> call(Pid, Msg) ->
call(Pid, Msg, infinity). call(Pid, Msg, infinity).
@ -172,13 +184,19 @@ handle_call(Request, _From, State) ->
?LOG(error, "adapter unexpected call ~p", [Request]), ?LOG(error, "adapter unexpected call ~p", [Request]),
{reply, ignored, State, hibernate}. {reply, ignored, State, hibernate}.
handle_cast({received_puback, Msg}, State) ->
%% NOTE: the counter named 'messages.acked', but the hook named 'message.acked'!
ok = emqx_metrics:inc('messages.acked'),
_ = emqx_hooks:run('message.acked', [conninfo(State), Msg]),
{noreply, State, hibernate};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(error, "broker_api unexpected cast ~p", [Msg]), ?LOG(error, "broker_api unexpected cast ~p", [Msg]),
{noreply, State, hibernate}. {noreply, State, hibernate}.
handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}}, handle_info({deliver, _Topic, #message{} = Msg},
State = #state{sub_topics = Subscribers}) -> State = #state{sub_topics = Subscribers}) ->
deliver([{Topic, Payload}], Subscribers), deliver([Msg], Subscribers),
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(check_alive, State = #state{sub_topics = []}) -> handle_info(check_alive, State = #state{sub_topics = []}) ->
@ -271,27 +289,25 @@ packet_to_message(Topic, Payload,
%% Deliver %% Deliver
deliver([], _) -> ok; deliver([], _) -> ok;
deliver([Pub | More], Subscribers) -> deliver([Msg | More], Subscribers) ->
ok = do_deliver(Pub, Subscribers), ok = do_deliver(Msg, Subscribers),
deliver(More, Subscribers). deliver(More, Subscribers).
do_deliver({Topic, Payload}, Subscribers) -> do_deliver(Msg, Subscribers) ->
%% handle PUBLISH packet from broker %% handle PUBLISH packet from broker
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]), ?LOG(debug, "deliver message from broker, msg: ~p", [Msg]),
deliver_to_coap(Topic, Payload, Subscribers), deliver_to_coap(Msg, Subscribers),
ok. ok.
deliver_to_coap(_TopicName, _Payload, []) -> deliver_to_coap(_Msg, []) ->
ok; ok;
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) -> deliver_to_coap(#message{topic = TopicName} = Msg, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
Matched = case IsWild of Matched = case IsWild of
true -> emqx_topic:match(TopicName, TopicFilter); true -> emqx_topic:match(TopicName, TopicFilter);
false -> TopicName =:= TopicFilter false -> TopicName =:= TopicFilter
end, end,
%?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p", Matched andalso (CoapPid ! {dispatch, Msg}),
% [Matched, CoapPid, TopicName, Payload, T]), deliver_to_coap(Msg, T).
Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
deliver_to_coap(TopicName, Payload, T).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper funcs %% Helper funcs
@ -328,12 +344,13 @@ chann_info(State) ->
will_msg => undefined will_msg => undefined
}. }.
conninfo(#state{peername = Peername, conninfo(#state{peername = {PeerHost, _} = Peername,
clientid = ClientId, clientid = ClientId,
connected_at = ConnectedAt}) -> connected_at = ConnectedAt}) ->
#{socktype => udp, #{socktype => udp,
sockname => {{127, 0, 0, 1}, 5683}, sockname => {{127, 0, 0, 1}, 5683},
peername => Peername, peername => Peername,
peerhost => PeerHost,
peercert => nossl, %% TODO: dtls peercert => nossl, %% TODO: dtls
conn_mod => ?MODULE, conn_mod => ?MODULE,
proto_name => <<"CoAP">>, proto_name => <<"CoAP">>,

View File

@ -138,16 +138,32 @@ coap_unobserve({state, ChId, Prefix, TopicPath}) ->
ok. ok.
handle_info({dispatch, Topic, Payload}, State) -> handle_info({dispatch, Topic, Payload}, State) ->
%% This clause should never be matched any more. We keep it here to handle
%% the old format messages during the release upgrade.
%% In this case the second function clause of `coap_ack/2` will be called,
%% and the ACKs is discarded.
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]), ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload), {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]), ?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State}; {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info({dispatch, Msg}, State) ->
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
Topic = emqx_coap_mqtt_adapter:message_topic(Msg),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) -> handle_info(Message, State) ->
?LOG(error, "Unknown Message ~p", [Message]), ?LOG(error, "Unknown Message ~p", [Message]),
{noreply, State}. {noreply, State}.
coap_ack(_Ref, State) -> {ok, State}. coap_ack({pub, Msg}, State) ->
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
{ok, State};
coap_ack(_Ref, State) ->
?LOG(debug, "received coap ack: ~p", [_Ref]),
{ok, State}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal Functions %% Internal Functions

View File

@ -104,12 +104,26 @@ coap_unobserve({state, ChId, Prefix, Topic}) ->
ok. ok.
handle_info({dispatch, Topic, Payload}, State) -> handle_info({dispatch, Topic, Payload}, State) ->
%% This clause should never be matched any more. We keep it here to handle
%% the old format messages during the release upgrade.
%% In this case the second function clause of `coap_ack/2` will be called,
%% and the ACKs is discarded.
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]), ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State}; {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info({dispatch, Msg}, State) ->
Payload = emqx_coap_mqtt_adapter:message_payload(Msg),
{notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) -> handle_info(Message, State) ->
emqx_coap_mqtt_adapter:handle_info(Message, State). emqx_coap_mqtt_adapter:handle_info(Message, State).
coap_ack(_Ref, State) -> {ok, State}. coap_ack({pub, Msg}, State) ->
?LOG(debug, "received coap ack for publish msg: ~p", [Msg]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:received_puback(Pid, Msg),
{ok, State};
coap_ack(_Ref, State) ->
?LOG(debug, "received coap ack: ~p", [_Ref]),
{ok, State}.
get_auth(Query) -> get_auth(Query) ->
get_auth(Query, #coap_mqtt_auth{}). get_auth(Query, #coap_mqtt_auth{}).

View File

@ -91,7 +91,7 @@ t_observe(_Config) ->
Topic = <<"abc">>, TopicStr = binary_to_list(Topic), Topic = <<"abc">>, TopicStr = binary_to_list(Topic),
Payload = <<"123">>, Payload = <<"123">>,
Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret", Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret",
{ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri), {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri),
?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]), ?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]),
[SubPid] = emqx:subscribers(Topic), [SubPid] = emqx:subscribers(Topic),
@ -195,12 +195,16 @@ t_one_clientid_sub_2_topics(_Config) ->
[SubPid] = emqx:subscribers(Topic2), [SubPid] = emqx:subscribers(Topic2),
?assert(is_pid(SubPid)), ?assert(is_pid(SubPid)),
CntrAcked1 = emqx_metrics:val('messages.acked'),
emqx:publish(emqx_message:make(Topic1, Payload1)), emqx:publish(emqx_message:make(Topic1, Payload1)),
Notif1 = receive_notification(), Notif1 = receive_notification(),
?LOGT("observer 1 get Notif=~p", [Notif1]), ?LOGT("observer 1 get Notif=~p", [Notif1]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1,
?assertEqual(Payload1, PayloadRecv1), ?assertEqual(Payload1, PayloadRecv1),
timer:sleep(100),
CntrAcked2 = emqx_metrics:val('messages.acked'),
?assertEqual(CntrAcked2, CntrAcked1 + 1),
emqx:publish(emqx_message:make(Topic2, Payload2)), emqx:publish(emqx_message:make(Topic2, Payload2)),
@ -208,6 +212,9 @@ t_one_clientid_sub_2_topics(_Config) ->
?LOGT("observer 2 get Notif=~p", [Notif2]), ?LOGT("observer 2 get Notif=~p", [Notif2]),
{coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2, {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2,
?assertEqual(Payload2, PayloadRecv2), ?assertEqual(Payload2, PayloadRecv2),
timer:sleep(100),
CntrAcked3 = emqx_metrics:val('messages.acked'),
?assertEqual(CntrAcked3, CntrAcked2 + 1),
er_coap_observer:stop(Pid1), er_coap_observer:stop(Pid1),
er_coap_observer:stop(Pid2). er_coap_observer:stop(Pid2).

View File

@ -2,6 +2,11 @@
## Enhancements ## Enhancements
- We now trigger the `'message.acked'` hook after the CoAP gateway sends a message to the device and receives the ACK from the device [#9264](https://github.com/emqx/emqx/pull/9264).
With this change, the CoAP gateway can be combined with the offline message caching function (in the
emqx enterprise), so that CoAP devices are able to read the missed messages from the database when
it is online again.
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239). - Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). - Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).

View File

@ -2,6 +2,9 @@
## 增强 ## 增强
- 当 CoAP 网关给设备投递消息并收到设备发来的确认之后,回调 `'message.acked'` 钩子 [#9264](https://github.com/emqx/emqx/pull/9264)。
有了这个改动CoAP 网关可以配合 EMQX (企业版)的离线消息缓存功能,让 CoAP 设备重新上线之后,从数据库读取其离线状态下错过的消息。
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。 - 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 - 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。