> content >> migrate" * for analyzing data from various sources and importing them into Drupal tables. */ /** * Call a migrate hook */ function migrate_invoke_all($hook) { // Let modules do any one-time initialization (e.g., including migration support files) module_invoke_all('migrate_init'); $args = func_get_args(); $hookfunc = "migrate" . "_$hook"; unset($args[0]); $return = array(); $modulelist = module_implements($hookfunc); foreach ($modulelist as $module) { $function = $module . '_' . $hookfunc; $result = call_user_func_array($function, $args); if (isset($result) && is_array($result)) { $return = array_merge_recursive($return, $result); } elseif (isset($result)) { $return[] = $result; } } return $return; } /** * Call a destination hook (e.g., hook_migrate_prepare_node). Use this version * for hooks with the precise signature below, so that the object can be passed by * reference. * * @param $hook * @param $object * @param $tblinfo * @param $row * @return */ function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) { // We could have used module_invoke_all, but unfortunately // module_invoke_all passes all arguments by value. $errors = array(); $hook = 'migrate_' . $hook; foreach (module_implements($hook) as $module_name) { $function = $module_name . '_' . $hook; if (function_exists($function)) { timer_start($function); $errors = array_merge($errors, (array)$function($object, $tblinfo, $row)); timer_stop($function); } } return $errors; } /** * Save a new or updated content set * * @param $content_set * An array or object representing the content set. This is passed by reference (so * when adding a new content set the ID can be set) * @param $options * Array of additional options for saving the content set. Currently: * base_table: The base table of the view - if provided, we don't need * to load the view. * base_database: The database of the base table - if base_table is present * and base_database omitted, it defaults to 'default' * @return * The ID of the content set that was saved, or NULL if nothing was saved */ function migrate_save_content_set(&$content_set, $options = array()) { // Deal with objects internally (but remember if we need to put the parameter // back to an array) if (is_array($content_set)) { $was_array = TRUE; $content_set = (object) $content_set; } else { $was_array = FALSE; } // Update or insert the content set record as appropriate if (isset($content_set->mcsid)) { drupal_write_record('migrate_content_sets', $content_set, 'mcsid'); } else { drupal_write_record('migrate_content_sets', $content_set); } // Create or modify map and message tables $maptablename = migrate_map_table_name($content_set->mcsid); $msgtablename = migrate_message_table_name($content_set->mcsid); // TODO: For now, PK must be in base_table // If the caller tells us the base table of the view, we don't need // to load the view (which would not work when called from hook_install()) if (isset($options['base_table'])) { $tablename = $options['base_table']; if (isset($options['base_database'])) { $tabledb = $options['base_database']; } else { $tabledb = 'default'; } } else { // Get the proper field definition for the sourcekey $view = views_get_view($content_set->view_name); if (!$view) { drupal_set_message(t('View !view does not exist - either (re)create this view, or remove the content set using it.', array('!view' => $content_set->view_name))); return NULL; } // Must do this to load the database $view->init_query(); $tabledb = $view->base_database; $tablename = $view->base_table; } db_set_active($tabledb); $inspect = schema_invoke('inspect'); db_set_active('default'); $sourceschema = $inspect[$tablename]; // If the PK of the content set is defined, make sure we have a mapping table if ($content_set->sourcekey) { $sourcefield = $sourceschema['fields'][$content_set->sourcekey]; // The field name might be _... if (!$sourcefield) { $sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1); $sourcefield = $sourceschema['fields'][$sourcekey]; } // But - we don't want serial fields to behave serially, so change to int if ($sourcefield['type'] == 'serial') { $sourcefield['type'] = 'int'; } $schema_change = FALSE; if (!db_table_exists($maptablename)) { $schema = _migrate_map_table_schema($sourcefield); db_create_table($ret, $maptablename, $schema); // Expose map table to views tw_add_tables(array($maptablename)); tw_add_fk($maptablename, 'destid'); $schema = _migrate_message_table_schema($sourcefield); db_create_table($ret, $msgtablename, $schema); // Expose messages table to views tw_add_tables(array($msgtablename)); tw_add_fk($msgtablename, 'sourceid'); $schema_change = TRUE; } else { // TODO: Deal with varchar->int case where there is existing non-int data $desired_schema = _migrate_map_table_schema($sourcefield); $actual_schema = $inspect[$maptablename]; if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) { $ret = array(); db_drop_primary_key($ret, $maptablename); db_change_field($ret, $maptablename, 'sourceid', 'sourceid', $sourcefield, array('primary key' => array('sourceid'))); tw_perform_analysis($maptablename); $schema_change = TRUE; } $desired_schema = _migrate_message_table_schema($sourcefield); $actual_schema = $inspect[$msgtablename]; if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) { $ret = array(); db_drop_index($ret, $msgtablename, 'sourceid'); db_change_field($ret, $msgtablename, 'sourceid', 'sourceid', $sourcefield, array('indexes' => array('sourceid' => array('sourceid')))); tw_perform_analysis($maptablename); $schema_change = TRUE; } } // Make sure the schema gets updated to reflect changes if ($schema_change) { cache_clear_all('schema', 'cache'); } } if ($was_array) { $content_set = (array)$content_set; return $content_set['mcsid']; } else { return $content_set->mcsid; } } function migrate_save_content_mapping(&$mapping) { if ($mapping->mcmid) { drupal_write_record('migrate_content_mappings', $mapping, 'mcmid'); } else { drupal_write_record('migrate_content_mappings', $mapping); } } function migrate_delete_content_set($mcsid) { // First, remove the map and message tables from the Table Wizard, and drop them $ret = array(); $maptable = migrate_map_table_name($mcsid); $msgtable = migrate_message_table_name($mcsid); if (db_table_exists($maptable)) { tw_remove_tables(array($maptable, $msgtable)); db_drop_table($ret, $maptable); db_drop_table($ret, $msgtable); } // Then, delete the content set data $sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d"; db_query($sql, $mcsid); $sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d"; db_query($sql, $mcsid); } function migrate_delete_content_mapping($mcmid) { $sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d"; db_query($sql, $mcmid); } /** * Convenience function for generating a message array * * @param $message * Text describing the error condition * @param $type * One of the MIGRATE_MESSAGE constants, identifying the level of error * @return * Structured array suitable for return from an import hook */ function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) { $error = array( 'level' => $type, 'message' => $message, ); return $error; } /** * Add a mapping from source ID to destination ID for the specified content set * * @param $mcsid * ID of the content set being processed * @param $sourceid * Primary key value from the source * @param $destid * Primary key value from the destination */ function migrate_add_mapping($mcsid, $sourceid, $destid) { static $maptables = array(); if (!isset($maptables[$mcsid])) { $maptables[$mcsid] = migrate_map_table_name($mcsid); } $mapping = new stdClass; $mapping->sourceid = $sourceid; $mapping->destid = $destid; drupal_write_record($maptables[$mcsid], $mapping); } /** * Clear migrated objects from the specified content set * * @param $mcsid * ID of the content set to clear * @param $messages * Array of messages to (ultimately) be displayed by the caller. * @param $options * Keyed array of optional options: * itemlimit - Maximum number of items to process * timelimit - Unix timestamp after which to stop processing * idlist - Comma-separated list of source IDs to process, instead of proceeding through * all unmigrated rows * feedback - Keyed array controlling status feedback to the caller * function - PHP function to call, passing a message to be displayed * frequency - How often to call the function * frequency_unit - How to interpret frequency (items or seconds) * * @return * Status of the migration process: */ function migrate_content_process_clear($mcsid, &$messages = array(), &$options = array()) { $itemlimit = $options['itemlimit']; $timelimit = $options['timelimit']; $idlist = $options['idlist']; $lastfeedback = time(); if (isset($options['feedback'])) { $feedback = $options['feedback']['function']; $frequency = $options['feedback']['frequency']; $frequency_unit = $options['feedback']['frequency_unit']; } $result = db_query("SELECT * FROM {migrate_content_sets} WHERE mcsid=%d", $mcsid); $tblinfo = db_fetch_object($result); $desttype = $tblinfo->desttype; $view_name = $tblinfo->view_name; $description = $tblinfo->description; $contenttype = $tblinfo->contenttype; $sourcekey = $tblinfo->sourcekey; $maptable = migrate_map_table_name($mcsid); $msgtablename = migrate_message_table_name($mcsid); $processstart = microtime(TRUE); $status = MIGRATE_STATUS_IN_PROGRESS; // If we're being called on a content set that isn't flagged for clearing, temporarily flag it $original_clearing = $tblinfo->clearing; if (!$original_clearing) { $sql = "UPDATE {migrate_content_sets} SET clearing=1 WHERE mcsid=%d"; db_query($sql, $mcsid); } $deleted = 0; if ($idlist) { $sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ($idlist)"; } else { $sql = "SELECT sourceid,destid FROM {" . $maptable . "}"; } timer_start('delete query'); if ($itemlimit) { $deletelist = db_query_range($sql, 0, $itemlimit); } else { $deletelist = db_query($sql); } timer_stop('delete query'); while ($row = db_fetch_object($deletelist)) { // Recheck clearing flag - permits dynamic interruption of jobs $sql = "SELECT clearing FROM {migrate_content_sets} WHERE mcsid=%d"; $clearing = db_result(db_query($sql, $mcsid)); if (!$clearing) { $status = MIGRATE_STATUS_CANCELLED; break; } // Check for time out if there is time info present if (isset($timelimit) && time() >= $timelimit) { $status = MIGRATE_STATUS_TIMEDOUT; break; } if (isset($feedback)) { if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) || ($frequency_unit == 'items' && $deleted >= $frequency)) { $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $status); $feedback($message); $lastfeedback = time(); $deleted = 0; } } // @TODO: Should return success/failure. Problem: node_delete doesn't return anything... migrate_invoke_all("delete_$contenttype", $row->destid); timer_start('clear map/msg'); db_query("DELETE FROM {" . $maptable . "} WHERE sourceid=%d", $row->sourceid); db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid=%d AND level=%d", $row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL); timer_stop('clear map/msg'); $deleted++; } if ($status == MIGRATE_STATUS_IN_PROGRESS) { $status = MIGRATE_STATUS_SUCCESS; } $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $status); if ($status == MIGRATE_STATUS_SUCCESS) { // Mark that we're done $tblinfo->clearing = 0; migrate_save_content_set($tblinfo); // Remove old messages before beginning new import process db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL); } if (isset($feedback)) { $feedback($message); } else { $messages[] = $message; } watchdog('migrate', $message); if (!$original_clearing) { $sql = "UPDATE {migrate_content_sets} SET clearing=0 WHERE mcsid=%d"; db_query($sql, $mcsid); } return $status; } /** * Import objects from the specified content set * * @param $mcsid * ID of the content set to clear * @param $messages * Array of messages to (ultimately) be displayed by the caller. * @param $options * Keyed array of optional options: * itemlimit - Maximum number of items to process * timelimit - Unix timestamp after which to stop processing * idlist - Comma-separated list of source IDs to process, instead of proceeding through * all unmigrated rows * feedback - Keyed array controlling status feedback to the caller * function - PHP function to call, passing a message to be displayed * frequency - How often to call the function * frequency_unit - How to interpret frequency (items or seconds) * * @return * Status of the migration process: */ function migrate_content_process_import($mcsid, &$messages = array(), &$options = array()) { $itemlimit = $options['itemlimit']; $timelimit = $options['timelimit']; $idlist = $options['idlist']; $lastfeedback = time(); if (isset($options['feedback'])) { $feedback = $options['feedback']['function']; $frequency = $options['feedback']['frequency']; $frequency_unit = $options['feedback']['frequency_unit']; } $result = db_query("SELECT * FROM {migrate_content_sets} WHERE mcsid=%d", $mcsid); $tblinfo = db_fetch_object($result); $desttype = $tblinfo->desttype; $view_name = $tblinfo->view_name; $description = $tblinfo->description; $contenttype = $tblinfo->contenttype; $sourcekey = $tblinfo->sourcekey; $maptable = migrate_map_table_name($mcsid); $msgtablename = migrate_message_table_name($mcsid); $processstart = microtime(TRUE); $status = MIGRATE_STATUS_IN_PROGRESS; // If we're being called on a content set that isn't flagged for importing, temporarily flag it $original_importing = $tblinfo->importing || $tblinfo->scanning; if (!$original_importing) { $sql = "UPDATE {migrate_content_sets} SET importing=1 WHERE mcsid=%d"; db_query($sql, $mcsid); } $collist = db_query("SELECT srcfield, destfield, default_value FROM {migrate_content_mappings} WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '') ORDER BY mcmid", $mcsid); $fields = array(); while ($row = db_fetch_object($collist)) { $fields[$row->destfield]['srcfield'] = $row->srcfield; $fields[$row->destfield]['default_value'] = $row->default_value; } $tblinfo->fields = $fields; $tblinfo->maptable = $maptable; // We pick up everything in the input view that is not already imported, and // not already errored out // Emulate views execute(), so we can scroll through the results ourselves $view = views_get_view($view_name); if (!$view) { $messages[] = t('View !view does not exist - either (re)create this view, or remove the content set using it.', array('!view' => $view_name)); return MIGRATE_STATUS_FAILURE; } $view->build(); // Let modules modify the view just prior to executing it. foreach (module_implements('views_pre_execute') as $module) { $function = $module . '_views_pre_execute'; $function($view); } $viewdb = $view->base_database; // Add a left join to the map table, and only include rows not in the map $join = new views_join; // Views prepends _ to column names other than the base table's // primary key - we need to strip that here for the join to work. But, it's // common for tables to have the tablename beginning field names (e.g., // table cms with PK cms_id). Deal with that as well... $baselen = drupal_strlen($view->base_table); if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) { // So, which case is it? Ask the schema module... db_set_active($viewdb); $inspect = schema_invoke('inspect', db_prefix_tables('{'. $view->base_table .'}')); db_set_active('default'); $tableschema = $inspect[$view->base_table]; $sourcefield = $tableschema['fields'][$sourcekey]; if (!$sourcefield) { $joinkey = drupal_substr($sourcekey, $baselen + 1); $sourcefield = $tableschema['fields'][$joinkey]; if (!$sourcefield) { $messages[] = t("In view !view, can't find key !key for table !table", array('!view' => $view_name, '!key' => $sourcekey, '!table' => $view->base_table)); return MIGRATE_STATUS_FAILURE; } } else { $joinkey = $sourcekey; } } else { $joinkey = $sourcekey; } $join->construct($maptable, $view->base_table, $joinkey, 'sourceid'); $view->query->add_relationship($maptable, $join, $view->base_table); $view->query->add_where(0, "$maptable.sourceid IS NULL", $view->base_table); // Ditto for the errors table $join = new views_join; $join->construct($msgtablename, $view->base_table, $joinkey, 'sourceid'); $view->query->add_relationship($msgtablename, $join, $view->base_table); $view->query->add_where(0, "$msgtablename.sourceid IS NULL", $view->base_table); // If running over a selected list of IDs, pass those in to the query if ($idlist) { $view->query->add_where($view->options['group'], $view->base_table . ".$sourcekey IN ($idlist)", $view->base_table); } // We can't seem to get $view->build() to rebuild build_info, so go straight into the query object $query = $view->query->query(); $query = db_rewrite_sql($query, $view->base_table, $view->base_field, array('view' => &$view)); $args = $view->build_info['query_args']; $replacements = module_invoke_all('views_query_substitutions', $view); $query = str_replace(array_keys($replacements), $replacements, $query); if (is_array($args)) { foreach ($args as $id => $arg) { $args[$id] = str_replace(array_keys($replacements), $replacements, $arg); } } // Now, make the current db name explicit if content set is pulling tables from another DB if ($viewdb <> 'default') { global $db_url; $url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url); $currdb = drupal_substr($url['path'], 1); $query = str_replace('{' . $maptable . '}', $currdb . '.' . '{' . $maptable . '}', $query); $query = str_replace('{' . $msgtablename . '}', $currdb . '.' . '{' . $msgtablename . '}', $query); db_set_active($viewdb); } //drupal_set_message($query); timer_start('execute view query'); if ($itemlimit) { $importlist = db_query_range($query, $args, 0, $itemlimit); } else { $importlist = db_query($query, $args); } timer_stop('execute view query'); if ($viewdb != 'default') { db_set_active('default'); } $imported = 0; timer_start('db_fetch_object'); while ($row = db_fetch_object($importlist)) { timer_stop('db_fetch_object'); // Recheck importing flag - permits dynamic interruption of cron jobs $sql = "SELECT importing,scanning FROM {migrate_content_sets} WHERE mcsid=%d"; $checkrow = db_fetch_object(db_query($sql, $mcsid)); $importing = $checkrow->importing; $scanning = $checkrow->scanning; if (!($importing || $scanning)) { $status = MIGRATE_STATUS_CANCELLED; break; } // Check for time out if there is time info present if (isset($timelimit) && time() >= $timelimit) { $status = MIGRATE_STATUS_TIMEDOUT; break; } if (isset($feedback)) { if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) || ($frequency_unit == 'items' && $imported >= $frequency)) { $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $status); $feedback($message); $lastfeedback = time(); $imported = 0; } } timer_start('import hooks'); $errors = migrate_invoke_all("import_$contenttype", $tblinfo, $row); timer_stop('import hooks'); // Ok, we're done. Preview the node or save it (if no errors). if (count($errors)) { $success = TRUE; foreach ($errors as $error) { if (!isset($error['level'])) { $error['level'] = MIGRATE_MESSAGE_ERROR; } if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) { $success = FALSE; } db_query("INSERT INTO {" . $msgtablename . "} (sourceid, level, message) VALUES('%s', %d, '%s')", $row->$sourcekey, $error['level'], $error['message']); } if ($success) { $imported++; } } else { $imported++; } timer_start('db_fetch_object'); } timer_stop('db_fetch_object'); if ($status == MIGRATE_STATUS_IN_PROGRESS) { $status = MIGRATE_STATUS_SUCCESS; } $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $status); if ($status == MIGRATE_STATUS_SUCCESS) { // Remember we're done if ($importing) { db_query("UPDATE {migrate_content_sets} SET importing=0, lastimported=NOW() WHERE mcsid=%d", $mcsid); } else { db_query("UPDATE {migrate_content_sets} SET lastimported=NOW() WHERE mcsid=%d", $mcsid); } } if (isset($feedback)) { $feedback($message); } else { $messages[] = $message; } watchdog('migrate', $message); if (!$original_importing) { $sql = "UPDATE {migrate_content_sets} SET importing=0 WHERE mcsid=%d"; db_query($sql, $mcsid); } return $status; } /* Revisit function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) { migrate_content_process_all(time()); } */ function migrate_content_process_all_batch($starttime, $limit, $idlist, &$context) { $messages = array(); // A zero max_execution_time means no limit - but let's set a reasonable // limit anyway $starttime = time(); $maxexectime = ini_get('max_execution_time'); if (!$maxexectime) { $maxexectime = 240; } // Initialize the Batch API context $context['finished'] = 0; // The Batch API progress bar will reflect the number of operations being // done (clearing/importing/scanning) if (!isset($context['sandbox']['numops'])) { $numops = 0; $sql = "SELECT COUNT(*) FROM {migrate_content_sets} WHERE clearing=1"; $numops = db_result(db_query($sql)); $sql = "SELECT COUNT(*) FROM {migrate_content_sets} WHERE importing=1 OR scanning=1"; $numops += db_result(db_query($sql)); $context['sandbox']['numops'] = $numops; $context['sandbox']['numopsdone'] = 0; } // For the timelimit, subtract more than enough time to clean up $options = array( 'itemlimit' => $limit, 'timelimit' => $starttime + (($maxexectime < 20) ? $maxexectime : ($maxexectime - 20)), 'idlist' => $idlist, ); $status = migrate_content_process_all($messages, $options); foreach ($messages as $message) { $context['sandbox']['message'] .= $message . '
'; $context['message'] = $context['sandbox']['message']; $context['results'][] = $message; $context['sandbox']['numopsdone'] += $options['opcount']; } // If we did not arrive via a timeout, we must have finished all operations if ($status != MIGRATE_STATUS_TIMEDOUT) { $context['finished'] = 1; } else { // Not done, report what percentage done we are (in terms of number of operations) $context['finished'] = $context['sandbox']['numopsdone']/$context['sandbox']['numops']; } // If requested save timers for eventual display if (variable_get('migrate_display_timers', 0)) { global $timers; foreach ($timers as $name => $timerec) { if (isset($timerec['time'])) { $context['sandbox']['times'][$name] += $timerec['time']/1000; } } // When all done, display the timers if ($context['finished'] == 1 && isset($context['sandbox']['times'])) { global $timers; arsort($context['sandbox']['times']); foreach ($context['sandbox']['times'] as $name => $total) { drupal_set_message("$name: " . round($total, 2)); } } } } /** * Process all enabled migration processes * * @param $messages * Array of messages to (ultimately) be displayed by the caller. * @param $options * Keyed array of optional options: * itemlimit - Maximum number of items to process * timelimit - Unix timestamp after which to stop processing * idlist - Comma-separated list of source IDs to process, instead of proceeding through * all unmigrated rows * opcount - Number of clearing or import operations performed * feedback - Keyed array controlling status feedback to the caller * function - PHP function to call, passing a message to be displayed * frequency - How often to call the function * frequency_unit - How to interpret frequency (items or seconds) * * @return * Status of the migration process: */ function migrate_content_process_all(&$messages = array(), &$options = array()) { if (variable_get('migrate_semaphore', FALSE)) { drupal_set_message('There is an import process already in progress'); return 0; } variable_set('migrate_semaphore', TRUE); // First, perform any clearing actions in reverse order $result = db_query("SELECT mcsid FROM {migrate_content_sets} WHERE clearing=1 ORDER BY weight DESC"); $context['sandbox']['timedout'] = FALSE; while ($row = db_fetch_object($result)) { $status = migrate_content_process_clear($row->mcsid, $messages, $options); if ($status != MIGRATE_STATUS_SUCCESS) { break; } $options['opcount']++; } // Then, any import actions going forward $result = db_query("SELECT mcsid FROM {migrate_content_sets} WHERE importing=1 OR scanning=1 ORDER BY weight"); while ($row = db_fetch_object($result)) { $status = migrate_content_process_import($row->mcsid, $messages, $options); if ($status != MIGRATE_STATUS_SUCCESS) { break; } $options['opcount']++; } variable_del('migrate_semaphore'); return $status; } function _migrate_progress_message($starttime, $numitems, $description, $import = TRUE, $status = MIGRATE_STATUS_SUCCESS) { $time = (microtime(TRUE) - $starttime); if ($time > 0) { $perminute = round(60*$numitems/$time); $time = round($time, 1); } else { $perminute = '?'; } if ($import) { switch ($status) { case MIGRATE_STATUS_SUCCESS: $basetext = "!numitems items imported in !time seconds (!perminute/min) - done importing '!description'";; break; case MIGRATE_STATUS_FAILURE: $basetext = "!numitems items imported in !time seconds (!perminute/min) - failure importing '!description'";; break; case MIGRATE_STATUS_TIMEDOUT: case MIGRATE_STATUS_IN_PROGRESS: $basetext = "!numitems items imported in !time seconds (!perminute/min) - continuing importing '!description'"; break; case MIGRATE_STATUS_CANCELLED: $basetext = "!numitems items imported in !time seconds (!perminute/min) - cancelled importing '!description'"; break; } } else { switch ($status) { case MIGRATE_STATUS_SUCCESS: $basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - done clearing '!description'";; break; case MIGRATE_STATUS_FAILURE: $basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - failure clearing '!description'";; break; case MIGRATE_STATUS_TIMEDOUT: case MIGRATE_STATUS_IN_PROGRESS: $basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - continuing clearing '!description'"; break; case MIGRATE_STATUS_CANCELLED: $basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - cancelled clearing '!description'"; break; } } $message = t($basetext, array('!numitems' => $numitems, '!time' => $time, '!perminute' => $perminute, '!description' => $description)); return $message; } /* * Implementation of hook_init(). */ function migrate_init() { // Loads the hooks for the supported modules. // TODO: Be more lazy - only load when really needed $path = drupal_get_path('module', 'migrate') .'/modules'; $files = drupal_system_listing('.*\.inc$', $path, 'name', 0); foreach ($files as $module_name => $file) { if (module_exists($module_name)) { include_once($file->filename); } } // Add main CSS functionality. drupal_add_css(drupal_get_path('module', 'migrate') .'/migrate.css'); } /** * Implementation of hook_action_info(). */ /* Revisit function migrate_action_info() { $info['migrate_content_process_clear'] = array( 'type' => 'migrate', 'description' => t('Clear a migration content set'), 'configurable' => FALSE, 'hooks' => array( 'cron' => array('run'), ), ); $info['migrate_content_process_import'] = array( 'type' => 'migrate', 'description' => t('Import a migration content set'), 'configurable' => FALSE, 'hooks' => array( 'cron' => array('run'), ), ); $info['migrate_content_process_all_action'] = array( 'type' => 'migrate', 'description' => t('Perform all active migration processes'), 'configurable' => FALSE, 'hooks' => array( 'cron' => array('run'), ), ); return $info; } */ /** * Implementation of hook_cron(). * */ function migrate_cron() { $path = drupal_get_path('module', 'migrate') . '/migrate_pages.inc'; include_once($path); // Elevate privileges so node deletion/creation works in cron session_save_session(FALSE); global $user; $saveuser = $user; $user = user_load(array('uid' => 1)); $messages = array(); // A zero max_execution_time means no limit - but let's set a reasonable // limit anyway $starttime = variable_get('cron_semaphore', 0); $maxexectime = ini_get('max_execution_time'); if (!$maxexectime) { $maxexectime = 240; } $options = array('timelimit' => $starttime + (($maxexectime < 20) ? $maxexectime : ($maxexectime - 20))); migrate_content_process_all($messages, $options); $user = $saveuser; session_save_session(TRUE); } /** * Implementation of hook_perm(). */ function migrate_perm() { return array(MIGRATE_ACCESS_BASIC, MIGRATE_ACCESS_ADVANCED); } /** * Implementation of hook_help(). */ function migrate_help($page, $arg) { switch ($page) { case 'admin/content/migrate': return theme('advanced_help_topic', 'migrate', 'about', 'icon') . t('Click the question marks like this one to read the migrate module help topics.'); case 'admin/content/migrate/content_sets': return t('Define sets of mappings from imported tables to Drupal content. These are the migrations which are later processed.'); case 'admin/content/migrate/process': return t('View and manage import processes here. Processes that are in progress are checked - they can be cancelled by unchecking, or new processes begun by checking, then clicking Submit. Any checked process will run in the background (via cron) automatically - you may also run them interactively.'); case 'admin/content/migrate/tools': return t('Besides content that is migrated into a new site, nodes may be manually created during the testing process. Typically you will want to clear these before the final migration - if you are absolutely positive that all nodes of a given type should be deleted, you may do so here.'); } } /** * Implementation of hook_menu(). */ function migrate_menu() { $items = array(); $items['admin/content/migrate'] = array( 'title' => 'Migrate', 'description' => 'Manage data migration from external sources', 'page callback' => 'migrate_front', 'access arguments' => array(MIGRATE_ACCESS_BASIC), 'file' => 'migrate_pages.inc', ); $items['admin/content/migrate/content_sets'] = array( 'title' => 'Content sets', 'description' => 'Manage content sets: mappings of source data to Drupal content', 'weight' => 2, 'page callback' => 'migrate_content_sets', 'access arguments' => array(MIGRATE_ACCESS_ADVANCED), 'file' => 'migrate_pages.inc', ); $items['admin/content/migrate/process'] = array( 'title' => 'Process', 'description' => 'Perform and monitor the creation of Drupal content from source data', 'weight' => 3, 'page callback' => 'migrate_dashboard', 'access arguments' => array(MIGRATE_ACCESS_BASIC), 'file' => 'migrate_pages.inc', ); $items['admin/content/migrate/tools'] = array( 'title' => 'Tools', 'description' => 'Additional tools for managing migration', 'weight' => 4, 'page callback' => 'migrate_tools', 'access arguments' => array(MIGRATE_ACCESS_ADVANCED), 'file' => 'migrate_pages.inc', ); $items['admin/content/migrate/settings'] = array( 'title' => 'Settings', 'description' => 'Migrate module settings', 'weight' => 5, 'page callback' => 'migrate_settings', 'access arguments' => array(MIGRATE_ACCESS_ADVANCED), 'file' => 'migrate_pages.inc', ); $items['admin/content/migrate/content_sets/%'] = array( 'title' => 'Content set', 'page callback' => 'drupal_get_form', 'page arguments' => array('migrate_content_set_mappings', 4), 'access arguments' => array(MIGRATE_ACCESS_ADVANCED), 'type' => MENU_CALLBACK, 'file' => 'migrate_pages.inc', ); $items['migrate/xlat/%'] = array( 'page callback' => 'migrate_xlat', 'access arguments' => array('access content'), 'page arguments' => array(2), 'type' => MENU_CALLBACK, ); return $items; } /** * Implementation of hook_schema_alter(). * @param $schema */ function migrate_schema_alter(&$schema) { // Check for table existence - at install time, hook_schema_alter() may be called // before our install hook. if (db_table_exists('migrate_content_sets')) { $result = db_query("SELECT * FROM {migrate_content_sets}"); while ($content_set = db_fetch_object($result)) { $maptablename = migrate_map_table_name($content_set->mcsid); $msgtablename = migrate_message_table_name($content_set->mcsid); // Get the proper field definition for the sourcekey $view = views_get_view($content_set->view_name); if (!$view) { drupal_set_message(t('View !view does not exist - either (re)create this view, or remove the migrate content set using it.', array('!view' => $content_set->view_name))); continue; } // Must do this to load the database $view->init_query(); // TODO: For now, PK must be in base_table $tabledb = $view->base_database; $tablename = $view->base_table; db_set_active($tabledb); $inspect = schema_invoke('inspect'); db_set_active('default'); $sourceschema = $inspect[$tablename]; // If the PK of the content set is defined, make sure we have a mapping table if ($sourcekey = $content_set->sourcekey) { $sourcefield = $sourceschema['fields'][$sourcekey]; if (!$sourcefield) { // strip base table name if views prepended it $baselen = drupal_strlen($tablename); if (!strncasecmp($sourcekey, $tablename . '_', $baselen + 1)) { $sourcekey = drupal_substr($sourcekey, $baselen + 1); } $sourcefield = $sourceschema['fields'][$sourcekey]; } // We don't want serial fields to behave serially, so change to int if ($sourcefield['type'] == 'serial') { $sourcefield['type'] = 'int'; } $schema[$maptablename] = _migrate_map_table_schema($sourcefield); $schema[$msgtablename] = _migrate_message_table_schema($sourcefield); } } } } /* * Translate URIs from an old site to the new one * Requires adding RewriteRules to .htaccess. For example, if the URLs * for news articles had the form * http://example.com/issues/news/[OldID].html, use this rule: * * RewriteRule ^issues/news/([0-9]+).html$ /migrate/xlat/node/$1 [L] */ function migrate_xlat($contenttype=NULL, $oldid=NULL) { $uri = ''; if ($contenttype && $oldid) { $newid = _migrate_xlat_get_new_id($contenttype, $oldid); if ($newid) { $uri = migrate_invoke_all("xlat_$contenttype", $newid); drupal_goto($uri[0], NULL, NULL, 301); } } } /* * Helper function to translate an ID from a source file to the corresponding * Drupal-side ID (nid, uid, etc.) * * TODO: Update to new world (content sets as the basis of migration) */ function _migrate_xlat_get_new_id($contenttype, $oldid) { $result = db_query("SELECT DISTINCT mf.importtable, mc.colname FROM {migrate_content_sets} mcs INNER JOIN {migrate_files} mf ON mcs.mfid=mf.mfid INNER JOIN {migrate_columns} mc ON mf.mfid=mc.mfid AND chosenpk=1 WHERE mcs.contenttype='%s'", $contenttype); while ($row = db_fetch_object($result)) { $table = migrate_map_table_name($row->mcsid); $pkcol = $row->colname; $id = db_result(db_query("SELECT destid FROM {$table} WHERE sourceid=%d", $oldid)); if ($id) { return $id; } } return NULL; } /** * Implementation of hook_theme(). * * Registers all theme functions used in this module. */ function migrate_theme() { return array( 'migrate_mapping_table' => array('arguments' => array('form')), '_migrate_dashboard_form' => array( 'arguments' => array('form' => NULL), 'function' => 'theme_migrate_dashboard', ), '_migrate_tools_form' => array( 'arguments' => array('form' => NULL), 'function' => 'theme_migrate_tools', ), '_migrate_settings_form' => array( 'arguments' => array('form' => NULL), 'function' => 'theme_migrate_settings', ), 'migrate_content_set_mappings' => array( 'arguments' => array('form' => NULL), 'function' => 'theme_migrate_content_set_mappings', ), ); } function migrate_map_table_name($mcsid) { return "migrate_map_$mcsid"; } function migrate_message_table_name($mcsid) { return "migrate_msgs_$mcsid"; } function _migrate_map_table_name($mcsid) { return migrate_map_table_name($mcsid); } function _migrate_message_table_name($mcsid) { return migrate_message_table_name($mcsid); } function _migrate_map_table_schema($sourcefield) { $schema = array( 'description' => t('Mappings from source key to destination key'), 'fields' => array( 'sourceid' => $sourcefield, // @TODO: Assumes destination key is unsigned int 'destid' => array( 'type' => 'int', 'unsigned' => TRUE, 'not null' => TRUE, ), ), 'primary key' => array('sourceid'), 'indexes' => array( 'idkey' => array('destid'), ), ); return $schema; } function _migrate_message_table_schema($sourcefield) { $schema = array( 'description' => t('Import errors'), 'fields' => array( 'mceid' => array( 'type' => 'serial', 'unsigned' => TRUE, 'not null' => TRUE, ), 'sourceid' => $sourcefield, 'level' => array( 'type' => 'int', 'unsigned' => TRUE, 'not null' => TRUE, 'default' => 1, ), 'message' => array( 'type' => 'text', 'size' => 'medium', 'not null' => TRUE, ), ), 'primary key' => array('mceid'), 'indexes' => array( 'sourceid' => array('sourceid'), ), ); return $schema; } function migrate_views_api() { return array('api' => '2.0'); } /** * Check to see if the advanced help module is installed, and if not put up * a message. * * Only call this function if the user is already in a position for this to * be useful. */ function migrate_check_advanced_help() { if (variable_get('migrate_hide_help_message', FALSE)) { return; } if (!module_exists('advanced_help')) { $filename = db_result(db_query("SELECT filename FROM {system} WHERE type = 'module' AND name = 'advanced_help'")); if ($filename && file_exists($filename)) { drupal_set_message(t('If you enable the advanced help module, Migrate will provide more and better help. Hide this message.', array('@modules' => url('admin/build/modules'), '@hide' => url('admin/build/views/tools')))); } else { drupal_set_message(t('If you install the advanced help module from !href, Migrate will provide more and better help. Hide this message.', array('!href' => l('http://drupal.org/project/advanced_help', 'http://drupal.org/project/advanced_help'), '@hide' => url('admin/content/migrate/settings')))); } } } /** * Check if a date is valid and return the correct * timestamp to use. Returns -1 if the date is not * considered valid. */ function _migrate_valid_date($date) { //TODO: really check whether the date is valid!! if (empty($date)) { return -1; } if (is_numeric($date) && $date > -1) { return $date; } // strtotime() doesn't recognize dashes as separators, change to slashes $date = str_replace('-', '/', $date); $time = strtotime($date); if ($time < 0 || !$time) { // Handles form YYYY-MM-DD HH:MM:SS.garbage if (drupal_strlen($date) > 19) { $time = strtotime(drupal_substr($date, 0, 19)); if ($time < 0 || !$time) { return -1; } } } return $time; }