diff --git a/apps/emqx_coap/src/emqx_coap.app.src b/apps/emqx_coap/src/emqx_coap.app.src index dce5ef4f4..b4ea07898 100644 --- a/apps/emqx_coap/src/emqx_coap.app.src +++ b/apps/emqx_coap/src/emqx_coap.app.src @@ -1,6 +1,6 @@ {application, emqx_coap, [{description, "EMQ X CoAP Gateway"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gen_coap]}, diff --git a/apps/emqx_coap/src/emqx_coap.appup.src b/apps/emqx_coap/src/emqx_coap.appup.src index b73e26be6..1e766c45b 100644 --- a/apps/emqx_coap/src/emqx_coap.appup.src +++ b/apps/emqx_coap/src/emqx_coap.appup.src @@ -1,9 +1,15 @@ %% -*-: erlang -*- {VSN, - [{"4.3.0",[ - {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + [{<<"4\\.3\\.[0-1]">>,[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}, + {load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []}, + {load_module, emqx_coap_resource, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []}], - [{"4.3.0",[ - {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + [{<<"4\\.3\\.[0-1]">>,[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}, + {load_module, emqx_coap_pubsub_resource, brutal_purge, soft_purge, []}, + {load_module, emqx_coap_resource, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []}] }. diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index 63647de5d..c10efd655 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -31,6 +31,9 @@ -export([ subscribe/2 , unsubscribe/2 , publish/3 + , received_puback/2 + , message_payload/1 + , message_topic/1 ]). -export([ client_pid/4 @@ -95,6 +98,15 @@ unsubscribe(Pid, Topic) -> publish(Pid, Topic, Payload) -> gen_server:call(Pid, {publish, Topic, Payload}). +received_puback(Pid, Msg) -> + gen_server:cast(Pid, {received_puback, Msg}). + +message_payload(#message{payload = Payload}) -> + Payload. + +message_topic(#message{topic = Topic}) -> + Topic. + %% For emqx_management plugin call(Pid, Msg) -> call(Pid, Msg, infinity). @@ -172,13 +184,19 @@ handle_call(Request, _From, State) -> ?LOG(error, "adapter unexpected call ~p", [Request]), {reply, ignored, State, hibernate}. +handle_cast({received_puback, Msg}, State) -> + %% NOTE: the counter named 'messages.acked', but the hook named 'message.acked'! + ok = emqx_metrics:inc('messages.acked'), + _ = emqx_hooks:run('message.acked', [conninfo(State), Msg]), + {noreply, State, hibernate}; + handle_cast(Msg, State) -> ?LOG(error, "broker_api unexpected cast ~p", [Msg]), {noreply, State, hibernate}. -handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}}, +handle_info({deliver, _Topic, #message{} = Msg}, State = #state{sub_topics = Subscribers}) -> - deliver([{Topic, Payload}], Subscribers), + deliver([Msg], Subscribers), {noreply, State, hibernate}; handle_info(check_alive, State = #state{sub_topics = []}) -> @@ -271,27 +289,25 @@ packet_to_message(Topic, Payload, %% Deliver deliver([], _) -> ok; -deliver([Pub | More], Subscribers) -> - ok = do_deliver(Pub, Subscribers), +deliver([Msg | More], Subscribers) -> + ok = do_deliver(Msg, Subscribers), deliver(More, Subscribers). -do_deliver({Topic, Payload}, Subscribers) -> +do_deliver(Msg, Subscribers) -> %% handle PUBLISH packet from broker - ?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]), - deliver_to_coap(Topic, Payload, Subscribers), + ?LOG(debug, "deliver message from broker, msg: ~p", [Msg]), + deliver_to_coap(Msg, Subscribers), ok. -deliver_to_coap(_TopicName, _Payload, []) -> +deliver_to_coap(_Msg, []) -> ok; -deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) -> +deliver_to_coap(#message{topic = TopicName} = Msg, [{TopicFilter, {IsWild, CoapPid}} | T]) -> Matched = case IsWild of true -> emqx_topic:match(TopicName, TopicFilter); false -> TopicName =:= TopicFilter end, - %?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p", - % [Matched, CoapPid, TopicName, Payload, T]), - Matched andalso (CoapPid ! {dispatch, TopicName, Payload}), - deliver_to_coap(TopicName, Payload, T). + Matched andalso (CoapPid ! {dispatch, Msg}), + deliver_to_coap(Msg, T). %%-------------------------------------------------------------------- %% Helper funcs @@ -328,12 +344,13 @@ chann_info(State) -> will_msg => undefined }. -conninfo(#state{peername = Peername, +conninfo(#state{peername = {PeerHost, _} = Peername, clientid = ClientId, connected_at = ConnectedAt}) -> #{socktype => udp, sockname => {{127, 0, 0, 1}, 5683}, peername => Peername, + peerhost => PeerHost, peercert => nossl, %% TODO: dtls conn_mod => ?MODULE, proto_name => <<"CoAP">>, diff --git a/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl b/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl index 8ab33628b..445c8b708 100644 --- a/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl +++ b/apps/emqx_coap/src/emqx_coap_pubsub_resource.erl @@ -138,16 +138,32 @@ coap_unobserve({state, ChId, Prefix, TopicPath}) -> ok. handle_info({dispatch, Topic, Payload}, State) -> + %% This clause should never be matched any more. We keep it here to handle + %% the old format messages during the release upgrade. + %% In this case the second function clause of `coap_ack/2` will be called, + %% and the ACKs is discarded. ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]), {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload), ?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]), {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State}; +handle_info({dispatch, Msg}, State) -> + Payload = emqx_coap_mqtt_adapter:message_payload(Msg), + Topic = emqx_coap_mqtt_adapter:message_topic(Msg), + {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload), + ?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]), + {notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State}; handle_info(Message, State) -> ?LOG(error, "Unknown Message ~p", [Message]), {noreply, State}. -coap_ack(_Ref, State) -> {ok, State}. - +coap_ack({pub, Msg}, State) -> + ?LOG(debug, "received coap ack for publish msg: ~p", [Msg]), + Pid = get(mqtt_client_pid), + emqx_coap_mqtt_adapter:received_puback(Pid, Msg), + {ok, State}; +coap_ack(_Ref, State) -> + ?LOG(debug, "received coap ack: ~p", [_Ref]), + {ok, State}. %%-------------------------------------------------------------------- %% Internal Functions diff --git a/apps/emqx_coap/src/emqx_coap_resource.erl b/apps/emqx_coap/src/emqx_coap_resource.erl index ab5d31a63..daa536540 100644 --- a/apps/emqx_coap/src/emqx_coap_resource.erl +++ b/apps/emqx_coap/src/emqx_coap_resource.erl @@ -104,12 +104,26 @@ coap_unobserve({state, ChId, Prefix, Topic}) -> ok. handle_info({dispatch, Topic, Payload}, State) -> + %% This clause should never be matched any more. We keep it here to handle + %% the old format messages during the release upgrade. + %% In this case the second function clause of `coap_ack/2` will be called, + %% and the ACKs is discarded. ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]), {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State}; +handle_info({dispatch, Msg}, State) -> + Payload = emqx_coap_mqtt_adapter:message_payload(Msg), + {notify, {pub, Msg}, #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State}; handle_info(Message, State) -> emqx_coap_mqtt_adapter:handle_info(Message, State). -coap_ack(_Ref, State) -> {ok, State}. +coap_ack({pub, Msg}, State) -> + ?LOG(debug, "received coap ack for publish msg: ~p", [Msg]), + Pid = get(mqtt_client_pid), + emqx_coap_mqtt_adapter:received_puback(Pid, Msg), + {ok, State}; +coap_ack(_Ref, State) -> + ?LOG(debug, "received coap ack: ~p", [_Ref]), + {ok, State}. get_auth(Query) -> get_auth(Query, #coap_mqtt_auth{}). diff --git a/apps/emqx_coap/test/emqx_coap_SUITE.erl b/apps/emqx_coap/test/emqx_coap_SUITE.erl index 6d8294ef5..0973382c6 100644 --- a/apps/emqx_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_coap/test/emqx_coap_SUITE.erl @@ -91,7 +91,7 @@ t_observe(_Config) -> Topic = <<"abc">>, TopicStr = binary_to_list(Topic), Payload = <<"123">>, Uri = "coap://127.0.0.1/mqtt/"++TopicStr++"?c=client1&u=tom&p=secret", - {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri), + {ok, Pid, N, Code, Content} = er_coap_observer:observe(Uri), ?LOGT("observer Pid=~p, N=~p, Code=~p, Content=~p", [Pid, N, Code, Content]), [SubPid] = emqx:subscribers(Topic), @@ -195,12 +195,16 @@ t_one_clientid_sub_2_topics(_Config) -> [SubPid] = emqx:subscribers(Topic2), ?assert(is_pid(SubPid)), + CntrAcked1 = emqx_metrics:val('messages.acked'), emqx:publish(emqx_message:make(Topic1, Payload1)), Notif1 = receive_notification(), ?LOGT("observer 1 get Notif=~p", [Notif1]), {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv1}} = Notif1, ?assertEqual(Payload1, PayloadRecv1), + timer:sleep(100), + CntrAcked2 = emqx_metrics:val('messages.acked'), + ?assertEqual(CntrAcked2, CntrAcked1 + 1), emqx:publish(emqx_message:make(Topic2, Payload2)), @@ -208,6 +212,9 @@ t_one_clientid_sub_2_topics(_Config) -> ?LOGT("observer 2 get Notif=~p", [Notif2]), {coap_notify, _, _, {ok,content}, #coap_content{payload = PayloadRecv2}} = Notif2, ?assertEqual(Payload2, PayloadRecv2), + timer:sleep(100), + CntrAcked3 = emqx_metrics:val('messages.acked'), + ?assertEqual(CntrAcked3, CntrAcked2 + 1), er_coap_observer:stop(Pid1), er_coap_observer:stop(Pid2). diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index bac959ca9..98b0073d0 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -2,6 +2,11 @@ ## Enhancements +- We now trigger the `'message.acked'` hook after the CoAP gateway sends a message to the device and receives the ACK from the device [#9264](https://github.com/emqx/emqx/pull/9264). + With this change, the CoAP gateway can be combined with the offline message caching function (in the + emqx enterprise), so that CoAP devices are able to read the missed messages from the database when + it is online again. + - Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239). - Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index 286b2a2f0..f253fe4b5 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -2,6 +2,9 @@ ## 增强 +- 当 CoAP 网关给设备投递消息并收到设备发来的确认之后,回调 `'message.acked'` 钩子 [#9264](https://github.com/emqx/emqx/pull/9264)。 + 有了这个改动,CoAP 网关可以配合 EMQX (企业版)的离线消息缓存功能,让 CoAP 设备重新上线之后,从数据库读取其离线状态下错过的消息。 + - 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。 - 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。