XRootD
XrdPfc.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include <fcntl.h>
20 #include <sstream>
21 #include <algorithm>
22 #include <sys/statvfs.h>
23 
24 #include "XrdCl/XrdClConstants.hh"
25 #include "XrdCl/XrdClURL.hh"
26 
27 #include "XrdOuc/XrdOucEnv.hh"
28 #include "XrdOuc/XrdOucUtils.hh"
29 
30 #include "XrdSys/XrdSysPthread.hh"
31 #include "XrdSys/XrdSysTimer.hh"
32 #include "XrdSys/XrdSysTrace.hh"
33 
35 
36 #include "XrdOss/XrdOss.hh"
37 
38 #include "XrdPfc.hh"
39 #include "XrdPfcTrace.hh"
40 #include "XrdPfcFSctl.hh"
41 #include "XrdPfcInfo.hh"
42 #include "XrdPfcIOFile.hh"
43 #include "XrdPfcIOFileBlock.hh"
44 
45 using namespace XrdPfc;
46 
47 Cache * Cache::m_instance = 0;
48 
50 
51 
53 {
55  return 0;
56 }
57 
58 void *PurgeThread(void*)
59 {
61  return 0;
62 }
63 
65 {
67  return 0;
68 }
69 
70 void *PrefetchThread(void*)
71 {
73  return 0;
74 }
75 
76 //==============================================================================
77 
78 extern "C"
79 {
81  const char *config_filename,
82  const char *parameters,
83  XrdOucEnv *env)
84 {
85  XrdSysError err(logger, "");
86  err.Say("++++++ Proxy file cache initialization started.");
87 
88  if ( ! env ||
89  ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
90  {
93  }
94 
95  Cache &instance = Cache::CreateInstance(logger, env);
96 
97  if (! instance.Config(config_filename, parameters))
98  {
99  err.Say("Config Proxy file cache initialization failed.");
100  return 0;
101  }
102  err.Say("------ Proxy file cache initialization completed.");
103 
104  {
105  pthread_t tid;
106 
107  for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
108  {
109  XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
110  }
111 
112  if (instance.RefConfiguration().m_prefetch_max_blocks > 0)
113  {
114  XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
115  }
116 
117  XrdSysThread::Run(&tid, ResourceMonitorHeartBeatThread, 0, 0, "XrdPfc ResourceMonitorHeartBeat");
118 
119  XrdSysThread::Run(&tid, PurgeThread, 0, 0, "XrdPfc Purge");
120  }
121 
122  XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
123  env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
124 
125  return &instance;
126 }
127 }
128 
129 //==============================================================================
130 
131 void Configuration::calculate_fractional_usages(long long du, long long fu,
132  double &frac_du, double &frac_fu)
133 {
134  // Calculate fractional disk / file usage and clamp them to [0, 1].
135 
136  // Fractional total usage above LWM:
137  // - can be > 1 if usage is above HWM;
138  // - can be < 0 if triggered via age-based-purging.
139  frac_du = (double) (du - m_diskUsageLWM) / (m_diskUsageHWM - m_diskUsageLWM);
140 
141  // Fractional file usage above baseline.
142  // - can be > 1 if file usage is above max;
143  // - can be < 0 if file usage is below baseline.
144  frac_fu = (double) (fu - m_fileUsageBaseline) / (m_fileUsageMax - m_fileUsageBaseline);
145 
146  frac_du = std::min( std::max( frac_du, 0.0), 1.0 );
147  frac_fu = std::min( std::max( frac_fu, 0.0), 1.0 );
148 }
149 
150 //==============================================================================
151 
153 {
154  assert (m_instance == 0);
155  m_instance = new Cache(logger, env);
156  return *m_instance;
157 }
158 
159  Cache& Cache::GetInstance() { return *m_instance; }
160 const Cache& Cache::TheOne() { return *m_instance; }
161 const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
162 
164 {
165  if (! m_decisionpoints.empty())
166  {
167  XrdCl::URL url(io->Path());
168  std::string filename = url.GetPath();
169  std::vector<Decision*>::const_iterator it;
170  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
171  {
172  XrdPfc::Decision *d = *it;
173  if (! d) continue;
174  if (! d->Decide(filename, *m_oss))
175  {
176  return false;
177  }
178  }
179  }
180 
181  return true;
182 }
183 
185  XrdOucCache("pfc"),
186  m_env(env),
187  m_log(logger, "XrdPfc_"),
188  m_trace(new XrdSysTrace("XrdPfc", logger)),
189  m_traceID("Cache"),
190  m_oss(0),
191  m_gstream(0),
192  m_prefetch_condVar(0),
193  m_prefetch_enabled(false),
194  m_RAM_used(0),
195  m_RAM_write_queue(0),
196  m_RAM_std_size(0),
197  m_isClient(false),
198  m_in_purge(false),
199  m_active_cond(0),
200  m_stats_n_purge_cond(0),
201  m_fs_state(0),
202  m_last_scan_duration(0),
203  m_last_purge_duration(0),
204  m_spt_state(SPTS_Idle)
205 {
206  // Default log level is Warning.
207  m_trace->What = 2;
208 }
209 
211 {
212  const char* tpfx = "Attach() ";
213 
214  if (Cache::GetInstance().Decide(io))
215  {
216  TRACE(Info, tpfx << io->Path());
217 
218  IO *cio;
219 
220  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
221  {
222  cio = new IOFileBlock(io, *this);
223  }
224  else
225  {
226  IOFile *iof = new IOFile(io, *this);
227 
228  if ( ! iof->HasFile())
229  {
230  delete iof;
231  // TODO - redirect instead. But this is kind of an awkward place for it.
232  // errno is set during IOFile construction.
233  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
234  return io;
235  }
236 
237  cio = iof;
238  }
239 
240  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
241  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
242 
243  return cio;
244  }
245  else
246  {
247  TRACE(Info, tpfx << "decision decline " << io->Path());
248  }
249  return io;
250 }
251 
252 void Cache::AddWriteTask(Block* b, bool fromRead)
253 {
254  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
255 
256  {
257  XrdSysMutexHelper lock(&m_RAM_mutex);
258  m_RAM_write_queue += b->get_size();
259  }
260 
261  m_writeQ.condVar.Lock();
262  if (fromRead)
263  m_writeQ.queue.push_back(b);
264  else
265  m_writeQ.queue.push_front(b);
266  m_writeQ.size++;
267  m_writeQ.condVar.Signal();
268  m_writeQ.condVar.UnLock();
269 }
270 
272 {
273  std::list<Block*> removed_blocks;
274  long long sum_size = 0;
275 
276  m_writeQ.condVar.Lock();
277  std::list<Block*>::iterator i = m_writeQ.queue.begin();
278  while (i != m_writeQ.queue.end())
279  {
280  if ((*i)->m_file == file)
281  {
282  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
283  std::list<Block*>::iterator j = i++;
284  removed_blocks.push_back(*j);
285  sum_size += (*j)->get_size();
286  m_writeQ.queue.erase(j);
287  --m_writeQ.size;
288  }
289  else
290  {
291  ++i;
292  }
293  }
294  m_writeQ.condVar.UnLock();
295 
296  {
297  XrdSysMutexHelper lock(&m_RAM_mutex);
298  m_RAM_write_queue -= sum_size;
299  }
300 
301  file->BlocksRemovedFromWriteQ(removed_blocks);
302 }
303 
305 {
306  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
307 
308  while (true)
309  {
310  m_writeQ.condVar.Lock();
311  while (m_writeQ.size == 0)
312  {
313  m_writeQ.condVar.Wait();
314  }
315 
316  // MT -- optimize to pop several blocks if they are available (or swap the list).
317  // This makes sense especially for smallish block sizes.
318 
319  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
320  long long sum_size = 0;
321 
322  for (int bi = 0; bi < n_pushed; ++bi)
323  {
324  Block* block = m_writeQ.queue.front();
325  m_writeQ.queue.pop_front();
326  m_writeQ.writes_between_purges += block->get_size();
327  sum_size += block->get_size();
328 
329  blks_to_write[bi] = block;
330 
331  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
332  }
333  m_writeQ.size -= n_pushed;
334 
335  m_writeQ.condVar.UnLock();
336 
337  {
338  XrdSysMutexHelper lock(&m_RAM_mutex);
339  m_RAM_write_queue -= sum_size;
340  }
341 
342  for (int bi = 0; bi < n_pushed; ++bi)
343  {
344  Block* block = blks_to_write[bi];
345 
346  block->m_file->WriteBlockToDisk(block);
347  }
348  }
349 }
350 
351 //==============================================================================
352 
353 char* Cache::RequestRAM(long long size)
354 {
355  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
356 
357  bool std_size = (size == m_configuration.m_bufferSize);
358 
359  m_RAM_mutex.Lock();
360 
361  long long total = m_RAM_used + size;
362 
363  if (total <= m_configuration.m_RamAbsAvailable)
364  {
365  m_RAM_used = total;
366  if (std_size && m_RAM_std_size > 0)
367  {
368  char *buf = m_RAM_std_blocks.back();
369  m_RAM_std_blocks.pop_back();
370  --m_RAM_std_size;
371 
372  m_RAM_mutex.UnLock();
373 
374  return buf;
375  }
376  else
377  {
378  m_RAM_mutex.UnLock();
379  char *buf;
380  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
381  {
382  // Report out of mem? Probably should report it at least the first time,
383  // then periodically.
384  return 0;
385  }
386  return buf;
387  }
388  }
389  m_RAM_mutex.UnLock();
390  return 0;
391 }
392 
393 void Cache::ReleaseRAM(char* buf, long long size)
394 {
395  bool std_size = (size == m_configuration.m_bufferSize);
396  {
397  XrdSysMutexHelper lock(&m_RAM_mutex);
398 
399  m_RAM_used -= size;
400 
401  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
402  {
403  m_RAM_std_blocks.push_back(buf);
404  ++m_RAM_std_size;
405  return;
406  }
407  }
408  free(buf);
409 }
410 
411 File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
412 {
413  // Called from virtual IO::Attach
414 
415  TRACE(Debug, "GetFile " << path << ", io " << io);
416 
417  ActiveMap_i it;
418 
419  {
420  XrdSysCondVarHelper lock(&m_active_cond);
421 
422  while (true)
423  {
424  it = m_active.find(path);
425 
426  // File is not open or being opened. Mark it as being opened and
427  // proceed to opening it outside of while loop.
428  if (it == m_active.end())
429  {
430  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
431  break;
432  }
433 
434  if (it->second != 0)
435  {
436  it->second->AddIO(io);
437  inc_ref_cnt(it->second, false, true);
438 
439  return it->second;
440  }
441  else
442  {
443  // Wait for some change in m_active, then recheck.
444  m_active_cond.Wait();
445  }
446  }
447  }
448 
449  if (filesize == 0)
450  {
451  struct stat st;
452  int res = io->Fstat(st);
453  if (res < 0) {
454  errno = res;
455  TRACE(Error, "GetFile, could not get valid stat");
456  } else if (res > 0) {
457  errno = ENOTSUP;
458  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
459  } else {
460  filesize = st.st_size;
461  }
462  }
463 
464  File *file = 0;
465 
466  if (filesize >= 0)
467  {
468  file = File::FileOpen(path, off, filesize);
469  }
470 
471  {
472  XrdSysCondVarHelper lock(&m_active_cond);
473 
474  if (file)
475  {
476  inc_ref_cnt(file, false, true);
477  it->second = file;
478 
479  file->AddIO(io);
480  }
481  else
482  {
483  m_active.erase(it);
484  }
485 
486  m_active_cond.Broadcast();
487  }
488 
489  return file;
490 }
491 
493 {
494  // Called from virtual IO::DetachFinalize.
495 
496  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
497 
498  {
499  XrdSysCondVarHelper lock(&m_active_cond);
500 
501  f->RemoveIO(io);
502  }
503  dec_ref_cnt(f, true);
504 }
505 
506 
507 namespace
508 {
509 
510 class DiskSyncer : public XrdJob
511 {
512 private:
513  File *m_file;
514  bool m_high_debug;
515 
516 public:
517  DiskSyncer(File *f, bool high_debug, const char *desc = "") :
518  XrdJob(desc),
519  m_file(f),
520  m_high_debug(high_debug)
521  {}
522 
523  void DoIt()
524  {
525  m_file->Sync();
526  Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
527  delete this;
528  }
529 };
530 
531 
532 class CommandExecutor : public XrdJob
533 {
534 private:
535  std::string m_command_url;
536 
537 public:
538  CommandExecutor(const std::string& command, const char *desc = "") :
539  XrdJob(desc),
540  m_command_url(command)
541  {}
542 
543  void DoIt()
544  {
545  Cache::GetInstance().ExecuteCommandUrl(m_command_url);
546  delete this;
547  }
548 };
549 
550 }
551 
552 //==============================================================================
553 
554 void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
555 {
556  DiskSyncer* ds = new DiskSyncer(f, high_debug);
557 
558  if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
559 
560  schedP->Schedule(ds);
561 }
562 
563 void Cache::FileSyncDone(File* f, bool high_debug)
564 {
565  dec_ref_cnt(f, high_debug);
566 }
567 
568 void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
569 {
570  // called from GetFile() or SheduleFileSync();
571 
572  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
573 
574  if (lock) m_active_cond.Lock();
575  int rc = f->inc_ref_cnt();
576  if (lock) m_active_cond.UnLock();
577 
578  TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
579 }
580 
581 void Cache::dec_ref_cnt(File* f, bool high_debug)
582 {
583  // Called from ReleaseFile() or DiskSync callback.
584 
585  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
586  int cnt;
587 
588  {
589  XrdSysCondVarHelper lock(&m_active_cond);
590 
591  cnt = f->get_ref_cnt();
592 
593  if (f->is_in_emergency_shutdown())
594  {
595  // In this case file has been already removed from m_active map and
596  // does not need to be synced.
597 
598  if (cnt == 1)
599  {
600  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
601  << " -- deleting File object without further ado");
602  delete f;
603  }
604  else
605  {
606  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
607  << " -- waiting");
608  }
609 
610  return;
611  }
612  }
613 
614  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
615 
616  if (cnt == 1)
617  {
618  if (f->FinalizeSyncBeforeExit())
619  {
620  // Note, here we "reuse" the existing reference count for the
621  // final sync.
622 
623  TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
624  schedule_file_sync(f, true, true);
625  return;
626  }
627  }
628 
629  {
630  XrdSysCondVarHelper lock(&m_active_cond);
631 
632  cnt = f->dec_ref_cnt();
633  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
634  if (cnt == 0)
635  {
636  ActiveMap_i it = m_active.find(f->GetLocalPath());
637  m_active.erase(it);
638 
639  m_closed_files_stats.insert(std::make_pair(f->GetLocalPath(), f->DeltaStatsFromLastCall()));
640 
641  if (m_gstream)
642  {
643  const Stats &st = f->RefStats();
644  const Info::AStat *as = f->GetLastAccessStats();
645 
646  char buf[4096];
647  int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
648  "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
649  "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
650  "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,\"n_cks_errs\":%d}",
651  f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
652  f->GetNBlocks(), f->GetNDownloadedBlocks(),
653  (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
654  f->GetRemoteLocations().c_str(),
656  );
657  bool suc = false;
658  if (len < 4096)
659  {
660  suc = m_gstream->Insert(buf, len + 1);
661  }
662  if ( ! suc)
663  {
664  TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
665  }
666  }
667 
668  delete f;
669  }
670  }
671 }
672 
673 bool Cache::IsFileActiveOrPurgeProtected(const std::string& path)
674 {
675  XrdSysCondVarHelper lock(&m_active_cond);
676 
677  return m_active.find(path) != m_active.end() ||
678  m_purge_delay_set.find(path) != m_purge_delay_set.end();
679 }
680 
681 
682 //==============================================================================
683 //=== PREFETCH
684 //==============================================================================
685 
687 {
688  // Can be called with other locks held.
689 
690  if ( ! m_prefetch_enabled)
691  {
692  return;
693  }
694 
695  m_prefetch_condVar.Lock();
696  m_prefetchList.push_back(file);
697  m_prefetch_condVar.Signal();
698  m_prefetch_condVar.UnLock();
699 }
700 
701 
703 {
704  // Can be called with other locks held.
705 
706  if ( ! m_prefetch_enabled)
707  {
708  return;
709  }
710 
711  m_prefetch_condVar.Lock();
712  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
713  {
714  if (*it == file)
715  {
716  m_prefetchList.erase(it);
717  break;
718  }
719  }
720  m_prefetch_condVar.UnLock();
721 }
722 
723 
725 {
726  m_prefetch_condVar.Lock();
727  while (m_prefetchList.empty())
728  {
729  m_prefetch_condVar.Wait();
730  }
731 
732  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
733 
734  size_t l = m_prefetchList.size();
735  int idx = rand() % l;
736  File* f = m_prefetchList[idx];
737 
738  m_prefetch_condVar.UnLock();
739  return f;
740 }
741 
742 
744 {
745  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
746 
747  while (true)
748  {
749  m_RAM_mutex.Lock();
750  bool doPrefetch = (m_RAM_used < limit_RAM);
751  m_RAM_mutex.UnLock();
752 
753  if (doPrefetch)
754  {
756  f->Prefetch();
757  }
758  else
759  {
761  }
762  }
763 }
764 
765 
766 //==============================================================================
767 //=== Virtuals from XrdOucCache
768 //==============================================================================
769 
770 //------------------------------------------------------------------------------
784 
785 int Cache::LocalFilePath(const char *curl, char *buff, int blen,
786  LFP_Reason why, bool forall)
787 {
788  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
789  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
790  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
791 
792  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
793 
794  if (buff && blen > 0) buff[0] = 0;
795 
796  XrdCl::URL url(curl);
797  std::string f_name = url.GetPath();
798  std::string i_name = f_name + Info::s_infoExtension;
799 
800  if (why == ForPath)
801  {
802  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
803  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
804  return ret;
805  }
806 
807  {
808  XrdSysCondVarHelper lock(&m_active_cond);
809  m_purge_delay_set.insert(f_name);
810  }
811 
812  struct stat sbuff, sbuff2;
813  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
814  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
815  {
816  if (S_ISDIR(sbuff.st_mode))
817  {
818  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
819  return -EISDIR;
820  }
821  else
822  {
823  bool read_ok = false;
824  bool is_complete = false;
825 
826  // Lock and check if the file is active. If NOT, keep the lock
827  // and add dummy access after successful reading of info file.
828  // If it IS active, just release the lock, this ongoing access will
829  // assure the file continues to exist.
830 
831  // XXXX How can I just loop over the cinfo file when active?
832  // Can I not get is_complete from the existing file?
833  // Do I still want to inject access record?
834  // Oh, it writes only if not active .... still let's try to use existing File.
835 
836  m_active_cond.Lock();
837 
838  bool is_active = m_active.find(f_name) != m_active.end();
839 
840  if (is_active) m_active_cond.UnLock();
841 
842  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
843  XrdOucEnv myEnv;
844  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
845  if (res >= 0)
846  {
847  Info info(m_trace, 0);
848  if (info.Read(infoFile, i_name.c_str()))
849  {
850  read_ok = true;
851 
852  is_complete = info.IsComplete();
853 
854  // Add full-size access if reason is for access.
855  if ( ! is_active && is_complete && why == ForAccess)
856  {
857  info.WriteIOStatSingle(info.GetFileSize());
858  info.Write(infoFile, i_name.c_str());
859  }
860  }
861  infoFile->Close();
862  }
863  delete infoFile;
864 
865  if ( ! is_active) m_active_cond.UnLock();
866 
867  if (read_ok)
868  {
869  if ((is_complete || why == ForInfo) && buff != 0)
870  {
871  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
872  if (res2 < 0)
873  return res2;
874 
875  // Normally, files are owned by us but when direct cache access
876  // is wanted and possible, make sure the file is world readable.
877  if (why == ForAccess)
878  {mode_t mode = (forall ? worldReadable : groupReadable);
879  if (((sbuff.st_mode & worldReadable) != mode)
880  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
881  {is_complete = false;
882  *buff = 0;
883  }
884  }
885  }
886 
887  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
888  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
889 
890  return is_complete ? 0 : -EREMOTE;
891  }
892  }
893  }
894 
895  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
896  return -ENOENT;
897 }
898 
899 //______________________________________________________________________________
900 // Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
901 // pfc configuration parameters. The logic of accessing the Info file is the same
902 // as in Cache::LocalFilePath.
912 //------------------------------------------------------------------------------
913 int Cache::ConsiderCached(const char *curl)
914 {
915  TRACE(Debug, "ConsiderFileCached '" << curl << "'" );
916 
917  XrdCl::URL url(curl);
918  std::string f_name = url.GetPath();
919  std::string i_name = f_name + Info::s_infoExtension;
920 
921  {
922  XrdSysCondVarHelper lock(&m_active_cond);
923  m_purge_delay_set.insert(f_name);
924  }
925 
926  struct stat sbuff, sbuff2;
927  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
928  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
929  {
930  if (S_ISDIR(sbuff.st_mode))
931  {
932  TRACE(Info, "ConsiderCached '" << curl << ", why=ForInfo" << " -> EISDIR");
933  return -EISDIR;
934  }
935  else
936  {
937  bool read_ok = false;
938  bool is_cached = false;
939 
940  // Lock and check if the file is active. If NOT, keep the lock
941  // and add dummy access after successful reading of info file.
942  // If it IS active, just release the lock, this ongoing access will
943  // assure the file continues to exist.
944 
945  // XXXX How can I just loop over the cinfo file when active?
946  // Can I not get is_complete from the existing file?
947  // Do I still want to inject access record?
948  // Oh, it writes only if not active .... still let's try to use existing File.
949 
950  m_active_cond.Lock();
951 
952  bool is_active = m_active.find(f_name) != m_active.end();
953 
954  if (is_active)
955  m_active_cond.UnLock();
956 
957  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
958  XrdOucEnv myEnv;
959  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
960  if (res >= 0)
961  {
962  Info info(m_trace, 0);
963  if (info.Read(infoFile, i_name.c_str()))
964  {
965  read_ok = true;
966 
967  if (info.IsComplete())
968  {
969  is_cached = true;
970  }
971  else if (info.GetFileSize() == 0)
972  {
973  is_cached = true;
974  }
975  else
976  {
977  long long fileSize = info.GetFileSize();
978  long long bytesRead = info.GetNDownloadedBytes();
979 
980  if (fileSize < m_configuration.m_onlyIfCachedMinSize)
981  {
982  if ((float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
983  is_cached = true;
984  }
985  else
986  {
987  if (bytesRead > m_configuration.m_onlyIfCachedMinSize &&
988  (float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
989  is_cached = true;
990  }
991  }
992  }
993  infoFile->Close();
994  }
995  delete infoFile;
996 
997  if (!is_active) m_active_cond.UnLock();
998 
999  if (read_ok)
1000  {
1001  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << (is_cached ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
1002  return is_cached ? 0 : -EREMOTE;
1003  }
1004  }
1005  }
1006 
1007  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << " -> ENOENT");
1008  return -ENOENT;
1009 }
1010 
1011 //______________________________________________________________________________
1019 //------------------------------------------------------------------------------
1020 
1021 int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1022 {
1023  XrdCl::URL url(curl);
1024  std::string f_name = url.GetPath();
1025  std::string i_name = f_name + Info::s_infoExtension;
1026 
1027  // Do not allow write access.
1028  if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1029  {
1030  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1031  return -EROFS;
1032  }
1033 
1034  // Intercept xrdpfc_command requests.
1035  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1036  {
1037  // Schedule a job to process command request.
1038  {
1039  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1040 
1041  schedP->Schedule(ce);
1042  }
1043 
1044  return -EAGAIN;
1045  }
1046 
1047  {
1048  XrdSysCondVarHelper lock(&m_active_cond);
1049  m_purge_delay_set.insert(f_name);
1050  }
1051 
1052  struct stat sbuff;
1053  int res = m_oss->Stat(i_name.c_str(), &sbuff);
1054  if (res == 0)
1055  {
1056  TRACE(Dump, "Prepare defer open " << f_name);
1057  return 1;
1058  }
1059  else
1060  {
1061  return 0;
1062  }
1063 }
1064 
1065 //______________________________________________________________________________
1066 // virtual method of XrdOucCache.
1071 //------------------------------------------------------------------------------
1072 
1073 int Cache::Stat(const char *curl, struct stat &sbuff)
1074 {
1075  XrdCl::URL url(curl);
1076  std::string f_name = url.GetPath();
1077 
1078  {
1079  XrdSysCondVarHelper lock(&m_active_cond);
1080  m_purge_delay_set.insert(f_name);
1081  }
1082 
1083  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK)
1084  {
1085  if (S_ISDIR(sbuff.st_mode))
1086  {
1087  return 0;
1088  }
1089  else
1090  {
1091  bool success = false;
1092  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
1093  XrdOucEnv myEnv;
1094 
1095  f_name += Info::s_infoExtension;
1096  int res = infoFile->Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1097  if (res >= 0)
1098  {
1099  Info info(m_trace, 0);
1100  if (info.Read(infoFile, f_name.c_str()))
1101  {
1102  sbuff.st_size = info.GetFileSize();
1103  success = true;
1104  }
1105  }
1106  infoFile->Close();
1107  delete infoFile;
1108  return success ? 0 : 1;
1109  }
1110  }
1111 
1112  return 1;
1113 }
1114 
1115 //______________________________________________________________________________
1116 // virtual method of XrdOucCache.
1120 //------------------------------------------------------------------------------
1121 
1122 int Cache::Unlink(const char *curl)
1123 {
1124  XrdCl::URL url(curl);
1125  std::string f_name = url.GetPath();
1126 
1127  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1128 
1129  return UnlinkFile(f_name, false);
1130 }
1131 
1132 int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1133 {
1134  ActiveMap_i it;
1135  File *file = 0;
1136  {
1137  XrdSysCondVarHelper lock(&m_active_cond);
1138 
1139  it = m_active.find(f_name);
1140 
1141  if (it != m_active.end())
1142  {
1143  if (fail_if_open)
1144  {
1145  TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1146  return -EBUSY;
1147  }
1148 
1149  // Null File* in m_active map means an operation is ongoing, probably
1150  // Attach() with possible File::Open(). Ask for retry.
1151  if (it->second == 0)
1152  {
1153  TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1154  return -EAGAIN;
1155  }
1156 
1157  file = it->second;
1159  it->second = 0;
1160  }
1161  else
1162  {
1163  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1164  }
1165  }
1166 
1167  if (file)
1168  {
1169  RemoveWriteQEntriesFor(file);
1170  }
1171 
1172  std::string i_name = f_name + Info::s_infoExtension;
1173 
1174  // Unlink file & cinfo
1175  int f_ret = m_oss->Unlink(f_name.c_str());
1176  int i_ret = m_oss->Unlink(i_name.c_str());
1177 
1178  TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1179 
1180  {
1181  XrdSysCondVarHelper lock(&m_active_cond);
1182 
1183  m_active.erase(it);
1184  }
1185 
1186  return std::min(f_ret, i_ret);
1187 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define XrdOssOK
Definition: XrdOss.hh:50
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:54
#define TRACE_INT(act, x)
Definition: XrdPfcTrace.hh:46
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:80
void * PrefetchThread(void *)
Definition: XrdPfc.cc:70
void * PurgeThread(void *)
Definition: XrdPfc.cc:58
void * ResourceMonitorHeartBeatThread(void *)
Definition: XrdPfc.cc:52
void * ProcessWriteTaskThread(void *)
Definition: XrdPfc.cc:64
int stat(const char *path, struct stat *buf)
bool Debug
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:212
Definition: XrdJob.hh:43
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual const char * Path()=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
void PutPtr(const char *varname, void *value)
Definition: XrdOucEnv.cc:298
int get_size() const
Definition: XrdPfcFile.hh:146
File * get_file() const
Definition: XrdPfcFile.hh:150
long long m_offset
Definition: XrdPfcFile.hh:124
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
void FileSyncDone(File *, bool high_debug)
Definition: XrdPfc.cc:563
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:411
static const Configuration & Conf()
Definition: XrdPfc.cc:161
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition: XrdPfc.cc:785
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1073
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:315
void Purge()
Thread function invoked to scan and purge files from disk when needed.
Definition: XrdPfcPurge.cc:698
void ReleaseRAM(char *buf, long long size)
Definition: XrdPfc.cc:393
virtual int ConsiderCached(const char *url)
Definition: XrdPfc.cc:913
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:159
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
Definition: XrdPfcPurge.cc:606
void DeRegisterPrefetchFile(File *)
Definition: XrdPfc.cc:702
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition: XrdPfc.cc:686
void Prefetch()
Definition: XrdPfc.cc:743
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:492
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition: XrdPfc.cc:252
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:184
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:163
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1132
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
Obtain a new IO object that fronts existing XrdOucCacheIO.
static XrdScheduler * schedP
Definition: XrdPfc.hh:404
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition: XrdPfc.cc:673
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:724
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition: XrdPfc.cc:304
virtual int Unlink(const char *url)
Definition: XrdPfc.cc:1122
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:271
static const Cache & TheOne()
Definition: XrdPfc.cc:160
char * RequestRAM(long long size)
Definition: XrdPfc.cc:353
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition: XrdPfc.cc:1021
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition: XrdPfc.cc:152
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:271
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1392
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:939
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:99
int GetNBlocks() const
Definition: XrdPfcFile.hh:292
void Prefetch()
Definition: XrdPfcFile.cc:1407
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1504
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:289
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:290
void AddIO(IO *io)
Definition: XrdPfcFile.cc:295
int GetBlockSize() const
Definition: XrdPfcFile.hh:291
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:293
long long GetFileSize()
Definition: XrdPfcFile.hh:279
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:165
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:112
int inc_ref_cnt()
Definition: XrdPfcFile.hh:298
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1022
int dec_ref_cnt()
Definition: XrdPfcFile.hh:299
int get_ref_cnt()
Definition: XrdPfcFile.hh:297
const Stats & RefStats() const
Definition: XrdPfcFile.hh:294
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:332
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:274
Stats DeltaStatsFromLastCall()
Definition: XrdPfcFile.cc:142
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:302
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:40
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:49
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
Definition: XrdPfcInfo.cc:446
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
Definition: XrdPfcInfo.hh:411
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:451
long long GetFileSize() const
Get file size.
Definition: XrdPfcInfo.hh:446
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
int m_NCksumErrors
number of checksum errors while getting data from remote
Definition: XrdPfcStats.hh:39
void Schedule(XrdJob *jp)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
bool Insert(const char *data, int dlen)
Definition: XrdPfc.hh:41
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:56
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:102
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:88
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:79
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:87
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:106
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition: XrdPfc.cc:131
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:101
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:104
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:116
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:115
Access statistics.
Definition: XrdPfcInfo.hh:61
long long BytesHit
read from cache
Definition: XrdPfcInfo.hh:68
long long BytesBypassed
read from remote and dropped
Definition: XrdPfcInfo.hh:70
time_t DetachTime
close time
Definition: XrdPfcInfo.hh:63
long long BytesMissed
read from remote and cached
Definition: XrdPfcInfo.hh:69
time_t AttachTime
open time
Definition: XrdPfcInfo.hh:62