> content >> migrate"
* for analyzing data from various sources and importing them into Drupal tables.
*/
/**
* Call a migrate hook. Like module_invoke_all, but gives modules a chance
* to do one-time initialization before any of their hooks are called, and
* adds "migrate" to the hook name.
* @param $hook
* Hook to invoke (e.g., 'types', 'fields_node', etc.)
* @return
* Merged array of results.
*/
function migrate_invoke_all($hook) {
// Let modules do any one-time initialization (e.g., including migration support files)
global $_migrate_inited;
if (!isset($_migrate_inited)) {
module_invoke_all('migrate_init');
$_migrate_inited = TRUE;
}
$args = func_get_args();
$hookfunc = "migrate" . "_$hook";
unset($args[0]);
$return = array();
$modulelist = module_implements($hookfunc);
foreach ($modulelist as $module) {
$function = $module . '_' . $hookfunc;
$result = call_user_func_array($function, $args);
if (isset($result) && is_array($result)) {
$return = array_merge_recursive($return, $result);
}
elseif (isset($result)) {
$return[] = $result;
}
}
return $return;
}
/**
* Call a destination hook (e.g., hook_migrate_prepare_node). Use this version
* for hooks with the precise signature below, so that the object can be passed by
* reference.
*
* @param $hook
* Hook to invoke (e.g., 'types', 'fields_node', etc.)
* @param $object
* Destination object being built - passed by reference, so hooks can modify it
* @param $tblinfo
* Metadata about the content set
* @param $row
* The raw source row.
* @return
* Merged array of results.
*/
function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) {
// Let modules do any one-time initialization (e.g., including migration support files)
global $_migrate_inited;
if (!isset($_migrate_inited)) {
module_invoke_all('migrate_init');
$_migrate_inited = TRUE;
}
// We could have used module_invoke_all, but unfortunately
// module_invoke_all passes all arguments by value.
$errors = array();
$hook = 'migrate_' . $hook;
foreach (module_implements($hook) as $module_name) {
$function = $module_name . '_' . $hook;
if (function_exists($function)) {
timer_start($function);
$errors = array_merge($errors, (array)$function($object, $tblinfo, $row));
timer_stop($function);
}
}
return $errors;
}
/**
* Save a new or updated content set
*
* @param $content_set
* An array or object representing the content set. This is passed by reference (so
* when adding a new content set the ID can be set)
* @param $options
* Array of additional options for saving the content set. Currently:
* base_table: The base table of the view - if provided, we don't need
* to load the view.
* base_database: The database of the base table - if base_table is present
* and base_database omitted, it defaults to 'default'
* @return
* The ID of the content set that was saved, or NULL if nothing was saved
*/
function migrate_save_content_set(&$content_set, $options = array()) {
// Deal with objects internally (but remember if we need to put the parameter
// back to an array)
if (is_array($content_set)) {
$was_array = TRUE;
$content_set = (object) $content_set;
}
else {
$was_array = FALSE;
}
// Update or insert the content set record as appropriate
if (isset($content_set->mcsid)) {
drupal_write_record('migrate_content_sets', $content_set, 'mcsid');
}
else {
drupal_write_record('migrate_content_sets', $content_set);
}
// Create or modify map and message tables
$maptablename = migrate_map_table_name($content_set->mcsid);
$msgtablename = migrate_message_table_name($content_set->mcsid);
// TODO: For now, PK must be in base_table
// If the caller tells us the base table of the view, we don't need
// to load the view (which would not work when called from hook_install())
if (isset($options['base_table'])) {
$tablename = $options['base_table'];
if (isset($options['base_database'])) {
$tabledb = $options['base_database'];
}
else {
$tabledb = 'default';
}
}
else {
// Get the proper field definition for the sourcekey
$view = views_get_view($content_set->view_name);
if (!$view) {
drupal_set_message(t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $content_set->view_name)));
return NULL;
}
// Must do this to load the database
$view->init_query();
if (isset($view->base_database)) {
$tabledb = $view->base_database;
}
else {
$tabledb = 'default';
}
$tablename = $view->base_table;
}
$sourceschema = _migrate_inspect_schema($tablename, $tabledb);
// If the PK of the content set is defined, make sure we have a mapping table
if (isset($content_set->sourcekey) && $content_set->sourcekey) {
$sourcefield = $sourceschema['fields'][$content_set->sourcekey];
// The field name might be
_...
if (!$sourcefield) {
$sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1);
$sourcefield = $sourceschema['fields'][$sourcekey];
}
// But - we don't want serial fields to behave serially, so change to int
if ($sourcefield['type'] == 'serial') {
$sourcefield['type'] = 'int';
}
$schema_change = FALSE;
if (!db_table_exists($maptablename)) {
$schema = _migrate_map_table_schema($sourcefield);
db_create_table($ret, $maptablename, $schema);
// Expose map table to views
tw_add_tables(array($maptablename));
tw_add_fk($maptablename, 'destid');
$schema = _migrate_message_table_schema($sourcefield);
db_create_table($ret, $msgtablename, $schema);
// Expose messages table to views
tw_add_tables(array($msgtablename));
tw_add_fk($msgtablename, 'sourceid');
$schema_change = TRUE;
}
else {
// TODO: Deal with varchar->int case where there is existing non-int data
$desired_schema = _migrate_map_table_schema($sourcefield);
$actual_schema = $inspect[$maptablename];
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_primary_key($ret, $maptablename);
db_change_field($ret, $maptablename, 'sourceid', 'sourceid',
$sourcefield, array('primary key' => array('sourceid')));
tw_perform_analysis($maptablename);
$schema_change = TRUE;
}
$desired_schema = _migrate_message_table_schema($sourcefield);
$actual_schema = $inspect[$msgtablename];
if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) {
$ret = array();
db_drop_index($ret, $msgtablename, 'sourceid');
db_change_field($ret, $msgtablename, 'sourceid', 'sourceid',
$sourcefield, array('indexes' => array('sourceid' => array('sourceid'))));
tw_perform_analysis($maptablename);
$schema_change = TRUE;
}
}
// Make sure the schema gets updated to reflect changes
if ($schema_change) {
cache_clear_all('schema', 'cache');
}
}
if ($was_array) {
$content_set = (array)$content_set;
return $content_set['mcsid'];
}
else {
return $content_set->mcsid;
}
}
/**
* Save a new or updated content mapping
*
* @param $mapping
* An object representing the mapping. This is passed by reference (so
* when adding a new mapping the ID can be set)
* @return
* The ID of the mapping that was saved, or NULL if nothing was saved
*/
function migrate_save_content_mapping(&$mapping) {
if ($mapping->mcmid) {
drupal_write_record('migrate_content_mappings', $mapping, 'mcmid');
}
else {
drupal_write_record('migrate_content_mappings', $mapping);
}
return $mapping->mcmid;
}
/**
* Delete the specified content set, including map and message tables.
*
* @param $mcsid
* Unique identifier of the content set to delete.
*/
function migrate_delete_content_set($mcsid) {
// First, remove the map and message tables from the Table Wizard, and drop them
$ret = array();
$maptable = migrate_map_table_name($mcsid);
$msgtable = migrate_message_table_name($mcsid);
if (db_table_exists($maptable)) {
tw_remove_tables(array($maptable, $msgtable));
db_drop_table($ret, $maptable);
db_drop_table($ret, $msgtable);
}
// Then, delete the content set data
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d";
db_query($sql, $mcsid);
$sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d";
db_query($sql, $mcsid);
}
/**
* Delete the specified content mapping.
*
* @param $mcmid
* Unique identifier of the mapping to delete.
*/
function migrate_delete_content_mapping($mcmid) {
$sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d";
db_query($sql, $mcmid);
}
/**
* Convenience function for generating a message array
*
* @param $message
* Text describing the error condition
* @param $type
* One of the MIGRATE_MESSAGE constants, identifying the level of error
* @return
* Structured array suitable for return from an import hook
*/
function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) {
$error = array(
'level' => $type,
'message' => $message,
);
return $error;
}
/**
* Add a mapping from source ID to destination ID for the specified content set
*
* @param $mcsid
* ID of the content set being processed
* @param $sourceid
* Primary key value from the source
* @param $destid
* Primary key value from the destination
*/
function migrate_add_mapping($mcsid, $sourceid, $destid) {
static $maptables = array();
if (!isset($maptables[$mcsid])) {
$maptables[$mcsid] = migrate_map_table_name($mcsid);
}
$needs_update = db_result(db_query('SELECT needs_update
FROM {' . $maptables[$mcsid] . "}
WHERE sourceid='%s'",
$sourceid));
if ($needs_update == 1) {
db_query('UPDATE {' . $maptables[$mcsid] . "}
SET needs_update=0
WHERE sourceid='%s'",
$sourceid);
}
elseif ($needs_update !== 0) {
db_query('INSERT INTO {' . $maptables[$mcsid] . "}
(sourceid, destid, needs_update)
VALUES('%s', %d, 0)",
$sourceid, $destid);
}
}
/**
* Clear migrated objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_clear($mcsid, &$options = array()) {
$itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
$timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
$idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = isset($options['feedback']['frequency']) ?
$options['feedback']['frequency'] : NULL;
$frequency_unit = isset($options['feedback']['frequency_unit']) ?
$options['feedback']['frequency_unit'] : NULL;
}
$result = db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d", $mcsid);
$tblinfo = db_fetch_object($result);
$description = $tblinfo->description;
if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
return MIGRATE_RESULT_IN_PROGRESS;
}
else {
db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d",
MIGRATE_STATUS_CLEARING, $mcsid);
}
$desttype = $tblinfo->desttype;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
$processstart = microtime(TRUE);
$memory_limit = _migrate_memory_limit();
// If this content set is set up to update existing content, we don't
// want to delete the content on clear, just the map/message tables
$sql = "SELECT srcfield FROM {migrate_content_mappings}
WHERE mcsid=%d AND primary_key=1";
$srcfield = db_result(db_query($sql, $mcsid));
if ($srcfield) {
$full_clear = FALSE;
}
else {
$full_clear = TRUE;
}
// Assume success until proven otherwise
$return = MIGRATE_RESULT_COMPLETED;
$deleted = 0;
if ($idlist) {
$sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ($idlist)";
}
else {
$sql = "SELECT sourceid,destid FROM {" . $maptable . "}";
}
timer_start('delete query');
if ($itemlimit) {
$deletelist = db_query_range($sql, 0, $itemlimit);
}
else {
$deletelist = db_query($sql);
}
timer_stop('delete query');
while ($row = db_fetch_object($deletelist)) {
// Recheck status - permits dynamic interruption of jobs
$sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
$status = db_result(db_query($sql, $mcsid));
if ($status != MIGRATE_STATUS_CLEARING) {
$return = MIGRATE_RESULT_STOPPED;
break;
}
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
// Check for closeness to memory limit
$usage = memory_get_usage();
$pct_memory = $usage/$memory_limit;
if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
if (isset($feedback)) {
$feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch',
array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit)));
}
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $deleted >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, MIGRATE_RESULT_INCOMPLETE);
$feedback($message);
$lastfeedback = time();
$deleted = 0;
}
}
// @TODO: Should return success/failure. Problem: node_delete doesn't return anything...
if ($full_clear) {
timer_start('delete hooks');
migrate_invoke_all("delete_$contenttype", $row->destid);
timer_stop('delete hooks');
}
timer_start('clear map/msg');
db_query("DELETE FROM {" . $maptable . "} WHERE sourceid='%s'", $row->sourceid);
db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid='%s' AND level=%d",
$row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL);
timer_stop('clear map/msg');
$deleted++;
}
// Mark that we're done
$sql = "UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d";
db_query($sql, MIGRATE_STATUS_IDLE, $mcsid);
// If we've completed a total clear, make sure all messages are gone
if ($return == MIGRATE_RESULT_COMPLETED && !$idlist && !$itemlimit) {
db_query('TRUNCATE TABLE {' . $msgtablename . '}');
}
// In other cases (except when we're still in the middle of a process), keep
// informationals, which should still be attached to uncleared items
else if ($return != MIGRATE_RESULT_INCOMPLETE) {
// Remove old messages before beginning new import process
db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL);
}
$message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $return);
if (isset($feedback)) {
$feedback($message);
}
return $return;
}
/**
* Import objects from the specified content set
*
* @param $mcsid
* ID of the content set to clear
* @param $options
* Keyed array of optional options:
* itemlimit - Maximum number of items to process
* timelimit - Unix timestamp after which to stop processing
* idlist - Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* feedback - Keyed array controlling status feedback to the caller
* function - PHP function to call, passing a message to be displayed
* frequency - How often to call the function
* frequency_unit - How to interpret frequency (items or seconds)
*
* @return
* Status of the migration process:
*/
function migrate_content_process_import($mcsid, &$options = array()) {
$tblinfo = db_fetch_object(db_query("SELECT *
FROM {migrate_content_sets}
WHERE mcsid=%d",
$mcsid));
if ($tblinfo->status != MIGRATE_STATUS_IDLE) {
return MIGRATE_RESULT_IN_PROGRESS;
}
else {
db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d",
MIGRATE_STATUS_IMPORTING, $mcsid);
}
$itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL;
$timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL;
$idlist = isset($options['idlist']) ? $options['idlist'] : NULL;
$lastfeedback = time();
if (isset($options['feedback'])) {
$feedback = $options['feedback']['function'];
$frequency = isset($options['feedback']['frequency']) ?
$options['feedback']['frequency'] : NULL;
$frequency_unit = isset($options['feedback']['frequency_unit']) ?
$options['feedback']['frequency_unit'] : NULL;
}
$description = $tblinfo->description;
$desttype = $tblinfo->desttype;
$view_name = $tblinfo->view_name;
$view_args = $tblinfo->view_args;
$contenttype = $tblinfo->contenttype;
$sourcekey = $tblinfo->sourcekey;
$maptable = migrate_map_table_name($mcsid);
$msgtablename = migrate_message_table_name($mcsid);
$processstart = microtime(TRUE);
$memory_limit = _migrate_memory_limit();
// Assume success until proven otherwise
$return = MIGRATE_RESULT_COMPLETED;
$collist = db_query("SELECT srcfield, destfield, default_value
FROM {migrate_content_mappings}
WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '')
ORDER BY mcmid",
$mcsid);
$fields = array();
while ($row = db_fetch_object($collist)) {
$fields[$row->destfield]['srcfield'] = $row->srcfield;
$fields[$row->destfield]['default_value'] = $row->default_value;
}
$tblinfo->fields = $fields;
$tblinfo->maptable = $maptable;
// We pick up everything in the input view that is not already imported, and
// not already errored out
// Emulate views execute(), so we can scroll through the results ourselves
$view = views_get_view($view_name);
if (!$view) {
if ($feedback) {
$feedback(t('View !view does not exist - either (re)create this view, or
remove the content set using it.', array('!view' => $view_name)));
}
return MIGRATE_RESULT_FAILED;
}
// Identify the content set being processed. Simplifies $view alterations.
$view->migrate_content_set = $tblinfo;
$view->is_cacheable = FALSE;
if ($view_args) {
$view->set_arguments(explode('/', $view_args));
}
$view->build();
// Let modules modify the view just prior to executing it.
foreach (module_implements('views_pre_execute') as $module) {
$function = $module . '_views_pre_execute';
$function($view);
}
if (isset($view->base_database)) {
$viewdb = $view->base_database;
}
else {
$viewdb = 'default';
}
// Add a left join to the map table, and only include rows not in the map
$join = new views_join;
// Views prepends _ to column names other than the base table's
// primary key - we need to strip that here for the join to work. But, it's
// common for tables to have the tablename beginning field names (e.g.,
// table cms with PK cms_id). Deal with that as well...
$baselen = drupal_strlen($view->base_table);
if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) {
// So, which case is it? Ask the schema module...
db_set_active($viewdb);
$inspect = schema_invoke('inspect', db_prefix_tables('{' . $view->base_table . '}'));
db_set_active('default');
$tableschema = $inspect[$view->base_table];
$sourcefield = $tableschema['fields'][$sourcekey];
if (!$sourcefield) {
$joinkey = drupal_substr($sourcekey, $baselen + 1);
$sourcefield = $tableschema['fields'][$joinkey];
if (!$sourcefield) {
if ($feedback) {
$feedback(t("In view !view, can't find key !key for table !table",
array('!view' => $view_name, '!key' => $sourcekey, '!table' => $view->base_table)));
}
return MIGRATE_RESULT_FAILED;
}
}
else {
$joinkey = $sourcekey;
}
}
else {
$joinkey = $sourcekey;
}
$join->construct($maptable, $view->base_table, $joinkey, 'sourceid');
$view->query->add_relationship($maptable, $join, $view->base_table);
// We want both unimported and unupdated content
$where = "$maptable.sourceid IS NULL OR $maptable.needs_update = 1";
// And as long as we have the map table, get the destination ID, the
// import hook will need it to identify the existing destination object
$view->query->add_field($maptable, 'destid', 'destid');
$view->query->add_where(0, $where, $view->base_table);
// Ditto for the errors table
$join = new views_join;
$join->construct($msgtablename, $view->base_table, $joinkey, 'sourceid');
$view->query->add_relationship($msgtablename, $join, $view->base_table);
$view->query->add_where(0, "$msgtablename.sourceid IS NULL", $view->base_table);
// If running over a selected list of IDs, pass those in to the query
if ($idlist) {
$view->query->add_where($view->options['group'], $view->base_table . ".$joinkey IN ($idlist)",
$view->base_table);
}
// We can't seem to get $view->build() to rebuild build_info, so go straight into the query object
$query = $view->query->query();
$query = db_rewrite_sql($query, $view->base_table, $view->base_field,
array('view' => &$view));
$args = $view->build_info['query_args'];
$replacements = module_invoke_all('views_query_substitutions', $view);
$query = str_replace(array_keys($replacements), $replacements, $query);
if (is_array($args)) {
foreach ($args as $id => $arg) {
$args[$id] = str_replace(array_keys($replacements), $replacements, $arg);
}
}
// Now, make the current db name explicit if content set is pulling tables from another DB
if ($viewdb <> 'default') {
global $db_url;
$url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url);
$currdb = drupal_substr($url['path'], 1);
$query = str_replace('{' . $maptable . '}',
$currdb . '.' . '{' . $maptable . '}', $query);
$query = str_replace('{' . $msgtablename . '}',
$currdb . '.' . '{' . $msgtablename . '}', $query);
db_set_active($viewdb);
}
//drupal_set_message($query);
timer_start('execute view query');
if ($itemlimit) {
$importlist = db_query_range($query, $args, 0, $itemlimit);
}
else {
$importlist = db_query($query, $args);
}
timer_stop('execute view query');
if ($viewdb != 'default') {
db_set_active('default');
}
$imported = 0;
timer_start('db_fetch_object');
while ($row = db_fetch_object($importlist)) {
timer_stop('db_fetch_object');
// Recheck status - permits dynamic interruption of cron jobs
$sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d";
$status = db_result(db_query($sql, $mcsid));
if ($status != MIGRATE_STATUS_IMPORTING) {
$return = MIGRATE_RESULT_STOPPED;
break;
}
// Check for time out if there is time info present
if (isset($timelimit) && time() >= $timelimit) {
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
// Check for closeness to memory limit
$usage = memory_get_usage();
$pct_memory = $usage/$memory_limit;
if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) {
if (isset($feedback)) {
$feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch',
array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit)));
}
$return = MIGRATE_RESULT_INCOMPLETE;
break;
}
if (isset($feedback)) {
if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) ||
($frequency_unit == 'items' && $imported >= $frequency)) {
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, MIGRATE_RESULT_INCOMPLETE);
$feedback($message);
$lastfeedback = time();
$imported = 0;
}
}
timer_start('import hooks');
$errors = migrate_invoke_all("import_$contenttype", $tblinfo, $row);
timer_stop('import hooks');
// Ok, we're done. Preview the node or save it (if no errors).
if (count($errors)) {
$success = TRUE;
foreach ($errors as $error) {
if (!isset($error['level'])) {
$error['level'] = MIGRATE_MESSAGE_ERROR;
}
if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) {
$success = FALSE;
}
db_query("INSERT INTO {" . $msgtablename . "}
(sourceid, level, message)
VALUES('%s', %d, '%s')",
$row->$sourcekey, $error['level'], $error['message']);
}
if ($success) {
$imported++;
}
}
else {
$imported++;
}
timer_start('db_fetch_object');
}
timer_stop('db_fetch_object');
$message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $return);
// Remember we're done
$tblinfo->status = MIGRATE_STATUS_IDLE;
if ($return == MIGRATE_RESULT_COMPLETED) {
$tblinfo->lastimported = date('Y-m-d H:i:s');
}
if (isset($feedback)) {
$feedback($message);
}
watchdog('migrate', $message);
drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid');
return $return;
}
/* Revisit
function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) {
migrate_content_process_all(time());
}
*/
/**
* Process all enabled migration processes in a browser, using the Batch API
* to break it into manageable chunks.
*
* @param $clearing
* Array of content set ids (keyed by content set id) to clear
* @param $importing
* Array of content set ids (keyed by content set id) to import
* @param $limit
* Maximum number of items to process
* @param $idlist
* Comma-separated list of source IDs to process, instead of proceeding through
* all unmigrated rows
* @param $context
* Batch API context structure
*/
function migrate_content_process_batch($clearing, $importing, $limit, $idlist, &$context) {
// A zero max_execution_time means no limit - but let's set a reasonable
// limit anyway
$starttime = time();
$maxexectime = ini_get('max_execution_time');
if (!$maxexectime) {
$maxexectime = 240;
}
// Initialize the Batch API context
$context['finished'] = 0;
// The Batch API progress bar will reflect the number of operations being
// done (clearing/importing)
if (!isset($context['sandbox']['numops'])) {
$context['sandbox']['numops'] = count($clearing) + count($importing);
$context['sandbox']['numopsdone'] = 0;
$context['sandbox']['clearing'] = $clearing;
$context['sandbox']['importing'] = $importing;
$context['sandbox']['message'] = '';
$context['sandbox']['times'] = array();
}
// For the timelimit, subtract more than enough time to clean up
$options = array(
'itemlimit' => $limit,
'timelimit' => $starttime + (($maxexectime < 5) ? $maxexectime : ($maxexectime - 5)),
'idlist' => $idlist,
'feedback' => array('function' => '_migrate_process_message'),
);
global $_migrate_messages;
if (!isset($_migrate_messages)) {
$_migrate_messages = array();
}
// Work on the last clearing op (if any)
if (count($context['sandbox']['clearing'])) {
$row = db_fetch_object(db_query(
"SELECT mcsid,description FROM {migrate_content_sets}
WHERE mcsid IN (%s)
ORDER BY weight DESC
LIMIT 1",
implode(',', $context['sandbox']['clearing'])));
$status = migrate_content_process_clear($row->mcsid, $options);
if ($status != MIGRATE_RESULT_INCOMPLETE) {
unset($context['sandbox']['clearing'][$row->mcsid]);
}
}
// If not, work on the first importing op
elseif (count($context['sandbox']['importing'])) {
$row = db_fetch_object(db_query(
"SELECT mcsid,description FROM {migrate_content_sets}
WHERE mcsid IN (%s)
ORDER BY weight ASC
LIMIT 1",
implode(',', $context['sandbox']['importing'])));
if (variable_get('migrate_update', 0)) {
migrate_content_set_update($row->mcsid);
}
$status = migrate_content_process_import($row->mcsid, $options);
if ($status != MIGRATE_RESULT_INCOMPLETE) {
unset($context['sandbox']['importing'][$row->mcsid]);
}
}
// If not, nothing to do
else {
$context['finished'] = 1;
}
// Make sure the entire process stops if requested
if ($status == MIGRATE_RESULT_STOPPED) {
$context['finished'] = 1;
}
if ($context['finished'] != 1) {
if ($status != MIGRATE_RESULT_INCOMPLETE) {
$context['sandbox']['numopsdone']++;
}
$context['finished'] = $context['sandbox']['numopsdone']/$context['sandbox']['numops'];
}
foreach ($_migrate_messages as $message) {
if (!isset($context['sandbox']['message'])) {
$context['sandbox']['message'] = $message . '
';
}
else {
$context['sandbox']['message'] .= $message . '
';
}
$context['message'] = $context['sandbox']['message'];
$context['results'][] .= $message;
}
$context['message'] = $context['sandbox']['message'];
// If requested save timers for eventual display
if (variable_get('migrate_display_timers', 0)) {
global $timers;
foreach ($timers as $name => $timerec) {
if (isset($timerec['time'])) {
if (isset($context['sandbox']['times'][$name])) {
$context['sandbox']['times'][$name] += $timerec['time']/1000;
}
else {
$context['sandbox']['times'][$name] = $timerec['time']/1000;
}
}
}
// When all done, display the timers
if ($context['finished'] == 1 && isset($context['sandbox']['times'])) {
global $timers;
arsort($context['sandbox']['times']);
foreach ($context['sandbox']['times'] as $name => $total) {
drupal_set_message("$name: " . round($total, 2));
}
}
}
}
/**
* Capture messages generated during an import or clear process
*/
function _migrate_process_message($message) {
global $_migrate_messages;
$_migrate_messages[] = $message;
}
/**
* Prepare a content set for updating of existing items
* @param $mcsid
* ID of the content set to update
*/
function migrate_content_set_update($mcsid) {
$maptable = migrate_map_table_name($mcsid);
db_query('UPDATE {' . $maptable . '} SET needs_update=1');
$msgtable = migrate_message_table_name($mcsid);
db_query('TRUNCATE TABLE {' . $msgtable . '}');
}
/**
* Generate a progress message
* @param $starttime
* microtime() when this piece of work began
* @param $numitems
* Number of items that were processed
* @param $description
* Name (description) of the content set being processed
* @param $import
* TRUE for an import process, FALSE for a clearing process
* @param $status
* Status of the work
* @return
* Formatted message
*/
function _migrate_progress_message($starttime, $numitems, $description, $import = TRUE, $result = MIGRATE_RESULT_COMPLETED) {
$time = (microtime(TRUE) - $starttime);
if ($time > 0) {
$perminute = round(60*$numitems/$time);
$time = round($time, 1);
}
else {
$perminute = '?';
}
if ($import) {
switch ($result) {
case MIGRATE_RESULT_COMPLETED:
$basetext = "Imported !numitems in !time sec (!perminute/min) - done with '!description'";;
break;
case MIGRATE_RESULT_FAILED:
$basetext = "Imported !numitems in !time sec (!perminute/min) - failure with '!description'";;
break;
case MIGRATE_RESULT_INCOMPLETE:
$basetext = "Imported !numitems in !time sec (!perminute/min) - continuing with '!description'";
break;
case MIGRATE_RESULT_STOPPED:
$basetext = "Imported !numitems in !time sec (!perminute/min) - stopped '!description'";
break;
}
}
else {
switch ($result) {
case MIGRATE_RESULT_COMPLETED:
$basetext = "Deleted !numitems in !time sec (!perminute/min) - done with '!description'";;
break;
case MIGRATE_RESULT_FAILED:
$basetext = "Deleted !numitems in !time sec (!perminute/min) - failure with '!description'";;
break;
case MIGRATE_RESULT_INCOMPLETE:
$basetext = "Deleted !numitems in !time sec (!perminute/min) - continuing with '!description'";
break;
case MIGRATE_RESULT_STOPPED:
$basetext = "Deleted !numitems in !time sec (!perminute/min) - stopped '!description'";
break;
}
}
$message = t($basetext,
array('!numitems' => $numitems, '!time' => $time, '!perminute' => $perminute,
'!description' => $description));
return $message;
}
/*
* Implementation of hook_init().
*/
function migrate_init() {
// Loads the hooks for the supported modules.
// TODO: Be more lazy - only load when really needed
$path = drupal_get_path('module', 'migrate') .'/modules';
$files = drupal_system_listing('.*\.inc$', $path, 'name', 0);
foreach ($files as $module_name => $file) {
if (module_exists($module_name)) {
include_once($file->filename);
}
}
// Add main CSS functionality.
drupal_add_css(drupal_get_path('module', 'migrate') .'/migrate.css');
}
/**
* Implementation of hook_action_info().
*/
/* Revisit
function migrate_action_info() {
$info['migrate_content_process_clear'] = array(
'type' => 'migrate',
'description' => t('Clear a migration content set'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
$info['migrate_content_process_import'] = array(
'type' => 'migrate',
'description' => t('Import a migration content set'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
$info['migrate_content_process_all_action'] = array(
'type' => 'migrate',
'description' => t('Perform all active migration processes'),
'configurable' => FALSE,
'hooks' => array(
'cron' => array('run'),
),
);
return $info;
}
*/
/**
* Implementation of hook_perm().
*/
function migrate_perm() {
return array(MIGRATE_ACCESS_BASIC, MIGRATE_ACCESS_ADVANCED);
}
/**
* Implementation of hook_help().
*/
function migrate_help($page, $arg) {
switch ($page) {
case 'admin/content/migrate':
return t('Defined content sets are listed here. New content sets may be added
below, and tasks may be executed directly in the browser. A process that is
actively running will be highlighted.
');
}
}
/**
* Implementation of hook_menu().
*/
function migrate_menu() {
$items = array();
$items['admin/content/migrate'] = array(
'title' => 'Migrate',
'description' => 'Monitor and control the creation of Drupal content from source data',
'weight' => 3,
'page callback' => 'migrate_dashboard',
'access arguments' => array(MIGRATE_ACCESS_BASIC),
'file' => 'migrate_pages.inc',
);
$items['admin/content/migrate/dashboard'] = array(
'title' => 'Dashboard',
'type' => MENU_DEFAULT_LOCAL_TASK,
'weight' => -10,
);
$items['admin/content/migrate/settings'] = array(
'title' => 'Settings',
'description' => 'Migrate module settings',
'weight' => 5,
'page callback' => 'migrate_settings',
'access arguments' => array(MIGRATE_ACCESS_ADVANCED),
'file' => 'migrate_pages.inc',
'type' => MENU_LOCAL_TASK,
);
$items['admin/content/migrate/%'] = array(
'title' => 'Content set',
'page callback' => 'drupal_get_form',
'page arguments' => array('migrate_content_set_mappings', 3),
'access arguments' => array(MIGRATE_ACCESS_ADVANCED),
'type' => MENU_CALLBACK,
'file' => 'migrate_pages.inc',
);
$items['migrate/xlat/%'] = array(
'page callback' => 'migrate_xlat',
'access arguments' => array('access content'),
'page arguments' => array(2),
'type' => MENU_CALLBACK,
);
return $items;
}
/**
* Implementation of hook_schema_alter().
*/
function migrate_schema_alter(&$schema) {
// Check for table existence - at install time, hook_schema_alter() may be called
// before our install hook.
if (db_table_exists('migrate_content_sets')) {
$result = db_query("SELECT * FROM {migrate_content_sets}");
while ($content_set = db_fetch_object($result)) {
$maptablename = migrate_map_table_name($content_set->mcsid);
$msgtablename = migrate_message_table_name($content_set->mcsid);
// Get the proper field definition for the sourcekey
$view = views_get_view($content_set->view_name);
if (!$view) {
drupal_set_message(t('View !view does not exist - either (re)create this view, or
remove the migrate content set using it.', array('!view' => $content_set->view_name)));
continue;
}
// Must do this to load the database
$view->init_query();
// TODO: For now, PK must be in base_table
if (isset($view->base_database)) {
$tabledb = $view->base_database;
}
else {
$tabledb = 'default';
}
$tablename = $view->base_table;
$sourceschema = _migrate_inspect_schema($tablename, $tabledb);
// If the PK of the content set is defined, make sure we have a mapping table
$sourcekey = $content_set->sourcekey;
if ($sourcekey) {
$sourcefield = $sourceschema['fields'][$sourcekey];
if (!$sourcefield) {
// strip base table name if views prepended it
$baselen = drupal_strlen($tablename);
if (!strncasecmp($sourcekey, $tablename . '_', $baselen + 1)) {
$sourcekey = drupal_substr($sourcekey, $baselen + 1);
}
$sourcefield = $sourceschema['fields'][$sourcekey];
}
// We don't want serial fields to behave serially, so change to int
if ($sourcefield['type'] == 'serial') {
$sourcefield['type'] = 'int';
}
$schema[$maptablename] = _migrate_map_table_schema($sourcefield);
$schema[$maptablename]['name'] = $maptablename;
$schema[$msgtablename] = _migrate_message_table_schema($sourcefield);
$schema[$msgtablename]['name'] = $msgtablename;
}
}
}
}
/*
* Translate URIs from an old site to the new one
* Requires adding RewriteRules to .htaccess. For example, if the URLs
* for news articles had the form
* http://example.com/issues/news/[OldID].html, use this rule:
*
* RewriteRule ^issues/news/([0-9]+).html$ /migrate/xlat/node/$1 [L]
*
* @param $contenttype
* Content type to translate (e.g., 'node', 'user', etc.)
* @param $oldid
* Primary key from input view
*/
function migrate_xlat($contenttype, $oldid) {
if ($contenttype && $oldid) {
$newid = _migrate_xlat_get_new_id($contenttype, $oldid);
if ($newid) {
$uri = migrate_invoke_all("xlat_$contenttype", $newid);
drupal_goto($uri[0], NULL, NULL, 301);
}
}
}
/*
* Helper function to translate an ID from a source file to the corresponding
* Drupal-side ID (nid, uid, etc.)
* Note that the result may be ambiguous - for example, if you are importing
* nodes from different content sets, they might have overlapping source IDs.
*
* @param $contenttype
* Content type to translate (e.g., 'node', 'user', etc.)
* @param $oldid
* Primary key from input view
* @return
* Drupal-side ID of the object
*/
function _migrate_xlat_get_new_id($contenttype, $oldid) {
$result = db_query("SELECT mcsid
FROM {migrate_content_sets}
WHERE contenttype='%s'",
$contenttype);
while ($row = db_fetch_object($result)) {
static $maptables = array();
if (!isset($maptables[$row->mcsid])) {
$maptables[$row->mcsid] = migrate_map_table_name($row->mcsid);
}
$sql = "SELECT destid
FROM {" . $maptables[$row->mcsid] . "}
WHERE sourceid='%s'";
$id = db_result(db_query($sql, $oldid));
if ($id) {
return $id;
}
}
return NULL;
}
/**
* Implementation of hook_theme().
*/
function migrate_theme() {
return array(
'migrate_mapping_table' => array('arguments' => array('form')),
'_migrate_dashboard_form' => array(
'arguments' => array('form' => NULL),
'function' => 'theme_migrate_dashboard',
),
'_migrate_settings_form' => array(
'arguments' => array('form' => NULL),
'function' => 'theme_migrate_settings',
),
'migrate_content_set_mappings' => array(
'arguments' => array('form' => NULL),
'function' => 'theme_migrate_content_set_mappings',
),
);
}
/**
* Determine the name of the map table for this content set
* @param $mcsid
* Unique identifier of the content set
* @return
* The name of the map table
*/
function migrate_map_table_name($mcsid) {
return "migrate_map_$mcsid";
}
/**
* Determine the name of the message table for this content set
* @param $mcsid
* Unique identifier of the content set
* @return
* The name of the message table
*/
function migrate_message_table_name($mcsid) {
return "migrate_msgs_$mcsid";
}
/**
* Generate a map table schema record, given the source PK definition
* @param $sourcefield
* Schema definition for the content set source's PK
* @return
* Schema structure for a map table
*/
function _migrate_map_table_schema($sourcefield) {
$schema = array(
'description' => t('Mappings from source key to destination key'),
'fields' => array(
'sourceid' => $sourcefield,
// @TODO: Assumes destination key is unsigned int
'destid' => array(
'type' => 'int',
'unsigned' => TRUE,
'not null' => TRUE,
),
'needs_update' => array(
'type' => 'int',
'size' => 'tiny',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => (int)FALSE,
),
),
'primary key' => array('sourceid'),
'indexes' => array(
'idkey' => array('destid'),
),
);
return $schema;
}
/**
* Generate a message table schema record, given the source PK definition
* @param $sourcefield
* Schema definition for the content set source's PK
* @return
* Schema structure for a message table
*/
function _migrate_message_table_schema($sourcefield) {
$schema = array(
'description' => t('Import errors'),
'fields' => array(
'mceid' => array(
'type' => 'serial',
'unsigned' => TRUE,
'not null' => TRUE,
),
'sourceid' => $sourcefield,
'level' => array(
'type' => 'int',
'unsigned' => TRUE,
'not null' => TRUE,
'default' => 1,
),
'message' => array(
'type' => 'text',
'size' => 'medium',
'not null' => TRUE,
),
),
'primary key' => array('mceid'),
'indexes' => array(
'sourceid' => array('sourceid'),
),
);
return $schema;
}
/**
* Implementation of hook_views_api().
*/
function migrate_views_api() {
return array('api' => '2.0');
}
/**
* Check to see if the advanced help module is installed, and if not put up
* a message.
*
* Only call this function if the user is already in a position for this to
* be useful.
*/
function migrate_check_advanced_help() {
if (variable_get('migrate_hide_help_message', FALSE)) {
return;
}
if (!module_exists('advanced_help')) {
$filename = db_result(db_query("SELECT filename FROM {system} WHERE type = 'module' AND name = 'advanced_help'"));
if ($filename && file_exists($filename)) {
drupal_set_message(t('If you enable the advanced help module,
Migrate will provide more and better help. Hide this message.',
array('@modules' => url('admin/build/modules'),
'@hide' => url('admin/build/views/tools'))));
}
else {
drupal_set_message(t('If you install the advanced help module from !href, Migrate will provide more and better help. Hide this message.', array('!href' => l('http://drupal.org/project/advanced_help', 'http://drupal.org/project/advanced_help'), '@hide' => url('admin/content/migrate/settings'))));
}
}
}
/**
* Check if a date is valid and return the correct
* timestamp to use. Returns -1 if the date is not
* considered valid.
*/
function _migrate_valid_date($date) {
if (empty($date)) {
return -1;
}
if (is_numeric($date) && $date > -1) {
return $date;
}
// strtotime() doesn't recognize dashes as separators, change to slashes
$date = str_replace('-', '/', $date);
$time = strtotime($date);
if ($time < 0 || !$time) {
// Handles form YYYY-MM-DD HH:MM:SS.garbage
if (drupal_strlen($date) > 19) {
$time = strtotime(drupal_substr($date, 0, 19));
if ($time < 0 || !$time) {
return -1;
}
}
}
return $time;
}
/**
* Get the row count for a view, possibly filtered by arguments
* @param $view
* Name of the view to query
* @param $args
* Optional arguments to the view, separated by '/'
* @return
* Number of rows in the view.
*/
function _migrate_get_view_count($view, $args = NULL) {
if (is_string($view)) {
$view = views_get_view($view);
}
// Force execution of count query, with minimal unnecessary results returned
// @TODO: Find way to execute ONLY count query
$view->pager['items_per_page'] = 1;
$view->get_total_rows = TRUE;
if ($args) {
$view->set_arguments(explode('/', $args));
}
$view->execute();
$rows = $view->total_rows;
// TODO: Now, that's the total rows in the current source. However, it may be
// (particularly with a content set that updates existing objects) that
// previously-migrated content is no longer in the source, so the stats
// (particularly Unimported) may be misleading. So, we would want to add in
// anything in the map table not in the source. This is tricky, and may not
// really be worth the performance impact, so we'll leave it alone for now.
return $rows;
}
/**
* Get the PHP memory_limit value in bytes
*/
function _migrate_memory_limit() {
$value = trim(ini_get('memory_limit'));
$last = strtolower($value[strlen($value)-1]);
switch ($last) {
case 'g':
$value *= 1024;
case 'm':
$value *= 1024;
case 'k':
$value *= 1024;
}
return $value;
}
/**
* Wrapper around schema_invoke('inspect'), to handle views tablenames
* hacked to include "dbname.".
*/
function _migrate_inspect_schema($tablename = '', $tabledb = 'default') {
static $inspect = array();
// TW can treat external MySQL tables as internal, but we can revert
// if we find a period in the tablename. Todo: might be better to build
// this check direcly into schema_mysql.inc
if (strpos($tablename, '.')) {
list($tabledb, $tablename) = explode('.', $tablename);
$dbinfo = tw_get_dbinfo();
foreach($dbinfo as $dbkey => $db_detail) {
if($db_detail['name'] == $tabledb) {
$tabledb = $dbkey;
break;
}
}
}
if (!isset($inspect[$tabledb])) {
db_set_active($tabledb);
// TODO: schema_mysql_inspect($name = NULL) takes $name [tablename] which
// limits the query to only get that table, which is probably a good idea.
$inspect[$tabledb] = schema_invoke('inspect');
db_set_active('default');
}
// If a tablename was not given, return the whole schema
if ($tablename == '') {
return $inspect[$tabledb];
}
// Return the schema for just this table.
return $inspect[$tabledb][$tablename];
}