Changeset 3543


Ignore:
Timestamp:
Apr 29, 2010 2:30:27 PM (5 years ago)
Author:
baud
Message:

correct rollback of the dpm-drain multi-threaded feature

Location:
lcg-dm/trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lcg-dm/trunk/dpm/dpm-drain.c

    • Property svn:keywords set to Id Date Author Revision
    r2935 r3543  
    55
    66#ifndef lint
    7 static char sccsid[] = "@(#)$RCSfile: dpm-drain.c,v $ $Revision: 1.22 $ $Date: 2009/12/14 10:07:40 $ CERN IT-GD/SC Jean-Philippe Baud";
     7static char sccsid[] = "@(#)$RCSfile: dpm-drain.c,v $ $Revision$ $Date$ CERN IT-GD/SC Jean-Philippe Baud";
    88#endif /* not lint */
    99
     
    1919#include <unistd.h>
    2020#include <errno.h>
    21 #include <stdarg.h>
    2221#include "Cgetopt.h"
    2322#include "dpm_api.h"
     
    2524#include "serrno.h"
    2625#include "u64subr.h"
    27 #include "Cthread_api.h"
    28 #include "Cpool_api.h"
    2926int help_flag = 0;
    3027volatile sig_atomic_t sig_int_received = 0;
    31 
    32 struct dpm_drain_global_s;
    33 
    34 struct dpm_drain_thread_info_s {
    35         struct Cns_filereplica lp;
    36         u_signed64 filesize;
    37         int rc;
    38         int good;
    39         int hardrc;
    40         int idx;
    41         struct dpm_drain_global_s *global;
    42         char logbuf[2048];
    43         size_t loglen;
    44 };
    45 
    46 struct dpm_drain_global_s {
    47         struct dpm_drain_thread_info_s *thip;
    48         int nbgids;
    49         gid_t *gid_list;       
    50         u_signed64 min_size;
    51 
    52         int condvar;
    53 
    54         int rc;
    55         int nbthreads;
    56         int pass_good;
    57         int pass_rc;
    58 
    59         /* access only by main thread */
    60         u_signed64 current_size;
    61 
    62         /* can have concurrent access between threads, require lock */
    63         enum states { FINISHED, NOTFINISHED } *thip_state;
    64         int nbfree;
    65 };
    66 
    67 thread_logit(struct dpm_drain_thread_info_s *thip, FILE *stream, char *msg, ...)
    68 {
    69         va_list args;
    70         time_t current_time;
    71         int has_newline = 0;
    72         struct dpm_drain_thread_info_s lthip;
    73         int Tid = 0;
    74         struct tm tres;
    75 
    76         va_start (args, msg);
    77 
    78         if (strlen (msg) && msg[strlen (msg)-1]=='\n')
    79                 has_newline = 1;
    80 
    81         if (!thip) {
    82                 memset (&lthip, '\0', sizeof(lthip));
    83                 thip = &lthip;
    84         }
    85 
    86         if (thip->loglen == 0) {
    87                 current_time = time (0);
    88                 Cglobals_getTid (&Tid);
    89                 if (!strftime (thip->logbuf, sizeof(thip->logbuf), "%a %b %d %H:%M:%S %Y",
    90                     localtime_r (&current_time, &tres)))
    91                         *thip->logbuf = '\0';
    92                 if (Tid<0)
    93                         Csnprintf (thip->logbuf + strlen (thip->logbuf),
    94                             sizeof(thip->logbuf) - strlen (thip->logbuf), " %5d: ", getpid ());
    95                 else
    96                         Csnprintf (thip->logbuf + strlen (thip->logbuf),
    97                             sizeof(thip->logbuf) - strlen (thip->logbuf), " %5d,%d: ", getpid (), Tid);
    98                 thip->loglen = strlen (thip->logbuf);
    99         }
    100 
    101         Cvsnprintf (thip->logbuf+thip->loglen, sizeof(thip->logbuf) - thip->loglen, msg, args);
    102         thip->loglen = strlen (thip->logbuf);
    103 
    104         if (has_newline) {
    105                 fprintf (stdout, "%s", thip->logbuf);
    106                 fflush (stdout);
    107                 thip->logbuf[0] = '\0';
    108                 thip->loglen = 0;
    109         }
    110 
    111         va_end (args);
    112         return (0);
    113 }
    114 
    115 void collect_finished_and_reinit(s)
    116 struct dpm_drain_global_s *s;
    117 {
    118         int i;
    119         for (i=0;i < s->nbthreads;++i) {
    120                 if (s->thip_state[i] == FINISHED) {
    121                         if (s->min_size != 0 && s->thip[i].good)
    122                                 s->current_size += s->thip[i].filesize;
    123                         if (s->thip[i].hardrc)
    124                                 s->rc = s->thip[i].hardrc;
    125                         if (s->thip[i].good)
    126                                 s->pass_good++;
    127                         if (s->thip[i].rc)
    128                                 s->pass_rc = s->thip[i].rc;
    129                         memset (&s->thip[i], '\0', sizeof(struct dpm_drain_thread_info_s));
    130                         s->thip[i].global = s;
    131                         s->thip[i].idx = i;
    132                         s->thip_state[i] = NOTFINISHED;
    133                 }
    134         }
    135 }
    13628
    13729void sig_handler(signum)
     
    15446}
    15547
    156 void *handle_one_file(arg)
    157 void *arg;
    158 {
    159         struct dpm_drain_thread_info_s *thip = (struct dpm_drain_thread_info_s *)arg;
    160         struct Cns_filestatg statbuf;
    161         struct Cns_filereplica *lp = &thip->lp;
    162         struct dpm_drain_global_s *s = thip->global;
    163         time_t current_time;
    164         char path[CA_MAXPATHLEN+1];
    165         char u64buf[21];
    166         int nbentries;
    167         struct dpns_filereplicax *rep_entries = NULL;
    168         int i;
    169         int j;
    170         char buf[256];
    171         struct tm tres;
    172         struct dpm_space_metadata *spacemd = NULL;
    173         int replicate;
    174         time_t f_lifetime;
    175         char pfn[CA_MAXSFNLEN+1];
    176         char *p;
    177 
    178         if (s->nbgids != 0 || s->min_size != 0) {
    179                 if (Cns_statr (lp->sfn, &statbuf) < 0) {
    180                         thread_logit (thip, stderr, "Cns_statr %s: %s\n", lp->sfn, sstrerror (serrno));
    181                         thip->rc = 1;
    182                         goto finish;
    183                 }
    184                 thip->filesize = statbuf.filesize;
    185         }
    186         if (s->nbgids != 0) {
    187                 for (i=0; i < s->nbgids; i++) {
    188                         if (statbuf.gid == s->gid_list[i])
    189                                 break;
    190                 }
    191                 if (i >= s->nbgids)
    192                         goto finish;
    193         }
    194         if (lp->status != '-') {        /* file is being populated/deleted */
    195                 if (lp->status == 'P') {
    196                         thread_logit (thip, stdout, "The file %s is in the process of being uploaded, ignoring\n", lp->sfn);
    197                 } else {
    198                         thread_logit (thip, stdout, "The file %s is recorded as being in the process of being deleted, ignoring it during drain\n", lp->sfn);
    199                 }
    200                 if (s->min_size == 0)
    201                         thip->rc = 1;
    202                 goto finish;
    203         }
    204         current_time = time (0);
    205         if (lp->ptime > current_time) { /* file is pinned */
    206                 if (!strftime (buf, sizeof(buf), "%a %b %d %H:%M:%S %Y",
    207                     localtime_r (&lp->ptime, &tres)))
    208                         *buf = '\0';
    209                 thread_logit (thip, stdout, "%s pinned until %s\n", lp->sfn, buf);
    210                 if (s->min_size == 0)
    211                         thip->rc = 1;
    212                 goto finish;
    213         }
    214 
    215         if (dpns_getpath (NULL, lp->fileid, path) < 0) {
    216                 thread_logit (thip, stderr,
    217                         "dpns_getpath: %s (%s): %s\n", lp->sfn,
    218                         u64tostr (lp->fileid, u64buf, 0), sstrerror (serrno));
    219                 thip->rc = 1;
    220                 goto finish;
    221         }
    222 
    223         if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) {
    224                 thread_logit (thip, stderr, "dpns_getreplicax: %s: %s\n", path, sstrerror (serrno));
    225                 thip->rc = 1;
    226                 goto finish;
    227         }
    228         for (i=0; i < nbentries; i++) {
    229                 if (!strcmp (lp->sfn, rep_entries[i].sfn))
    230                         break;
    231         }
    232         if (i >= nbentries) {
    233                 thread_logit (thip, stderr, "could not find replica of %s with pfn %s\n", path, lp->sfn);
    234                 thip->rc = 1;
    235                 goto finish;
    236         }
    237         thread_logit (thip, stdout, "File:\t\t%s\n", path);
    238         thread_logit (thip, stdout, "pfn:\t\t%s (of %d)\n", lp->sfn, nbentries);
    239         thread_logit (thip, stdout, "replica type:\t%s\n", (rep_entries[i].r_type == 'P') ? "primary" : "secondary");
    240         switch(lp->f_type) {
    241                 case 'V':
    242                         strcpy (buf,"volatile");
    243                         break;
    244                 case 'D':
    245                         strcpy (buf,"durable");
    246                         break;
    247                 case 'P':
    248                         strcpy (buf,"permanent");
    249                         break;
    250                 default:
    251                         sprintf (buf,"'%c'",lp->f_type);
    252                         break;
    253         }
    254         thread_logit (thip, stdout, "file type:\t%s", buf);
    255         if (lp->f_type == 'P')
    256                 thread_logit (thip, stdout, " (does not expire)\n");
    257         else {
    258                 if (!strftime (buf, sizeof(buf), "%a %b %d %H:%M:%S %Y",
    259                     localtime_r (&rep_entries[i].ltime, &tres)))
    260                         *buf = '\0';
    261                 thread_logit (thip, stdout, " (%s on %s)\n",
    262                     (rep_entries[i].ltime <= current_time) ? "expired" : "expires", buf);
    263         }
    264         if (s->min_size != 0)
    265                 thread_logit (thip, stdout, "size:\t\t%s\n", u64tostru (statbuf.filesize, u64buf, 0));
    266         if (rep_entries[i].setname == NULL || *rep_entries[i].setname == '\0')
    267                 thread_logit (thip, stdout, "space:\t\tnot in any space\n");
    268         else {
    269                 p = rep_entries[i].setname;
    270                 thread_logit (thip, stdout, "space:\t\t%s", p);
    271                 j = 0;
    272                 if (dpm_getspacemd (1, &p, &j, &spacemd) < 0) {
    273                         if (serrno == EINVAL) {
    274                                 thread_logit (thip, stdout, " (invalid space)\n");
    275                                 *p = '\0';
    276                         } else
    277                                 thread_logit (thip, stdout, "\n");
    278                 } else if (j == 1 && spacemd)
    279                         thread_logit (thip, stdout, " (%s)\n", spacemd[0].u_token);
    280                 else
    281                         thread_logit (thip, stdout, "\n");
    282                 free (spacemd);
    283                 spacemd = NULL;
    284         }
    285         replicate = 1;
    286         if (lp->f_type == 'V' && rep_entries[i].ltime <= current_time)
    287                 replicate = 0;
    288         if (replicate) {
    289                 thread_logit (thip, stdout, "replicating...\n");
    290                 f_lifetime = rep_entries[i].ltime;
    291                 if (dpm_replicatex (lp->sfn, lp->f_type, rep_entries[i].setname, f_lifetime, pfn) < 0) {
    292                         thread_logit (thip, stdout, "failed\n");
    293                         thread_logit (thip, stderr, "dpm_replicatex %s: %s\n", lp->sfn, sstrerror (serrno));
    294                         thip->rc = 1;
    295                         goto finish;
    296                 }
    297                 if (lp->f_type != 'P') {
    298                         free (rep_entries);
    299                         rep_entries = NULL;
    300                         if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) {
    301                                 thread_logit (thip, stdout, "failed\n");
    302                                 thread_logit (thip, stderr, "dpns_getreplicax: %s: %s\n",
    303                                     path, sstrerror (serrno));
    304                                 if (dpm_delreplica (pfn) < 0)
    305                                         thip->hardrc = 1;
    306                                 thip->rc = 1;
    307                                 goto finish;
    308                         }
    309                         for (i=0; i < nbentries; i++) {
    310                                 if (!strcmp (pfn, rep_entries[i].sfn))
    311                                         break;
    312                         }
    313                         if (i>=nbentries) {
    314                                 thread_logit (thip, stdout, "failed\n");
    315                                 thread_logit (thip, stderr, "could not find new replica\n");
    316                                 if (dpm_delreplica (pfn) < 0)
    317                                         thip->hardrc = 1;
    318                                 thip->rc = 1;
    319                                 goto finish;
    320                         }
    321                         if (rep_entries[i].ltime < f_lifetime) {
    322                                 thread_logit (thip, stdout, "failed\n");
    323                                 thread_logit (thip, stderr, "could not replicate to a new file with sufficient lifetime\n");
    324                                 if (dpm_delreplica (pfn) < 0)
    325                                         thip->hardrc = 1;
    326                                 thip->rc = 1;
    327                                 goto finish;
    328                         }
    329                 }
    330         }
    331         thread_logit (thip, stdout, "deleting %s\n",lp->sfn);
    332         if (dpm_delreplica (lp->sfn) < 0) {
    333                 thread_logit (thip, stderr, "dpm_delreplica %s: %s\n", lp->sfn, sstrerror (serrno));
    334                 thip->rc = 1;
    335                 thip->hardrc = 1;
    336                 goto finish;
    337         }
    338         thip->good = 1;
    339 
    340 finish:
    341         free (rep_entries);
    342         rep_entries = NULL;
    343         if (Cthread_mutex_lock (&s->condvar) < 0) {
    344                 fprintf (stderr, "Cthread_mutex_lock", sstrerror (serrno));
    345                 exit (SYERR);
    346         }
    347         s->thip_state[thip->idx] = FINISHED;
    348         s->nbfree++;
    349         if (Cthread_cond_signal (&s->condvar) < 0) {
    350                 fprintf (stderr, "Cthread_cond_signal", sstrerror (serrno));
    351                 exit (SYERR);
    352         }
    353         if (Cthread_mutex_unlock (&s->condvar) < 0) {
    354                 fprintf (stderr, "Cthread_mutex_unlock", sstrerror (serrno));
    355                 exit (SYERR);
    356         }
    357         return (NULL);
    358 }
    359 
    36048int main(argc, argv)
    36149int argc;
    36250char **argv;
    36351{
     52        char buf[256];
    36453        int c;
     54        u_signed64 current_size = 0;
     55        time_t current_time;
    36556        char *dp;
    36657        struct dpm_fs *dpm_fs;
    36758        struct dpm_pool *dpm_pools;
    36859        int errflg = 0;
     60        time_t f_lifetime;
    36961        int flags;
    370         int found_fs;
     62        int found_fs = 0;
    37163        char *fs = NULL;
    37264        struct fs_list_s {
     
    37567        } *fs_list = NULL;
    37668        int fs_status;
     69        gid_t *gid_list = NULL;
    37770#ifndef VIRTUAL_ID
    37871        struct group *gr;
     
    38073        char groupname[256];
    38174        int i = 0;
    382         int ipool;
    38375        int j = 0;
    38476        Cns_list list;
     
    39183                {"poolname", REQUIRED_ARGUMENT, 0, OPT_POOL_NAME},
    39284                {"server", REQUIRED_ARGUMENT, 0, OPT_FS_SERVER},
    393                 {"threads", REQUIRED_ARGUMENT, 0, OPT_NBTHREADS},
    39485                {0, 0, 0, 0}
    39586        };
    39687        struct Cns_filereplica *lp;
     88        u_signed64 min_size = 0;
     89        int nbentries;
    39790        int nbfs;       
     91        int nbgids = 0;
    39892        int nbpools;   
    39993        char *p = NULL;
     94        int pass_good;
     95        int pass_rc;
     96        char path[CA_MAXPATHLEN+1];
     97        char pfn[CA_MAXSFNLEN+1];
    40098        char *poolname = NULL;
    401         struct dpm_drain_global_s s;
     99        int rc = 0;
     100        int replicate;
     101        struct dpns_filereplicax *rep_entries = NULL;
    402102        int save_serrno;
    403103        struct sigaction sigact;
    404104        char *server = NULL;
     105        struct dpm_space_metadata *spacemd = NULL;
    405106        int target_fs_status;
    406         int thread_index;
     107        struct Cns_filestatg statbuf;
    407108        char u64buf[21];
    408         int test_tid[20];
    409 
    410         (void) dpm_copyfile_activate ();
    411 
    412         memset(&s, '\0', sizeof(struct dpm_drain_global_s));
    413         s.nbthreads = 1;
     109
    414110        Copterr = 1;
    415111        Coptind = 1;
     
    441137                                errflg++;
    442138                        } else
    443                                 s.min_size = strutou64 (Coptarg);
    444                         break;
    445                 case OPT_NBTHREADS:
    446                         p = Coptarg;
    447                         while (*p >= '0' && *p <= '9') p++;
    448                         if (*p != '\0') {
    449                                 fprintf (stderr,
    450                                     "invalid number of threads %s\n", Coptarg);
    451                                 errflg++;
    452                         } else {
    453                                 s.nbthreads = atoi (Coptarg);
    454                                 if (s.nbthreads <= 0) {
    455                                         fprintf (stderr,
    456                                             "invalid number of threads %s\n", Coptarg);
    457                                         errflg++;
    458                                 }
    459                         }
     139                                min_size = strutou64 (Coptarg);
    460140                        break;
    461141                case OPT_POOL_NAME:
     
    470150                        p = Coptarg;
    471151                        //check that the user didn't provide already a list of gid
    472                         if (s.nbgids == 0) i = 0;
     152                        if (nbgids == 0) i = 0;
    473153                        while (*p) {
    474                                 if (*p == ',') s.nbgids++;
     154                                if (*p == ',') nbgids++;
    475155                                p++;
    476156                        }
    477                         s.nbgids++;
    478                         if ((s.gid_list = (gid_t *) realloc (s.gid_list, s.nbgids * sizeof(gid_t))) == NULL) {
     157                        nbgids++;
     158                        if ((gid_list = (gid_t *) realloc (gid_list, nbgids * sizeof(gid_t))) == NULL) {
    479159                                fprintf (stderr, "Could not allocate memory for gids\n");
    480160                                exit (USERR);
     
    482162                        p = strtok (Coptarg, ",");
    483163                        while (p) {
    484                                 if ((s.gid_list[i] = strtol (p, &dp, 10)) < 0 || *dp != '\0') {
     164                                if ((gid_list[i] = strtol (p, &dp, 10)) < 0 || *dp != '\0') {
    485165                                        fprintf (stderr, "Invalid gid %s \n",p);
    486166                                        errflg++;
     
    488168                                               
    489169#ifdef VIRTUAL_ID
    490                                         if (s.gid_list[i] > 0 && Cns_getgrpbygid (s.gid_list[i], groupname) < 0) {
     170                                        if (gid_list[i] > 0 && Cns_getgrpbygid (gid_list[i], groupname) < 0) {
    491171#else
    492                                         if (s.gid_list[i] > 0 && ! getgrgid (s.gid_list[i])) {
     172                                        if (gid_list[i] > 0 && ! getgrgid (gid_list[i])) {
    493173#endif
    494174                                                fprintf (stderr, "Invalid gid %s \n", p);
     
    506186                                break;
    507187                        /* check that the user didn't provide already a list of gid */
    508                         if (s.nbgids == 0) i = 0;
     188                        if (nbgids == 0) i = 0;
    509189                        p = Coptarg;
    510190                        while (*p) {
    511                                 if (*p == ',') s.nbgids++;
     191                                if (*p == ',') nbgids++;
    512192                                p++;
    513193                        }
    514                         s.nbgids++;
    515                         if ((s.gid_list = (gid_t *) realloc (s.gid_list, s.nbgids * sizeof(gid_t))) == NULL) {
     194                        nbgids++;
     195                        if ((gid_list = (gid_t *) realloc (gid_list, nbgids * sizeof(gid_t))) == NULL) {
    516196                                fprintf (stderr, "Could not allocate memory for gids\n");
    517197                                exit (USERR);
     
    521201#ifdef VIRTUAL_ID
    522202                                if (strcmp (p, "root") == 0)
    523                                         s.gid_list[i] = 0;
    524                                 else if (Cns_getgrpbynam (p, &s.gid_list[i]) < 0) {
     203                                        gid_list[i] = 0;
     204                                else if (Cns_getgrpbynam (p, &gid_list[i]) < 0) {
    525205#else
    526206                                if ((gr = getgrnam (p)))
    527                                         s.gid_list[i] = gr->gr_gid;
     207                                        gid_list[i] = gr->gr_gid;
    528208                                else {
    529209#endif
     
    562242
    563243        /* set status to FS_RDONLY unless a specific server & fs is specified along with a limit */
    564         target_fs_status = ((s.nbgids == 0 && s.min_size == 0) || !(server && fs)) ? FS_RDONLY : -1;
     244        target_fs_status = ((nbgids == 0 && min_size == 0) || !(server && fs)) ? FS_RDONLY : -1;
    565245
    566246        if (dpm_getpools (&nbpools, &dpm_pools) < 0) {
     
    568248                exit (USERR);
    569249        }
    570 
    571250        for (i = 0; i < nbpools; i++) {
    572251                if (poolname && strcmp ((dpm_pools + i)->poolname, poolname))
     
    614293        sigaction(SIGINT, &sigact, (struct sigaction *)NULL);
    615294
    616         s.thip = (struct dpm_drain_thread_info_s *)
    617             calloc (s.nbthreads, sizeof(struct dpm_drain_thread_info_s));
    618 
    619         s.thip_state = (enum states *)
    620             calloc (s.nbthreads, sizeof(enum states));
    621 
    622         s.nbfree = s.nbthreads;
    623 
    624         /* moves all the newly created thread specific states to NOTFINISHED */
    625         collect_finished_and_reinit(&s);
    626 
    627         ipool = Cpool_create (s.nbthreads, NULL);
    628         if (ipool < 0) {
    629                 fprintf (stderr, "Could not create thread pool\n");
    630                 exit (SYERR);
    631         }
    632 
    633         if (Cthread_mutex_lock (&s.condvar) < 0) {
    634                 fprintf (stderr, "Cthread_mutex_lock", sstrerror (serrno));
    635                 exit (SYERR);
    636         }
    637 
    638         thread_logit (NULL, stdout, "Starting to drain with %d threads\n", s.nbthreads);
    639 
    640295        while (1) {
     296                pass_rc = 0;
     297                pass_good = 0;
    641298                flags = CNS_LIST_BEGIN;
    642                 s.pass_good = 0;
    643                 s.pass_rc = 0;
    644299                while (!sig_int_received && (serrno=0, lp = Cns_listreplicax (poolname, server, fs, flags, &list)) != NULL) {
    645300                        if (flags != CNS_LIST_CONTINUE)
    646301                                flags = CNS_LIST_CONTINUE;
     302
    647303                        for (i=0; i < found_fs; i++) {
    648304                                if (!strcmp (fs_list[i].server, lp->host) &&
     
    652308                        if (i >= found_fs)
    653309                                continue;
    654                         while (!s.nbfree) {
    655                                 if (Cthread_cond_wait (&s.condvar) < 0) {
    656                                         fprintf (stderr, "Error in Cthread_cond_wait: %s\n", sstrerror (serrno));
    657                                         exit (SYERR);
    658                                 }
    659                         }
    660                         collect_finished_and_reinit(&s);
    661 
    662                         if (s.min_size != 0 && s.current_size >= s.min_size)
    663                                 break;
    664 
    665                         if (sig_int_received)
    666                                 break;
    667 
    668                         if ((thread_index = Cpool_next_index (ipool)) < 0) {
    669                                 fprintf (stderr, "Error in Cpool_next_index: %s\n", sstrerror (serrno));
    670                                 exit (SYERR);
    671                         }
    672                         memcpy (&s.thip[thread_index].lp, lp, sizeof(struct Cns_filereplica));
    673                         if (Cpool_assign (ipool, &handle_one_file, &s.thip[thread_index], -1) < 0) {
    674                                 fprintf (stderr, "Error in Cpool_assign: %s\n", sstrerror (serrno));
    675                                 exit (SYERR);
    676                         }
    677                         s.nbfree--;
     310
     311                        printf ("\n");
     312
     313                        if (nbgids != 0 || min_size != 0) {
     314                                if (Cns_statr (lp->sfn, &statbuf) < 0) {
     315                                        fprintf (stderr, "Cns_statr %s: %s\n", lp->sfn, sstrerror (serrno));
     316                                        pass_rc = 1;
     317                                        continue;
     318                                }
     319                        }
     320                        if (nbgids != 0) {
     321                                for (i=0; i < nbgids; i++) {
     322                                        if (statbuf.gid == gid_list[i])
     323                                                break;
     324                                }
     325                                if (i >= nbgids)
     326                                        continue;
     327                        }
     328                        if (lp->status != '-') {        /* file is being populated/deleted */
     329                                if (lp->status == 'P') {
     330                                        printf ("The file %s is in the process of being uploaded, ignoring\n", lp->sfn);
     331                                } else {
     332                                        printf ("The file %s is recorded as being in the process of being deleted, ignoring it during drain\n", lp->sfn);
     333                                }
     334                                fflush (stdout);
     335                                if (min_size == 0)
     336                                        pass_rc = 1;
     337                                continue;
     338                        }
     339                        current_time = time (0);
     340                        if (lp->ptime > current_time) { /* file is pinned */
     341                                printf ("%s pinned until %s", lp->sfn, ctime (&lp->ptime));
     342                                fflush (stdout);
     343                                if (min_size == 0)
     344                                        pass_rc = 1;
     345                                continue;
     346                        }
     347
     348                        if (dpns_getpath (NULL, lp->fileid, path) < 0) {
     349                                fprintf (stderr,
     350                                        "dpns_getpath: %s (%s): %s\n", lp->sfn,
     351                                        u64tostr (lp->fileid, u64buf, 0), sstrerror (serrno));
     352                                pass_rc = 1;
     353                                continue;
     354                        }
     355
     356                        free (rep_entries);
     357                        rep_entries = NULL;
     358
     359                        if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) {
     360                                fprintf (stderr, "dpns_getreplicax: %s: %s\n",
     361                                        path, sstrerror (serrno));
     362                                pass_rc = 1;
     363                                continue;
     364                        }
     365                        for (i=0; i < nbentries; i++) {
     366                                if (!strcmp (lp->sfn, rep_entries[i].sfn))
     367                                        break;
     368                        }
     369                        if (i >= nbentries) {
     370                                fprintf (stderr, "could not find replica of %s with pfn %s\n", path, lp->sfn);
     371                                pass_rc = 1;
     372                                continue;
     373                        }
     374                        printf ("File:\t\t%s\n", path);
     375                        printf ("pfn:\t\t%s (of %d)\n", lp->sfn, nbentries);
     376                        printf ("replica type:\t%s\n",
     377                            (rep_entries[i].r_type == 'P') ? "primary" : "secondary");
     378                        switch(lp->f_type) {
     379                                case 'V':
     380                                        strcpy (buf,"volatile");
     381                                        break;
     382                                case 'D':
     383                                        strcpy (buf,"durable");
     384                                        break;
     385                                case 'P':
     386                                        strcpy (buf,"permanent");
     387                                        break;
     388                                default:
     389                                        sprintf (buf,"'%c'",lp->f_type);
     390                                        break;
     391                        }
     392                        printf ("file type:\t%s", buf);
     393                        if (lp->f_type == 'P')
     394                                printf (" (does not expire)\n");
     395                        else {
     396                                buf[0] = '\0';
     397                                if (p=ctime (&rep_entries[i].ltime))
     398                                        strcpy (buf, p);
     399                                if (p=strchr (buf, '\n'))
     400                                        *p = '\0';
     401                                printf (" (%s on %s)\n",
     402                                    (rep_entries[i].ltime <= current_time) ? "expired" : "expires", buf);
     403                        }
     404                        if (min_size != 0)
     405                                printf ("size:\t\t%s\n", u64tostru (statbuf.filesize, u64buf, 0));
     406                        if (rep_entries[i].setname == NULL || *rep_entries[i].setname == '\0')
     407                                printf ("space:\t\tnot in any space\n");
     408                        else {
     409                                p = rep_entries[i].setname;
     410                                printf ("space:\t\t%s", p);
     411                                j = 0;
     412                                if (dpm_getspacemd (1, &p, &j, &spacemd) < 0) {
     413                                        if (serrno == EINVAL) {
     414                                                printf (" (invalid space)\n");
     415                                                *p = '\0';
     416                                        } else
     417                                                printf ("\n");
     418                                } else if (j == 1 && spacemd)
     419                                        printf (" (%s)\n", spacemd[0].u_token);
     420                                else
     421                                        printf ("\n");
     422                                free (spacemd);
     423                                spacemd = NULL;
     424                        }
     425                        replicate = 1;
     426                        if (lp->f_type == 'V' && rep_entries[i].ltime <= current_time)
     427                                replicate = 0;
     428                        if (replicate) {
     429                                printf ("replicating... ");
     430                                fflush (stdout);
     431                                f_lifetime = rep_entries[i].ltime;
     432                                if (dpm_replicatex (lp->sfn, lp->f_type, rep_entries[i].setname, f_lifetime, pfn) < 0) {
     433                                        printf ("failed\n");
     434                                        fprintf (stderr, "dpm_replicatex %s: %s\n", lp->sfn, sstrerror (serrno));
     435                                        pass_rc = 1;
     436                                        continue;
     437                                }
     438                                if (lp->f_type != 'P') {
     439                                        free (rep_entries);
     440                                        rep_entries = NULL;
     441                                        if (dpns_getreplicax (path, NULL, NULL, &nbentries, &rep_entries) < 0) {
     442                                                printf ("failed\n");
     443                                                fprintf (stderr, "dpns_getreplicax: %s: %s\n",
     444                                                    path, sstrerror (serrno));
     445                                                if (dpm_delreplica (pfn) < 0)
     446                                                        rc = 1;
     447                                                pass_rc = 1;
     448                                                continue;
     449                                        }
     450                                        for (i=0; i < nbentries; i++) {
     451                                                if (!strcmp (pfn, rep_entries[i].sfn))
     452                                                        break;
     453                                        }
     454                                        if (i>=nbentries) {
     455                                                printf ("failed\n");
     456                                                fprintf (stderr, "could not find new replica\n");
     457                                                if (dpm_delreplica (pfn) < 0)
     458                                                        rc = 1;
     459                                                pass_rc = 1;
     460                                                continue;
     461                                        }
     462                                        if (rep_entries[i].ltime < f_lifetime) {
     463                                                printf ("failed\n");
     464                                                fprintf (stderr, "could not replicate to a new file with sufficient lifetime\n");
     465                                                if (dpm_delreplica (pfn) < 0)
     466                                                        rc = 1;
     467                                                pass_rc = 1;
     468                                                continue;
     469                                        }
     470                                }
     471                        }
     472                        printf ("deleting\n");
     473                        fflush (stdout);
     474                        if (dpm_delreplica (lp->sfn) < 0) {
     475                                fprintf (stderr, "dpm_delreplica %s: %s\n", lp->sfn, sstrerror (serrno));
     476                                pass_rc = 1;
     477                                rc = 1;
     478                                continue;
     479                        }
     480                        if (min_size != 0) {
     481                                current_size += statbuf.filesize;
     482                                if (current_size >= min_size)
     483                                        break;
     484                        }
     485                        pass_good++;
    678486                }
    679487                save_serrno = serrno;
    680488                (void) Cns_listreplicax (poolname, server, fs, CNS_LIST_END, &list);
    681                 while (s.nbfree<s.nbthreads) {
    682                         if (Cthread_cond_wait (&s.condvar) < 0) {
    683                                 fprintf (stderr, "Error in Cthread_cond_wait: %s\n", sstrerror (serrno));
    684                                 exit (SYERR);
    685                         }
    686                 }
    687                 collect_finished_and_reinit(&s);
    688 
    689                 if (sig_int_received || lp != NULL || s.rc || save_serrno != SETIMEDOUT || s.pass_good == 0) {
    690                         if (s.pass_rc)
    691                                 s.rc = 1;
     489                free (rep_entries);
     490                rep_entries = NULL;
     491                if (sig_int_received || lp != NULL || rc || save_serrno != SETIMEDOUT || pass_good == 0) {
     492                        if (pass_rc)
     493                                rc = 1;
    692494                        if (!sig_int_received && lp == NULL && save_serrno)
    693495                                fprintf (stderr, "Cns_listreplicax: %s\n", sstrerror (save_serrno));
    694496                        break;
    695497                }
    696         }
    697         if (Cthread_mutex_unlock (&s.condvar) < 0)
    698                 fprintf (stderr, "Cthread_mutex_unlock", sstrerror (serrno));
     498                printf ("\n");
     499        }
    699500        if (sig_int_received) {
    700                 thread_logit (NULL, stderr, "Finishing after user requested that the drain stop\n");
    701                 s.rc = 1;
    702         } else if (s.rc)
    703                 thread_logit (NULL, stderr, "There were some errors which prevented dpm-drain from completing fully\n");
    704         else
    705                 thread_logit (NULL, stderr, "Finished draining\n");
    706         if (s.min_size != 0) {
    707                 thread_logit (NULL, stdout, "number of bytes drained %s\n", u64tostru (s.current_size, u64buf, 0));
    708                 if (s.current_size < s.min_size)
    709                         s.rc = 1;
    710         }
    711         return (s.rc);
     501                fprintf (stderr, "\nFinishing after interrupt\n");
     502                rc = 1;
     503        } else if (rc)
     504                fprintf (stderr, "\nThere were some errors which prevented dpm-drain from completing fully\n");
     505        if (min_size != 0) {
     506                printf ("\nnumber of bytes drained %s\n", u64tostru (current_size, u64buf, 0));
     507                fflush (stdout);
     508                if (current_size < min_size)
     509                        rc = 1;
     510        }
     511        exit (rc);
    712512}
  • lcg-dm/trunk/dpm/dpm_copyfile.c

    • Property svn:keywords set to Id Date Author Revision
    r2935 r3543  
    55
    66#ifndef lint
    7 static char sccsid[] = "@(#)$RCSfile: dpm_copyfile.c,v $ $Revision: 1.6 $ $Date: 2009/12/14 10:08:22 $ CERN Jean-Philippe Baud";
     7static char sccsid[] = "@(#)$RCSfile: dpm_copyfile.c,v $ $Revision$ $Date$ CERN Jean-Philippe Baud";
    88#endif /* not lint */
    99
     
    2727        monitor_t *mt;
    2828        volatile globus_bool_t done;
    29         int istimeout;
    3029}  check_t;
    3130
    3231static int ggc_interrupted;
     32static int ggc_interrupt_processed;
     33static int istimeout;
    3334
    3435static volatile int globus_activated = 0;
    3536
    36 int
    37 dpm_copyfile_activate(void)
     37static int
     38do_globus_activates(void)
    3839{
    3940        /* not thread safe */
    4041        if (!globus_activated) {
    4142                globus_activated = 1;
    42                 ggc_interrupted = 0;
    4343                (void) globus_module_activate (GLOBUS_GASS_COPY_MODULE);
    4444                (void) globus_module_activate (GLOBUS_FTP_CLIENT_MODULE);
     
    173173        {
    174174                GlobusTimeReltimeSet (delay, 1, 0);
    175                 if (!monitor->done && !check->istimeout && !ggc_interrupted){
     175                if (!monitor->done && !istimeout && !ggc_interrupted){
    176176                        globus_callback_register_oneshot (
    177177                                NULL,
     
    202202        globus_gass_copy_attr_t ggc_srcattr;
    203203        globus_gass_copy_handle_t ggc_handle;
    204         int ggc_interrupt_processed;
    205204        globus_result_t gresult;
    206205        monitor_t monitor;
     
    214213        int save_errno = 0;
    215214
     215        ggc_interrupted = 0;
    216216        ggc_interrupt_processed = 0;
    217         check.istimeout = 0;
    218         rc = dpm_copyfile_activate();
     217        istimeout = 0;
     218        rc = do_globus_activates();
    219219        globus_mutex_init (&monitor.mutex, NULL);
    220220        globus_cond_init (&monitor.cond, NULL);
     
    319319                ts.tv_sec = time (0) + timeout;
    320320                ts.tv_nsec = 0;
    321                 while ( (! monitor.done) && (!check.istimeout) && (!save_errno)) {
     321                while ( (! monitor.done) && (!istimeout) && (!save_errno)) {
    322322                        if (ggc_interrupted && ! ggc_interrupt_processed) {
    323323                                globus_gass_copy_cancel (&ggc_handle,
     
    330330                }
    331331        } else
    332                 while ( (! monitor.done) && (!check.istimeout) && (!save_errno)) {
     332                while ( (! monitor.done) && (!istimeout) && (!save_errno)) {
    333333                        if (ggc_interrupted && ! ggc_interrupt_processed) {
    334334                                globus_gass_copy_cancel (&ggc_handle,
     
    341341                }
    342342        if (save_errno == ETIMEDOUT)
    343                 check.istimeout = 1;
     343                istimeout = 1;
    344344        if (!monitor.done){
    345345                int ret;
     
    371371        if (ggc_interrupt_processed == 1) {
    372372                save_errno = ECANCELED;
    373         } else if (check.istimeout == 1) {
     373        } else if (istimeout == 1) {
    374374                save_errno = ETIMEDOUT;
    375375        }
  • lcg-dm/trunk/h/dpm_api.h

    • Property svn:keywords set to Id Date Author Revision
    r2935 r3543  
    11/*
    2  * $Id: dpm_api.h,v 1.21 2009/12/14 10:07:40 dhsmith Exp $
     2 * $Id$
    33 */
    44
     
    99
    1010/*
    11  * @(#)$RCSfile: dpm_api.h,v $ $Revision: 1.21 $ $Date: 2009/12/14 10:07:40 $ CERN IT-GD/CT Jean-Philippe Baud
     11 * @(#)$RCSfile: dpm_api.h,v $ $Revision$ $Date$ CERN IT-GD/CT Jean-Philippe Baud
    1212 */
    1313
     
    157157#ifndef SWIG
    158158EXTERN_C int DLL_DECL dpm_copyfile _PROTO((char *, char *, int, int));
    159 EXTERN_C int DLL_DECL dpm_copyfile_activate _PROTO(());
    160159#endif
    161160EXTERN_C int DLL_DECL dpm_delreplica _PROTO((char *));
Note: See TracChangeset for help on using the changeset viewer.