Changeset 3543

Show
Ignore:
Timestamp:
29/04/10 14:30:27 (5 years ago)
Author:
baud
Message:

correct rollback of the dpm-drain multi-threaded feature

Location:
lcg-dm/trunk
Files:
3 modified

Legend:

Unmodified
Added
Removed
  • lcg-dm/trunk/dpm/dpm-drain.c

    • Property svn:keywords set to Id Date Author Revision
    r2935 r3543  
    55 
    66#ifndef lint 
    7 static char sccsid[] = "@(#)$RCSfile: dpm-drain.c,v $ $Revision: 1.22 $ $Date: 2009/12/14 10:07:40 $ CERN IT-GD/SC Jean-Philippe Baud"; 
     7static char sccsid[] = "@(#)$RCSfile: dpm-drain.c,v $ $Revision$ $Date$ CERN IT-GD/SC Jean-Philippe Baud"; 
    88#endif /* not lint */ 
    99 
     
    1919#include <unistd.h> 
    2020#include <errno.h> 
    21 #include <stdarg.h> 
    2221#include "Cgetopt.h" 
    2322#include "dpm_api.h" 
     
    2524#include "serrno.h" 
    2625#include "u64subr.h" 
    27 #include "Cthread_api.h" 
    28 #include "Cpool_api.h" 
    2926int help_flag = 0; 
    3027volatile sig_atomic_t sig_int_received = 0; 
    31  
    32 struct dpm_drain_global_s; 
    33  
    34 struct dpm_drain_thread_info_s { 
    35         struct Cns_filereplica lp; 
    36         u_signed64 filesize; 
    37         int rc; 
    38         int good; 
    39         int hardrc; 
    40         int idx; 
    41         struct dpm_drain_global_s *global; 
    42         char logbuf[2048]; 
    43         size_t loglen; 
    44 }; 
    45  
    46 struct dpm_drain_global_s { 
    47         struct dpm_drain_thread_info_s *thip; 
    48         int nbgids; 
    49         gid_t *gid_list;         
    50         u_signed64 min_size; 
    51  
    52         int condvar; 
    53  
    54         int rc; 
    55         int nbthreads; 
    56         int pass_good; 
    57         int pass_rc; 
    58  
    59         /* access only by main thread */ 
    60         u_signed64 current_size; 
    61  
    62         /* can have concurrent access between threads, require lock */ 
    63         enum states { FINISHED, NOTFINISHED } *thip_state; 
    64         int nbfree; 
    65 }; 
    66  
    67 thread_logit(struct dpm_drain_thread_info_s *thip, FILE *stream, char *msg, ...) 
    68 { 
    69         va_list args; 
    70         time_t current_time; 
    71         int has_newline = 0; 
    72         struct dpm_drain_thread_info_s lthip; 
    73         int Tid = 0; 
    74         struct tm tres; 
    75  
    76         va_start (args, msg); 
    77  
    78         if (strlen (msg) && msg[strlen (msg)-1]=='\n') 
    79                 has_newline = 1; 
    80  
    81         if (!thip) { 
    82                 memset (&lthip, '\0', sizeof(lthip)); 
    83                 thip = &lthip; 
    84         } 
    85  
    86         if (thip->loglen == 0) { 
    87                 current_time = time (0); 
    88                 Cglobals_getTid (&Tid); 
    89                 if (!strftime (thip->logbuf, sizeof(thip->logbuf), "%a %b %d %H:%M:%S %Y", 
    90                     localtime_r (&current_time, &tres))) 
    91                         *thip->logbuf = '\0'; 
    92                 if (Tid<0) 
    93                         Csnprintf (thip->logbuf + strlen (thip->logbuf), 
    94                             sizeof(thip->logbuf) - strlen (thip->logbuf), " %5d: ", getpid ()); 
    95                 else 
    96                         Csnprintf (thip->logbuf + strlen (thip->logbuf), 
    97                             sizeof(thip->logbuf) - strlen (thip->logbuf), " %5d,%d: ", getpid (), Tid); 
    98                 thip->loglen = strlen (thip->logbuf); 
    99         } 
    100  
    101         Cvsnprintf (thip->logbuf+thip->loglen, sizeof(thip->logbuf) - thip->loglen, msg, args); 
    102         thip->loglen = strlen (thip->logbuf); 
    103  
    104         if (has_newline) { 
    105                 fprintf (stdout, "%s", thip->logbuf); 
    106                 fflush (stdout); 
    107                 thip->logbuf[0] = '\0'; 
    108                 thip->loglen = 0; 
    109         } 
    110  
    111         va_end (args); 
    112         return (0); 
    113 } 
    114  
    115 void collect_finished_and_reinit(s) 
    116 struct dpm_drain_global_s *s; 
    117 { 
    118         int i; 
    119         for (i=0;i < s->nbthreads;++i) { 
    120                 if (s->thip_state[i] == FINISHED) { 
    121                         if (s->min_size != 0 && s->thip[i].good) 
    122                                 s->current_size += s->thip[i].filesize; 
    123                         if (s->thip[i].hardrc) 
    124                                 s->rc = s->thip[i].hardrc; 
    125                         if (s->thip[i].good) 
    126                                 s->pass_good++; 
    127                         if (s->thip[i].rc) 
    128                                 s->pass_rc = s->thip[i].rc; 
    129                         memset (&s->thip[i], '\0', sizeof(struct dpm_drain_thread_info_s)); 
    130                         s->thip[i].global = s; 
    131                         s->thip[i].idx = i; 
    132                         s->thip_state[i] = NOTFINISHED; 
    133                 } 
    134         } 
    135 } 
    13628 
    13729void sig_handler(signum) 
     
    15446} 
    15547 
    156 void *handle_one_file(arg) 
    157 void *arg; 
    158 { 
    159         struct dpm_drain_thread_info_s *thip = (struct dpm_drain_thread_info_s *)arg; 
    160         struct Cns_filestatg statbuf; 
    161         struct Cns_filereplica *lp = &thip->lp; 
    162         struct dpm_drain_global_s *s = thip->global; 
    163         time_t current_time; 
    164         char path[CA_MAXPATHLEN+1]; 
    165         char u64buf[21]; 
    166         int nbentries; 
    167         struct dpns_filereplicax *rep_entries = NULL; 
    168         int i; 
    169         int j; 
    170         char buf[256]; 
    171         struct tm tres; 
    172         struct dpm_space_metadata *spacemd = NULL; 
    173         int replicate; 
    174         time_t f_lifetime; 
    175         char pfn[CA_MAXSFNLEN+1]; 
    176         char *p; 
    177  
    178         if (s->nbgids != 0 || s->min_size != 0) { 
    179                 if (Cns_statr (lp->sfn, &statbuf) < 0) { 
    180                         thread_logit (thip, stderr, "Cns_statr %s: %s\n", lp->sfn, sstrerror (serrno)); 
    181                         thip->rc = 1; 
    182                         goto finish; 
    183                 } 
    184                 thip->filesize = statbuf.filesize; 
    185         } 
    186         if (s->nbgids != 0) { 
    187                 for (i=0; i < s->nbgids; i++) { 
    188                         if (statbuf.gid == s->gid_list[i]) 
    189                                 break; 
    190                 } 
    191                 if (i >= s->nbgids) 
    192                         goto finish; 
    193         } 
    194         if (lp->status != '-') {        /* file is being populated/deleted */ 
    195                 if (lp->status == 'P') { 
    196                         thread_logit (thip, stdout, "The file %s is in the process of being uploaded, ignoring\n", lp->sfn); 
    197                 } else { 
    198                         thread_logit (thip, stdout, "The file %s is recorded as being in the process of being deleted, ignoring it during drain\n", lp->sfn); 
    199                 } 
    200                 if (s->min_size == 0) 
    201                         thip->rc = 1; 
    202                 goto finish; 
    203         } 
    204         current_time = time (0); 
    205         if (lp->ptime > current_time) { /* file is pinned */ 
    206                 if (!strftime (buf, sizeof(buf), "%a %b %d %H:%M:%S %Y", 
    207                     localtime_r (&lp->ptime, &tres))) 
    208                         *buf = '\0'; 
    209                 thread_logit (thip, stdout, "%s pinned until %s\n", lp->sfn, buf); 
    210                 if (s->min_size == 0) 
    211                         thip->rc = 1; 
    212                 goto finish; 
    213         } 
    214  
    215         if (dpns_getpath (NULL, lp->fileid, path) < 0) { 
    216                 thread_logit (thip, stderr,  
    217                         "dpns_getpath: %s (%s): %s\n", lp->sfn, 
    218                         u64tostr (lp->fileid, u64buf, 0), sstrerror (serrno)); 
    219                 thip->rc = 1; 
    220                 goto finish; 
    221         } 
    222  
    223         if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) { 
    224                 thread_logit (thip, stderr, "dpns_getreplicax: %s: %s\n", path, sstrerror (serrno)); 
    225                 thip->rc = 1; 
    226                 goto finish; 
    227         } 
    228         for (i=0; i < nbentries; i++) { 
    229                 if (!strcmp (lp->sfn, rep_entries[i].sfn)) 
    230                         break; 
    231         } 
    232         if (i >= nbentries) { 
    233                 thread_logit (thip, stderr, "could not find replica of %s with pfn %s\n", path, lp->sfn); 
    234                 thip->rc = 1; 
    235                 goto finish; 
    236         } 
    237         thread_logit (thip, stdout, "File:\t\t%s\n", path); 
    238         thread_logit (thip, stdout, "pfn:\t\t%s (of %d)\n", lp->sfn, nbentries); 
    239         thread_logit (thip, stdout, "replica type:\t%s\n", (rep_entries[i].r_type == 'P') ? "primary" : "secondary"); 
    240         switch(lp->f_type) { 
    241                 case 'V': 
    242                         strcpy (buf,"volatile"); 
    243                         break; 
    244                 case 'D': 
    245                         strcpy (buf,"durable"); 
    246                         break; 
    247                 case 'P': 
    248                         strcpy (buf,"permanent"); 
    249                         break; 
    250                 default: 
    251                         sprintf (buf,"'%c'",lp->f_type); 
    252                         break; 
    253         } 
    254         thread_logit (thip, stdout, "file type:\t%s", buf); 
    255         if (lp->f_type == 'P') 
    256                 thread_logit (thip, stdout, " (does not expire)\n"); 
    257         else { 
    258                 if (!strftime (buf, sizeof(buf), "%a %b %d %H:%M:%S %Y", 
    259                     localtime_r (&rep_entries[i].ltime, &tres))) 
    260                         *buf = '\0'; 
    261                 thread_logit (thip, stdout, " (%s on %s)\n", 
    262                     (rep_entries[i].ltime <= current_time) ? "expired" : "expires", buf); 
    263         } 
    264         if (s->min_size != 0) 
    265                 thread_logit (thip, stdout, "size:\t\t%s\n", u64tostru (statbuf.filesize, u64buf, 0)); 
    266         if (rep_entries[i].setname == NULL || *rep_entries[i].setname == '\0') 
    267                 thread_logit (thip, stdout, "space:\t\tnot in any space\n"); 
    268         else { 
    269                 p = rep_entries[i].setname; 
    270                 thread_logit (thip, stdout, "space:\t\t%s", p); 
    271                 j = 0; 
    272                 if (dpm_getspacemd (1, &p, &j, &spacemd) < 0) { 
    273                         if (serrno == EINVAL) { 
    274                                 thread_logit (thip, stdout, " (invalid space)\n"); 
    275                                 *p = '\0'; 
    276                         } else 
    277                                 thread_logit (thip, stdout, "\n"); 
    278                 } else if (j == 1 && spacemd) 
    279                         thread_logit (thip, stdout, " (%s)\n", spacemd[0].u_token); 
    280                 else 
    281                         thread_logit (thip, stdout, "\n"); 
    282                 free (spacemd); 
    283                 spacemd = NULL; 
    284         } 
    285         replicate = 1; 
    286         if (lp->f_type == 'V' && rep_entries[i].ltime <= current_time) 
    287                 replicate = 0; 
    288         if (replicate) { 
    289                 thread_logit (thip, stdout, "replicating...\n"); 
    290                 f_lifetime = rep_entries[i].ltime; 
    291                 if (dpm_replicatex (lp->sfn, lp->f_type, rep_entries[i].setname, f_lifetime, pfn) < 0) { 
    292                         thread_logit (thip, stdout, "failed\n"); 
    293                         thread_logit (thip, stderr, "dpm_replicatex %s: %s\n", lp->sfn, sstrerror (serrno)); 
    294                         thip->rc = 1; 
    295                         goto finish; 
    296                 } 
    297                 if (lp->f_type != 'P') { 
    298                         free (rep_entries); 
    299                         rep_entries = NULL; 
    300                         if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) { 
    301                                 thread_logit (thip, stdout, "failed\n"); 
    302                                 thread_logit (thip, stderr, "dpns_getreplicax: %s: %s\n", 
    303                                     path, sstrerror (serrno)); 
    304                                 if (dpm_delreplica (pfn) < 0) 
    305                                         thip->hardrc = 1; 
    306                                 thip->rc = 1; 
    307                                 goto finish; 
    308                         } 
    309                         for (i=0; i < nbentries; i++) { 
    310                                 if (!strcmp (pfn, rep_entries[i].sfn)) 
    311                                         break; 
    312                         } 
    313                         if (i>=nbentries) { 
    314                                 thread_logit (thip, stdout, "failed\n"); 
    315                                 thread_logit (thip, stderr, "could not find new replica\n"); 
    316                                 if (dpm_delreplica (pfn) < 0) 
    317                                         thip->hardrc = 1; 
    318                                 thip->rc = 1; 
    319                                 goto finish; 
    320                         } 
    321                         if (rep_entries[i].ltime < f_lifetime) { 
    322                                 thread_logit (thip, stdout, "failed\n"); 
    323                                 thread_logit (thip, stderr, "could not replicate to a new file with sufficient lifetime\n"); 
    324                                 if (dpm_delreplica (pfn) < 0) 
    325                                         thip->hardrc = 1; 
    326                                 thip->rc = 1; 
    327                                 goto finish; 
    328                         } 
    329                 } 
    330         } 
    331         thread_logit (thip, stdout, "deleting %s\n",lp->sfn); 
    332         if (dpm_delreplica (lp->sfn) < 0) { 
    333                 thread_logit (thip, stderr, "dpm_delreplica %s: %s\n", lp->sfn, sstrerror (serrno)); 
    334                 thip->rc = 1; 
    335                 thip->hardrc = 1; 
    336                 goto finish; 
    337         } 
    338         thip->good = 1; 
    339  
    340 finish: 
    341         free (rep_entries); 
    342         rep_entries = NULL; 
    343         if (Cthread_mutex_lock (&s->condvar) < 0) { 
    344                 fprintf (stderr, "Cthread_mutex_lock", sstrerror (serrno)); 
    345                 exit (SYERR); 
    346         } 
    347         s->thip_state[thip->idx] = FINISHED; 
    348         s->nbfree++; 
    349         if (Cthread_cond_signal (&s->condvar) < 0) { 
    350                 fprintf (stderr, "Cthread_cond_signal", sstrerror (serrno)); 
    351                 exit (SYERR); 
    352         } 
    353         if (Cthread_mutex_unlock (&s->condvar) < 0) { 
    354                 fprintf (stderr, "Cthread_mutex_unlock", sstrerror (serrno)); 
    355                 exit (SYERR); 
    356         } 
    357         return (NULL); 
    358 } 
    359  
    36048int main(argc, argv) 
    36149int argc; 
    36250char **argv; 
    36351{ 
     52        char buf[256]; 
    36453        int c; 
     54        u_signed64 current_size = 0; 
     55        time_t current_time; 
    36556        char *dp; 
    36657        struct dpm_fs *dpm_fs; 
    36758        struct dpm_pool *dpm_pools; 
    36859        int errflg = 0; 
     60        time_t f_lifetime; 
    36961        int flags; 
    370         int found_fs; 
     62        int found_fs = 0; 
    37163        char *fs = NULL; 
    37264        struct fs_list_s { 
     
    37567        } *fs_list = NULL; 
    37668        int fs_status; 
     69        gid_t *gid_list = NULL;  
    37770#ifndef VIRTUAL_ID 
    37871        struct group *gr; 
     
    38073        char groupname[256]; 
    38174        int i = 0; 
    382         int ipool; 
    38375        int j = 0; 
    38476        Cns_list list; 
     
    39183                {"poolname", REQUIRED_ARGUMENT, 0, OPT_POOL_NAME}, 
    39284                {"server", REQUIRED_ARGUMENT, 0, OPT_FS_SERVER}, 
    393                 {"threads", REQUIRED_ARGUMENT, 0, OPT_NBTHREADS}, 
    39485                {0, 0, 0, 0} 
    39586        }; 
    39687        struct Cns_filereplica *lp; 
     88        u_signed64 min_size = 0; 
     89        int nbentries; 
    39790        int nbfs;        
     91        int nbgids = 0; 
    39892        int nbpools;     
    39993        char *p = NULL; 
     94        int pass_good; 
     95        int pass_rc; 
     96        char path[CA_MAXPATHLEN+1]; 
     97        char pfn[CA_MAXSFNLEN+1]; 
    40098        char *poolname = NULL; 
    401         struct dpm_drain_global_s s; 
     99        int rc = 0; 
     100        int replicate; 
     101        struct dpns_filereplicax *rep_entries = NULL; 
    402102        int save_serrno; 
    403103        struct sigaction sigact; 
    404104        char *server = NULL; 
     105        struct dpm_space_metadata *spacemd = NULL; 
    405106        int target_fs_status; 
    406         int thread_index; 
     107        struct Cns_filestatg statbuf; 
    407108        char u64buf[21]; 
    408         int test_tid[20]; 
    409  
    410         (void) dpm_copyfile_activate (); 
    411  
    412         memset(&s, '\0', sizeof(struct dpm_drain_global_s)); 
    413         s.nbthreads = 1; 
     109 
    414110        Copterr = 1; 
    415111        Coptind = 1; 
     
    441137                                errflg++; 
    442138                        } else 
    443                                 s.min_size = strutou64 (Coptarg); 
    444                         break; 
    445                 case OPT_NBTHREADS: 
    446                         p = Coptarg; 
    447                         while (*p >= '0' && *p <= '9') p++; 
    448                         if (*p != '\0') { 
    449                                 fprintf (stderr, 
    450                                     "invalid number of threads %s\n", Coptarg); 
    451                                 errflg++; 
    452                         } else { 
    453                                 s.nbthreads = atoi (Coptarg); 
    454                                 if (s.nbthreads <= 0) { 
    455                                         fprintf (stderr, 
    456                                             "invalid number of threads %s\n", Coptarg); 
    457                                         errflg++; 
    458                                 } 
    459                         } 
     139                                min_size = strutou64 (Coptarg); 
    460140                        break; 
    461141                case OPT_POOL_NAME: 
     
    470150                        p = Coptarg; 
    471151                        //check that the user didn't provide already a list of gid 
    472                         if (s.nbgids == 0) i = 0; 
     152                        if (nbgids == 0) i = 0; 
    473153                        while (*p) { 
    474                                 if (*p == ',') s.nbgids++; 
     154                                if (*p == ',') nbgids++; 
    475155                                p++; 
    476156                        } 
    477                         s.nbgids++; 
    478                         if ((s.gid_list = (gid_t *) realloc (s.gid_list, s.nbgids * sizeof(gid_t))) == NULL) { 
     157                        nbgids++; 
     158                        if ((gid_list = (gid_t *) realloc (gid_list, nbgids * sizeof(gid_t))) == NULL) { 
    479159                                fprintf (stderr, "Could not allocate memory for gids\n"); 
    480160                                exit (USERR); 
     
    482162                        p = strtok (Coptarg, ","); 
    483163                        while (p) { 
    484                                 if ((s.gid_list[i] = strtol (p, &dp, 10)) < 0 || *dp != '\0') { 
     164                                if ((gid_list[i] = strtol (p, &dp, 10)) < 0 || *dp != '\0') { 
    485165                                        fprintf (stderr, "Invalid gid %s \n",p); 
    486166                                        errflg++; 
     
    488168                                                 
    489169#ifdef VIRTUAL_ID 
    490                                         if (s.gid_list[i] > 0 && Cns_getgrpbygid (s.gid_list[i], groupname) < 0) { 
     170                                        if (gid_list[i] > 0 && Cns_getgrpbygid (gid_list[i], groupname) < 0) { 
    491171#else 
    492                                         if (s.gid_list[i] > 0 && ! getgrgid (s.gid_list[i])) { 
     172                                        if (gid_list[i] > 0 && ! getgrgid (gid_list[i])) { 
    493173#endif 
    494174                                                fprintf (stderr, "Invalid gid %s \n", p); 
     
    506186                                break; 
    507187                        /* check that the user didn't provide already a list of gid */ 
    508                         if (s.nbgids == 0) i = 0; 
     188                        if (nbgids == 0) i = 0; 
    509189                        p = Coptarg; 
    510190                        while (*p) { 
    511                                 if (*p == ',') s.nbgids++; 
     191                                if (*p == ',') nbgids++; 
    512192                                p++; 
    513193                        } 
    514                         s.nbgids++; 
    515                         if ((s.gid_list = (gid_t *) realloc (s.gid_list, s.nbgids * sizeof(gid_t))) == NULL) { 
     194                        nbgids++; 
     195                        if ((gid_list = (gid_t *) realloc (gid_list, nbgids * sizeof(gid_t))) == NULL) { 
    516196                                fprintf (stderr, "Could not allocate memory for gids\n"); 
    517197                                exit (USERR); 
     
    521201#ifdef VIRTUAL_ID 
    522202                                if (strcmp (p, "root") == 0) 
    523                                         s.gid_list[i] = 0; 
    524                                 else if (Cns_getgrpbynam (p, &s.gid_list[i]) < 0) { 
     203                                        gid_list[i] = 0; 
     204                                else if (Cns_getgrpbynam (p, &gid_list[i]) < 0) { 
    525205#else 
    526206                                if ((gr = getgrnam (p))) 
    527                                         s.gid_list[i] = gr->gr_gid; 
     207                                        gid_list[i] = gr->gr_gid; 
    528208                                else { 
    529209#endif 
     
    562242 
    563243        /* set status to FS_RDONLY unless a specific server & fs is specified along with a limit */ 
    564         target_fs_status = ((s.nbgids == 0 && s.min_size == 0) || !(server && fs)) ? FS_RDONLY : -1; 
     244        target_fs_status = ((nbgids == 0 && min_size == 0) || !(server && fs)) ? FS_RDONLY : -1; 
    565245 
    566246        if (dpm_getpools (&nbpools, &dpm_pools) < 0) { 
     
    568248                exit (USERR); 
    569249        } 
    570  
    571250        for (i = 0; i < nbpools; i++) { 
    572251                if (poolname && strcmp ((dpm_pools + i)->poolname, poolname)) 
     
    614293        sigaction(SIGINT, &sigact, (struct sigaction *)NULL); 
    615294 
    616         s.thip = (struct dpm_drain_thread_info_s *) 
    617             calloc (s.nbthreads, sizeof(struct dpm_drain_thread_info_s)); 
    618  
    619         s.thip_state = (enum states *) 
    620             calloc (s.nbthreads, sizeof(enum states)); 
    621  
    622         s.nbfree = s.nbthreads; 
    623  
    624         /* moves all the newly created thread specific states to NOTFINISHED */ 
    625         collect_finished_and_reinit(&s); 
    626  
    627         ipool = Cpool_create (s.nbthreads, NULL); 
    628         if (ipool < 0) { 
    629                 fprintf (stderr, "Could not create thread pool\n"); 
    630                 exit (SYERR); 
    631         } 
    632  
    633         if (Cthread_mutex_lock (&s.condvar) < 0) { 
    634                 fprintf (stderr, "Cthread_mutex_lock", sstrerror (serrno)); 
    635                 exit (SYERR); 
    636         } 
    637  
    638         thread_logit (NULL, stdout, "Starting to drain with %d threads\n", s.nbthreads); 
    639  
    640295        while (1) { 
     296                pass_rc = 0; 
     297                pass_good = 0; 
    641298                flags = CNS_LIST_BEGIN; 
    642                 s.pass_good = 0; 
    643                 s.pass_rc = 0; 
    644299                while (!sig_int_received && (serrno=0, lp = Cns_listreplicax (poolname, server, fs, flags, &list)) != NULL) { 
    645300                        if (flags != CNS_LIST_CONTINUE) 
    646301                                flags = CNS_LIST_CONTINUE; 
     302 
    647303                        for (i=0; i < found_fs; i++) { 
    648304                                if (!strcmp (fs_list[i].server, lp->host) && 
     
    652308                        if (i >= found_fs) 
    653309                                continue; 
    654                         while (!s.nbfree) { 
    655                                 if (Cthread_cond_wait (&s.condvar) < 0) { 
    656                                         fprintf (stderr, "Error in Cthread_cond_wait: %s\n", sstrerror (serrno)); 
    657                                         exit (SYERR); 
    658                                 } 
    659                         } 
    660                         collect_finished_and_reinit(&s); 
    661  
    662                         if (s.min_size != 0 && s.current_size >= s.min_size) 
    663                                 break; 
    664  
    665                         if (sig_int_received) 
    666                                 break; 
    667  
    668                         if ((thread_index = Cpool_next_index (ipool)) < 0) { 
    669                                 fprintf (stderr, "Error in Cpool_next_index: %s\n", sstrerror (serrno)); 
    670                                 exit (SYERR); 
    671                         } 
    672                         memcpy (&s.thip[thread_index].lp, lp, sizeof(struct Cns_filereplica)); 
    673                         if (Cpool_assign (ipool, &handle_one_file, &s.thip[thread_index], -1) < 0) { 
    674                                 fprintf (stderr, "Error in Cpool_assign: %s\n", sstrerror (serrno)); 
    675                                 exit (SYERR); 
    676                         } 
    677                         s.nbfree--; 
     310 
     311                        printf ("\n"); 
     312 
     313                        if (nbgids != 0 || min_size != 0) { 
     314                                if (Cns_statr (lp->sfn, &statbuf) < 0) { 
     315                                        fprintf (stderr, "Cns_statr %s: %s\n", lp->sfn, sstrerror (serrno)); 
     316                                        pass_rc = 1; 
     317                                        continue; 
     318                                } 
     319                        } 
     320                        if (nbgids != 0) { 
     321                                for (i=0; i < nbgids; i++) { 
     322                                        if (statbuf.gid == gid_list[i])  
     323                                                break; 
     324                                } 
     325                                if (i >= nbgids) 
     326                                        continue; 
     327                        } 
     328                        if (lp->status != '-') {        /* file is being populated/deleted */ 
     329                                if (lp->status == 'P') { 
     330                                        printf ("The file %s is in the process of being uploaded, ignoring\n", lp->sfn); 
     331                                } else { 
     332                                        printf ("The file %s is recorded as being in the process of being deleted, ignoring it during drain\n", lp->sfn); 
     333                                } 
     334                                fflush (stdout); 
     335                                if (min_size == 0) 
     336                                        pass_rc = 1; 
     337                                continue; 
     338                        } 
     339                        current_time = time (0); 
     340                        if (lp->ptime > current_time) { /* file is pinned */ 
     341                                printf ("%s pinned until %s", lp->sfn, ctime (&lp->ptime)); 
     342                                fflush (stdout); 
     343                                if (min_size == 0) 
     344                                        pass_rc = 1; 
     345                                continue; 
     346                        } 
     347 
     348                        if (dpns_getpath (NULL, lp->fileid, path) < 0) { 
     349                                fprintf (stderr,  
     350                                        "dpns_getpath: %s (%s): %s\n", lp->sfn, 
     351                                        u64tostr (lp->fileid, u64buf, 0), sstrerror (serrno)); 
     352                                pass_rc = 1; 
     353                                continue; 
     354                        } 
     355 
     356                        free (rep_entries); 
     357                        rep_entries = NULL; 
     358 
     359                        if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) { 
     360                                fprintf (stderr, "dpns_getreplicax: %s: %s\n", 
     361                                        path, sstrerror (serrno)); 
     362                                pass_rc = 1; 
     363                                continue; 
     364                        } 
     365                        for (i=0; i < nbentries; i++) { 
     366                                if (!strcmp (lp->sfn, rep_entries[i].sfn)) 
     367                                        break; 
     368                        } 
     369                        if (i >= nbentries) { 
     370                                fprintf (stderr, "could not find replica of %s with pfn %s\n", path, lp->sfn); 
     371                                pass_rc = 1; 
     372                                continue; 
     373                        } 
     374                        printf ("File:\t\t%s\n", path); 
     375                        printf ("pfn:\t\t%s (of %d)\n", lp->sfn, nbentries); 
     376                        printf ("replica type:\t%s\n", 
     377                            (rep_entries[i].r_type == 'P') ? "primary" : "secondary"); 
     378                        switch(lp->f_type) { 
     379                                case 'V': 
     380                                        strcpy (buf,"volatile"); 
     381                                        break; 
     382                                case 'D': 
     383                                        strcpy (buf,"durable"); 
     384                                        break; 
     385                                case 'P': 
     386                                        strcpy (buf,"permanent"); 
     387                                        break; 
     388                                default: 
     389                                        sprintf (buf,"'%c'",lp->f_type); 
     390                                        break; 
     391                        } 
     392                        printf ("file type:\t%s", buf); 
     393                        if (lp->f_type == 'P') 
     394                                printf (" (does not expire)\n"); 
     395                        else { 
     396                                buf[0] = '\0'; 
     397                                if (p=ctime (&rep_entries[i].ltime)) 
     398                                        strcpy (buf, p); 
     399                                if (p=strchr (buf, '\n')) 
     400                                        *p = '\0'; 
     401                                printf (" (%s on %s)\n", 
     402                                    (rep_entries[i].ltime <= current_time) ? "expired" : "expires", buf); 
     403                        } 
     404                        if (min_size != 0) 
     405                                printf ("size:\t\t%s\n", u64tostru (statbuf.filesize, u64buf, 0)); 
     406                        if (rep_entries[i].setname == NULL || *rep_entries[i].setname == '\0') 
     407                                printf ("space:\t\tnot in any space\n"); 
     408                        else { 
     409                                p = rep_entries[i].setname; 
     410                                printf ("space:\t\t%s", p); 
     411                                j = 0; 
     412                                if (dpm_getspacemd (1, &p, &j, &spacemd) < 0) { 
     413                                        if (serrno == EINVAL) { 
     414                                                printf (" (invalid space)\n"); 
     415                                                *p = '\0'; 
     416                                        } else 
     417                                                printf ("\n"); 
     418                                } else if (j == 1 && spacemd) 
     419                                        printf (" (%s)\n", spacemd[0].u_token); 
     420                                else 
     421                                        printf ("\n"); 
     422                                free (spacemd); 
     423                                spacemd = NULL; 
     424                        } 
     425                        replicate = 1; 
     426                        if (lp->f_type == 'V' && rep_entries[i].ltime <= current_time) 
     427                                replicate = 0; 
     428                        if (replicate) { 
     429                                printf ("replicating... "); 
     430                                fflush (stdout); 
     431                                f_lifetime = rep_entries[i].ltime; 
     432                                if (dpm_replicatex (lp->sfn, lp->f_type, rep_entries[i].setname, f_lifetime, pfn) < 0) { 
     433                                        printf ("failed\n"); 
     434                                        fprintf (stderr, "dpm_replicatex %s: %s\n", lp->sfn, sstrerror (serrno)); 
     435                                        pass_rc = 1; 
     436                                        continue; 
     437                                } 
     438                                if (lp->f_type != 'P') { 
     439                                        free (rep_entries); 
     440                                        rep_entries = NULL; 
     441                                        if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) { 
     442                                                printf ("failed\n"); 
     443                                                fprintf (stderr, "dpns_getreplicax: %s: %s\n", 
     444                                                    path, sstrerror (serrno)); 
     445                                                if (dpm_delreplica (pfn) < 0) 
     446                                                        rc = 1; 
     447                                                pass_rc = 1; 
     448                                                continue; 
     449                                        } 
     450                                        for (i=0; i < nbentries; i++) { 
     451                                                if (!strcmp (pfn, rep_entries[i].sfn)) 
     452                                                        break; 
     453                                        } 
     454                                        if (i>=nbentries) { 
     455                                                printf ("failed\n"); 
     456                                                fprintf (stderr, "could not find new replica\n"); 
     457                                                if (dpm_delreplica (pfn) < 0) 
     458                                                        rc = 1; 
     459                                                pass_rc = 1; 
     460                                                continue; 
     461                                        } 
     462                                        if (rep_entries[i].ltime < f_lifetime) { 
     463                                                printf ("failed\n"); 
     464                                                fprintf (stderr, "could not replicate to a new file with sufficient lifetime\n"); 
     465                                                if (dpm_delreplica (pfn) < 0) 
     466                                                        rc = 1; 
     467                                                pass_rc = 1; 
     468                                                continue; 
     469                                        } 
     470                                } 
     471                        } 
     472                        printf ("deleting\n"); 
     473                        fflush (stdout); 
     474                        if (dpm_delreplica (lp->sfn) < 0) { 
     475                                fprintf (stderr, "dpm_delreplica %s: %s\n", lp->sfn, sstrerror (serrno)); 
     476                                pass_rc = 1; 
     477                                rc = 1; 
     478                                continue; 
     479                        } 
     480                        if (min_size != 0) { 
     481                                current_size += statbuf.filesize; 
     482                                if (current_size >= min_size) 
     483                                        break; 
     484                        } 
     485                        pass_good++; 
    678486                } 
    679487                save_serrno = serrno; 
    680488                (void) Cns_listreplicax (poolname, server, fs, CNS_LIST_END, &list); 
    681                 while (s.nbfree<s.nbthreads) { 
    682                         if (Cthread_cond_wait (&s.condvar) < 0) { 
    683                                 fprintf (stderr, "Error in Cthread_cond_wait: %s\n", sstrerror (serrno)); 
    684                                 exit (SYERR); 
    685                         } 
    686                 } 
    687                 collect_finished_and_reinit(&s); 
    688  
    689                 if (sig_int_received || lp != NULL || s.rc || save_serrno != SETIMEDOUT || s.pass_good == 0) { 
    690                         if (s.pass_rc) 
    691                                 s.rc = 1; 
     489                free (rep_entries); 
     490                rep_entries = NULL; 
     491                if (sig_int_received || lp != NULL || rc || save_serrno != SETIMEDOUT || pass_good == 0) { 
     492                        if (pass_rc) 
     493                                rc = 1; 
    692494                        if (!sig_int_received && lp == NULL && save_serrno) 
    693495                                fprintf (stderr, "Cns_listreplicax: %s\n", sstrerror (save_serrno)); 
    694496                        break; 
    695497                } 
    696         } 
    697         if (Cthread_mutex_unlock (&s.condvar) < 0) 
    698                 fprintf (stderr, "Cthread_mutex_unlock", sstrerror (serrno)); 
     498                printf ("\n"); 
     499        } 
    699500        if (sig_int_received) { 
    700                 thread_logit (NULL, stderr, "Finishing after user requested that the drain stop\n"); 
    701                 s.rc = 1; 
    702         } else if (s.rc) 
    703                 thread_logit (NULL, stderr, "There were some errors which prevented dpm-drain from completing fully\n"); 
    704         else 
    705                 thread_logit (NULL, stderr, "Finished draining\n"); 
    706         if (s.min_size != 0) { 
    707                 thread_logit (NULL, stdout, "number of bytes drained %s\n", u64tostru (s.current_size, u64buf, 0)); 
    708                 if (s.current_size < s.min_size) 
    709                         s.rc = 1; 
    710         } 
    711         return (s.rc); 
     501                fprintf (stderr, "\nFinishing after interrupt\n"); 
     502                rc = 1; 
     503        } else if (rc) 
     504                fprintf (stderr, "\nThere were some errors which prevented dpm-drain from completing fully\n"); 
     505        if (min_size != 0) { 
     506                printf ("\nnumber of bytes drained %s\n", u64tostru (current_size, u64buf, 0)); 
     507                fflush (stdout); 
     508                if (current_size < min_size) 
     509                        rc = 1; 
     510        } 
     511        exit (rc); 
    712512} 
  • lcg-dm/trunk/dpm/dpm_copyfile.c

    • Property svn:keywords set to Id Date Author Revision
    r2935 r3543  
    55 
    66#ifndef lint 
    7 static char sccsid[] = "@(#)$RCSfile: dpm_copyfile.c,v $ $Revision: 1.6 $ $Date: 2009/12/14 10:08:22 $ CERN Jean-Philippe Baud"; 
     7static char sccsid[] = "@(#)$RCSfile: dpm_copyfile.c,v $ $Revision$ $Date$ CERN Jean-Philippe Baud"; 
    88#endif /* not lint */ 
    99 
     
    2727        monitor_t *mt; 
    2828        volatile globus_bool_t done; 
    29         int istimeout; 
    3029}  check_t; 
    3130 
    3231static int ggc_interrupted; 
     32static int ggc_interrupt_processed; 
     33static int istimeout; 
    3334 
    3435static volatile int globus_activated = 0; 
    3536 
    36 int 
    37 dpm_copyfile_activate(void) 
     37static int 
     38do_globus_activates(void) 
    3839{ 
    3940        /* not thread safe */ 
    4041        if (!globus_activated) { 
    4142                globus_activated = 1; 
    42                 ggc_interrupted = 0; 
    4343                (void) globus_module_activate (GLOBUS_GASS_COPY_MODULE); 
    4444                (void) globus_module_activate (GLOBUS_FTP_CLIENT_MODULE); 
     
    173173        { 
    174174                GlobusTimeReltimeSet (delay, 1, 0); 
    175                 if (!monitor->done && !check->istimeout && !ggc_interrupted){ 
     175                if (!monitor->done && !istimeout && !ggc_interrupted){ 
    176176                        globus_callback_register_oneshot ( 
    177177                                NULL, 
     
    202202        globus_gass_copy_attr_t ggc_srcattr; 
    203203        globus_gass_copy_handle_t ggc_handle; 
    204         int ggc_interrupt_processed; 
    205204        globus_result_t gresult; 
    206205        monitor_t monitor; 
     
    214213        int save_errno = 0; 
    215214 
     215        ggc_interrupted = 0; 
    216216        ggc_interrupt_processed = 0; 
    217         check.istimeout = 0; 
    218         rc = dpm_copyfile_activate(); 
     217        istimeout = 0; 
     218        rc = do_globus_activates(); 
    219219        globus_mutex_init (&monitor.mutex, NULL); 
    220220        globus_cond_init (&monitor.cond, NULL); 
     
    319319                ts.tv_sec = time (0) + timeout; 
    320320                ts.tv_nsec = 0; 
    321                 while ( (! monitor.done) && (!check.istimeout) && (!save_errno)) { 
     321                while ( (! monitor.done) && (!istimeout) && (!save_errno)) { 
    322322                        if (ggc_interrupted && ! ggc_interrupt_processed) { 
    323323                                globus_gass_copy_cancel (&ggc_handle, 
     
    330330                } 
    331331        } else 
    332                 while ( (! monitor.done) && (!check.istimeout) && (!save_errno)) { 
     332                while ( (! monitor.done) && (!istimeout) && (!save_errno)) { 
    333333                        if (ggc_interrupted && ! ggc_interrupt_processed) { 
    334334                                globus_gass_copy_cancel (&ggc_handle, 
     
    341341                } 
    342342        if (save_errno == ETIMEDOUT) 
    343                 check.istimeout = 1; 
     343                istimeout = 1; 
    344344        if (!monitor.done){ 
    345345                int ret; 
     
    371371        if (ggc_interrupt_processed == 1) { 
    372372                save_errno = ECANCELED; 
    373         } else if (check.istimeout == 1) { 
     373        } else if (istimeout == 1) { 
    374374                save_errno = ETIMEDOUT; 
    375375        } 
  • lcg-dm/trunk/h/dpm_api.h

    • Property svn:keywords set to Id Date Author Revision
    r2935 r3543  
    11/* 
    2  * $Id: dpm_api.h,v 1.21 2009/12/14 10:07:40 dhsmith Exp $ 
     2 * $Id$ 
    33 */ 
    44 
     
    99 
    1010/* 
    11  * @(#)$RCSfile: dpm_api.h,v $ $Revision: 1.21 $ $Date: 2009/12/14 10:07:40 $ CERN IT-GD/CT Jean-Philippe Baud 
     11 * @(#)$RCSfile: dpm_api.h,v $ $Revision$ $Date$ CERN IT-GD/CT Jean-Philippe Baud 
    1212 */ 
    1313 
     
    157157#ifndef SWIG 
    158158EXTERN_C int DLL_DECL dpm_copyfile _PROTO((char *, char *, int, int)); 
    159 EXTERN_C int DLL_DECL dpm_copyfile_activate _PROTO(()); 
    160159#endif 
    161160EXTERN_C int DLL_DECL dpm_delreplica _PROTO((char *));