XRootD
Loading...
Searching...
No Matches
XrdSsiSessReal.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S s i S e s s R e a l . c c */
4/* */
5/* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <cerrno>
31#include <cinttypes>
32#include <cstdio>
33#include <cstdlib>
34#include <cstring>
35#include <string>
36#include <sys/types.h>
37#include <netinet/in.h>
38
43#include "XrdSsi/XrdSsiScale.hh"
47#include "XrdSsi/XrdSsiTrace.hh"
48#include "XrdSsi/XrdSsiUtils.hh"
49
50#include "XrdSys/XrdSysError.hh"
52#include "Xrd/XrdScheduler.hh"
53
54using namespace XrdSsi;
55
56/******************************************************************************/
57/* L o c a l D e f i n e s */
58/******************************************************************************/
59
60#define SINGLETON(dlvar, theitem)\
61 theitem ->dlvar .next == theitem
62
63#define INSERT(dlvar, curitem, newitem) \
64 newitem ->dlvar .next = curitem; \
65 newitem ->dlvar .prev = curitem ->dlvar .prev; \
66 curitem ->dlvar .prev-> dlvar .next = newitem; \
67 curitem ->dlvar .prev = newitem
68
69#define REMOVE(dlbase, dlvar, curitem) \
70 if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
71 ? 0 : curitem ->dlvar .next);\
72 curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
73 curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
74 curitem ->dlvar .next = curitem;\
75 curitem ->dlvar .prev = curitem
76
77/******************************************************************************/
78/* L o c a l S t a t i c s */
79/******************************************************************************/
80
81namespace
82{
83 std::string dsProperty("DataServer");
84 XrdSsiMutex sidMutex;
85
86 Atomic(uint32_t) sidVal(0);
87}
88
89/******************************************************************************/
90/* G l o b a l s */
91/******************************************************************************/
92
93namespace XrdSsi
94{
95extern XrdScheduler *schedP;
96
97extern XrdSysError Log;
99}
100
101/******************************************************************************/
102/* L o c a l C l a s s e s */
103/******************************************************************************/
104
105namespace
106{
107class CleanUp : public XrdJob
108{
109public:
110
111void DoIt() {sessP->Lock();
112 sessP->Unprovision();
113 delete this;
114 }
115
116 CleanUp(XrdSsiSessReal *sP) : sessP(sP) {}
117 ~CleanUp() {}
118
119private:
120XrdSsiSessReal *sessP;
121};
122}
123
124/******************************************************************************/
125/* D e s t r u c t o r */
126/******************************************************************************/
127
129{
130 XrdSsiTaskReal *tP;
131
132 if (resKey) free(resKey);
133 if (sessName) free(sessName);
134 if (sessNode) free(sessNode);
135
136 while((tP = freeTask)) {freeTask = tP->attList.next; delete tP;}
137}
138
139/******************************************************************************/
140/* I n i t S e s s i o n */
141/******************************************************************************/
142
143void XrdSsiSessReal::InitSession(XrdSsiServReal *servP, const char *sName,
144 int uent, bool hold, bool newSID)
145{
146 EPNAME("InitSession");
147 requestP = 0;
148 uEnt = uent;
149 attBase = 0;
150 freeTask = 0;
151 myService = servP;
152 nextTID = 0;
153 alocLeft = XrdSsiRRInfo::idMax;
154 isHeld = hold;
155 inOpen = false;
156 noReuse = false;
157 if (resKey) {free(resKey); resKey = 0;}
158 if (sessName) free(sessName);
159 sessName = (sName ? strdup(sName) : 0);
160 if (sessNode) free(sessNode);
161 sessNode = 0;
162 if (newSID)
163 {if (servP == 0) sessID = 0xffffffff;
164 else {Atomic_BEG(sidMutex);
165 sessID = Atomic_INC(sidVal);
166 Atomic_END(sidMutex);
167 snprintf(tident, sizeof(tident), "S %u#", sessID);
168 DEBUG("new sess for "<<sName<<" uent="<<uent<<" hold="<<hold);
169 }
170 } else {
171 DEBUG("reuse sess for "<<sName<<" uent="<<uent<<" hold="<<hold);
172 }
173}
174
175/******************************************************************************/
176/* Private: N e w T a s k */
177/******************************************************************************/
178
179// Must be called with sessMutex locked!
180
181XrdSsiTaskReal *XrdSsiSessReal::NewTask(XrdSsiRequest *reqP)
182{
183 EPNAME("NewTask");
184 XrdSsiTaskReal *ptP, *tP;
185
186// Allocate a task object for this request
187//
188 if ((tP = freeTask)) freeTask = tP->attList.next;
189 else {if (!alocLeft || !(tP = new XrdSsiTaskReal(this)))
190 {XrdSsiUtils::RetErr(*reqP, "Too many active requests.", EMLINK);
191 return 0;
192 }
193 alocLeft--;
194 }
195
196// We always set a new task ID to avoid ID collisions. This is good for over
197// 194 days if we have 1 request/second. In practice. this will work for a
198// couple of years before wrapping. By then the ID's should be free.
199//
200 tP->SetTaskID(nextTID++, sessID);
201 nextTID &= XrdSsiRRInfo::idMax;
202
203// Initialize the task and return its pointer
204//
205 tP->Init(reqP, reqP->GetTimeOut());
206 DEBUG("New task=" <<tP <<" id=" <<tP->ID());
207
208// Insert the task into our list of tasks
209//
210 if ((ptP = attBase)) {INSERT(attList, ptP, tP);}
211 else attBase = tP;
212
213// We will be using the session mutex for serialization. Afterwards, bind the
214// task to the request and return the task pointer.
215//
216 XrdSsiRRAgent::SetMutex(reqP, &sessMutex);
217 tP->BindRequest(*reqP);
218 return tP;
219}
220
221/******************************************************************************/
222/* P r o v i s i o n */
223/******************************************************************************/
224
225bool XrdSsiSessReal::Provision(XrdSsiRequest *reqP, const char *epURL)
226{
227 EPNAME("Provision");
228 XrdCl::XRootDStatus epStatus;
229 XrdSsiMutexMon rHelp(&sessMutex);
231
232// Set retry flag as appropriate
233//
234 if (XrdSsiRRAgent::isaRetry(reqP, true)) oFlags |= XrdCl::OpenFlags::Refresh;
235
236// Issue the open and if the open was started, return success.
237//
238 DEBUG("Provisioning " <<epURL);
239 epStatus = epFile.Open((const std::string)epURL, oFlags,
242 reqP->GetTimeOut());
243
244// If there was an error, scuttle the request. Note that errors will be returned
245// on a separate thread to avoid hangs here.
246//
247 if (!epStatus.IsOK())
248 {std::string eTxt;
249 int eNum = XrdSsiUtils::GetErr(epStatus, eTxt);
250 XrdSsiUtils::RetErr(*reqP, eTxt.c_str(), eNum);
252 return false;
253 }
254
255// Queue a new task and indicate our state
256//
257 NewTask(reqP);
258 inOpen = true;
259 return true;
260}
261
262/******************************************************************************/
263/* Private: R e l T a s k */
264/******************************************************************************/
265
266void XrdSsiSessReal::RelTask(XrdSsiTaskReal *tP) // sessMutex locked!
267{
268 EPNAME("RelTask");
269
270// Do some debugging here
271//
272 DEBUG((isHeld ? "Recycling":"Deleting")<<" task="<<tP<<" id=" <<tP->ID());
273
274// Delete this task or place it on the free list
275//
276 if (!isHeld) delete tP;
277 else {tP->ClrEvent();
278 tP->attList.next = freeTask;
279 freeTask = tP;
280 }
281}
282
283/******************************************************************************/
284/* R u n */
285/******************************************************************************/
286
288{
289 XrdSsiMutexMon sessMon(sessMutex);
290 XrdSsiTaskReal *tP;
291
292// If we are not allowed to be reused, return to indicated try someone else
293//
294 if (noReuse) return false;
295
296// Reserve a stream ID. If we cannot then indicate we cannot be reused
297//
298 if (!XrdSsi::sidScale.rsvEnt(uEnt)) return false;
299
300// Queue a new task
301//
302 tP = NewTask(reqP);
303
304// If we are already open and we have a task, send off the request
305//
306 if (!inOpen && tP && !tP->SendRequest(sessNode)) noReuse = true;
307 return true;
308}
309
310/******************************************************************************/
311/* Private: S h u t d o w n */
312/******************************************************************************/
313
314// Called with sessMutex locked and return with it unlocked
315
316void XrdSsiSessReal::Shutdown(XrdCl::XRootDStatus &epStatus, bool onClose)
317{
318 XrdSsiTaskReal *tP, *ntP = freeTask;
319
320// Delete all acccumulated tasks
321//
322 while((tP = ntP)) {ntP = tP->attList.next; delete tP;}
323 freeTask = 0;
324
325// If the close failed then we cannot recycle this object as it is not reusable
326//
327 if (onClose && !epStatus.IsOK())
328 {std::string eText;
329 int eNum = XrdSsiUtils::GetErr(epStatus, eText);
330 char mBuff[1024];
331 snprintf(mBuff, sizeof(mBuff), "Unprovision: %s@%s error; %d",
332 sessName, sessNode, eNum);
333 Log.Emsg("Shutdown", mBuff, eText.c_str());
334 sessMutex.UnLock();
335 myService->Recycle(this, false);
336 } else {
337 if (sessName) {free(sessName); sessName = 0;}
338 if (sessNode) {free(sessNode); sessNode = 0;}
339 sessMutex.UnLock();
340 myService->Recycle(this, !noReuse);
341 }
342}
343
344/******************************************************************************/
345/* T a s k F i n i s h e d */
346/******************************************************************************/
347
349{
350 EPNAME("TaskFin");
351// Lock our mutex
352//
353 sessMutex.Lock();
354
355// Remove task from the task list if it's in it and release the task object.
356//
357 if (tP == attBase || tP->attList.next != tP)
358 {REMOVE(attBase, attList, tP);}
359 RelTask(tP);
360
361// Return the request entry number
362//
364
365// If we are waiting for a provision to finish, simply exit as the event
366// handler will notice that there is no task and will unprovision. Otherwise
367//
368
369
370// If we can shutdown, then unprovision which will drive a shutdown. Note
371// that Unprovision() returns without the sessMutex, otherwise we must
372// unlock it before we return. A shutdown invalidates this object!
373//
374 if (!inOpen)
375 {if (!isHeld && !attBase) Unprovision();
376 else sessMutex.UnLock();
377 } else {
378 DEBUG("Unprovision deferred for " <<sessName);
379 sessMutex.UnLock();
380 }
381}
382
383/******************************************************************************/
384/* U n H o l d */
385/******************************************************************************/
386
387void XrdSsiSessReal::UnHold(bool cleanup)
388{
389 XrdSsiMutexMon sessMon(sessMutex);
390
391// Immediately stopo reuse of this object
392//
393 if (isHeld && resKey && myService) myService->StopReuse(resKey);
394
395// Turn off the hold flag and if we have no attached tasks, schedule shutdown
396//
397 isHeld = false;
398 if (cleanup && !attBase) XrdSsi::schedP->Schedule(new CleanUp(this));
399}
400
401/******************************************************************************/
402/* Private: U n p r o v i s i o n */
403/******************************************************************************/
404
405// Called with sessMutex locked and returns with it unlocked
406// Returns false if a shutdown occurred (i.e. session object no longer valid)
407
408bool XrdSsiSessReal::Unprovision() // Called with sessMutex locked!
409{
410 EPNAME("Unprovision");
412
413// Clear any pending events
414//
415 DEBUG("Closing " <<sessName);
416
417// If the file is not open (it might be due to an open error) then do a
418// shutdown right away. Otherwise, try to close if successful the event
419// handler will do the shutdown, Otherwise, we do a Futterwacken dance.
420//
421 if (!epFile.IsOpen()) {Shutdown(uStat, false); return false;}
422 else {uStat = epFile.Close((XrdCl::ResponseHandler *)this);
423 if (!uStat.IsOK()) {Shutdown(uStat, true); return false;}
424 else sessMutex.UnLock();
425 }
426 return true;
427}
428
429/******************************************************************************/
430/* X e q E v e n t */
431/******************************************************************************/
432
434 XrdCl::AnyObject **respP)
435{
436// Lock out mutex. Note that events like shutdown unlock the mutex. The only
437// events handled here are open() and close().
438//
439 sessMutex.Lock();
440 XrdSsiTaskReal *ztP, *ntP, *tP = attBase;
441
442// If we are not in the open phase then this is due to a close event. Simply
443// do a shutdown and return to stop event processing.
444//
445 if (!inOpen)
446 {Shutdown(*status, true); // sessMutex gets unlocked!
447 return -1; // This object no longer valid!
448 }
449
450// We are no longer in open. However, if open encounetered an error then this
451// session cannot be reused because the file object is in a bad state.
452//
453 inOpen = false;
454 noReuse = !status->IsOK();
455
456// If we have no requests then we may want to simply shoutdown.
457// Note that shutdown and unprovision unlock the sessMutex.
458//
459 if (!tP)
460 {if (isHeld)
461 {sessMutex.UnLock();
462 return 1;
463 }
464 if (!status->IsOK()) Shutdown(*status, false);
465 else {if (!isHeld) return (Unprovision() ? 1 : -1);
466 else sessMutex.UnLock();
467 }
468 return 1; // Flush events and continue
469 }
470
471// We are here because the open finally completed. If the open failed, then
472// schedule an error for all pending tasks. The Finish() call on each will
473// drive the cleanup of this session.
474//
475 if (!status->IsOK())
476 {XrdSsiErrInfo eInfo;
477 XrdSsiUtils::SetErr(*status, eInfo);
478 do {tP->SchedError(&eInfo); tP = tP->attList.next;}
479 while(tP != attBase);
480 sessMutex.UnLock();
481 return 1;
482 }
483
484// Obtain the endpoint name
485//
486 std::string currNode;
487 if (epFile.GetProperty(dsProperty, currNode))
488 {if (sessNode) free(sessNode);
489 sessNode = strdup(currNode.c_str());
490 } else sessNode = strdup("Unknown!");
491
492// Execute each pending request. Make sure not to reference the task object
493// chain pointer after invoking SendRequest() as it may become invalid.
494//
495 ztP = attBase;
496 do {ntP = tP->attList.next;
497 if (!tP->SendRequest(sessNode)) noReuse = true;
498 tP = ntP;
499 } while(tP != ztP);
500
501// We are done, field the next event
502//
503 sessMutex.UnLock();
504 return 0;
505}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define DEBUG(x)
#define EPNAME(x)
#define Atomic(type)
#define Atomic_INC(x)
#define Atomic_END(x)
#define Atomic_BEG(x)
#define REMOVE(dlbase, dlvar, curitem)
#define INSERT(dlvar, curitem, newitem)
XRootDStatus Close(ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:151
bool IsOpen() const
Check if the file is open.
Definition XrdClFile.cc:846
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:99
bool GetProperty(const std::string &name, std::string &value) const
Definition XrdClFile.cc:878
Handle an async response.
void Schedule(XrdJob *jp)
void ClrEvent()
char tident[24]
static void SetMutex(XrdSsiRequest *rP, XrdSsiMutex *mP)
static bool isaRetry(XrdSsiRequest *rP, bool reset=false)
static const unsigned int idMax
uint16_t GetTimeOut()
void BindRequest(XrdSsiRequest &rqstR)
void retEnt(int xEnt)
void StopReuse(const char *resKey)
void Recycle(XrdSsiSessReal *sObj, bool reuse)
void InitSession(XrdSsiServReal *servP, const char *sName, int uent, bool hold, bool newSID=false)
bool Provision(XrdSsiRequest *reqP, const char *epURL)
bool Run(XrdSsiRequest *reqP)
int XeqEvent(XrdCl::XRootDStatus *status, XrdCl::AnyObject **respP)
XrdCl::File epFile
void UnHold(bool cleanup=true)
void TaskFinished(XrdSsiTaskReal *tP)
void Init(XrdSsiRequest *rP, unsigned short tmo=0)
XrdSsiTaskReal * next
void SchedError(XrdSsiErrInfo *eInfo=0)
bool SendRequest(const char *node)
void SetTaskID(uint32_t tid, uint32_t sid)
static int GetErr(XrdCl::XRootDStatus &Status, std::string &eText)
static void RetErr(XrdSsiRequest &reqP, const char *eTxt, int eNum)
static void SetErr(XrdCl::XRootDStatus &Status, XrdSsiErrInfo &eInfo)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdScheduler * schedP
XrdSsiScale sidScale
XrdSysError Log
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
bool IsOK() const
We're fine.