> content >> migrate"
* for analyzing data from various sources and importing them into Drupal tables.
*/
/**
* Call a migrate hook
*/
function migrate_invoke_all($hook) {
// Let modules do any one-time initialization (e.g., including migration support files)
module_invoke_all('migrate_init');
$args = func_get_args();
$hookfunc = "migrate" . "_$hook";
unset($args[0]);
$return = array();
$modulelist = module_implements($hookfunc);
foreach ($modulelist as $module) {
$function = $module . '_' . $hookfunc;
$result = call_user_func_array($function, $args);
if (isset($result) && is_array($result)) {
$return = array_merge_recursive($return, $result);
}
elseif (isset($result)) {
$return[] = $result;
}
}
return $return;
}
/**
* Call a destination hook (e.g., hook_migrate_prepare_node). Use this version
* for hooks with the precise signature below, so that the object can be passed by
* reference.
*
* @param $hook
* @param $object
* @param $tblinfo
* @param $row
* @return
*/
function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) {
// We could have used module_invoke_all, but unfortunately
// module_invoke_all passes all arguments by value.
$errors = array();
$hook = 'migrate_' . $hook;
foreach (module_implements($hook) as $module_name) {
$function = $module_name . '_' . $hook;
if (function_exists($function)) {
timer_start($function);
$errors = array_merge($errors, (array)$function($object, $tblinfo, $row));
timer_stop($function);
}
}
return $errors;
}
/**
* Save a new or updated content set
*
* @param $content_set
* An array or object representing the content set. This is passed by reference (so
* when adding a new content set the ID can be set)
* @param $options
* Array of additional options for saving the content set. Currently:
* base_table: The base table of the view - if provided, we don't need
* to load the view.
* base_database: The database of the base table - if base_table is present
* and base_database omitted, it defaults to 'default'
* @return
* The ID of the content set that was saved, or NULL if nothing was saved
*/
function migrate_save_content_set(&$content_set, $options = array()) {
// Deal with objects internally (but remember if we need to put the parameter
// back to an array)
if (is_array($content_set)) {
$was_array = TRUE;
$content_set = (object) $content_set;
}
else {
$was_array = FALSE;
}
// Update or insert the content set record as appropriate
if (isset($content_set->mcsid)) {
drupal_write_record('migrate_content_sets', $content_set, 'mcsid');
}
else {
drupal_write_record('migrate_content_sets', $content_set);
}
// Create or modify map and message tables
$maptablename = migrate_map_table_name($content_set->mcsid);
$msgtablename = migrate_message_table_name($content_set->mcsid);
// TODO: For now, PK must be in base_table
// If the caller tells us the base table of the view, we don't need
// to load the view (which would not work when called from hook_install())
if (isset($options['base_table'])) {
$tablename = $options['base_table'];
if (isset($options['base_database'])) {
$tabledb = $options['base_database'];
}
else {
$tabledb = 'default';
}
}
else {
// Get the proper field definition for the sourcekey
$view = views_get_view($content_set->view_name);
if (!$view) {
drupal_set_message(t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $content_set->view_name)));
return NULL;
}
// Must do this to load the database
$view->init_query();
$tabledb = $view->base_database;
$tablename = $view->base_table;
}
db_set_active($tabledb);
$inspect = schema_invoke('inspect');
db_set_active('default');
$sourceschema = $inspect[$tablename];
// If the PK of the content set is defined, make sure we have a mapping table
if ($content_set->sourcekey) {
$sourcefield = $sourceschema['fields'][$content_set->sourcekey];
// The field name might be
_...
if (!$sourcefield) {
$sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1);
$sourcefield = $sourceschema['fields'][$sourcekey];
}
// But - we don't want serial fields to behave serially, so change to int
if ($sourcefield['type'] == 'serial') {
$sourcefield['type'] = 'int';
}
$schema_change = FALSE;
if (!db_table_exists($maptablename)) {
$schema = _migrate_map_table_schema($sourcefield);
db_create_table($ret, $maptablename, $schema);
// Expose map table to views
tw_add_tables(array($maptablename));
tw_add_fk($maptablename, 'destid');
$schema = _migrate_message_table_schema($sourcefield);
db_create_table($ret, $msgtablename, $schema);
// Expose messages table to views
tw_add_tables(array($msgtablename));
tw_add_fk($msgtablename, 'sourceid');
$schema_change = TRUE;
}
else {
// TODO: Deal with varchar->int case where there is existing non-int data
$desired_schema = _migrate_map_table_schema($sourcefield);
$actual_schema = $inspect[$maptablename];
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_primary_key($ret, $maptablename);
db_change_field($ret, $maptablename, 'sourceid', 'sourceid',
$sourcefield, array('primary key' => array('sourceid')));
tw_perform_analysis($maptablename);
$schema_change = TRUE;
}
$desired_schema = _migrate_message_table_schema($sourcefield);
$actual_schema = $inspect[$msgtablename];
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_index($ret, $msgtablename, 'sourceid');
db_change_field($ret, $msgtablename, 'sourceid', 'sourceid',
$sourcefield, array('indexes' => array('sourceid' => array('sourceid'))));
tw_perform_analysis($maptablename);
$schema_change = TRUE;
}
}
// Make sure the schema gets updated to reflect changes
if ($schema_change) {
cache_clear_all('schema', 'cache');
}
}
if ($was_array) {
$content_set = (array)$content_set;
return $content_set['mcsid'];
}
else {
return $content_set->mcsid;
}
}
function migrate_save_content_mapping(&$mapping) {
if ($mapping->mcmid) {
drupal_write_record('migrate_content_mappings', $mapping, 'mcmid');
}
else {
drupal_write_record('migrate_content_mappings', $mapping);
}
}
function migrate_delete_content_set($mcsid) {
// First, remove the map and message tables from the Table Wizard, and drop them
$ret = array();
$maptable = migrate_map_table_name($mcsid);
$msgtable = migrate_message_table_name($mcsid);
if (db_table_exists($maptable)) {
tw_remove_tables(array($maptable, $msgtable));
db_drop_table($ret, $maptable);
db_drop_table($ret, $msgtable);
}
// Then, delete the content set data
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d";
db_query($sql, $mcsid);
$sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d";
db_query($sql, $mcsid);
}
function migrate_delete_content_mapping($mcmid) {
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d";
db_query($sql, $mcmid);
}
/**
* Convenience function for generating a message array
*
* @param $message
* Text describing the error condition
* @param $type
* One of the MIGRATE_MESSAGE constants, identifying the level of error
* @return
* Structured array suitable for return from an import hook
*/
function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) {
$error = array(
'level' => $type,
'message' => $message,
);
return $error;
}
/**
* Add a mapping from source ID to destination ID for the specified content set
*
* @param $mcsid
* ID of the content set being processed
* @param $sourceid
* Primary key value from the source
* @param $destid
* Primary key value from the destination
*/
function migrate_add_mapping($mcsid, $sourceid, $destid) {
static $maptables = array();
if (!isset($maptables[$mcsid])) {
$maptables[$mcsid] = migrate_map_table_name($mcsid);
}
$mapping = new stdClass;
$mapping->sourceid = $sourceid;
$mapping->destid = $destid;
drupal_write_record($maptables[$mcsid], $mapping);
}
/**
* Clear migrated objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $messages
* Array of messages to (ultimately) be displayed by the caller.
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_clear($mcsid, &$messages = array(), &$options = array()) {
$itemlimit = $options['itemlimit'];
$timelimit = $options['timelimit'];
$idlist = $options['idlist'];
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = $options['feedback']['frequency'];
$frequency_unit = $options['feedback']['frequency_unit'];
}
$result = db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d", $mcsid);
$tblinfo = db_fetch_object($result);
$desttype = $tblinfo->desttype;
$view_name = $tblinfo->view_name;
$description = $tblinfo->description;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
$processstart = microtime(TRUE);
$status = MIGRATE_STATUS_IN_PROGRESS;
// If we're being called on a content set that isn't flagged for clearing, temporarily flag it
$original_clearing = $tblinfo->clearing;
if (!$original_clearing) {
$sql = "UPDATE {migrate_content_sets} SET clearing=1 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
$deleted = 0;
if ($idlist) {
$sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ($idlist)";
}
else {
$sql = "SELECT sourceid,destid FROM {" . $maptable . "}";
}
timer_start('delete query');
if ($itemlimit) {
$deletelist = db_query_range($sql, 0, $itemlimit);
}
else {
$deletelist = db_query($sql);
}
timer_stop('delete query');
while ($row = db_fetch_object($deletelist)) {
// Recheck clearing flag - permits dynamic interruption of jobs
$sql = "SELECT clearing FROM {migrate_content_sets} WHERE mcsid=%d";
$clearing = db_result(db_query($sql, $mcsid));
if (!$clearing) {
$status = MIGRATE_STATUS_CANCELLED;
break;
}
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$status = MIGRATE_STATUS_TIMEDOUT;
break;
}
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $deleted >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $status);
$feedback($message);
$lastfeedback = time();
$deleted = 0;
}
}
// @TODO: Should return success/failure. Problem: node_delete doesn't return anything...
migrate_invoke_all("delete_$contenttype", $row->destid);
timer_start('clear map/msg');
db_query("DELETE FROM {" . $maptable . "} WHERE sourceid=%d", $row->sourceid);
db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid=%d AND level=%d",
$row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL);
timer_stop('clear map/msg');
$deleted++;
}
if ($status == MIGRATE_STATUS_IN_PROGRESS) {
$status = MIGRATE_STATUS_SUCCESS;
}
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $status);
if ($status == MIGRATE_STATUS_SUCCESS) {
// Mark that we're done
$tblinfo->clearing = 0;
migrate_save_content_set($tblinfo);
// Remove old messages before beginning new import process
db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL);
}
if (isset($feedback)) {
$feedback($message);
}
else {
$messages[] = $message;
}
watchdog('migrate', $message);
if (!$original_clearing) {
$sql = "UPDATE {migrate_content_sets} SET clearing=0 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
return $status;
}
/**
* Import objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $messages
* Array of messages to (ultimately) be displayed by the caller.
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_import($mcsid, &$messages = array(), &$options = array()) {
$itemlimit = $options['itemlimit'];
$timelimit = $options['timelimit'];
$idlist = $options['idlist'];
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = $options['feedback']['frequency'];
$frequency_unit = $options['feedback']['frequency_unit'];
}
$result = db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d", $mcsid);
$tblinfo = db_fetch_object($result);
$desttype = $tblinfo->desttype;
$view_name = $tblinfo->view_name;
$description = $tblinfo->description;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
$processstart = microtime(TRUE);
$status = MIGRATE_STATUS_IN_PROGRESS;
// If we're being called on a content set that isn't flagged for importing, temporarily flag it
$original_importing = $tblinfo->importing || $tblinfo->scanning;
if (!$original_importing) {
$sql = "UPDATE {migrate_content_sets} SET importing=1 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
$collist = db_query("SELECT srcfield, destfield, default_value
FROM {migrate_content_mappings}
WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')
ORDER BY mcmid",
$mcsid);
$fields = array();
while ($row = db_fetch_object($collist)) {
$fields[$row->destfield]['srcfield'] = $row->srcfield;
$fields[$row->destfield]['default_value'] = $row->default_value;
}
$tblinfo->fields = $fields;
$tblinfo->maptable = $maptable;
// We pick up everything in the input view that is not already imported, and
// not already errored out
// Emulate views execute(), so we can scroll through the results ourselves
$view = views_get_view($view_name);
if (!$view) {
$messages[] = t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $view_name));
return MIGRATE_STATUS_FAILURE;
}
$view->build();
// Let modules modify the view just prior to executing it.
foreach (module_implements('views_pre_execute') as $module) {
$function = $module . '_views_pre_execute';
$function($view);
}
$viewdb = $view->base_database;
// Add a left join to the map table, and only include rows not in the map
$join = new views_join;
// Views prepends _ to column names other than the base table's
// primary key - we need to strip that here for the join to work. But, it's
// common for tables to have the tablename beginning field names (e.g.,
// table cms with PK cms_id). Deal with that as well...
$baselen = drupal_strlen($view->base_table);
if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {
// So, which case is it? Ask the schema module...
db_set_active($viewdb);
$inspect = schema_invoke('inspect', db_prefix_tables('{'. $view->base_table .'}'));
db_set_active('default');
$tableschema = $inspect[$view->base_table];
$sourcefield = $tableschema['fields'][$sourcekey];
if (!$sourcefield) {
$joinkey = drupal_substr($sourcekey, $baselen + 1);
$sourcefield = $tableschema['fields'][$joinkey];
if (!$sourcefield) {
$messages[] = t("In view !view, can't find key !key for table !table",
array('!view' => $view_name, '!key' => $sourcekey, '!table' => $view->base_table));
return MIGRATE_STATUS_FAILURE;
}
}
else {
$joinkey = $sourcekey;
}
}
else {
$joinkey = $sourcekey;
}
$join->construct($maptable, $view->base_table, $joinkey, 'sourceid');
$view->query->add_relationship($maptable, $join, $view->base_table);
$view->query->add_where(0, "$maptable.sourceid IS NULL", $view->base_table);
// Ditto for the errors table
$join = new views_join;
$join->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
$view->query->add_relationship($msgtablename, $join, $view->base_table);
$view->query->add_where(0, "$msgtablename.sourceid IS NULL", $view->base_table);
// If running over a selected list of IDs, pass those in to the query
if ($idlist) {
$view->query->add_where($view->options['group'], $view->base_table . ".$sourcekey IN ($idlist)",
$view->base_table);
}
// We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
$query = $view->query->query();
$query = db_rewrite_sql($query, $view->base_table, $view->base_field,
array('view' => &$view));
$args = $view->build_info['query_args'];
$replacements = module_invoke_all('views_query_substitutions', $view);
$query = str_replace(array_keys($replacements), $replacements, $query);
if (is_array($args)) {
foreach ($args as $id => $arg) {
$args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
}
}
// Now, make the current db name explicit if content set is pulling tables from another DB
if ($viewdb <> 'default') {
global $db_url;
$url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
$currdb = drupal_substr($url['path'], 1);
$query = str_replace('{' . $maptable . '}',
$currdb . '.' . '{' . $maptable . '}', $query);
$query = str_replace('{' . $msgtablename . '}',
$currdb . '.' . '{' . $msgtablename . '}', $query);
db_set_active($viewdb);
}
//drupal_set_message($query);
timer_start('execute view query');
if ($itemlimit) {
$importlist = db_query_range($query, $args, 0, $itemlimit);
}
else {
$importlist = db_query($query, $args);
}
timer_stop('execute view query');
if ($viewdb != 'default') {
db_set_active('default');
}
$imported = 0;
timer_start('db_fetch_object');
while ($row = db_fetch_object($importlist)) {
timer_stop('db_fetch_object');
// Recheck importing flag - permits dynamic interruption of cron jobs
$sql = "SELECT importing,scanning FROM {migrate_content_sets} WHERE mcsid=%d";
$checkrow = db_fetch_object(db_query($sql, $mcsid));
$importing = $checkrow->importing;
$scanning = $checkrow->scanning;
if (!($importing || $scanning)) {
$status = MIGRATE_STATUS_CANCELLED;
break;
}
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$status = MIGRATE_STATUS_TIMEDOUT;
break;
}
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $imported >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $status);
$feedback($message);
$lastfeedback = time();
$imported = 0;
}
}
timer_start('import hooks');
$errors = migrate_invoke_all("import_$contenttype", $tblinfo, $row);
timer_stop('import hooks');
// Ok, we're done. Preview the node or save it (if no errors).
if (count($errors)) {
$success = TRUE;
foreach ($errors as $error) {
if (!isset($error['level'])) {
$error['level'] = MIGRATE_MESSAGE_ERROR;
}
if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
$success = FALSE;
}
db_query("INSERT INTO {" . $msgtablename . "}
(sourceid, level, message)
VALUES('%s', %d, '%s')",
$row->$sourcekey, $error['level'], $error['message']);
}
if ($success) {
$imported++;
}
}
else {
$imported++;
}
timer_start('db_fetch_object');
}
timer_stop('db_fetch_object');
if ($status == MIGRATE_STATUS_IN_PROGRESS) {
$status = MIGRATE_STATUS_SUCCESS;
}
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $status);
if ($status == MIGRATE_STATUS_SUCCESS) {
// Remember we're done
if ($importing) {
db_query("UPDATE {migrate_content_sets}
SET importing=0, lastimported=NOW()
WHERE mcsid=%d",
$mcsid);
}
else {
db_query("UPDATE {migrate_content_sets}
SET lastimported=NOW()
WHERE mcsid=%d",
$mcsid);
}
}
if (isset($feedback)) {
$feedback($message);
}
else {
$messages[] = $message;
}
watchdog('migrate', $message);
if (!$original_importing) {
$sql = "UPDATE {migrate_content_sets} SET importing=0 WHERE mcsid=%d";
db_query($sql, $mcsid);
}
return $status;
}
/* Revisit
function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) {
migrate_content_process_all(time());
}
*/
function migrate_content_process_all_batch($starttime, $limit, $idlist, &$context) {
$messages = array();
// A zero max_execution_time means no limit - but let's set a reasonable
// limit anyway
$starttime = time();
$maxexectime = ini_get('max_execution_time');
if (!$maxexectime) {
$maxexectime = 240;
}
// Initialize the Batch API context
$context['finished'] = 0;
// The Batch API progress bar will reflect the number of operations being
// done (clearing/importing/scanning)
if (!isset($context['sandbox']['numops'])) {
$numops = 0;
$sql = "SELECT COUNT(*) FROM {migrate_content_sets} WHERE clearing=1";
$numops = db_result(db_query($sql));
$sql = "SELECT COUNT(*) FROM {migrate_content_sets} WHERE importing=1 OR scanning=1";
$numops += db_result(db_query($sql));
$context['sandbox']['numops'] = $numops;
$context['sandbox']['numopsdone'] = 0;
}
// For the timelimit, subtract more than enough time to clean up
$options = array(
'itemlimit' => $limit,
'timelimit' => $starttime + (($maxexectime < 20) ? $maxexectime : ($maxexectime - 20)),
'idlist' => $idlist,
);
$status = migrate_content_process_all($messages, $options);
foreach ($messages as $message) {
$context['sandbox']['message'] .= $message . '
';
$context['message'] = $context['sandbox']['message'];
$context['results'][] = $message;
$context['sandbox']['numopsdone'] += $options['opcount'];
}
// If we did not arrive via a timeout, we must have finished all operations
if ($status != MIGRATE_STATUS_TIMEDOUT) {
$context['finished'] = 1;
}
else {
// Not done, report what percentage done we are (in terms of number of operations)
$context['finished'] = $context['sandbox']['numopsdone']/$context['sandbox']['numops'];
}
// If requested save timers for eventual display
if (variable_get('migrate_display_timers', 0)) {
global $timers;
foreach ($timers as $name => $timerec) {
if (isset($timerec['time'])) {
$context['sandbox']['times'][$name] += $timerec['time']/1000;
}
}
// When all done, display the timers
if ($context['finished'] == 1 && isset($context['sandbox']['times'])) {
global $timers;
arsort($context['sandbox']['times']);
foreach ($context['sandbox']['times'] as $name => $total) {
drupal_set_message("$name: " . round($total, 2));
}
}
}
}
/**
* Process all enabled migration processes
*
* @param $messages
* Array of messages to (ultimately) be displayed by the caller.
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* opcount - Number of clearing or import operations performed
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_all(&$messages = array(), &$options = array()) {
if (variable_get('migrate_semaphore', FALSE)) {
drupal_set_message('There is an import process already in progress');
return 0;
}
variable_set('migrate_semaphore', TRUE);
// First, perform any clearing actions in reverse order
$result = db_query("SELECT mcsid
FROM {migrate_content_sets}
WHERE clearing=1
ORDER BY weight DESC");
$context['sandbox']['timedout'] = FALSE;
while ($row = db_fetch_object($result)) {
$status = migrate_content_process_clear($row->mcsid, $messages, $options);
if ($status != MIGRATE_STATUS_SUCCESS) {
break;
}
$options['opcount']++;
}
// Then, any import actions going forward
$result = db_query("SELECT mcsid
FROM {migrate_content_sets}
WHERE importing=1 OR scanning=1
ORDER BY weight");
while ($row = db_fetch_object($result)) {
$status = migrate_content_process_import($row->mcsid, $messages, $options);
if ($status != MIGRATE_STATUS_SUCCESS) {
break;
}
$options['opcount']++;
}
variable_del('migrate_semaphore');
return $status;
}
function _migrate_progress_message($starttime, $numitems, $description, $import = TRUE, $status = MIGRATE_STATUS_SUCCESS) {
$time = (microtime(TRUE) - $starttime);
if ($time > 0) {
$perminute = round(60*$numitems/$time);
$time = round($time, 1);
}
else {
$perminute = '?';
}
if ($import) {
switch ($status) {
case MIGRATE_STATUS_SUCCESS:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - done importing '!description'";;
break;
case MIGRATE_STATUS_FAILURE:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - failure importing '!description'";;
break;
case MIGRATE_STATUS_TIMEDOUT:
case MIGRATE_STATUS_IN_PROGRESS:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - continuing importing '!description'";
break;
case MIGRATE_STATUS_CANCELLED:
$basetext = "!numitems items imported in !time seconds (!perminute/min) - cancelled importing '!description'";
break;
}
}
else {
switch ($status) {
case MIGRATE_STATUS_SUCCESS:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - done clearing '!description'";;
break;
case MIGRATE_STATUS_FAILURE:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - failure clearing '!description'";;
break;
case MIGRATE_STATUS_TIMEDOUT:
case MIGRATE_STATUS_IN_PROGRESS:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - continuing clearing '!description'";
break;
case MIGRATE_STATUS_CANCELLED:
$basetext = "!numitems previously-imported items deleted in !time seconds (!perminute/min) - cancelled clearing '!description'";
break;
}
}
$message = t($basetext,
array('!numitems' => $numitems, '!time' => $time, '!perminute' => $perminute,
'!description' => $description));
return $message;
}
/*
* Implementation of hook_init().
*/
function migrate_init() {
// Loads the hooks for the supported modules.
// TODO: Be more lazy - only load when really needed
$path = drupal_get_path('module', 'migrate') .'/modules';
$files = drupal_system_listing('.*\.inc$', $path, 'name', 0);
foreach ($files as $module_name => $file) {
if (module_exists($module_name)) {
include_once($file->filename);
}
}
// Add main CSS functionality.
drupal_add_css(drupal_get_path('module', 'migrate') .'/migrate.css');
}
/**
* Implementation of hook_action_info().
*/
/* Revisit
function migrate_action_info() {
$info['migrate_content_process_clear'] = array(
'type' => 'migrate',
'description' => t('Clear a migration content set'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
$info['migrate_content_process_import'] = array(
'type' => 'migrate',
'description' => t('Import a migration content set'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
$info['migrate_content_process_all_action'] = array(
'type' => 'migrate',
'description' => t('Perform all active migration processes'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
return $info;
}
*/
/**
* Implementation of hook_cron().
*
*/
function migrate_cron() {
$path = drupal_get_path('module', 'migrate') . '/migrate_pages.inc';
include_once($path);
// Elevate privileges so node deletion/creation works in cron
session_save_session(FALSE);
global $user;
$saveuser = $user;
$user = user_load(array('uid' => 1));
$messages = array();
// A zero max_execution_time means no limit - but let's set a reasonable
// limit anyway
$starttime = variable_get('cron_semaphore', 0);
$maxexectime = ini_get('max_execution_time');
if (!$maxexectime) {
$maxexectime = 240;
}
$options = array('timelimit' => $starttime + (($maxexectime < 20) ? $maxexectime : ($maxexectime - 20)));
migrate_content_process_all($messages, $options);
$user = $saveuser;
session_save_session(TRUE);
}
/**
* Implementation of hook_perm().
*/
function migrate_perm() {
return array(MIGRATE_ACCESS_BASIC, MIGRATE_ACCESS_ADVANCED);
}
/**
* Implementation of hook_help().
*/
function migrate_help($page, $arg) {
switch ($page) {
case 'admin/content/migrate':
return theme('advanced_help_topic', 'migrate', 'about', 'icon') . t('Click the question marks like this one to read the migrate module help topics.');
case 'admin/content/migrate/content_sets':
return t('Define sets of mappings from imported tables to Drupal content. These are the migrations which are later processed.');
case 'admin/content/migrate/process':
return t('View and manage import processes here. Processes that are in progress are checked - they can be cancelled by unchecking, or new processes begun by checking, then clicking Submit. Any checked process will run in the background (via cron) automatically - you may also run them interactively.');
case 'admin/content/migrate/tools':
return t('Besides content that is migrated into a new site, nodes may be manually
created during the testing process. Typically you will want to clear these before the
final migration - if you are absolutely positive that all nodes of a
given type should be deleted, you may do so here.');
}
}
/**
* Implementation of hook_menu().
*/
function migrate_menu() {
$items = array();
$items['admin/content/migrate'] = array(
'title' => 'Migrate',
'description' => 'Manage data migration from external sources',
'page callback' => 'migrate_front',
'access arguments' => array(MIGRATE_ACCESS_BASIC),
'file' => 'migrate_pages.inc',
);
$items['admin/content/migrate/content_sets'] = array(
'title' => 'Content sets',
'description' => 'Manage content sets: mappings of source data to Drupal content',
'weight' => 2,
'page callback' => 'migrate_content_sets',
'access arguments' => array(MIGRATE_ACCESS_ADVANCED),
'file' => 'migrate_pages.inc',
);
$items['admin/content/migrate/process'] = array(
'title' => 'Process',
'description' => 'Perform and monitor the creation of Drupal content from source data',
'weight' => 3,
'page callback' => 'migrate_dashboard',
'access arguments' => array(MIGRATE_ACCESS_BASIC),
'file' => 'migrate_pages.inc',
);
$items['admin/content/migrate/tools'] = array(
'title' => 'Tools',
'description' => 'Additional tools for managing migration',
'weight' => 4,
'page callback' => 'migrate_tools',
'access arguments' => array(MIGRATE_ACCESS_ADVANCED),
'file' => 'migrate_pages.inc',
);
$items['admin/content/migrate/settings'] = array(
'title' => 'Settings',
'description' => 'Migrate module settings',
'weight' => 5,
'page callback' => 'migrate_settings',
'access arguments' => array(MIGRATE_ACCESS_ADVANCED),
'file' => 'migrate_pages.inc',
);
$items['admin/content/migrate/content_sets/%'] = array(
'title' => 'Content set',
'page callback' => 'drupal_get_form',
'page arguments' => array('migrate_content_set_mappings', 4),
'access arguments' => array(MIGRATE_ACCESS_ADVANCED),
'type' => MENU_CALLBACK,
'file' => 'migrate_pages.inc',
);
$items['migrate/xlat/%'] = array(
'page callback' => 'migrate_xlat',
'access arguments' => array('access content'),
'page arguments' => array(2),
'type' => MENU_CALLBACK,
);
return $items;
}
/**
* Implementation of hook_schema_alter().
* @param $schema
*/
function migrate_schema_alter(&$schema) {
// Check for table existence - at install time, hook_schema_alter() may be called
// before our install hook.
if (db_table_exists('migrate_content_sets')) {
$result = db_query("SELECT * FROM {migrate_content_sets}");
while ($content_set = db_fetch_object($result)) {
$maptablename = migrate_map_table_name($content_set->mcsid);
$msgtablename = migrate_message_table_name($content_set->mcsid);
// Get the proper field definition for the sourcekey
$view = views_get_view($content_set->view_name);
if (!$view) {
drupal_set_message(t('View !view does not exist - either (re)create this view, or
remove the migrate content set using it.', array('!view' => $content_set->view_name)));
continue;
}
// Must do this to load the database
$view->init_query();
// TODO: For now, PK must be in base_table
$tabledb = $view->base_database;
$tablename = $view->base_table;
db_set_active($tabledb);
$inspect = schema_invoke('inspect');
db_set_active('default');
$sourceschema = $inspect[$tablename];
// If the PK of the content set is defined, make sure we have a mapping table
if ($sourcekey = $content_set->sourcekey) {
$sourcefield = $sourceschema['fields'][$sourcekey];
if (!$sourcefield) {
// strip base table name if views prepended it
$baselen = drupal_strlen($tablename);
if (!strncasecmp($sourcekey, $tablename . '_', $baselen + 1)) {
$sourcekey = drupal_substr($sourcekey, $baselen + 1);
}
$sourcefield = $sourceschema['fields'][$sourcekey];
}
// We don't want serial fields to behave serially, so change to int
if ($sourcefield['type'] == 'serial') {
$sourcefield['type'] = 'int';
}
$schema[$maptablename] = _migrate_map_table_schema($sourcefield);
$schema[$msgtablename] = _migrate_message_table_schema($sourcefield);
}
}
}
}
/*
* Translate URIs from an old site to the new one
* Requires adding RewriteRules to .htaccess. For example, if the URLs
* for news articles had the form
* http://example.com/issues/news/[OldID].html, use this rule:
*
* RewriteRule ^issues/news/([0-9]+).html$ /migrate/xlat/node/$1 [L]
*/
function migrate_xlat($contenttype=NULL, $oldid=NULL) {
$uri = '';
if ($contenttype && $oldid) {
$newid = _migrate_xlat_get_new_id($contenttype, $oldid);
if ($newid) {
$uri = migrate_invoke_all("xlat_$contenttype", $newid);
drupal_goto($uri[0], NULL, NULL, 301);
}
}
}
/*
* Helper function to translate an ID from a source file to the corresponding
* Drupal-side ID (nid, uid, etc.)
*
* TODO: Update to new world (content sets as the basis of migration)
*/
function _migrate_xlat_get_new_id($contenttype, $oldid) {
$result = db_query("SELECT DISTINCT mf.importtable, mc.colname
FROM {migrate_content_sets} mcs
INNER JOIN {migrate_files} mf ON mcs.mfid=mf.mfid
INNER JOIN {migrate_columns} mc ON mf.mfid=mc.mfid AND chosenpk=1
WHERE mcs.contenttype='%s'",
$contenttype);
while ($row = db_fetch_object($result)) {
$table = migrate_map_table_name($row->mcsid);
$pkcol = $row->colname;
$id = db_result(db_query("SELECT destid
FROM {$table} WHERE sourceid=%d", $oldid));
if ($id) {
return $id;
}
}
return NULL;
}
/**
* Implementation of hook_theme().
*
* Registers all theme functions used in this module.
*/
function migrate_theme() {
return array(
'migrate_mapping_table' => array('arguments' => array('form')),
'_migrate_dashboard_form' => array(
'arguments' => array('form' => NULL),
'function' => 'theme_migrate_dashboard',
),
'_migrate_tools_form' => array(
'arguments' => array('form' => NULL),
'function' => 'theme_migrate_tools',
),
'_migrate_settings_form' => array(
'arguments' => array('form' => NULL),
'function' => 'theme_migrate_settings',
),
'migrate_content_set_mappings' => array(
'arguments' => array('form' => NULL),
'function' => 'theme_migrate_content_set_mappings',
),
);
}
function migrate_map_table_name($mcsid) {
return "migrate_map_$mcsid";
}
function migrate_message_table_name($mcsid) {
return "migrate_msgs_$mcsid";
}
function _migrate_map_table_name($mcsid) {
return migrate_map_table_name($mcsid);
}
function _migrate_message_table_name($mcsid) {
return migrate_message_table_name($mcsid);
}
function _migrate_map_table_schema($sourcefield) {
$schema = array(
'description' => t('Mappings from source key to destination key'),
'fields' => array(
'sourceid' => $sourcefield,
// @TODO: Assumes destination key is unsigned int
'destid' => array(
'type' => 'int',
'unsigned' => TRUE,
'not null' => TRUE,
),
),
'primary key' => array('sourceid'),
'indexes' => array(
'idkey' => array('destid'),
),
);
return $schema;
}
function _migrate_message_table_schema($sourcefield) {
$schema = array(
'description' => t('Import errors'),
'fields' => array(
'mceid' => array(
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
),
'sourceid' => $sourcefield,
'level' => array(
'type' => 'int',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => 1,
),
'message' => array(
'type' => 'text',
'size' => 'medium',
'not null' => TRUE,
),
),
'primary key' => array('mceid'),
'indexes' => array(
'sourceid' => array('sourceid'),
),
);
return $schema;
}
function migrate_views_api() {
return array('api' => '2.0');
}
/**
* Check to see if the advanced help module is installed, and if not put up
* a message.
*
* Only call this function if the user is already in a position for this to
* be useful.
*/
function migrate_check_advanced_help() {
if (variable_get('migrate_hide_help_message', FALSE)) {
return;
}
if (!module_exists('advanced_help')) {
$filename = db_result(db_query("SELECT filename FROM {system} WHERE type = 'module' AND name = 'advanced_help'"));
if ($filename && file_exists($filename)) {
drupal_set_message(t('If you enable the advanced help module,
Migrate will provide more and better help. Hide this message.',
array('@modules' => url('admin/build/modules'),
'@hide' => url('admin/build/views/tools'))));
}
else {
drupal_set_message(t('If you install the advanced help module from !href, Migrate will provide more and better help. Hide this message.', array('!href' => l('http://drupal.org/project/advanced_help', 'http://drupal.org/project/advanced_help'), '@hide' => url('admin/content/migrate/settings'))));
}
}
}
/**
* Check if a date is valid and return the correct
* timestamp to use. Returns -1 if the date is not
* considered valid.
*/
function _migrate_valid_date($date) {
//TODO: really check whether the date is valid!!
if (empty($date)) {
return -1;
}
if (is_numeric($date) && $date > -1) {
return $date;
}
// strtotime() doesn't recognize dashes as separators, change to slashes
$date = str_replace('-', '/', $date);
$time = strtotime($date);
if ($time < 0 || !$time) {
// Handles form YYYY-MM-DD HH:MM:SS.garbage
if (drupal_strlen($date) > 19) {
$time = strtotime(drupal_substr($date, 0, 19));
if ($time < 0 || !$time) {
return -1;
}
}
}
return $time;
}