ingest_util.c

Go to the documentation of this file.
00001 /*******************************************************************************
00002 *
00003 *  COPYRIGHT (C) 2006 Battelle Memorial Institute.  All Rights Reserved.
00004 *
00005 ********************************************************************************
00006 *
00007 *  Author:
00008 *     name:  Brian Ermold
00009 *     phone: (509) 375-2277
00010 *     email: brian.ermold@pnl.gov
00011 *
00012 ********************************************************************************
00013 *
00014 *  RCS INFORMATION:
00015 *    $RCSfile: ingest_util.c,v $
00016 *    $Revision: 1.20 $
00017 *    $Locker:  $
00018 *    $Date: 2006/09/05 19:05:55 $
00019 *    $State: Exp $
00020 *
00021 *******************************************************************************/
00022 
00023 #include <ctype.h>
00024 #include <dirent.h>
00025 #include <errno.h>
00026 #include <signal.h>
00027 #include <string.h>
00028 #include <unistd.h>
00029 #include <sys/resource.h>
00030 
00031 #include "ingest_lib/ingest_util.h"
00032 #include "ingest_lib/ingest_stats.h"
00033 #include "ingest_lib/sds_util.h"
00034 #include "ingest_lib/sds_log.h"
00035 #include "ingest_lib/sds_msg.h"
00036 #include "ingest_lib/sds_status.h"
00037 #include "ingest_lib/sds_tdb.h"
00038 
00039 #include "message.h"
00040 #include "timer.h"
00041 
00042 /*******************************************************************************
00043 *  Private Variables:
00044 */
00045 
00046 static char *gRCSId;
00047 static char *gRCSState;
00048 
00049 static char gTDBInstBase[MAXPLATNAME];
00050 static char gTDBInstName[MAXPLATNAME];
00051 static int  gTDBInstNum;
00052 
00053 static ProcLoc  *gIngestLocation;
00054 static char      gIngestInDir[MAXPATHNAME];
00055 static DSClass **gIngestOutputs;
00056 
00057 static int   gIngestTimeLimit;
00058 static char *gFileExtensionString;
00059 
00060 static char *gIngestCustodian;
00061 static char *gMentorCustodian;
00062 
00063 /* End Private Variables */
00064 
00065 const char *get_ingest_rcsid(void) { return((const char *)gRCSId); }
00066 const char *get_ingest_rcsstate(void) { return((const char *)gRCSState); }
00067 
00068 const char *get_tdb_inst_name(void) { return((const char *)gTDBInstName); }
00069 const char *get_tdb_inst_base(void) { return((const char *)gTDBInstBase); }
00070 int         get_tdb_inst_num(void)  { return(gTDBInstNum); }
00071 
00072 ProcLoc     *get_ingest_location(void) { return(gIngestLocation); }
00073 const char  *get_ingest_in_dir(void)   { return(gIngestInDir); }
00074 DSClass    **get_ingest_outputs(void)  { return(gIngestOutputs); }
00075 
00076 int   get_time_limit(void)       { return(gIngestTimeLimit); }
00077 char *get_extension_string(void) { return(gFileExtensionString); }
00078 
00079 char *get_ingest_custodian(void) { return(gIngestCustodian); }
00080 char *get_mentor_custodian(void) { return(gMentorCustodian); }
00081 
00082 /*******************************************************************************
00083 *  Private Functions:
00084 */
00085 
00086 static int get_raw_data_in_dir()
00087 {
00088     char       *collection_home;
00089     DSClass   **ds_classes;
00090     const char *site;
00091     const char *facility;
00092 
00093     collection_home = get_collection_home();
00094     if (!collection_home) {
00095         return(0);
00096     }
00097 
00098     ds_classes = get_process_inputs();
00099     if (!ds_classes) {
00100         return(0);
00101     }
00102 
00103     site     = get_process_site();
00104     facility = get_process_facility();
00105 
00106     sprintf(gIngestInDir, "%s/%s/%s%s%s.%s",
00107         collection_home, site,
00108         site,
00109         ds_classes[0]->name,
00110         facility,
00111         ds_classes[0]->level);
00112 
00113     print_debug(__FILE__, __LINE__,
00114         "Raw Data Input Directory: %s\n", gIngestInDir);
00115 
00116     dsdb_free_ds_classes(ds_classes);
00117 
00118     return(1);
00119 }
00120 
00121 /* End Private Functions */
00122 
00123 int validate_ingest_output_dsc(char *dsc_name, char *dsc_level)
00124 {
00125     int row;
00126 
00127     for (row = 0; gIngestOutputs[row]; row++) {
00128         if ((strcmp(gIngestOutputs[row]->name,  dsc_name)  == 0) &&
00129             (strcmp(gIngestOutputs[row]->level, dsc_level) == 0) ) {
00130             return(1);
00131         }
00132     }
00133 
00134     current_status(STATUS_BADOUTDSC);
00135     append_log_msg(__FILE__, __LINE__,
00136         "Invalid output datastream class: ['%s' '%s']\n", dsc_name, dsc_level);
00137 
00138     return(0);
00139 }
00140 
00141 /*******************************************************************************
00142 *
00143 * Description:
00144 *   The incoming function is used to hook into ZEBRA's message handler.
00145 *
00146 * Inputs:
00147 *   msg: The msg to go to the ZEBRA message handler..
00148 *
00149 * Outputs:
00150 *   None.
00151 *
00152 * Returns:
00153 *   0
00154 *
00155 *******************************************************************************/
00156 static int incoming(struct message *msg)
00157 {
00158     switch (msg->m_proto) {
00159         case MT_TIMER:
00160             tl_DispatchEvent((struct tm_time *) msg->m_data);
00161             break;
00162     }
00163     return(0);
00164 }
00165 
00166 /*******************************************************************************
00167 *
00168 * Description:
00169 *   The init_DS function will hook an ingest module into the ZEBRA datastore
00170 *   and message handler.
00171 *
00172 * Inputs:
00173 *   platbase: The name of the software hooking into ZEB.
00174 *
00175 * Outputs:
00176 *   None.
00177 *
00178 * Returns:
00179 *   1 = success, 0 = failure
00180 *
00181 *******************************************************************************/
00182 int init_DS(char *site, char *facility, char *proc_name)
00183 {
00184     char id_name[SHORTSTRLEN];
00185 
00186     sprintf(id_name, "%s.%s.%s.ingest.%d", site, facility, proc_name, (int)getpid());
00187 
00188     usy_init();
00189 
00190     if (!msg_connect(incoming, id_name)) {
00191         print_debug(__FILE__, __LINE__, "msg_connect failed.\n");
00192         return(0);
00193     }
00194 
00195     if (!ds_Initialize()) {
00196         print_debug(__FILE__, __LINE__, "ds_Initialize failed.\n");
00197         return(0);
00198     }
00199 
00200     return(1);
00201 }
00202 
00203 
00204 /*******************************************************************************
00205 *
00206 * Description:
00207 *   The init_ingest function will initialize an ingest.
00208 *   It is responsible for:  <blockquote><p>
00209 *
00210 *       opening the databases                          <br>
00211 *       opening the log file                           <br>
00212 *       hooking into the zebra data store              <br>
00213 *       setting up the mail messages                   <br>
00214 *       updating the history's last time started field <br>
00215 *       reseting the file and data stats structures.   </p></blockquote>
00216 *
00217 *   Note that when an ingest produces multiple datastreams you should only
00218 *   call init_ingest once with the base platform name.  For example, the
00219 *   smos_ingest produces 3 datastreams: 1smos 30smos and 1440smos. In this
00220 *   case init_ingest should be called with the base platform name of "smos".
00221 *
00222 * Inputs:
00223 *   site:      The site name (i.e. sgp)
00224 *
00225 *   facility:  The facility name (i.e. C1)
00226 *
00227 *   proc_name: The process name (i.e. vceil25k)
00228 *
00229 * Outputs:
00230 *   None.
00231 *
00232 * Returns:
00233 *   None.
00234 *
00235 *******************************************************************************/
00236 void init_ingest(
00237     char *rcsid,
00238     char *rcsstate,
00239     char *site,
00240     char *facility,
00241     char *proc_name)
00242 {
00243     int   db_attempts;
00244     char  mail_from[64];
00245     char  mail_subject[64];
00246     char  si;
00247 
00248     print_debug(__FILE__, __LINE__,
00249         "Initializing ingest for: ['%s' '%s' '%s']\n"
00250         "RCS Id:    %s\n"
00251         "RCS State: %s\n",
00252         site, facility, proc_name, rcsid, rcsstate);
00253 
00254     gRCSId    = rcsid;
00255     gRCSState = rcsstate;
00256 
00257     /*  Initialize the database connection.
00258      */
00259 
00260     db_attempts = db_connect("ingest");
00261     if (!db_attempts) {
00262         fprintf(stderr,
00263             "Ingest ['%s' '%s' '%s'] could not connect to DSDB using db_alias 'ingest'.\n",
00264              __FILE__, __LINE__, site, facility, proc_name);
00265         exit(1);
00266     }
00267 
00268     /*  Initialize the database "methods".
00269      */
00270 
00271     init_process_methods(site, facility, "Ingest", proc_name);
00272     init_tdb_methods(site);
00273 
00274     /*  Initialize the mail messages
00275      */
00276 
00277     sprintf(mail_from, "%s.%s.%s", site, facility, proc_name);
00278     for (si = 0; mail_from[si] != '\0'; si++) {
00279         mail_from[si] = toupper(mail_from[si]);
00280     }
00281     mail_from[si] = '\0';
00282 
00283     sprintf(mail_subject, "Ingest");
00284 
00285     gIngestCustodian = get_tdb_ingest_custodian();
00286     if (!gIngestCustodian) {
00287         if (is_db_error()) {
00288             db_finish();
00289             exit(1);
00290         }
00291         else {
00292             print_debug(__FILE__, __LINE__,
00293                 "Ingest Mail Disabled: No ingest_custodian found in TDB Tables.\n");
00294         }
00295     }
00296 
00297     if (gIngestCustodian) {
00298         if (!init_mail_message(INGEST_MSG, mail_from, mail_subject)) {
00299             db_finish();
00300             exit(1);
00301         }
00302     }
00303 
00304     /*  From this point on we can call exit_ingest if a fatal error occurs.
00305      *  This will at least send the ingest mail and update the status in the
00306      *  DB even if the log file has not been opened yet.
00307      */
00308 
00309     gMentorCustodian = get_tdb_mentor_custodian(proc_name);
00310     if (!gMentorCustodian) {
00311         if (is_db_error()) {
00312             db_disconnect();
00313             exit_ingest(FAILURE);
00314         }
00315         else {
00316             print_debug(__FILE__, __LINE__,
00317                 "Mentor Mail Disabled: No %s_mentor_custodian was found in the TDB Tables.\n",
00318                 proc_name);
00319         }
00320     }
00321 
00322     if (gMentorCustodian) {
00323         if (!init_mail_message(MENTOR_MSG, mail_from, mail_subject)) {
00324             db_disconnect();
00325             exit_ingest(FAILURE);
00326         }
00327     }
00328 
00329     /*  Open the log file.
00330      */
00331 
00332     if (!open_instrlog(proc_name, INGESTLOG_EXTENSION)) {
00333         current_status(STATUS_NOLOGOPEN);
00334         db_disconnect();
00335         exit_ingest(FAILURE);
00336     }
00337 
00338     /*  Initialize the signal handlers
00339      */
00340 
00341     if (!init_signal_handlers()) {
00342         db_disconnect();
00343         exit_ingest(FAILURE);
00344     }
00345 
00346     /*  Log the number of attempts it took to connect to the database
00347      *  if it was more than once.
00348      */
00349 
00350     if (db_attempts > 1) {
00351         append_instrlog(
00352             "DB_ATTEPTS: It took %d attempts to connect to the database.",
00353             db_attempts);
00354     }
00355 
00356     /*  Update the last started time
00357      */
00358 
00359     if (!update_process_started()) {
00360         db_disconnect();
00361         exit_ingest(FAILURE);
00362     }
00363 
00364     /*  Get the tdb instrument number from the TDB tables and store the
00365      *  tdb_inst_base tdb_inst_num and tdb_inst_name in global space.
00366      */
00367 
00368     strcpy(gTDBInstBase, proc_name);
00369 
00370     gTDBInstNum = get_tdb_instrument_number(gTDBInstBase, facility);
00371 
00372     if (!gTDBInstNum) {
00373         if (!is_db_error()) {
00374             append_log_msg(__FILE__, __LINE__,
00375                 "Could not find instrument number in TDB Tables\n",
00376                  gTDBInstBase, facility);
00377             current_status(STATUS_NOINSTNUM);
00378         }
00379         exit_ingest(FAILURE);
00380     }
00381 
00382     sprintf(gTDBInstName, "%s%d", gTDBInstBase, gTDBInstNum);
00383 
00384     /*  Pre-load required information from the database
00385      */
00386 
00387     gIngestLocation = get_process_location();
00388     if (!gIngestLocation) {
00389         db_disconnect();
00390         exit_ingest(FAILURE);
00391     }
00392 
00393     if (!get_raw_data_in_dir()) {
00394         db_disconnect();
00395         exit_ingest(FAILURE);
00396     }
00397 
00398     gIngestOutputs = get_process_outputs();
00399     if (!gIngestOutputs) {
00400         db_disconnect();
00401         exit_ingest(FAILURE);
00402     }
00403 
00404     gIngestTimeLimit = get_tdb_ingest_time_limit(gTDBInstBase, gTDBInstNum);
00405     if (!gIngestTimeLimit) {
00406         if (!is_db_error()) {
00407             append_log_msg(__FILE__, __LINE__,
00408                 "Could not find Ingest time limit in TDB Tables\n");
00409             current_status(STATUS_NOTIMELIMIT);
00410         }
00411         db_disconnect();
00412         exit_ingest(FAILURE);
00413     }
00414 
00415     gFileExtensionString = get_tdb_ingest_file_extension(gTDBInstBase, gTDBInstNum);
00416     if (!gFileExtensionString) {
00417         if (!is_db_error()) {
00418             append_log_msg(__FILE__, __LINE__,
00419                 "Could not find ingest file extension in TDB Tables\n");
00420             current_status(STATUS_NOFILEEXT);
00421         }
00422         db_disconnect();
00423         exit_ingest(FAILURE);
00424     }
00425 
00426     /*  Disconnect from the database until we need it again
00427      */
00428 
00429     db_disconnect();
00430 
00431     /*  Hook into ZEB
00432      */
00433 
00434     if (!init_DS(site, facility, proc_name)) {
00435         current_status(STATUS_INITDS);
00436         append_log_msg(__FILE__, __LINE__,
00437             "Unable to initialize the zebra data store\n");
00438         exit_ingest(FAILURE);
00439     }
00440 
00441     /*  Initialize the stats structures
00442      */
00443 
00444     reset_dc_stats();
00445     reset_file_stats();
00446 }
00447 
00448 /*******************************************************************************
00449 *
00450 * Description:
00451 *   The build_extlist function will build the list of file extensions for the
00452 *   ingest specified in the init_ingest function. The last element of the list
00453 *   will be NULL.
00454 *
00455 * Inputs:
00456 *   None.
00457 *
00458 * Outputs:
00459 *   numext: The number of extensions in the list
00460 *
00461 * Returns:
00462 *   A pointer to the list of extensions found.
00463 *
00464 *******************************************************************************/
00465 char **build_extlist(int *numext)
00466 {
00467     static char *ext_list[MAXEXTENSIONS];
00468     static char  ext_str[SHORTSTRLEN];
00469 
00470     int   ext_strlen;
00471     char *chrp;
00472 
00473     *numext = 0;
00474 
00475     strcpy(ext_str, gFileExtensionString);
00476     ext_strlen = strlen(gFileExtensionString);
00477 
00478     chrp = ext_str;
00479 
00480     while(chrp < ext_str + ext_strlen) {
00481 
00482         while((!isalnum(*chrp)) && (*chrp != '\0')) {
00483             chrp++;
00484         }
00485 
00486         if (*chrp == '\0') {
00487             break;
00488         }
00489         else {
00490             ext_list[*numext] = chrp;
00491             while(isalnum(*++chrp));
00492             *chrp = '\0';
00493             print_debug(__FILE__, __LINE__,
00494                 "Added %s to the file extensions list\n", ext_list[*numext]);
00495             chrp++;
00496             (*numext)++;
00497         }
00498     }
00499 
00500     if (*numext == 0) {
00501         ext_str[0] = '\0';
00502         ext_list[*numext] = ext_str;
00503         (*numext)++;
00504     }
00505 
00506     ext_list[*numext] = (char *)NULL;
00507     return(ext_list);
00508 }
00509 
00510 /*******************************************************************************
00511 *
00512 * Description:
00513 *   The build_filelist function will obtain the list of files to ingest for the
00514 *   process specified in init_ingest. The last element of the list will be NULL.
00515 *
00516 * Inputs:
00517 *   None.
00518 *
00519 * Outputs:
00520 *   nfiles:   The number of files returned, -1 on failure.
00521 *
00522 *   dirname:  The name of the directory that was searched.
00523 *
00524 * Returns:
00525 *   A pointer to the list of files found.
00526 *
00527 *******************************************************************************/
00528 char **build_filelist(int *nfiles, char *dirname)
00529 {
00530     int             increment_size = 128;
00531     char          **file_list;
00532     char          **ext_list;
00533     int             numext;
00534     DIR            *dirp;
00535     struct dirent  *direntp;
00536     char          **extpp;
00537     char           *file_ext;
00538     int             list_size;
00539 
00540     print_debug(__FILE__, __LINE__,
00541         "Building the file list for: %s\n", gTDBInstName);
00542 
00543     /*  Allocate the space for the file_list
00544      */
00545 
00546     list_size = increment_size;
00547     file_list = (char **)calloc((size_t)list_size, sizeof(char *));
00548 
00549     if (!file_list) {
00550         append_log_msg(__FILE__, __LINE__,
00551             "Could not allocate memory for the file list!\n");
00552         current_status(STATUS_NOMEM);
00553         *nfiles = -1;
00554         return((char **)NULL);
00555     }
00556     file_list[0] = NULL;
00557 
00558     /*  Copy gIngestInDir to dirname if it is not NULL.
00559      */
00560 
00561     if (dirname) {
00562         strcpy(dirname, gIngestInDir);
00563     }
00564 
00565     /*  Build the list of file extensions we are looking for
00566      */
00567 
00568     ext_list = build_extlist(&numext);
00569 
00570     /*  Open the directory and build our file list
00571      */
00572 
00573     print_debug(__FILE__, __LINE__,
00574         "Searching for files in directory %s\n", gIngestInDir);
00575 
00576     dirp = opendir(gIngestInDir);
00577     if (!dirp) {
00578         append_log_msg(__FILE__, __LINE__,
00579             "Error #%i opening directory %s\n -> %s\n",
00580             errno, gIngestInDir, strerror(errno));
00581         free(file_list);
00582         current_status(STATUS_NODIROPEN);
00583         *nfiles = -1;
00584         return((char **)NULL);
00585     }
00586 
00587     *nfiles = 0;
00588     while ((direntp = readdir(dirp)) != NULL ) {
00589 
00590         /*  Check if this file has an extension in our extension list
00591          */
00592 
00593         for (extpp = ext_list; *extpp != NULL; extpp++) {
00594 
00595             if (strlen(direntp->d_name) > strlen(*extpp)) {
00596 
00597                 file_ext = direntp->d_name + strlen(direntp->d_name) - strlen(*extpp);
00598 
00599                 if (strcmp(*extpp, file_ext) == 0) {
00600 
00601                     /*  This file has the proper extension so make sure
00602                      *  our file list has room to add it.
00603                      */
00604 
00605                     if (*nfiles > list_size - 1) {
00606 
00607                         list_size += increment_size;
00608 
00609                         file_list = (char **)realloc(file_list, list_size * sizeof(char *));
00610 
00611                         if (!file_list) {
00612                             append_log_msg(__FILE__, __LINE__,
00613                                 "Could not reallocate memory for file list!\n");
00614                             closedir(dirp);
00615                             current_status(STATUS_NOMEM);
00616                             *nfiles = -1;
00617                             return((char **)NULL);
00618                         }
00619                     }
00620 
00621                     /*  Now we can store it in the file list.
00622                      */
00623 
00624                     file_list[*nfiles] = (char *)malloc((strlen(direntp->d_name) + 1) * sizeof(char));
00625 
00626                     if (!file_list[*nfiles]) {
00627                         append_log_msg(__FILE__, __LINE__,
00628                             "Could not allocate memory for file name: %s\n", direntp->d_name);
00629                         closedir(dirp);
00630                         current_status(STATUS_NOMEM);
00631                         *nfiles = -1;
00632                         return((char **)NULL);
00633                     }
00634 
00635                     strcpy(file_list[*nfiles], direntp->d_name);
00636                     (*nfiles)++;
00637 
00638                     print_debug(__FILE__, __LINE__,
00639                         "Added %s to the file list\n", direntp->d_name);
00640 
00641                     break;
00642                 }
00643             }
00644         }
00645     }
00646 
00647     closedir(dirp);
00648 
00649     /*  Set the last element of file_list to NULL
00650      */
00651 
00652     file_list[*nfiles] = (char *)NULL;
00653 
00654     /*  Sort the file_list
00655      */
00656 
00657     qsort(file_list, *nfiles, sizeof(char *), namecompare);
00658 
00659     return(file_list);
00660 }
00661 
00662 /*******************************************************************************
00663 *
00664 * Description:
00665 *   The free_filelist function will free the filelist created by build_filelist.
00666 *
00667 * Inputs:
00668 *   filelist: The filelist created by build_filelist.
00669 *
00670 * Outputs:
00671 *   None.
00672 *
00673 * Returns:
00674 *   None.
00675 *
00676 *******************************************************************************/
00677 void free_filelist(char **file_list)
00678 {
00679     int i;
00680 
00681     if (file_list) {
00682 
00683         for (i = 0; file_list[i]; i++) {
00684             free(file_list[i]);
00685         }
00686         free(file_list);
00687     }
00688 }
00689 
00690 /*******************************************************************************
00691 *
00692 * Description:
00693 *   The exit_ingest function cleans up after an ingest and exits. It is
00694 *   responsible for: <blockquote><p>
00695 *
00696 *       updating the ingest status and history information <br>
00697 *       writing the file and data stats to the log file    <br>
00698 *       closing the log file                               <br>
00699 *       mailing the messages if any were generated         <br>
00700 *       closing the databases.                             </p></blockquote>
00701 *
00702 * Inputs:
00703 *   platname:  The platform name we were working on.
00704 *
00705 *   status:    The current status of the ingest (1 = success, 0 = failure).
00706 *
00707 * Outputs:
00708 *   None.
00709 *
00710 * Returns:
00711 *   None.
00712 *
00713 *******************************************************************************/
00714 void exit_ingest(int status)
00715 {
00716     ProcStatus      *proc_status;
00717     time_t           last_started;
00718     time_t           last_successful;
00719     time_t           inst_exp_int;
00720     const char      *status_text;
00721     const char      *status_name;
00722     const char      *site;
00723     const char      *facility;
00724     const char      *proc_name;
00725     char            *hostname;
00726     char            *tdbvalue;
00727     int              disable_status_changed_mail;
00728     char             current_status_msg[512];
00729     char             mail_msg[512];
00730 
00731     char             uc_proc_name[32];
00732     char             uc_site[8];
00733     int              si;
00734 
00735     int found_files = 1;
00736     int db_connected;
00737 
00738     /*  Reconnect to the Database
00739      */
00740 
00741     db_connected = db_reconnect();
00742 
00743     if (db_connected) {
00744 
00745         /*  Get the process status from the database
00746          */
00747 
00748         proc_status = get_process_status();
00749         if (!proc_status || !proc_status->text) {
00750 
00751             print_debug(__FILE__, __LINE__,
00752                 "No status for %s has ever been recorded\n", gTDBInstName);
00753 
00754             proc_status     = (ProcStatus *)NULL;
00755             last_started    = 0;
00756             last_successful = 0;
00757         }
00758         else {
00759 
00760             print_debug(__FILE__, __LINE__,
00761                 "Previous Ingest Status: \"%s\"\n", proc_status->text);
00762 
00763             if (proc_status->last_started) {
00764                 last_started = mktime(proc_status->last_started);
00765             }
00766             else {
00767                 last_started = 0;
00768             }
00769 
00770             if (proc_status->last_successful) {
00771                 last_successful = mktime(proc_status->last_successful);
00772             }
00773             else {
00774                 last_successful = 0;
00775             }
00776         }
00777     }
00778 
00779     /*  Change the status to success if we were successful.
00780      */
00781 
00782     status_text = current_status(NULL);
00783 
00784     if (status == SUCCESS) {
00785 
00786         /*  Change current status to "good" unless it is one of the
00787          *  move_ingested_raw function errors.
00788          */
00789 
00790         if ((strcmp(status_text, STATUS_NULLDC)    != 0)
00791          && (strcmp(status_text, STATUS_NOMOVERAW) != 0)
00792          && (strcmp(status_text, STATUS_NOUTIME)   != 0)) {
00793 
00794             status_text = current_status(STATUS_GOOD);
00795         }
00796     }
00797     else if (db_connected) {
00798 
00799         /*  If the current status is "No files found" and the instrument
00800          *  expectation interval is greater than the difference between the
00801          *  last started time and the last successful we do not want to
00802          *  update the status.
00803          */
00804 
00805         if (strcmp(status_text, STATUS_NOFILES) == 0) {
00806 
00807             inst_exp_int = get_tdb_inst_exp_int(gTDBInstName);
00808 
00809             if (inst_exp_int &&
00810                 ((last_started - last_successful) <= inst_exp_int)) {
00811 
00812                 append_instrlog(
00813                     "No new files found but we are within the instrument expectation interval: %d seconds\n",
00814                     inst_exp_int);
00815 
00816                 update_process_completed();
00817 
00818                 found_files = 0;
00819             }
00820         }
00821     }
00822 
00823     /*  Build the current status message
00824      */
00825 
00826     site = get_process_site();
00827     for (si = 0; site[si] != '\0'; si++) {
00828         uc_site[si] = toupper(site[si]);
00829     }
00830     uc_site[si] = '\0';
00831 
00832     facility = get_process_facility();
00833 
00834     proc_name = get_process_name();
00835     for (si = 0; proc_name[si] != '\0'; si++) {
00836         uc_proc_name[si] = toupper(proc_name[si]);
00837     }
00838     uc_proc_name[si] = '\0';
00839 
00840     hostname = get_hostname();
00841     if (!hostname) {
00842         hostname = "unknown";
00843     }
00844 
00845     sprintf(current_status_msg,
00846         "Current Status:\n"
00847         "Proc:   %s.%s.%s\n"
00848         "Host:   %s\n"
00849         "Status: %s\n",
00850         uc_site, uc_proc_name, facility,
00851         hostname,
00852         status_text);
00853 
00854     if (db_connected) {
00855 
00856         /*  Send mail if the ingest status has changed.
00857          */
00858 
00859         disable_status_changed_mail = 0;
00860 
00861         tdbvalue = shared_tdb_fetch(DS_TDB, "disable_status_changed_mail");
00862         if (tdbvalue && (strcmp(tdbvalue, "off") != 0)) {
00863             disable_status_changed_mail = 1;
00864         }
00865 
00866         if (!proc_status || (strcmp(status_text, proc_status->text) != 0)) {
00867 
00868             if (found_files) {
00869                 print_debug(__FILE__, __LINE__,
00870                     "Status for %s has changed\n", gTDBInstName);
00871 
00872                 if (!disable_status_changed_mail) {
00873 
00874                     sprintf(mail_msg, "%s\nLast successful ingest completed at %s",
00875                         current_status_msg, ctime(&last_successful));
00876 
00877                     append_to_msg(INGEST_MSG, mail_msg);
00878                 }
00879             }
00880         }
00881         else {
00882             print_debug(__FILE__, __LINE__,
00883                 "Status for %s has not changed\n", gTDBInstName);
00884         }
00885 
00886         /*  Update the process status in the database
00887          */
00888 
00889         if (found_files) {
00890 
00891             if (strcmp(status_text, STATUS_GOOD) == 0) {
00892                 status_name = "Success";
00893             }
00894             else {
00895                 status_name = "Failure";
00896             }
00897 
00898             update_process_status(status_name, status_text);
00899         }
00900     }
00901 
00902     /*  Write stats to log file.
00903      */
00904 
00905     write_dc_stats();
00906     write_file_stats();
00907 
00908     /*  We are done with the database so clean up the connection.
00909      */
00910 
00911     db_finish();
00912 
00913     dsdb_free_process_location(gIngestLocation);
00914     dsdb_free_ds_classes(gIngestOutputs);
00915 
00916     print_debug(__FILE__, __LINE__,
00917         "Closed the database\n");
00918 
00919     /*  Mail the ingest and mentor messages.
00920      */
00921 
00922     if (gIngestCustodian) {
00923         mail_message(INGEST_MSG, gIngestCustodian);
00924     }
00925 
00926     if (gMentorCustodian) {
00927         mail_message(MENTOR_MSG, gMentorCustodian);
00928     }
00929 
00930     /*  Append the current status message to the log file and close it.
00931      */
00932 
00933     append_instrlog("\n%s", current_status_msg);
00934     close_instrlog();
00935 
00936     if (status == FAILURE) {
00937         exit(1);
00938     }
00939 
00940     exit(0);
00941 }
00942 
00943 /*****************************************************************************
00944 *
00945 * Description:
00946 *   This function will be called anytime a signal which has been set up by
00947 *   init_signal_handlers is caught. This routine will set the status and
00948 *   call exit_ingest if it is possible to close and re-open the TDB. If the
00949 *   TDB can not be re-opened it will log a message to the log file but it will
00950 *   be impossible to update the status in the TDB.
00951 *
00952 * Inputs:
00953 *   signal: The signal type.
00954 *
00955 *   si:     structure  containing the  reason why the signal was generated
00956 *           (see siginfo(5)).
00957 *
00958 *   uc:     the  receiving process's  context when the signal was delivered
00959 *           (see ucontext(5)).
00960 *
00961 * Outputs:
00962 *   None.
00963 *
00964 * Returns:
00965 *   None.
00966 *
00967 ****************************************************************************/
00968 void catch_sig(int sig, siginfo_t *si, void *uc)
00969 {
00970     char msg[128];
00971 
00972     switch (sig) {
00973 
00974         case SIGQUIT:
00975             strcpy(msg, "SIGQUIT: Quit (see termio(7I))\n");
00976             current_status(STATUS_COREDUMP);
00977             break;
00978         case SIGILL:
00979             strcpy(msg, "SIGILL: Illegal Instruction\n");
00980             current_status(STATUS_COREDUMP);
00981             break;
00982         case SIGTRAP:
00983             strcpy(msg, "SIGTRAP: Trace or Breakpoint Trap\n");
00984             current_status(STATUS_COREDUMP);
00985             break;
00986         case SIGABRT:
00987             strcpy(msg, "SIGABRT: Abort\n");
00988             current_status(STATUS_COREDUMP);
00989             break;
00990         case SIGEMT:
00991             strcpy(msg, "SIGEMT: Emulation Trap\n");
00992             current_status(STATUS_COREDUMP);
00993             break;
00994         case SIGFPE:
00995             strcpy(msg, "SIGFPE: Arithmetic Exception\n");
00996             current_status(STATUS_COREDUMP);
00997             break;
00998         case SIGBUS:
00999             strcpy(msg, "SIGBUS: Bus Error\n");
01000             current_status(STATUS_COREDUMP);
01001             break;
01002         case SIGSEGV:
01003             strcpy(msg, "SIGSEGV: Segmentation Fault\n");
01004             current_status(STATUS_COREDUMP);
01005             break;
01006         case SIGSYS:
01007             strcpy(msg, "SIGSYS: Bad System Call\n");
01008             current_status(STATUS_COREDUMP);
01009             break;
01010         case SIGHUP:
01011             strcpy(msg, "SIGHUP: Hangup (see termio(7I))\n");
01012             current_status(STATUS_SIGNAL);
01013             break;
01014         case SIGINT:
01015             strcpy(msg, "SIGINT: Interrupt (see termio(7I))\n");
01016             current_status(STATUS_SIGNAL);
01017             break;
01018         case SIGPIPE:
01019             strcpy(msg, "SIGPIPE: Broken Pipe\n");
01020             current_status(STATUS_SIGNAL);
01021             break;
01022         case SIGALRM:
01023             strcpy(msg, "SIGALRM: Alarm Clock\n");
01024             current_status(STATUS_SIGNAL);
01025             break;
01026         case SIGTERM:
01027             strcpy(msg, "SIGTERM: Terminated\n");
01028             current_status(STATUS_SIGNAL);
01029             break;
01030         default:
01031             strcpy(msg, "Unknown Signal Type\n");
01032             current_status(STATUS_SIGNAL);
01033     }
01034 
01035     append_log_msg(__FILE__, __LINE__,
01036         "Received Signal %s\n", msg);
01037 
01038     exit_ingest(0);
01039 }
01040 
01041 /*****************************************************************************
01042 *
01043 * Description:
01044 *   This function will limit the codedumpsize and setup the catch_sig function
01045 *   to be the signal handler for the following signals: <blockquote><p>
01046 *
01047 *       SIGHUP  -> Hangup (see termio(7I))    <br>
01048 *       SIGINT  -> Interrupt (see termio(7I)) <br>
01049 *       SIGQUIT -> Quit (see termio(7I))      <br>
01050 *       SIGILL  -> Illegal Instruction        <br>
01051 *       SIGTRAP -> Trace or Breakpoint Trap   <br>
01052 *       SIGABRT -> Abort                      <br>
01053 *       SIGEMT  -> Emulation Trap             <br>
01054 *       SIGFPE  -> Arithmetic Exception       <br>
01055 *       SIGBUS  -> Bus Error                  <br>
01056 *       SIGSEGV -> Segmentation Fault         <br>
01057 *       SIGSYS  -> Bad System Call            <br>
01058 *       SIGPIPE -> Broken Pipe                <br>
01059 *       SIGALRM -> Alarm Clock                <br>
01060 *       SIGTERM -> Terminated                 </p></blockquote>
01061 *
01062 * Inputs:
01063 *   None.
01064 *
01065 * Outputs:
01066 *   None.
01067 *
01068 * Returns:
01069 *   None.
01070 *
01071 ****************************************************************************/
01072 int init_signal_handlers(void)
01073 {
01074     static int initialized;
01075 
01076     struct rlimit   rl;
01077     struct sigaction act;
01078 
01079     if (initialized) {
01080         return(1);
01081     }
01082     initialized = 1;
01083 
01084     act.sa_handler = 0;
01085 
01086   /*  act.sa_mask = sigset_t(0); */
01087     act.sa_flags = (SA_SIGINFO);
01088     act.sa_sigaction = catch_sig;
01089 
01090     if (sigaction(SIGHUP,  &act, 0) != 0 ||   /* Hangup (see termio(7I))    */
01091         sigaction(SIGINT,  &act, 0) != 0 ||   /* Interrupt (see termio(7I)) */
01092         sigaction(SIGQUIT, &act, 0) != 0 ||   /* Quit (see termio(7I))      */
01093         sigaction(SIGILL,  &act, 0) != 0 ||   /* Illegal Instruction        */
01094         sigaction(SIGTRAP, &act, 0) != 0 ||   /* Trace or Breakpoint Trap   */
01095         sigaction(SIGABRT, &act, 0) != 0 ||   /* Abort                      */
01096         sigaction(SIGEMT,  &act, 0) != 0 ||   /* Emulation Trap             */
01097         sigaction(SIGFPE,  &act, 0) != 0 ||   /* Arithmetic Exception       */
01098         sigaction(SIGBUS,  &act, 0) != 0 ||   /* Bus Error                  */
01099         sigaction(SIGSEGV, &act, 0) != 0 ||   /* Segmentation Fault         */
01100         sigaction(SIGSYS,  &act, 0) != 0 ||   /* Bad System Call            */
01101         sigaction(SIGPIPE, &act, 0) != 0 ||   /* Broken Pipe                */
01102         sigaction(SIGALRM, &act, 0) != 0 ||   /* Alarm Clock                */
01103         sigaction(SIGTERM, &act, 0) != 0      /* Terminated                 */
01104        ) {
01105 
01106         append_log_msg(__FILE__, __LINE__,
01107             "Calling sigaction failed, error #%i: %s\n", errno, strerror(errno));
01108         current_status(STATUS_SIGNAL);
01109         return(0);
01110     }
01111 
01112       /*  set core limit size
01113    */
01114     rl.rlim_cur = COREDUMPSIZE;
01115     rl.rlim_max = COREDUMPSIZE;
01116     if (setrlimit(RLIMIT_CORE, &rl) == -1) {
01117 
01118         append_log_msg(__FILE__, __LINE__,
01119             "Calling setrlimit failed, error #%i: %s\n", errno, strerror(errno));
01120         current_status(STATUS_SIGNAL);
01121         return(0);
01122     }
01123 
01124     return(1);
01125 }
01126 
01127 /*******************************************************************************
01128 *
01129 * Description:
01130 *   The bad_line function is used to report a bad line in the specified file.
01131 *   It will count various types of bad data lines, depending on the linetype
01132 *   passed.  If linetype is a REPORT type, this function will append a bad line
01133 *   report to the INGEST_MSG.
01134 *
01135 * Inputs:
01136 *  line:     The line number which was bad.
01137 *
01138 *  filename: The name of the file with a bad line.
01139 *
01140 *  linetype: The type of bad line which was encountered.
01141 *
01142 * Outputs:
01143 *   None.
01144 *
01145 * Returns:
01146 *   None.
01147 *
01148 *******************************************************************************/
01149 void bad_line(int line, char *filename, int linetype)
01150 {
01151     static int badline[BADLINE_MAX];
01152     char       errorstr[MAXSTRLEN];
01153     char       badline_report[MAXSTRLEN*64];
01154     int        badline_total = 0;
01155     int        i;
01156 
01157     if (linetype == BADLINE_REPORT) {
01158         for(i = 0; i < BADLINE_MAX; i++) {
01159 
01160           /*  Add all lines EXCEPT BADLINE_COMMENT
01161            */
01162             if (i != BADLINE_COMMENT) {
01163                 badline_total += badline[i];
01164             }
01165         }
01166 
01167         if (badline_total) {
01168 
01169             sprintf(badline_report, "Bad data lines in file: %s\n", filename);
01170             sprintf(errorstr, "Bad number of elements: %d\n", badline[BADLINE_ELE]);
01171             strcat(badline_report,errorstr);
01172             sprintf(errorstr, "        Bad characters: %d\n", badline[BADLINE_ALPHA]);
01173             strcat(badline_report,errorstr);
01174             sprintf(errorstr, "              Bad date: %d\n", badline[BADLINE_DATE]);
01175             strcat(badline_report,errorstr);
01176             sprintf(errorstr, "       Bad Facility ID: %d\n", badline[BADLINE_FACID]);
01177             strcat(badline_report,errorstr);
01178 
01179           /*
01180            * We will not put the Comment Line count in our report for now
01181            *
01182            * sprintf(errorstr,"         Comment Lines: %d\n",badline[BADLINE_COMMENT]);
01183            * strcat(badline_report,errorstr);
01184            */
01185 
01186             sprintf(errorstr, "       TOTAL BAD LINES: %d\n", badline_total);
01187             strcat(badline_report, errorstr);
01188             append_log_msg(__FILE__, __LINE__, "%s", badline_report);
01189 
01190           /*  Reset the info
01191            */
01192             for(i = 0; i < BADLINE_MAX; i++) {
01193                 badline[i] = 0;
01194             }
01195         }
01196     }
01197     else {
01198         sprintf(errorstr, "Improper DataLine (line %d) in %s", line, filename);
01199         append_to_msg(INGEST_MSG, errorstr);
01200         print_debug(__FILE__, __LINE__, "%s\n", errorstr);
01201         badline[linetype]++;
01202     }
01203 }
01204 
01205 /*******************************************************************************
01206 *
01207 * Description:
01208 *   The get_full_raw_data_file_path function appends the given filename to the
01209 *   raw data input directory.
01210 *
01211 * Inputs:
01212 *   filename:  The name of the raw data file.
01213 *
01214 *   full_path: The pointer to the space to store the full path to the raw file.
01215 *
01216 * Outputs:
01217 *   full_path: The full path to the raw data file.
01218 *
01219 * Returns:
01220 *   1 = success, 0 = failure
01221 *
01222 *******************************************************************************/
01223 int get_full_raw_data_file_path(char *filename, char *full_path)
01224 {
01225     sprintf(full_path, "%s/%s", gIngestInDir, filename);
01226     return(1);
01227 }
01228 
01229 /*******************************************************************************
01230 *
01231 * Description:
01232 *   The open_data_file function opens a file and gets a file pointer.
01233 *
01234 * Inputs:
01235 *   filename: The name of the data file to open.
01236 *
01237 * Outputs:
01238 *   fp:      The pointer to the open file.
01239 *
01240 * Returns:
01241 *   1 = success, 0 = failure
01242 *
01243 *******************************************************************************/
01244 int open_data_file(char *filename, FILE **fp)
01245 {
01246     if ((*fp = fopen(filename, "r")) == NULL ) {
01247         current_status(STATUS_FOPEN);
01248         append_log_msg(__FILE__, __LINE__,
01249             "Could not open file: %s\n", filename);
01250         return(0);
01251     }
01252 
01253     return(1);
01254 }

Generated on Tue Sep 12 20:12:37 2006 for DSUTIL-INGEST_LIB by doxygen 1.3.5