Line data Source code
1 : /*
2 : *
3 : * Copyright (c) 2021 Project CHIP Authors
4 : * All rights reserved.
5 : *
6 : * Licensed under the Apache License, Version 2.0 (the "License");
7 : * you may not use this file except in compliance with the License.
8 : * You may obtain a copy of the License at
9 : *
10 : * http://www.apache.org/licenses/LICENSE-2.0
11 : *
12 : * Unless required by applicable law or agreed to in writing, software
13 : * distributed under the License is distributed on an "AS IS" BASIS,
14 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 : * See the License for the specific language governing permissions and
16 : * limitations under the License.
17 : */
18 :
19 : /**
20 : * @file
21 : * This file defines the initiator side of a CHIP Read Interaction.
22 : *
23 : */
24 :
25 : #include <app/AppConfig.h>
26 : #include <app/InteractionModelEngine.h>
27 : #include <app/InteractionModelHelper.h>
28 : #include <app/ReadClient.h>
29 : #include <app/StatusResponse.h>
30 : #include <assert.h>
31 : #include <lib/core/TLVTypes.h>
32 : #include <lib/support/FibonacciUtils.h>
33 : #include <messaging/ReliableMessageMgr.h>
34 : #include <messaging/ReliableMessageProtocolConfig.h>
35 : #include <platform/LockTracker.h>
36 :
37 : #include <app-common/zap-generated/cluster-objects.h>
38 : #include <app-common/zap-generated/ids/Attributes.h>
39 : #include <app-common/zap-generated/ids/Clusters.h>
40 :
41 : namespace chip {
42 : namespace app {
43 :
44 : using Status = Protocols::InteractionModel::Status;
45 :
46 982 : ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback,
47 982 : InteractionType aInteractionType) :
48 982 : mExchange(*this),
49 1964 : mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
50 982 : mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
51 : {
52 982 : assertChipStackLockedByCurrentThread();
53 :
54 982 : mpExchangeMgr = apExchangeMgr;
55 982 : mInteractionType = aInteractionType;
56 :
57 982 : mpImEngine = apImEngine;
58 :
59 982 : if (aInteractionType == InteractionType::Subscribe)
60 : {
61 247 : mpImEngine->AddReadClient(this);
62 : }
63 982 : }
64 :
65 214 : void ReadClient::ClearActiveSubscriptionState()
66 : {
67 214 : mIsReporting = false;
68 214 : mWaitingForFirstPrimingReport = true;
69 214 : mPendingMoreChunks = false;
70 214 : mMinIntervalFloorSeconds = 0;
71 214 : mMaxInterval = 0;
72 214 : mSubscriptionId = 0;
73 214 : mIsResubscriptionScheduled = false;
74 :
75 214 : MoveToState(ClientState::Idle);
76 214 : }
77 :
78 450 : void ReadClient::StopResubscription()
79 : {
80 450 : CancelLivenessCheckTimer();
81 450 : CancelResubscribeTimer();
82 :
83 : // Only deallocate the paths if they are not already deallocated.
84 450 : if (mReadPrepareParams.mpAttributePathParamsList != nullptr || mReadPrepareParams.mpEventPathParamsList != nullptr ||
85 293 : mReadPrepareParams.mpDataVersionFilterList != nullptr)
86 : {
87 157 : mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams));
88 : // Make sure we will never try to free those pointers again.
89 157 : mReadPrepareParams.mpAttributePathParamsList = nullptr;
90 157 : mReadPrepareParams.mAttributePathParamsListSize = 0;
91 157 : mReadPrepareParams.mpEventPathParamsList = nullptr;
92 157 : mReadPrepareParams.mEventPathParamsListSize = 0;
93 157 : mReadPrepareParams.mpDataVersionFilterList = nullptr;
94 157 : mReadPrepareParams.mDataVersionFilterListSize = 0;
95 : }
96 450 : }
97 :
98 1070 : ReadClient::~ReadClient()
99 : {
100 982 : assertChipStackLockedByCurrentThread();
101 :
102 982 : if (IsSubscriptionType())
103 : {
104 247 : StopResubscription();
105 :
106 : // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it.
107 : // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine
108 : // will point to null)
109 : //
110 247 : if (mpImEngine)
111 : {
112 82 : mpImEngine->RemoveReadClient(this);
113 : }
114 : }
115 1070 : }
116 :
117 10 : uint32_t ReadClient::ComputeTimeTillNextSubscription()
118 : {
119 10 : uint32_t maxWaitTimeInMsec = 0;
120 10 : uint32_t waitTimeInMsec = 0;
121 10 : uint32_t minWaitTimeInMsec = 0;
122 :
123 10 : if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
124 : {
125 10 : maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
126 : }
127 : else
128 : {
129 0 : maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
130 : }
131 :
132 10 : if (maxWaitTimeInMsec != 0)
133 : {
134 3 : minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
135 3 : waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
136 : }
137 :
138 10 : return waitTimeInMsec;
139 : }
140 :
141 10 : CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional<SessionHandle> aNewSessionHandle,
142 : bool aReestablishCASE)
143 : {
144 10 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
145 :
146 : //
147 : // If we're establishing CASE, make sure we are not provided a new SessionHandle as well.
148 : //
149 10 : VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT);
150 :
151 10 : if (aNewSessionHandle.HasValue())
152 : {
153 0 : mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value());
154 : }
155 :
156 10 : mForceCaseOnNextResub = aReestablishCASE;
157 10 : if (mForceCaseOnNextResub && mReadPrepareParams.mSessionHolder)
158 : {
159 : // Mark our existing session defunct, so that we will try to
160 : // re-establish it when the timer fires (unless something re-establishes
161 : // before then).
162 0 : mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct();
163 : }
164 :
165 10 : ReturnErrorOnFailure(
166 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
167 : System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
168 10 : mIsResubscriptionScheduled = true;
169 :
170 10 : return CHIP_NO_ERROR;
171 : }
172 :
173 885 : void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription)
174 : {
175 885 : if (IsReadType())
176 : {
177 671 : if (aError != CHIP_NO_ERROR)
178 : {
179 49 : mpCallback.OnError(aError);
180 : }
181 : }
182 : else
183 : {
184 214 : ClearActiveSubscriptionState();
185 214 : if (aError != CHIP_NO_ERROR)
186 : {
187 : //
188 : // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present
189 : // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which
190 : // populates mReadPrepareParams with the values provided by the application.
191 : //
192 47 : if (allowResubscription &&
193 45 : (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0))
194 : {
195 12 : CHIP_ERROR originalReason = aError;
196 :
197 12 : aError = mpCallback.OnResubscriptionNeeded(this, aError);
198 12 : if (aError == CHIP_NO_ERROR)
199 : {
200 12 : return;
201 : }
202 2 : if (aError == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
203 : {
204 2 : VerifyOrDie(originalReason == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT);
205 2 : ChipLogProgress(DataManagement, "ICD device is inactive mark subscription as InactiveICDSubscription");
206 2 : MoveToState(ClientState::InactiveICDSubscription);
207 2 : return;
208 : }
209 : }
210 :
211 : //
212 : // Either something bad happened when requesting resubscription or the application has decided to not
213 : // continue by returning an error. Let's convey the error back up to the application
214 : // and shut everything down.
215 : //
216 35 : mpCallback.OnError(aError);
217 : }
218 :
219 202 : StopResubscription();
220 : }
221 :
222 873 : mExchange.Release();
223 :
224 873 : mpCallback.OnDone(this);
225 : }
226 :
227 1610 : const char * ReadClient::GetStateStr() const
228 : {
229 : #if CHIP_DETAIL_LOGGING
230 1610 : switch (mState)
231 : {
232 214 : case ClientState::Idle:
233 214 : return "Idle";
234 970 : case ClientState::AwaitingInitialReport:
235 970 : return "AwaitingInitialReport";
236 215 : case ClientState::AwaitingSubscribeResponse:
237 215 : return "AwaitingSubscribeResponse";
238 209 : case ClientState::SubscriptionActive:
239 209 : return "SubscriptionActive";
240 2 : case ClientState::InactiveICDSubscription:
241 2 : return "InactiveICDSubscription";
242 : }
243 : #endif // CHIP_DETAIL_LOGGING
244 0 : return "N/A";
245 : }
246 :
247 1610 : void ReadClient::MoveToState(const ClientState aTargetState)
248 : {
249 1610 : mState = aTargetState;
250 1610 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Moving to [%10.10s]", __func__, this, GetStateStr());
251 1610 : }
252 :
253 799 : CHIP_ERROR ReadClient::SendRequest(ReadPrepareParams & aReadPrepareParams)
254 : {
255 799 : if (mInteractionType == InteractionType::Read)
256 : {
257 723 : return SendReadRequest(aReadPrepareParams);
258 : }
259 :
260 76 : if (mInteractionType == InteractionType::Subscribe)
261 : {
262 76 : return SendSubscribeRequest(aReadPrepareParams);
263 : }
264 :
265 0 : return CHIP_ERROR_INVALID_ARGUMENT;
266 : }
267 :
268 723 : CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams)
269 : {
270 723 : CHIP_ERROR err = CHIP_NO_ERROR;
271 :
272 723 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Sending Read Request", __func__, this);
273 :
274 723 : VerifyOrReturnError(ClientState::Idle == mState, err = CHIP_ERROR_INCORRECT_STATE);
275 :
276 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
277 723 : aReadPrepareParams.mAttributePathParamsListSize);
278 723 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
279 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
280 723 : aReadPrepareParams.mDataVersionFilterListSize);
281 :
282 723 : System::PacketBufferHandle msgBuf;
283 723 : ReadRequestMessage::Builder request;
284 723 : System::PacketBufferTLVWriter writer;
285 :
286 723 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
287 723 : ReturnErrorOnFailure(request.Init(&writer));
288 :
289 723 : if (!attributePaths.empty())
290 : {
291 604 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
292 604 : ReturnErrorOnFailure(err = request.GetError());
293 604 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
294 : }
295 :
296 723 : if (!eventPaths.empty())
297 : {
298 317 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
299 317 : ReturnErrorOnFailure(err = request.GetError());
300 :
301 317 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
302 :
303 317 : Optional<EventNumber> eventMin;
304 317 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
305 317 : if (eventMin.HasValue())
306 : {
307 314 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
308 314 : ReturnErrorOnFailure(err = request.GetError());
309 314 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
310 : }
311 317 : }
312 :
313 723 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
314 :
315 723 : bool encodedDataVersionList = false;
316 723 : TLV::TLVWriter backup;
317 723 : request.Checkpoint(backup);
318 723 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
319 723 : ReturnErrorOnFailure(request.GetError());
320 723 : if (!attributePaths.empty())
321 : {
322 604 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
323 : encodedDataVersionList));
324 : }
325 723 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
326 723 : if (encodedDataVersionList)
327 : {
328 11 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
329 : }
330 : else
331 : {
332 712 : request.Rollback(backup);
333 : }
334 :
335 723 : ReturnErrorOnFailure(request.EndOfReadRequestMessage());
336 723 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
337 :
338 723 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
339 :
340 723 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
341 723 : VerifyOrReturnError(exchange != nullptr, err = CHIP_ERROR_NO_MEMORY);
342 :
343 723 : mExchange.Grab(exchange);
344 :
345 723 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
346 : {
347 723 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
348 : }
349 : else
350 : {
351 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
352 : }
353 :
354 723 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf),
355 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
356 :
357 723 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
358 723 : MoveToState(ClientState::AwaitingInitialReport);
359 :
360 723 : return CHIP_NO_ERROR;
361 723 : }
362 :
363 343 : CHIP_ERROR ReadClient::GenerateEventPaths(EventPathIBs::Builder & aEventPathsBuilder, const Span<EventPathParams> & aEventPaths)
364 : {
365 854 : for (auto & event : aEventPaths)
366 : {
367 511 : VerifyOrReturnError(event.IsValidEventPath(), CHIP_ERROR_IM_MALFORMED_EVENT_PATH_IB);
368 511 : EventPathIB::Builder & path = aEventPathsBuilder.CreatePath();
369 511 : ReturnErrorOnFailure(aEventPathsBuilder.GetError());
370 511 : ReturnErrorOnFailure(path.Encode(event));
371 : }
372 :
373 343 : return aEventPathsBuilder.EndOfEventPaths();
374 : }
375 :
376 840 : CHIP_ERROR ReadClient::GenerateAttributePaths(AttributePathIBs::Builder & aAttributePathIBsBuilder,
377 : const Span<AttributePathParams> & aAttributePaths)
378 : {
379 2694 : for (auto & attribute : aAttributePaths)
380 : {
381 1856 : VerifyOrReturnError(attribute.IsValidAttributePath(), CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
382 1854 : AttributePathIB::Builder & path = aAttributePathIBsBuilder.CreatePath();
383 1854 : ReturnErrorOnFailure(aAttributePathIBsBuilder.GetError());
384 1854 : ReturnErrorOnFailure(path.Encode(attribute));
385 : }
386 :
387 838 : return aAttributePathIBsBuilder.EndOfAttributePathIBs();
388 : }
389 :
390 73 : CHIP_ERROR ReadClient::BuildDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
391 : const Span<AttributePathParams> & aAttributePaths,
392 : const Span<DataVersionFilter> & aDataVersionFilters,
393 : bool & aEncodedDataVersionList)
394 : {
395 152 : for (auto & filter : aDataVersionFilters)
396 : {
397 79 : VerifyOrReturnError(filter.IsValidDataVersionFilter(), CHIP_ERROR_INVALID_ARGUMENT);
398 :
399 : // If data version filter is for some cluster none of whose attributes are included in our paths, discard this filter.
400 79 : bool intersected = false;
401 91 : for (auto & path : aAttributePaths)
402 : {
403 85 : if (path.IncludesAttributesInCluster(filter))
404 : {
405 73 : intersected = true;
406 73 : break;
407 : }
408 : }
409 :
410 79 : if (!intersected)
411 : {
412 6 : continue;
413 : }
414 :
415 73 : DataVersionFilterIB::Builder & filterIB = aDataVersionFilterIBsBuilder.CreateDataVersionFilter();
416 73 : ReturnErrorOnFailure(aDataVersionFilterIBsBuilder.GetError());
417 73 : ClusterPathIB::Builder & path = filterIB.CreatePath();
418 73 : ReturnErrorOnFailure(filterIB.GetError());
419 73 : ReturnErrorOnFailure(path.Endpoint(filter.mEndpointId).Cluster(filter.mClusterId).EndOfClusterPathIB());
420 73 : VerifyOrReturnError(filter.mDataVersion.HasValue(), CHIP_ERROR_INVALID_ARGUMENT);
421 73 : ReturnErrorOnFailure(filterIB.DataVersion(filter.mDataVersion.Value()).EndOfDataVersionFilterIB());
422 73 : aEncodedDataVersionList = true;
423 : }
424 73 : return CHIP_NO_ERROR;
425 : }
426 :
427 836 : CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
428 : const Span<AttributePathParams> & aAttributePaths,
429 : const Span<DataVersionFilter> & aDataVersionFilters,
430 : bool & aEncodedDataVersionList)
431 : {
432 836 : if (!aDataVersionFilters.empty())
433 : {
434 73 : ReturnErrorOnFailure(BuildDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aDataVersionFilters,
435 : aEncodedDataVersionList));
436 : }
437 : else
438 : {
439 763 : ReturnErrorOnFailure(
440 : mpCallback.OnUpdateDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aEncodedDataVersionList));
441 : }
442 :
443 836 : return CHIP_NO_ERROR;
444 : }
445 :
446 3 : void ReadClient::OnActiveModeNotification()
447 : {
448 : // This function just tries to complete the deferred resubscription logic in `OnLivenessTimeoutCallback`.
449 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
450 : // If we are not in InactiveICDSubscription state, that means the liveness timeout has not been reached. Simply do nothing.
451 3 : VerifyOrReturn(IsInactiveICDSubscription());
452 :
453 : // When we reach here, the subscription definitely exceeded the liveness timeout. Just continue the unfinished resubscription
454 : // logic in `OnLivenessTimeoutCallback`.
455 1 : TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
456 : }
457 :
458 3 : void ReadClient::OnPeerTypeChange(PeerType aType)
459 : {
460 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
461 :
462 3 : mIsPeerLIT = (aType == PeerType::kLITICD);
463 :
464 3 : ChipLogProgress(DataManagement, "Peer is now %s LIT ICD.", mIsPeerLIT ? "a" : "not a");
465 :
466 : // If the peer is no longer LIT, try to wake up the subscription and do resubscribe when necessary.
467 3 : if (!mIsPeerLIT)
468 : {
469 2 : OnActiveModeNotification();
470 : }
471 3 : }
472 :
473 1969 : CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
474 : System::PacketBufferHandle && aPayload)
475 : {
476 1969 : CHIP_ERROR err = CHIP_NO_ERROR;
477 1969 : Status status = Status::InvalidAction;
478 1969 : VerifyOrExit(!IsIdle() && !IsInactiveICDSubscription(), err = CHIP_ERROR_INCORRECT_STATE);
479 :
480 1969 : if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::ReportData))
481 : {
482 1688 : err = ProcessReportData(std::move(aPayload), ReportType::kContinuingTransaction);
483 : }
484 281 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::SubscribeResponse))
485 : {
486 211 : ChipLogProgress(DataManagement, "SubscribeResponse is received");
487 211 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
488 211 : err = ProcessSubscribeResponse(std::move(aPayload));
489 : }
490 70 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::StatusResponse))
491 : {
492 138 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
493 70 : CHIP_ERROR statusError = CHIP_NO_ERROR;
494 70 : SuccessOrExit(err = StatusResponse::ProcessStatusResponse(std::move(aPayload), statusError));
495 70 : SuccessOrExit(err = statusError);
496 2 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
497 : }
498 : else
499 : {
500 0 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
501 : }
502 :
503 1969 : exit:
504 1969 : if (err != CHIP_NO_ERROR)
505 : {
506 76 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
507 : {
508 4 : status = Status::InvalidSubscription;
509 : }
510 76 : StatusResponse::Send(status, apExchangeContext, false /*aExpectResponse*/);
511 : }
512 :
513 1969 : if ((!IsSubscriptionType() && !mPendingMoreChunks) || err != CHIP_NO_ERROR)
514 : {
515 698 : Close(err);
516 : }
517 :
518 1969 : return err;
519 : }
520 :
521 82 : void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload)
522 : {
523 82 : Status status = Status::Success;
524 82 : mExchange.Grab(apExchangeContext);
525 :
526 : //
527 : // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
528 : // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
529 : // session to us, of which there could be multiple.
530 : //
531 : // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
532 : // to maximize our chances of success later.
533 : //
534 82 : mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());
535 :
536 82 : CHIP_ERROR err = ProcessReportData(std::move(aPayload), ReportType::kUnsolicited);
537 82 : if (err != CHIP_NO_ERROR)
538 : {
539 0 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
540 : {
541 0 : status = Status::InvalidSubscription;
542 : }
543 : else
544 : {
545 0 : status = Status::InvalidAction;
546 : }
547 :
548 0 : StatusResponse::Send(status, mExchange.Get(), false /*aExpectResponse*/);
549 0 : Close(err);
550 : }
551 82 : }
552 :
553 1778 : CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload, ReportType aReportType)
554 : {
555 1778 : CHIP_ERROR err = CHIP_NO_ERROR;
556 1778 : ReportDataMessage::Parser report;
557 1778 : bool suppressResponse = true;
558 1778 : SubscriptionId subscriptionId = 0;
559 1778 : EventReportIBs::Parser eventReportIBs;
560 1778 : AttributeReportIBs::Parser attributeReportIBs;
561 1778 : System::PacketBufferTLVReader reader;
562 1778 : reader.Init(std::move(aPayload));
563 1778 : err = report.Init(reader);
564 1778 : SuccessOrExit(err);
565 :
566 : #if CHIP_CONFIG_IM_PRETTY_PRINT
567 1776 : if (aReportType != ReportType::kUnsolicited)
568 : {
569 1694 : report.PrettyPrint();
570 : }
571 : #endif
572 :
573 1776 : err = report.GetSuppressResponse(&suppressResponse);
574 1776 : if (CHIP_END_OF_TLV == err)
575 : {
576 1146 : suppressResponse = false;
577 1146 : err = CHIP_NO_ERROR;
578 : }
579 1776 : SuccessOrExit(err);
580 :
581 1776 : err = report.GetSubscriptionId(&subscriptionId);
582 1776 : if (CHIP_NO_ERROR == err)
583 : {
584 345 : VerifyOrExit(IsSubscriptionType(), err = CHIP_ERROR_INVALID_ARGUMENT);
585 343 : if (mWaitingForFirstPrimingReport)
586 : {
587 213 : mSubscriptionId = subscriptionId;
588 : }
589 130 : else if (!IsMatchingSubscriptionId(subscriptionId))
590 : {
591 2 : err = CHIP_ERROR_INVALID_SUBSCRIPTION;
592 : }
593 : }
594 1431 : else if (CHIP_END_OF_TLV == err)
595 : {
596 1431 : if (IsSubscriptionType())
597 : {
598 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
599 : }
600 : else
601 : {
602 1431 : err = CHIP_NO_ERROR;
603 : }
604 : }
605 1774 : SuccessOrExit(err);
606 :
607 1772 : err = report.GetMoreChunkedMessages(&mPendingMoreChunks);
608 1772 : if (CHIP_END_OF_TLV == err)
609 : {
610 911 : mPendingMoreChunks = false;
611 911 : err = CHIP_NO_ERROR;
612 : }
613 1772 : SuccessOrExit(err);
614 :
615 1772 : err = report.GetEventReports(&eventReportIBs);
616 1772 : if (err == CHIP_END_OF_TLV)
617 : {
618 1132 : err = CHIP_NO_ERROR;
619 : }
620 640 : else if (err == CHIP_NO_ERROR)
621 : {
622 : chip::TLV::TLVReader EventReportsReader;
623 640 : eventReportIBs.GetReader(&EventReportsReader);
624 640 : err = ProcessEventReportIBs(EventReportsReader);
625 : }
626 1772 : SuccessOrExit(err);
627 :
628 1772 : err = report.GetAttributeReportIBs(&attributeReportIBs);
629 1772 : if (err == CHIP_END_OF_TLV)
630 : {
631 631 : err = CHIP_NO_ERROR;
632 : }
633 1141 : else if (err == CHIP_NO_ERROR)
634 : {
635 : TLV::TLVReader attributeReportIBsReader;
636 1141 : attributeReportIBs.GetReader(&attributeReportIBsReader);
637 1141 : err = ProcessAttributeReportIBs(attributeReportIBsReader);
638 : }
639 1772 : SuccessOrExit(err);
640 :
641 1770 : if (mIsReporting && !mPendingMoreChunks)
642 : {
643 880 : mpCallback.OnReportEnd();
644 880 : mIsReporting = false;
645 : }
646 :
647 1770 : SuccessOrExit(err = report.ExitContainer());
648 :
649 1778 : exit:
650 1778 : if (IsSubscriptionType())
651 : {
652 345 : if (IsAwaitingInitialReport())
653 : {
654 215 : MoveToState(ClientState::AwaitingSubscribeResponse);
655 : }
656 130 : else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
657 : {
658 : //
659 : // Only refresh the liveness check timer if we've successfully established
660 : // a subscription and have a valid value for mMaxInterval which the function
661 : // relies on.
662 : //
663 103 : mpCallback.NotifySubscriptionStillActive(*this);
664 103 : err = RefreshLivenessCheckTimer();
665 : }
666 : }
667 :
668 1778 : if (!suppressResponse && err == CHIP_NO_ERROR)
669 : {
670 1144 : bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks;
671 1144 : err = StatusResponse::Send(Status::Success, mExchange.Get(), !noResponseExpected);
672 : }
673 :
674 1778 : mWaitingForFirstPrimingReport = false;
675 1778 : return err;
676 1778 : }
677 :
678 11 : void ReadClient::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext)
679 : {
680 11 : ChipLogError(DataManagement, "Time out! failed to receive report data from Exchange: " ChipLogFormatExchange,
681 : ChipLogValueExchange(apExchangeContext));
682 11 : Close(CHIP_ERROR_TIMEOUT);
683 11 : }
684 :
685 3 : CHIP_ERROR ReadClient::ReadICDOperatingModeFromAttributeDataIB(TLV::TLVReader && aReader, PeerType & aType)
686 : {
687 : Clusters::IcdManagement::Attributes::OperatingMode::TypeInfo::DecodableType operatingMode;
688 :
689 3 : CHIP_ERROR err = DataModel::Decode(aReader, operatingMode);
690 3 : ReturnErrorOnFailure(err);
691 :
692 3 : switch (operatingMode)
693 : {
694 2 : case Clusters::IcdManagement::OperatingModeEnum::kSit:
695 2 : aType = PeerType::kNormal;
696 2 : break;
697 1 : case Clusters::IcdManagement::OperatingModeEnum::kLit:
698 1 : aType = PeerType::kLITICD;
699 1 : break;
700 0 : default:
701 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
702 0 : break;
703 : }
704 :
705 3 : return err;
706 : }
707 :
708 3709 : CHIP_ERROR ReadClient::ProcessAttributePath(AttributePathIB::Parser & aAttributePathParser,
709 : ConcreteDataAttributePath & aAttributePath)
710 : {
711 : // The ReportData must contain a concrete attribute path. Don't validate ID
712 : // ranges here, so we can tell apart "malformed data" and "out of range
713 : // IDs".
714 3709 : CHIP_ERROR err = CHIP_NO_ERROR;
715 : // The ReportData must contain a concrete attribute path
716 3709 : err = aAttributePathParser.GetConcreteAttributePath(aAttributePath, AttributePathIB::ValidateIdRanges::kNo);
717 3709 : VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
718 3707 : return CHIP_NO_ERROR;
719 : }
720 :
721 5282 : void ReadClient::NoteReportingData()
722 : {
723 5282 : if (!mIsReporting)
724 : {
725 934 : mpCallback.OnReportBegin();
726 934 : mIsReporting = true;
727 : }
728 5282 : }
729 :
730 1141 : CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeReportIBsReader)
731 : {
732 1141 : CHIP_ERROR err = CHIP_NO_ERROR;
733 4848 : while (CHIP_NO_ERROR == (err = aAttributeReportIBsReader.Next()))
734 : {
735 : TLV::TLVReader dataReader;
736 3709 : AttributeReportIB::Parser report;
737 3709 : AttributeDataIB::Parser data;
738 3709 : AttributeStatusIB::Parser status;
739 3709 : AttributePathIB::Parser path;
740 3709 : ConcreteDataAttributePath attributePath;
741 3709 : StatusIB statusIB;
742 :
743 3709 : TLV::TLVReader reader = aAttributeReportIBsReader;
744 3709 : ReturnErrorOnFailure(report.Init(reader));
745 :
746 3709 : err = report.GetAttributeStatus(&status);
747 3709 : if (CHIP_NO_ERROR == err)
748 : {
749 274 : StatusIB::Parser errorStatus;
750 274 : ReturnErrorOnFailure(status.GetPath(&path));
751 274 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
752 274 : if (!attributePath.IsValid())
753 : {
754 : // Don't fail the entire read or subscription when there is an
755 : // out-of-range ID. Just skip that one AttributeReportIB.
756 0 : ChipLogError(DataManagement,
757 : "Skipping AttributeStatusIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
758 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
759 : ChipLogValueMEI(attributePath.mAttributeId));
760 0 : continue;
761 : }
762 :
763 274 : ReturnErrorOnFailure(status.GetErrorStatus(&errorStatus));
764 274 : ReturnErrorOnFailure(errorStatus.DecodeStatusIB(statusIB));
765 274 : NoteReportingData();
766 274 : mpCallback.OnAttributeData(attributePath, nullptr, statusIB);
767 : }
768 3435 : else if (CHIP_END_OF_TLV == err)
769 : {
770 3435 : ReturnErrorOnFailure(report.GetAttributeData(&data));
771 3435 : ReturnErrorOnFailure(data.GetPath(&path));
772 3435 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
773 3433 : if (!attributePath.IsValid())
774 : {
775 : // Don't fail the entire read or subscription when there is an
776 : // out-of-range ID. Just skip that one AttributeReportIB.
777 2 : ChipLogError(DataManagement,
778 : "Skipping AttributeDataIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
779 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
780 : ChipLogValueMEI(attributePath.mAttributeId));
781 2 : continue;
782 : }
783 :
784 3431 : DataVersion version = 0;
785 3431 : ReturnErrorOnFailure(data.GetDataVersion(&version));
786 3431 : attributePath.mDataVersion.SetValue(version);
787 :
788 3431 : if (mReadPrepareParams.mpDataVersionFilterList != nullptr)
789 : {
790 65 : UpdateDataVersionFilters(attributePath);
791 : }
792 :
793 3431 : ReturnErrorOnFailure(data.GetData(&dataReader));
794 :
795 : // The element in an array may be another array -- so we should only set the list operation when we are handling the
796 : // whole list.
797 3431 : if (!attributePath.IsListOperation() && dataReader.GetType() == TLV::kTLVType_Array)
798 : {
799 1287 : attributePath.mListOp = ConcreteDataAttributePath::ListOperation::ReplaceAll;
800 : }
801 :
802 3431 : if (attributePath ==
803 6862 : ConcreteDataAttributePath(kRootEndpointId, Clusters::IcdManagement::Id,
804 : Clusters::IcdManagement::Attributes::OperatingMode::Id))
805 : {
806 : PeerType peerType;
807 : TLV::TLVReader operatingModeTlvReader;
808 3 : operatingModeTlvReader.Init(dataReader);
809 3 : if (CHIP_NO_ERROR == ReadICDOperatingModeFromAttributeDataIB(std::move(operatingModeTlvReader), peerType))
810 : {
811 : // It is safe to call `OnPeerTypeChange` since we are in the middle of parsing the attribute data, And
812 : // the subscription should be active so `OnActiveModeNotification` is a no-op in this case.
813 3 : InteractionModelEngine::GetInstance()->OnPeerTypeChange(mPeer, peerType);
814 : }
815 : else
816 : {
817 0 : ChipLogError(DataManagement, "Failed to get ICD state from attribute data with error'%" CHIP_ERROR_FORMAT "'",
818 : err.Format());
819 : }
820 : }
821 :
822 3431 : NoteReportingData();
823 3431 : mpCallback.OnAttributeData(attributePath, &dataReader, statusIB);
824 : }
825 3713 : }
826 :
827 1139 : if (CHIP_END_OF_TLV == err)
828 : {
829 1139 : err = CHIP_NO_ERROR;
830 : }
831 :
832 1139 : return err;
833 : }
834 :
835 640 : CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader)
836 : {
837 640 : CHIP_ERROR err = CHIP_NO_ERROR;
838 2217 : while (CHIP_NO_ERROR == (err = aEventReportIBsReader.Next()))
839 : {
840 : TLV::TLVReader dataReader;
841 1577 : EventReportIB::Parser report;
842 1577 : EventDataIB::Parser data;
843 1577 : EventHeader header;
844 1577 : StatusIB statusIB; // Default value for statusIB is success.
845 :
846 1577 : TLV::TLVReader reader = aEventReportIBsReader;
847 1577 : ReturnErrorOnFailure(report.Init(reader));
848 :
849 1577 : err = report.GetEventData(&data);
850 :
851 1577 : if (err == CHIP_NO_ERROR)
852 : {
853 1575 : header.mTimestamp = mEventTimestamp;
854 1575 : ReturnErrorOnFailure(data.DecodeEventHeader(header));
855 1575 : mEventTimestamp = header.mTimestamp;
856 :
857 1575 : ReturnErrorOnFailure(data.GetData(&dataReader));
858 :
859 : //
860 : // Update the event number being tracked in mReadPrepareParams in case
861 : // we want to send it in the next SubscribeRequest message to convey
862 : // the event number for which we have already received an event.
863 : //
864 1575 : mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1);
865 :
866 1575 : NoteReportingData();
867 1575 : mpCallback.OnEventData(header, &dataReader, nullptr);
868 : }
869 2 : else if (err == CHIP_END_OF_TLV)
870 : {
871 2 : EventStatusIB::Parser status;
872 2 : EventPathIB::Parser pathIB;
873 2 : StatusIB::Parser statusIBParser;
874 2 : ReturnErrorOnFailure(report.GetEventStatus(&status));
875 2 : ReturnErrorOnFailure(status.GetPath(&pathIB));
876 2 : ReturnErrorOnFailure(pathIB.GetEventPath(&header.mPath));
877 2 : ReturnErrorOnFailure(status.GetErrorStatus(&statusIBParser));
878 2 : ReturnErrorOnFailure(statusIBParser.DecodeStatusIB(statusIB));
879 :
880 2 : NoteReportingData();
881 2 : mpCallback.OnEventData(header, nullptr, &statusIB);
882 : }
883 1577 : }
884 :
885 640 : if (CHIP_END_OF_TLV == err)
886 : {
887 640 : err = CHIP_NO_ERROR;
888 : }
889 :
890 640 : return err;
891 : }
892 :
893 0 : void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
894 : {
895 0 : mLivenessTimeoutOverride = aLivenessTimeout;
896 0 : auto err = RefreshLivenessCheckTimer();
897 0 : if (err != CHIP_NO_ERROR)
898 : {
899 0 : Close(err);
900 : }
901 0 : }
902 :
903 312 : CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
904 : {
905 312 : CHIP_ERROR err = CHIP_NO_ERROR;
906 :
907 312 : VerifyOrReturnError(IsSubscriptionActive(), CHIP_ERROR_INCORRECT_STATE);
908 :
909 312 : CancelLivenessCheckTimer();
910 :
911 : System::Clock::Timeout timeout;
912 312 : ReturnErrorOnFailure(ComputeLivenessCheckTimerTimeout(&timeout));
913 :
914 : // EFR32/MBED/INFINION/K32W's chrono count return long unsigned, but other platform returns unsigned
915 312 : ChipLogProgress(
916 : DataManagement,
917 : "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64,
918 : static_cast<long unsigned>(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()));
919 312 : err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
920 : timeout, OnLivenessTimeoutCallback, this);
921 :
922 312 : return err;
923 : }
924 :
925 312 : CHIP_ERROR ReadClient::ComputeLivenessCheckTimerTimeout(System::Clock::Timeout * aTimeout)
926 : {
927 312 : if (mLivenessTimeoutOverride != System::Clock::kZero)
928 : {
929 0 : *aTimeout = mLivenessTimeoutOverride;
930 0 : return CHIP_NO_ERROR;
931 : }
932 :
933 312 : VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE);
934 :
935 : //
936 : // To calculate the duration we're willing to wait for a report to come to us, we take into account the maximum interval of
937 : // the subscription AND the time it takes for the report to make it to us in the worst case. This latter bit involves
938 : // computing the Ack timeout from the publisher for the ReportData message being sent to us using our IDLE interval as the
939 : // basis for that computation.
940 : //
941 : // Make sure to use the retransmission computation that includes backoff. For purposes of that computation, treat us as
942 : // active now (since we are right now sending/receiving messages), and use the default "how long are we guaranteed to stay
943 : // active" threshold for now.
944 : //
945 : // TODO: We need to find a good home for this logic that will correctly compute this based on transport. For now, this will
946 : // suffice since we don't use TCP as a transport currently and subscriptions over BLE aren't really a thing.
947 : //
948 312 : const auto & localMRPConfig = GetLocalMRPConfig();
949 312 : const auto & defaultMRPConfig = GetDefaultMRPConfig();
950 312 : const auto & ourMrpConfig = localMRPConfig.ValueOr(defaultMRPConfig);
951 : auto publisherTransmissionTimeout =
952 312 : GetRetransmissionTimeout(ourMrpConfig.mActiveRetransTimeout, ourMrpConfig.mIdleRetransTimeout,
953 312 : System::SystemClock().GetMonotonicTimestamp(), ourMrpConfig.mActiveThresholdTime);
954 312 : *aTimeout = System::Clock::Seconds16(mMaxInterval) + publisherTransmissionTimeout;
955 312 : return CHIP_NO_ERROR;
956 312 : }
957 :
958 762 : void ReadClient::CancelLivenessCheckTimer()
959 : {
960 762 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
961 : OnLivenessTimeoutCallback, this);
962 762 : }
963 :
964 450 : void ReadClient::CancelResubscribeTimer()
965 : {
966 450 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
967 : OnResubscribeTimerCallback, this);
968 450 : mIsResubscriptionScheduled = false;
969 450 : }
970 :
971 6 : void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
972 : {
973 6 : ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState);
974 :
975 : // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e
976 : // response timeouts).
977 6 : CHIP_ERROR subscriptionTerminationCause = CHIP_ERROR_TIMEOUT;
978 :
979 : //
980 : // Might as well try to see if this instance exists in the tracked list in the IM.
981 : // This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since
982 : // been shutdown at which point the client wouldn't exist in the active read client list.
983 : //
984 6 : VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this));
985 :
986 6 : ChipLogError(DataManagement,
987 : "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64,
988 : _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId()));
989 :
990 6 : if (_this->mIsPeerLIT)
991 : {
992 3 : subscriptionTerminationCause = CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
993 : }
994 :
995 6 : _this->TriggerResubscriptionForLivenessTimeout(subscriptionTerminationCause);
996 6 : }
997 :
998 7 : void ReadClient::TriggerResubscriptionForLivenessTimeout(CHIP_ERROR aReason)
999 : {
1000 : // We didn't get a message from the server on time; it's possible that it no
1001 : // longer has a useful CASE session to us. Mark defunct all sessions that
1002 : // have not seen peer activity in at least as long as our session.
1003 7 : const auto & holder = mReadPrepareParams.mSessionHolder;
1004 7 : if (holder)
1005 : {
1006 7 : System::Clock::Timestamp lastPeerActivity = holder->AsSecureSession()->GetLastPeerActivityTime();
1007 7 : mpImEngine->GetExchangeManager()->GetSessionManager()->ForEachMatchingSession(mPeer, [&lastPeerActivity](auto * session) {
1008 7 : if (!session->IsCASESession())
1009 : {
1010 7 : return;
1011 : }
1012 :
1013 0 : if (session->GetLastPeerActivityTime() > lastPeerActivity)
1014 : {
1015 0 : return;
1016 : }
1017 :
1018 0 : session->MarkAsDefunct();
1019 : });
1020 : }
1021 :
1022 7 : Close(aReason);
1023 7 : }
1024 :
1025 211 : CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload)
1026 : {
1027 211 : System::PacketBufferTLVReader reader;
1028 211 : reader.Init(std::move(aPayload));
1029 :
1030 211 : SubscribeResponseMessage::Parser subscribeResponse;
1031 211 : ReturnErrorOnFailure(subscribeResponse.Init(reader));
1032 :
1033 : #if CHIP_CONFIG_IM_PRETTY_PRINT
1034 211 : subscribeResponse.PrettyPrint();
1035 : #endif
1036 :
1037 211 : SubscriptionId subscriptionId = 0;
1038 211 : VerifyOrReturnError(subscribeResponse.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, CHIP_ERROR_INVALID_ARGUMENT);
1039 211 : VerifyOrReturnError(IsMatchingSubscriptionId(subscriptionId), CHIP_ERROR_INVALID_SUBSCRIPTION);
1040 209 : ReturnErrorOnFailure(subscribeResponse.GetMaxInterval(&mMaxInterval));
1041 :
1042 209 : ChipLogProgress(DataManagement,
1043 : "Subscription established with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u"
1044 : "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64,
1045 : mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()));
1046 :
1047 209 : ReturnErrorOnFailure(subscribeResponse.ExitContainer());
1048 :
1049 209 : MoveToState(ClientState::SubscriptionActive);
1050 :
1051 209 : mpCallback.OnSubscriptionEstablished(subscriptionId);
1052 :
1053 209 : mNumRetries = 0;
1054 :
1055 209 : ReturnErrorOnFailure(RefreshLivenessCheckTimer());
1056 :
1057 209 : return CHIP_NO_ERROR;
1058 211 : }
1059 :
1060 157 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams)
1061 : {
1062 157 : mReadPrepareParams = std::move(aReadPrepareParams);
1063 157 : CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams);
1064 157 : if (err != CHIP_NO_ERROR)
1065 : {
1066 1 : StopResubscription();
1067 : }
1068 157 : return err;
1069 : }
1070 :
1071 0 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(const ScopedNodeId & aPublisherId, ReadPrepareParams && aReadPrepareParams)
1072 : {
1073 0 : mPeer = aPublisherId;
1074 0 : mReadPrepareParams = std::move(aReadPrepareParams);
1075 0 : CHIP_ERROR err = EstablishSessionToPeer();
1076 0 : if (err != CHIP_NO_ERROR)
1077 : {
1078 : // Make sure we call our callback's OnDeallocatePaths.
1079 0 : StopResubscription();
1080 : }
1081 0 : return err;
1082 : }
1083 :
1084 242 : CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepareParams)
1085 : {
1086 242 : VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
1087 : CHIP_ERROR_INVALID_ARGUMENT);
1088 :
1089 239 : return SendSubscribeRequestImpl(aReadPrepareParams);
1090 : }
1091 :
1092 239 : CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadPrepareParams)
1093 : {
1094 239 : VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);
1095 :
1096 239 : if (&aReadPrepareParams != &mReadPrepareParams)
1097 : {
1098 76 : mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
1099 : }
1100 :
1101 239 : mIsPeerLIT = aReadPrepareParams.mIsPeerLIT;
1102 :
1103 239 : mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;
1104 :
1105 : // Todo: Remove the below, Update span in ReadPrepareParams
1106 239 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
1107 239 : aReadPrepareParams.mAttributePathParamsListSize);
1108 239 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
1109 239 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
1110 239 : aReadPrepareParams.mDataVersionFilterListSize);
1111 :
1112 239 : System::PacketBufferHandle msgBuf;
1113 239 : System::PacketBufferTLVWriter writer;
1114 239 : SubscribeRequestMessage::Builder request;
1115 239 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
1116 :
1117 239 : ReturnErrorOnFailure(request.Init(&writer));
1118 :
1119 239 : request.KeepSubscriptions(aReadPrepareParams.mKeepSubscriptions)
1120 239 : .MinIntervalFloorSeconds(aReadPrepareParams.mMinIntervalFloorSeconds)
1121 239 : .MaxIntervalCeilingSeconds(aReadPrepareParams.mMaxIntervalCeilingSeconds);
1122 :
1123 239 : if (!attributePaths.empty())
1124 : {
1125 232 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
1126 232 : ReturnErrorOnFailure(attributePathListBuilder.GetError());
1127 232 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
1128 : }
1129 :
1130 239 : if (!eventPaths.empty())
1131 : {
1132 22 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
1133 22 : ReturnErrorOnFailure(eventPathListBuilder.GetError());
1134 22 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
1135 :
1136 22 : Optional<EventNumber> eventMin;
1137 22 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
1138 22 : if (eventMin.HasValue())
1139 : {
1140 0 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
1141 0 : ReturnErrorOnFailure(request.GetError());
1142 0 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
1143 : }
1144 22 : }
1145 :
1146 239 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
1147 :
1148 239 : bool encodedDataVersionList = false;
1149 239 : TLV::TLVWriter backup;
1150 239 : request.Checkpoint(backup);
1151 239 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
1152 239 : ReturnErrorOnFailure(request.GetError());
1153 239 : if (!attributePaths.empty())
1154 : {
1155 232 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
1156 : encodedDataVersionList));
1157 : }
1158 239 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
1159 239 : if (encodedDataVersionList)
1160 : {
1161 65 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
1162 : }
1163 : else
1164 : {
1165 174 : request.Rollback(backup);
1166 : }
1167 :
1168 239 : ReturnErrorOnFailure(request.EndOfSubscribeRequestMessage());
1169 239 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
1170 :
1171 239 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
1172 :
1173 239 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
1174 239 : if (exchange == nullptr)
1175 : {
1176 0 : if (aReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1177 : {
1178 0 : return CHIP_ERROR_NO_MEMORY;
1179 : }
1180 :
1181 : // Trying to subscribe with a defunct session somehow.
1182 0 : return CHIP_ERROR_INCORRECT_STATE;
1183 : }
1184 :
1185 239 : mExchange.Grab(exchange);
1186 :
1187 239 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
1188 : {
1189 239 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
1190 : }
1191 : else
1192 : {
1193 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
1194 : }
1195 :
1196 239 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
1197 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
1198 :
1199 239 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
1200 239 : MoveToState(ClientState::AwaitingInitialReport);
1201 :
1202 239 : return CHIP_NO_ERROR;
1203 239 : }
1204 :
1205 6 : CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause)
1206 : {
1207 6 : if (aTerminationCause == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
1208 : {
1209 0 : ChipLogProgress(DataManagement, "ICD device is inactive, skipping scheduling resubscribe within DefaultResubscribePolicy");
1210 0 : return CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1211 : }
1212 :
1213 6 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
1214 :
1215 6 : auto timeTillNextResubscription = ComputeTimeTillNextSubscription();
1216 6 : ChipLogProgress(DataManagement,
1217 : "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
1218 : "ms due to error %" CHIP_ERROR_FORMAT,
1219 : GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription,
1220 : aTerminationCause.Format());
1221 6 : return ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT);
1222 : }
1223 :
1224 0 : void ReadClient::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
1225 : const SessionHandle & sessionHandle)
1226 : {
1227 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1228 0 : VerifyOrDie(_this != nullptr);
1229 :
1230 0 : ChipLogProgress(DataManagement, "HandleDeviceConnected");
1231 0 : _this->mReadPrepareParams.mSessionHolder.Grab(sessionHandle);
1232 0 : _this->mpExchangeMgr = &exchangeMgr;
1233 :
1234 0 : _this->mpCallback.OnCASESessionEstablished(sessionHandle, _this->mReadPrepareParams);
1235 :
1236 0 : auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1237 0 : if (err != CHIP_NO_ERROR)
1238 : {
1239 0 : _this->Close(err);
1240 : }
1241 0 : }
1242 :
1243 0 : void ReadClient::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
1244 : {
1245 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1246 0 : VerifyOrDie(_this != nullptr);
1247 :
1248 0 : ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'", err.Format());
1249 :
1250 0 : _this->Close(err);
1251 0 : }
1252 :
1253 7 : void ReadClient::OnResubscribeTimerCallback(System::Layer * /* If this starts being used, fix callers that pass nullptr */,
1254 : void * apAppState)
1255 : {
1256 7 : ReadClient * const _this = static_cast<ReadClient *>(apAppState);
1257 7 : VerifyOrDie(_this != nullptr);
1258 :
1259 7 : _this->mIsResubscriptionScheduled = false;
1260 :
1261 : CHIP_ERROR err;
1262 :
1263 7 : ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
1264 7 : _this->mNumRetries++;
1265 :
1266 7 : bool allowResubscribeOnError = true;
1267 14 : if (!_this->mReadPrepareParams.mSessionHolder ||
1268 7 : !_this->mReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1269 : {
1270 : // We don't have an active CASE session. We need to go ahead and set
1271 : // one up, if we can.
1272 0 : if (_this->EstablishSessionToPeer() == CHIP_NO_ERROR)
1273 : {
1274 0 : return;
1275 : }
1276 :
1277 0 : if (_this->mForceCaseOnNextResub)
1278 : {
1279 : // Caller asked us to force CASE but we have no way to do CASE.
1280 : // Just stop trying.
1281 0 : allowResubscribeOnError = false;
1282 : }
1283 :
1284 : // No way to send our subscribe request.
1285 0 : err = CHIP_ERROR_INCORRECT_STATE;
1286 0 : ExitNow();
1287 : }
1288 :
1289 7 : err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1290 :
1291 7 : exit:
1292 7 : if (err != CHIP_NO_ERROR)
1293 : {
1294 : //
1295 : // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid
1296 : // CASESessionManager pointer when mForceCaseOnNextResub was true.
1297 : //
1298 : // In that case, don't permit re-subscription to occur.
1299 : //
1300 0 : _this->Close(err, allowResubscribeOnError);
1301 : }
1302 : }
1303 :
1304 65 : void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath)
1305 : {
1306 130 : for (size_t index = 0; index < mReadPrepareParams.mDataVersionFilterListSize; index++)
1307 : {
1308 65 : if (mReadPrepareParams.mpDataVersionFilterList[index].mEndpointId == aPath.mEndpointId &&
1309 65 : mReadPrepareParams.mpDataVersionFilterList[index].mClusterId == aPath.mClusterId)
1310 : {
1311 : // Now we know the current version for this cluster is aPath.mDataVersion.
1312 65 : mReadPrepareParams.mpDataVersionFilterList[index].mDataVersion = aPath.mDataVersion;
1313 : }
1314 : }
1315 65 : }
1316 :
1317 339 : CHIP_ERROR ReadClient::GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin)
1318 : {
1319 339 : if (aReadPrepareParams.mEventNumber.HasValue())
1320 : {
1321 312 : aEventMin = aReadPrepareParams.mEventNumber;
1322 : }
1323 : else
1324 : {
1325 27 : ReturnErrorOnFailure(mpCallback.GetHighestReceivedEventNumber(aEventMin));
1326 27 : if (aEventMin.HasValue())
1327 : {
1328 : // We want to start with the first event _after_ the last one we received.
1329 2 : aEventMin.SetValue(aEventMin.Value() + 1);
1330 : }
1331 : }
1332 339 : return CHIP_NO_ERROR;
1333 : }
1334 :
1335 195 : void ReadClient::TriggerResubscribeIfScheduled(const char * reason)
1336 : {
1337 195 : if (!mIsResubscriptionScheduled)
1338 : {
1339 195 : return;
1340 : }
1341 :
1342 0 : ChipLogDetail(DataManagement, "ReadClient[%p] triggering resubscribe, reason: %s", this, reason);
1343 0 : CancelResubscribeTimer();
1344 0 : OnResubscribeTimerCallback(nullptr, this);
1345 : }
1346 :
1347 0 : Optional<System::Clock::Timeout> ReadClient::GetSubscriptionTimeout()
1348 : {
1349 0 : if (!IsSubscriptionType() || !IsSubscriptionActive())
1350 : {
1351 0 : return NullOptional;
1352 : }
1353 :
1354 : System::Clock::Timeout timeout;
1355 0 : CHIP_ERROR err = ComputeLivenessCheckTimerTimeout(&timeout);
1356 0 : if (err != CHIP_NO_ERROR)
1357 : {
1358 0 : return NullOptional;
1359 : }
1360 :
1361 0 : return MakeOptional(timeout);
1362 : }
1363 :
1364 0 : CHIP_ERROR ReadClient::EstablishSessionToPeer()
1365 : {
1366 0 : ChipLogProgress(DataManagement, "Trying to establish a CASE session for subscription");
1367 0 : auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
1368 0 : VerifyOrReturnError(caseSessionManager != nullptr, CHIP_ERROR_INCORRECT_STATE);
1369 0 : caseSessionManager->FindOrEstablishSession(mPeer, &mOnConnectedCallback, &mOnConnectionFailureCallback);
1370 0 : return CHIP_NO_ERROR;
1371 : }
1372 :
1373 : } // namespace app
1374 : } // namespace chip
|