commit
ab57c19e7a
|
@ -128,8 +128,7 @@ add_clientid(_Bindings, Params) ->
|
||||||
Re = do_add_clientid(Params),
|
Re = do_add_clientid(Params),
|
||||||
case Re of
|
case Re of
|
||||||
ok -> return(ok);
|
ok -> return(ok);
|
||||||
<<"ok">> -> return(ok);
|
{error, Error} -> {error, format_msg(Error)}
|
||||||
_ -> return({error, format_msg(Re)})
|
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -178,8 +177,7 @@ add_username(_Bindings, Params) ->
|
||||||
false ->
|
false ->
|
||||||
case do_add_username(Params) of
|
case do_add_username(Params) of
|
||||||
ok -> return(ok);
|
ok -> return(ok);
|
||||||
<<"ok">> -> return(ok);
|
{error, Error} -> {error, format_msg(Error)}
|
||||||
Error -> return({error, format_msg(Error)})
|
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -310,4 +308,3 @@ urldecode(S) ->
|
||||||
urldecode(S) ->
|
urldecode(S) ->
|
||||||
http_uri:decode(S).
|
http_uri:decode(S).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
|
|
@ -199,7 +199,7 @@ on_action_republish(Selected, _Envs = #{
|
||||||
}).
|
}).
|
||||||
|
|
||||||
increase_and_publish(ActId, Msg) ->
|
increase_and_publish(ActId, Msg) ->
|
||||||
emqx_broker:safe_publish(Msg),
|
_ = emqx_broker:safe_publish(Msg),
|
||||||
emqx_rule_metrics:inc_actions_success(ActId),
|
emqx_rule_metrics:inc_actions_success(ActId),
|
||||||
emqx_metrics:inc_msg(Msg).
|
emqx_metrics:inc_msg(Msg).
|
||||||
|
|
||||||
|
|
|
@ -122,10 +122,12 @@ proc_cmd(Tokens, Data, Opts) ->
|
||||||
|
|
||||||
%% preprocess SQL with place holders
|
%% preprocess SQL with place holders
|
||||||
-spec(preproc_sql(Sql::binary()) -> {prepare_statement(), prepare_params()}).
|
-spec(preproc_sql(Sql::binary()) -> {prepare_statement(), prepare_params()}).
|
||||||
|
-dialyzer({nowarn_function,preproc_sql/1}).
|
||||||
preproc_sql(Sql) ->
|
preproc_sql(Sql) ->
|
||||||
preproc_sql(Sql, '?').
|
preproc_sql(Sql, '?').
|
||||||
|
|
||||||
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n') -> {prepare_statement(), prepare_params()}).
|
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n') -> {prepare_statement(), prepare_params()}).
|
||||||
|
-dialyzer({nowarn_function,preproc_sql/2}).
|
||||||
preproc_sql(Sql, ReplaceWith) ->
|
preproc_sql(Sql, ReplaceWith) ->
|
||||||
case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of
|
case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of
|
||||||
{match, PlaceHolders} ->
|
{match, PlaceHolders} ->
|
||||||
|
|
|
@ -396,20 +396,20 @@ receipt_id(Headers) ->
|
||||||
|
|
||||||
handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, body = Body}, State) ->
|
||||||
Topic = header(<<"destination">>, Headers),
|
Topic = header(<<"destination">>, Headers),
|
||||||
maybe_send_receipt(receipt_id(Headers), State),
|
_ = maybe_send_receipt(receipt_id(Headers), State),
|
||||||
emqx_broker:publish(
|
_ = emqx_broker:publish(
|
||||||
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
||||||
),
|
),
|
||||||
State.
|
State.
|
||||||
|
|
||||||
handle_recv_ack_frame(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
handle_recv_ack_frame(#stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||||
Id = header(<<"id">>, Headers),
|
Id = header(<<"id">>, Headers),
|
||||||
maybe_send_receipt(receipt_id(Headers), State),
|
_ = maybe_send_receipt(receipt_id(Headers), State),
|
||||||
ack(Id, State).
|
ack(Id, State).
|
||||||
|
|
||||||
handle_recv_nack_frame(#stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
handle_recv_nack_frame(#stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
||||||
Id = header(<<"id">>, Headers),
|
Id = header(<<"id">>, Headers),
|
||||||
maybe_send_receipt(receipt_id(Headers), State),
|
_ = maybe_send_receipt(receipt_id(Headers), State),
|
||||||
nack(Id, State).
|
nack(Id, State).
|
||||||
|
|
||||||
ensure_clean_trans_timer(State = #pstate{transaction = Trans}) ->
|
ensure_clean_trans_timer(State = #pstate{transaction = Trans}) ->
|
||||||
|
@ -468,4 +468,4 @@ interval(incoming_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
|
interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
|
||||||
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
|
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
|
||||||
interval(clean_trans_timer, _) ->
|
interval(clean_trans_timer, _) ->
|
||||||
?TRANS_TIMEOUT.
|
?TRANS_TIMEOUT.
|
||||||
|
|
Loading…
Reference in New Issue