b6df4dbb92
MLS plugin fixes from this session: - Fix silent insert failures: location column NOT NULL was rejecting wpdb->insert calls, causing ~18k new properties since Dec 2025 to be lost. Inserts now build raw SQL with ST_PointFromText so the spatial column is populated atomically. - Auto-refresh expired media URLs in MLS_Media_Handler::fetch_and_cache(), guarded by a property-level GET_LOCK so concurrent fetches share one API refresh. - Normalize WP_Error to null in mls_get_property_image() so callers can rely on the documented string|null contract. - Support comma-separated property_type filters in MLS_Query and MLS_Cluster so the homepage "View All Commercial" link (?property_type=Commercial+Sale,Land,Farm) actually filters correctly. - Incremental sync now looks back 10 minutes past the latest modification timestamp as a safety margin against missed records. - Smart sync exits silently (info-level, not warning) when a full sync is in progress. Operational: - New cron: weekly full sync Sundays at 3 AM (/usr/local/bin/mls-full-sync). - New cron: hourly 2GB cap on mls-thumbnails/ and cache/transformed-images/ (/usr/local/bin/mls-image-cache-cap). - Logrotate config for wp-content/debug.log (2-day retention, daily rotation, delaycompress). Repo policy: - CLAUDE.md updated with explicit "commit everything except build artifacts" policy. - .gitignore: untrack runtime image caches and debug.log rotations. Other modifications in this snapshot are pre-existing in-flight theme/plugin/db_content_updates work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1425 lines
53 KiB
PHP
Executable File
1425 lines
53 KiB
PHP
Executable File
<?php
|
|
/**
|
|
* MLS Sync Engine
|
|
*
|
|
* Handles synchronization of MLS data including:
|
|
* - Full initial import
|
|
* - Incremental updates
|
|
* - Delete handling (MlgCanView = false)
|
|
* - Sync state tracking for resume capability
|
|
*/
|
|
|
|
if (!defined('ABSPATH')) {
|
|
exit;
|
|
}
|
|
|
|
class MLS_Sync_Engine {
|
|
|
|
/**
|
|
* Sync types
|
|
*/
|
|
const TYPE_FULL = 'full';
|
|
const TYPE_INCREMENTAL = 'incremental';
|
|
const TYPE_MEDIA = 'media';
|
|
const TYPE_MEDIA_REFRESH = 'media_refresh';
|
|
|
|
/**
|
|
* Sync statuses
|
|
*/
|
|
const STATUS_PENDING = 'pending';
|
|
const STATUS_RUNNING = 'running';
|
|
const STATUS_COMPLETED = 'completed';
|
|
const STATUS_FAILED = 'failed';
|
|
const STATUS_PAUSED = 'paused';
|
|
|
|
/**
|
|
* Database instance
|
|
*/
|
|
private $db;
|
|
|
|
/**
|
|
* API client instance
|
|
*/
|
|
private $api_client;
|
|
|
|
/**
|
|
* Media handler instance
|
|
*/
|
|
private $media_handler;
|
|
|
|
/**
|
|
* Logger instance
|
|
*/
|
|
private $logger;
|
|
|
|
/**
|
|
* Current sync state ID
|
|
*/
|
|
private $sync_state_id = null;
|
|
|
|
/**
|
|
* Stats for current sync
|
|
*/
|
|
private $stats = array(
|
|
'processed' => 0,
|
|
'created' => 0,
|
|
'updated' => 0,
|
|
'deleted' => 0,
|
|
'errors' => 0,
|
|
'homeproz_media_downloaded' => 0,
|
|
'homeproz_media_skipped' => 0,
|
|
);
|
|
|
|
/**
|
|
* Constructor
|
|
*/
|
|
public function __construct(MLS_DB $db, MLS_API_Client $api_client, MLS_Media_Handler $media_handler, MLS_Logger $logger) {
|
|
$this->db = $db;
|
|
$this->api_client = $api_client;
|
|
$this->media_handler = $media_handler;
|
|
$this->logger = $logger;
|
|
}
|
|
|
|
/**
|
|
* Run full sync (Active/Pending properties only)
|
|
*
|
|
* Initial import fetches only Active and Pending properties.
|
|
* Use incremental sync (replication) for ongoing updates.
|
|
*
|
|
* @param bool $dry_run If true, don't make changes
|
|
* @param int|null $limit Max records to process
|
|
* @param callable|null $progress_callback Callback for progress updates
|
|
* @return array Sync results
|
|
*/
|
|
public function run_full_sync($dry_run = false, $limit = null, $progress_callback = null) {
|
|
$this->logger->info('Starting full sync (Active/Pending only)', array('dry_run' => $dry_run, 'limit' => $limit));
|
|
|
|
// Store progress callback for use in process_property
|
|
$this->progress_callback = $progress_callback;
|
|
|
|
// Create sync state record
|
|
if (!$dry_run) {
|
|
$this->sync_state_id = $this->create_sync_state(self::TYPE_FULL);
|
|
$this->logger->set_sync_state($this->sync_state_id);
|
|
}
|
|
|
|
$this->stats = array(
|
|
'processed' => 0,
|
|
'created' => 0,
|
|
'updated' => 0,
|
|
'deleted' => 0,
|
|
'errors' => 0,
|
|
);
|
|
|
|
try {
|
|
// Get first page of Active/Pending properties with media
|
|
$start_time = microtime(true);
|
|
$this->emit_progress('api_request', array(
|
|
'method' => 'GET',
|
|
'url' => 'Property',
|
|
'params' => array('type' => 'initial_sync', 'filter' => 'Active/Pending', 'limit' => $limit),
|
|
));
|
|
|
|
$response = $this->api_client->get_properties_for_initial_sync('Media', $limit ? min($limit, 1000) : null);
|
|
$elapsed = round((microtime(true) - $start_time) * 1000);
|
|
|
|
if (is_wp_error($response)) {
|
|
$this->emit_progress('api_response', array(
|
|
'success' => false,
|
|
'status_code' => 0,
|
|
'error' => $response->get_error_message(),
|
|
'record_count' => 0,
|
|
'response_time' => $elapsed,
|
|
));
|
|
throw new Exception($response->get_error_message());
|
|
}
|
|
|
|
$this->emit_progress('api_response', array(
|
|
'success' => true,
|
|
'status_code' => 200,
|
|
'record_count' => isset($response['value']) ? count($response['value']) : 0,
|
|
'response_time' => $elapsed,
|
|
'has_more' => isset($response['@odata.nextLink']),
|
|
));
|
|
|
|
// Process pages
|
|
$continue = true;
|
|
while ($continue && isset($response['value'])) {
|
|
foreach ($response['value'] as $property) {
|
|
if ($limit && $this->stats['processed'] >= $limit) {
|
|
$continue = false;
|
|
break;
|
|
}
|
|
|
|
$this->process_property($property, $dry_run);
|
|
}
|
|
|
|
// Emit page complete event
|
|
$this->emit_progress('page_complete', array('processed' => $this->stats['processed']));
|
|
|
|
// Check for next page
|
|
if ($continue && isset($response['@odata.nextLink'])) {
|
|
// Save progress
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'last_next_link' => $response['@odata.nextLink'],
|
|
'records_processed' => $this->stats['processed'],
|
|
'records_created' => $this->stats['created'],
|
|
'records_updated' => $this->stats['updated'],
|
|
));
|
|
}
|
|
|
|
$start_time = microtime(true);
|
|
$this->emit_progress('api_request', array(
|
|
'method' => 'GET',
|
|
'url' => 'Property (next page)',
|
|
'params' => array('page' => 'next'),
|
|
));
|
|
|
|
$response = $this->api_client->get_next_page($response['@odata.nextLink']);
|
|
$elapsed = round((microtime(true) - $start_time) * 1000);
|
|
|
|
if (is_wp_error($response)) {
|
|
$this->emit_progress('api_response', array(
|
|
'success' => false,
|
|
'status_code' => 0,
|
|
'error' => $response->get_error_message(),
|
|
'record_count' => 0,
|
|
'response_time' => $elapsed,
|
|
));
|
|
throw new Exception($response->get_error_message());
|
|
}
|
|
|
|
$this->emit_progress('api_response', array(
|
|
'success' => true,
|
|
'status_code' => 200,
|
|
'record_count' => isset($response['value']) ? count($response['value']) : 0,
|
|
'response_time' => $elapsed,
|
|
'has_more' => isset($response['@odata.nextLink']),
|
|
));
|
|
} else {
|
|
$continue = false;
|
|
}
|
|
}
|
|
|
|
// Mark sync as completed
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_COMPLETED,
|
|
'completed_at' => current_time('mysql'),
|
|
'records_processed' => $this->stats['processed'],
|
|
'records_created' => $this->stats['created'],
|
|
'records_updated' => $this->stats['updated'],
|
|
'records_deleted' => $this->stats['deleted'],
|
|
));
|
|
|
|
// Update last sync time
|
|
$options = mls_plugin()->get_options();
|
|
$options->update_last_sync('full');
|
|
}
|
|
|
|
$this->logger->info('Full sync completed', $this->stats);
|
|
|
|
} catch (Exception $e) {
|
|
$this->logger->error('Full sync failed', array('error' => $e->getMessage()));
|
|
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_FAILED,
|
|
'last_error' => $e->getMessage(),
|
|
'error_count' => $this->stats['errors'] + 1,
|
|
));
|
|
}
|
|
|
|
return array(
|
|
'success' => false,
|
|
'error' => $e->getMessage(),
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
return array(
|
|
'success' => true,
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Run incremental sync (replication)
|
|
*
|
|
* Fetches all properties modified since last sync, including those that:
|
|
* - Became unavailable (MlgCanView=false)
|
|
* - Changed status (Active -> Sold)
|
|
*
|
|
* Properties are deleted from local DB if MlgCanView=false or status not Active/Pending.
|
|
*
|
|
* @param bool $dry_run If true, don't make changes
|
|
* @param callable|null $progress_callback Callback for progress updates
|
|
* @return array Sync results
|
|
*/
|
|
public function run_incremental_sync($dry_run = false, $progress_callback = null) {
|
|
// Get last modification timestamp
|
|
$last_timestamp = $this->get_last_modification_timestamp();
|
|
|
|
if (!$last_timestamp) {
|
|
$this->logger->info('No previous sync found, running full sync instead');
|
|
return $this->run_full_sync($dry_run, null, $progress_callback);
|
|
}
|
|
|
|
$this->logger->info('Starting replication sync', array(
|
|
'since' => $last_timestamp,
|
|
'dry_run' => $dry_run,
|
|
));
|
|
|
|
// Store progress callback for use in process_property
|
|
$this->progress_callback = $progress_callback;
|
|
|
|
if (!$dry_run) {
|
|
$this->sync_state_id = $this->create_sync_state(self::TYPE_INCREMENTAL);
|
|
$this->logger->set_sync_state($this->sync_state_id);
|
|
}
|
|
|
|
$this->stats = array(
|
|
'processed' => 0,
|
|
'created' => 0,
|
|
'updated' => 0,
|
|
'deleted' => 0,
|
|
'errors' => 0,
|
|
);
|
|
|
|
try {
|
|
// Get ALL modified properties (no MlgCanView or status filter for replication)
|
|
$start_time = microtime(true);
|
|
$this->emit_progress('api_request', array(
|
|
'method' => 'GET',
|
|
'url' => 'Property',
|
|
'params' => array('type' => 'replication', 'since' => $last_timestamp),
|
|
));
|
|
|
|
$response = $this->api_client->get_properties_for_replication($last_timestamp, 'Media');
|
|
$elapsed = round((microtime(true) - $start_time) * 1000);
|
|
|
|
if (is_wp_error($response)) {
|
|
$this->emit_progress('api_response', array(
|
|
'success' => false,
|
|
'status_code' => 0,
|
|
'error' => $response->get_error_message(),
|
|
'record_count' => 0,
|
|
'response_time' => $elapsed,
|
|
));
|
|
throw new Exception($response->get_error_message());
|
|
}
|
|
|
|
$this->emit_progress('api_response', array(
|
|
'success' => true,
|
|
'status_code' => 200,
|
|
'record_count' => isset($response['value']) ? count($response['value']) : 0,
|
|
'response_time' => $elapsed,
|
|
'has_more' => isset($response['@odata.nextLink']),
|
|
));
|
|
|
|
// Process pages
|
|
while (isset($response['value'])) {
|
|
foreach ($response['value'] as $property) {
|
|
$this->process_property($property, $dry_run);
|
|
}
|
|
|
|
// Emit page complete event
|
|
$this->emit_progress('page_complete', array('processed' => $this->stats['processed']));
|
|
|
|
// Check for next page
|
|
if (isset($response['@odata.nextLink'])) {
|
|
// Save progress for resume capability
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'last_next_link' => $response['@odata.nextLink'],
|
|
'records_processed' => $this->stats['processed'],
|
|
'records_created' => $this->stats['created'],
|
|
'records_updated' => $this->stats['updated'],
|
|
'records_deleted' => $this->stats['deleted'],
|
|
));
|
|
}
|
|
|
|
$start_time = microtime(true);
|
|
$this->emit_progress('api_request', array(
|
|
'method' => 'GET',
|
|
'url' => 'Property (next page)',
|
|
'params' => array('page' => 'next'),
|
|
));
|
|
|
|
$response = $this->api_client->get_next_page($response['@odata.nextLink']);
|
|
$elapsed = round((microtime(true) - $start_time) * 1000);
|
|
|
|
if (is_wp_error($response)) {
|
|
$this->emit_progress('api_response', array(
|
|
'success' => false,
|
|
'status_code' => 0,
|
|
'error' => $response->get_error_message(),
|
|
'record_count' => 0,
|
|
'response_time' => $elapsed,
|
|
));
|
|
throw new Exception($response->get_error_message());
|
|
}
|
|
|
|
$this->emit_progress('api_response', array(
|
|
'success' => true,
|
|
'status_code' => 200,
|
|
'record_count' => isset($response['value']) ? count($response['value']) : 0,
|
|
'response_time' => $elapsed,
|
|
'has_more' => isset($response['@odata.nextLink']),
|
|
));
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Mark sync as completed
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_COMPLETED,
|
|
'completed_at' => current_time('mysql'),
|
|
'records_processed' => $this->stats['processed'],
|
|
'records_created' => $this->stats['created'],
|
|
'records_updated' => $this->stats['updated'],
|
|
'records_deleted' => $this->stats['deleted'],
|
|
));
|
|
|
|
$options = mls_plugin()->get_options();
|
|
$options->update_last_sync('incremental');
|
|
}
|
|
|
|
$this->logger->info('Incremental sync completed', $this->stats);
|
|
|
|
// Download pending media for HomeProz properties
|
|
// This catches any HomeProz listings that have media records but images weren't downloaded
|
|
$media_stats = $this->download_pending_homeproz_media($dry_run);
|
|
$this->stats['homeproz_media_downloaded'] = $media_stats['downloaded'];
|
|
$this->stats['homeproz_media_skipped'] = $media_stats['skipped'];
|
|
|
|
} catch (Exception $e) {
|
|
$this->logger->error('Incremental sync failed', array('error' => $e->getMessage()));
|
|
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_FAILED,
|
|
'last_error' => $e->getMessage(),
|
|
));
|
|
}
|
|
|
|
return array(
|
|
'success' => false,
|
|
'error' => $e->getMessage(),
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
return array(
|
|
'success' => true,
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Run media refresh sync for properties with expiring media URLs
|
|
*
|
|
* Fetches fresh data for properties whose media URLs will expire within
|
|
* the specified number of days. This prevents on-demand API calls when
|
|
* visitors try to view images with expired URLs.
|
|
*
|
|
* If a property is no longer listed (not Active/Pending or MlgCanView=false),
|
|
* it will be removed from the local database.
|
|
*
|
|
* @param int $days_ahead Number of days to look ahead for expiring media (default: 3)
|
|
* @param bool $dry_run If true, don't make changes
|
|
* @param callable|null $progress_callback Callback for progress updates
|
|
* @return array Sync results
|
|
*/
|
|
public function run_media_refresh_sync($days_ahead = 3, $dry_run = false, $progress_callback = null) {
|
|
global $wpdb;
|
|
|
|
$this->logger->info('Starting media refresh sync', array(
|
|
'days_ahead' => $days_ahead,
|
|
'dry_run' => $dry_run,
|
|
));
|
|
|
|
$this->progress_callback = $progress_callback;
|
|
|
|
if (!$dry_run) {
|
|
$this->sync_state_id = $this->create_sync_state(self::TYPE_MEDIA_REFRESH);
|
|
$this->logger->set_sync_state($this->sync_state_id);
|
|
}
|
|
|
|
$this->stats = array(
|
|
'processed' => 0,
|
|
'created' => 0,
|
|
'updated' => 0,
|
|
'deleted' => 0,
|
|
'errors' => 0,
|
|
);
|
|
|
|
try {
|
|
// Find properties with media expiring within X days
|
|
$expiry_threshold = gmdate('Y-m-d H:i:s', strtotime("+{$days_ahead} days"));
|
|
|
|
// Include Active/Pending properties, plus Closed HomeProz properties
|
|
// (HomeProz wants to keep sold property images for portfolio)
|
|
$properties = $wpdb->get_results($wpdb->prepare(
|
|
"SELECT listing_key, listing_id, media_expires_at
|
|
FROM {$this->db->properties_table()}
|
|
WHERE (media_expires_at IS NULL OR media_expires_at <= %s)
|
|
AND (standard_status IN ('Active', 'Pending') OR (standard_status = 'Closed' AND is_homeproz = 1))
|
|
ORDER BY media_expires_at ASC",
|
|
$expiry_threshold
|
|
));
|
|
|
|
$total = count($properties);
|
|
$this->logger->info("Found {$total} properties with expiring media");
|
|
|
|
$this->emit_progress('media_refresh_start', array(
|
|
'total' => $total,
|
|
'expiry_threshold' => $expiry_threshold,
|
|
));
|
|
|
|
// Process in batches of 25 (MLS Grid max with $expand)
|
|
$batch_size = 25;
|
|
$batches = array_chunk($properties, $batch_size);
|
|
$batch_num = 0;
|
|
|
|
foreach ($batches as $batch) {
|
|
$batch_num++;
|
|
|
|
// Build array of listing_ids for this batch
|
|
$listing_ids = array_map(function($prop) {
|
|
return $prop->listing_id;
|
|
}, $batch);
|
|
|
|
// Fetch batch from API
|
|
$start_time = microtime(true);
|
|
$this->emit_progress('api_request', array(
|
|
'method' => 'GET',
|
|
'url' => 'Property',
|
|
'params' => array('batch' => $batch_num, 'count' => count($listing_ids)),
|
|
));
|
|
|
|
$response = $this->api_client->get_properties_by_ids($listing_ids);
|
|
$elapsed = round((microtime(true) - $start_time) * 1000);
|
|
|
|
if (is_wp_error($response)) {
|
|
$this->emit_progress('api_response', array(
|
|
'success' => false,
|
|
'status_code' => 0,
|
|
'error' => $response->get_error_message(),
|
|
'response_time' => $elapsed,
|
|
));
|
|
|
|
// Mark all in batch as errors
|
|
foreach ($batch as $prop) {
|
|
$this->stats['processed']++;
|
|
$this->stats['errors']++;
|
|
}
|
|
$this->logger->warning('Failed to fetch batch for media refresh', array(
|
|
'batch' => $batch_num,
|
|
'error' => $response->get_error_message(),
|
|
));
|
|
continue;
|
|
}
|
|
|
|
$returned_count = isset($response['value']) ? count($response['value']) : 0;
|
|
$this->emit_progress('api_response', array(
|
|
'success' => true,
|
|
'status_code' => 200,
|
|
'response_time' => $elapsed,
|
|
'record_count' => $returned_count,
|
|
));
|
|
|
|
// Index returned properties by ListingId
|
|
$returned_by_id = array();
|
|
if (isset($response['value'])) {
|
|
foreach ($response['value'] as $property_data) {
|
|
if (isset($property_data['ListingId'])) {
|
|
$returned_by_id[$property_data['ListingId']] = $property_data;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process each property in the batch
|
|
foreach ($batch as $prop) {
|
|
$this->stats['processed']++;
|
|
|
|
if (isset($returned_by_id[$prop->listing_id])) {
|
|
// Property found - process_property handles its own progress events
|
|
$property_data = $returned_by_id[$prop->listing_id];
|
|
|
|
if (!$dry_run) {
|
|
$this->process_property($property_data, false);
|
|
} else {
|
|
$this->stats['updated']++;
|
|
$this->emit_progress('property_skipped', array(
|
|
'listing_key' => $prop->listing_key,
|
|
));
|
|
}
|
|
} else {
|
|
// Property not in API response - may have been removed
|
|
if (!$dry_run) {
|
|
$this->delete_property($prop->listing_key);
|
|
}
|
|
$this->stats['deleted']++;
|
|
$this->emit_progress('property_deleted', array(
|
|
'listing_key' => $prop->listing_key,
|
|
'reason' => 'Not found in API',
|
|
));
|
|
}
|
|
}
|
|
|
|
// Update sync state after each batch
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'records_processed' => $this->stats['processed'],
|
|
'records_updated' => $this->stats['updated'],
|
|
'records_deleted' => $this->stats['deleted'],
|
|
));
|
|
}
|
|
|
|
// Emit batch/page complete
|
|
$this->emit_progress('page_complete', array('processed' => $this->stats['processed']));
|
|
}
|
|
|
|
// Mark sync as completed
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_COMPLETED,
|
|
'completed_at' => current_time('mysql'),
|
|
'records_processed' => $this->stats['processed'],
|
|
'records_updated' => $this->stats['updated'],
|
|
'records_deleted' => $this->stats['deleted'],
|
|
));
|
|
}
|
|
|
|
$this->logger->info('Media refresh sync completed', $this->stats);
|
|
|
|
$this->emit_progress('media_refresh_complete', array(
|
|
'stats' => $this->stats,
|
|
));
|
|
|
|
} catch (Exception $e) {
|
|
$this->logger->error('Media refresh sync failed', array('error' => $e->getMessage()));
|
|
|
|
if (!$dry_run) {
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_FAILED,
|
|
'last_error' => $e->getMessage(),
|
|
));
|
|
}
|
|
|
|
return array(
|
|
'success' => false,
|
|
'error' => $e->getMessage(),
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
return array(
|
|
'success' => true,
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Resume an interrupted sync
|
|
*
|
|
* @param int $sync_state_id Sync state ID to resume
|
|
* @param callable|null $progress_callback Progress callback
|
|
* @return array Sync results
|
|
*/
|
|
public function resume_sync($sync_state_id, $progress_callback = null) {
|
|
global $wpdb;
|
|
|
|
$state = $wpdb->get_row($wpdb->prepare(
|
|
"SELECT * FROM {$this->db->sync_state_table()} WHERE id = %d",
|
|
$sync_state_id
|
|
));
|
|
|
|
if (!$state) {
|
|
return array(
|
|
'success' => false,
|
|
'error' => 'Sync state not found',
|
|
);
|
|
}
|
|
|
|
if ($state->status === self::STATUS_COMPLETED) {
|
|
return array(
|
|
'success' => false,
|
|
'error' => 'Sync already completed',
|
|
);
|
|
}
|
|
|
|
$this->sync_state_id = $sync_state_id;
|
|
$this->progress_callback = $progress_callback;
|
|
$this->logger->set_sync_state($sync_state_id);
|
|
$this->logger->info('Resuming sync', array('sync_state_id' => $sync_state_id));
|
|
|
|
// Load existing stats
|
|
$this->stats = array(
|
|
'processed' => (int) $state->records_processed,
|
|
'created' => (int) $state->records_created,
|
|
'updated' => (int) $state->records_updated,
|
|
'deleted' => (int) $state->records_deleted,
|
|
'errors' => (int) $state->error_count,
|
|
);
|
|
|
|
// Update status to running
|
|
$this->update_sync_state(array('status' => self::STATUS_RUNNING));
|
|
|
|
try {
|
|
// Resume from last next_link
|
|
if ($state->last_next_link) {
|
|
$response = $this->api_client->get_next_page($state->last_next_link);
|
|
} else {
|
|
// Start fresh
|
|
$response = $this->api_client->get_properties_for_sync(
|
|
$state->last_modification_timestamp,
|
|
'Media'
|
|
);
|
|
}
|
|
|
|
if (is_wp_error($response)) {
|
|
throw new Exception($response->get_error_message());
|
|
}
|
|
|
|
// Process remaining pages
|
|
while (isset($response['value'])) {
|
|
foreach ($response['value'] as $property) {
|
|
$this->process_property($property, false);
|
|
}
|
|
|
|
// Emit page complete event
|
|
$this->emit_progress('page_complete', array('processed' => $this->stats['processed']));
|
|
|
|
if (isset($response['@odata.nextLink'])) {
|
|
$this->update_sync_state(array(
|
|
'last_next_link' => $response['@odata.nextLink'],
|
|
'records_processed' => $this->stats['processed'],
|
|
));
|
|
|
|
$response = $this->api_client->get_next_page($response['@odata.nextLink']);
|
|
|
|
if (is_wp_error($response)) {
|
|
throw new Exception($response->get_error_message());
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_COMPLETED,
|
|
'completed_at' => current_time('mysql'),
|
|
'records_processed' => $this->stats['processed'],
|
|
'records_created' => $this->stats['created'],
|
|
'records_updated' => $this->stats['updated'],
|
|
'records_deleted' => $this->stats['deleted'],
|
|
));
|
|
|
|
$this->logger->info('Resume sync completed', $this->stats);
|
|
|
|
} catch (Exception $e) {
|
|
$this->logger->error('Resume sync failed', array('error' => $e->getMessage()));
|
|
|
|
$this->update_sync_state(array(
|
|
'status' => self::STATUS_FAILED,
|
|
'last_error' => $e->getMessage(),
|
|
));
|
|
|
|
return array(
|
|
'success' => false,
|
|
'error' => $e->getMessage(),
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
return array(
|
|
'success' => true,
|
|
'stats' => $this->stats,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Progress callback reference
|
|
*/
|
|
private $progress_callback = null;
|
|
|
|
/**
|
|
* Allowed statuses for non-HomeProz listings (Active/Pending only)
|
|
*/
|
|
const ALLOWED_STATUSES = array('Active', 'Pending');
|
|
|
|
/**
|
|
* Allowed statuses for HomeProz listings (includes Closed for historical records)
|
|
*/
|
|
const HOMEPROZ_ALLOWED_STATUSES = array('Active', 'Pending', 'Closed');
|
|
|
|
/**
|
|
* Process a single property record
|
|
*
|
|
* During replication, properties are deleted if:
|
|
* - MlgCanView = false (removed from feed)
|
|
* - StandardStatus not in allowed list (varies by HomeProz status)
|
|
*
|
|
* HomeProz listings are retained even when Closed (sold) for historical viewing.
|
|
* Non-HomeProz listings are deleted when status is not Active/Pending.
|
|
*
|
|
* @param array $property Property data from API
|
|
* @param bool $dry_run If true, don't make changes
|
|
*/
|
|
private function process_property($property, $dry_run = false) {
|
|
global $wpdb;
|
|
|
|
$this->stats['processed']++;
|
|
|
|
$listing_key = $property['ListingKey'] ?? null;
|
|
if (!$listing_key) {
|
|
$this->stats['errors']++;
|
|
$this->logger->warning('Property missing ListingKey', array('property' => $property));
|
|
return;
|
|
}
|
|
|
|
// Check MlgCanView and StandardStatus
|
|
$can_view = $property['MlgCanView'] ?? true;
|
|
$status = $property['StandardStatus'] ?? null;
|
|
|
|
// Check if this is a HomeProz listing (by office ID or override list)
|
|
$listing_id = $property['ListingId'] ?? '';
|
|
$is_homeproz = (($property['ListOfficeMlsId'] ?? '') === MLS_HOMEPROZ_OFFICE_ID)
|
|
|| (defined('MLS_HOMEPROZ_OVERRIDE_LISTINGS') && in_array($listing_id, MLS_HOMEPROZ_OVERRIDE_LISTINGS));
|
|
|
|
// Determine allowed statuses based on whether it's a HomeProz listing
|
|
$allowed_statuses = $is_homeproz ? self::HOMEPROZ_ALLOWED_STATUSES : self::ALLOWED_STATUSES;
|
|
|
|
// Delete if: not viewable OR status is not in allowed list
|
|
// HomeProz listings are retained even when Closed (sold)
|
|
$should_delete = !$can_view || !in_array($status, $allowed_statuses);
|
|
|
|
if ($should_delete) {
|
|
// Check if we have this record locally before attempting delete
|
|
$exists_locally = $wpdb->get_var($wpdb->prepare(
|
|
"SELECT id FROM {$this->db->properties_table()} WHERE listing_key = %s",
|
|
$listing_key
|
|
));
|
|
|
|
if ($exists_locally) {
|
|
if (!$dry_run) {
|
|
$this->delete_property($listing_key);
|
|
}
|
|
$this->stats['deleted']++;
|
|
$this->emit_progress('property_deleted', array(
|
|
'listing_key' => $listing_key,
|
|
'reason' => !$can_view ? 'MlgCanView=false' : "Status={$status}",
|
|
));
|
|
}
|
|
// If not in our DB, just skip silently (e.g., Sold property we never had)
|
|
return;
|
|
}
|
|
|
|
// Check if property exists
|
|
$existing = $wpdb->get_var($wpdb->prepare(
|
|
"SELECT id FROM {$this->db->properties_table()} WHERE listing_key = %s",
|
|
$listing_key
|
|
));
|
|
|
|
// Prepare data for insert/update
|
|
$data = $this->map_property_data($property);
|
|
|
|
if ($dry_run) {
|
|
if ($existing) {
|
|
$this->stats['updated']++;
|
|
$this->emit_progress('property_skipped', array('listing_key' => $listing_key));
|
|
} else {
|
|
$this->stats['created']++;
|
|
$this->emit_progress('property_skipped', array('listing_key' => $listing_key));
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Build spatial location value for the NOT NULL location column
|
|
$lat = $property['Latitude'] ?? null;
|
|
$lng = $property['Longitude'] ?? null;
|
|
$has_coords = ($lat !== null && $lng !== null);
|
|
$point_lat = $has_coords ? (float) $lat : 0.0;
|
|
$point_lng = $has_coords ? (float) $lng : 0.0;
|
|
|
|
if ($existing) {
|
|
// Update existing
|
|
$wpdb->update(
|
|
$this->db->properties_table(),
|
|
$data,
|
|
array('listing_key' => $listing_key)
|
|
);
|
|
$this->stats['updated']++;
|
|
$this->emit_progress('property_updated', array('listing_key' => $listing_key));
|
|
|
|
// Update spatial location column (wpdb can't handle ST_PointFromText directly)
|
|
$wpdb->query($wpdb->prepare(
|
|
"UPDATE {$this->db->properties_table()} SET location = ST_PointFromText(CONCAT('POINT(', %f, ' ', %f, ')'), 4326) WHERE listing_key = %s",
|
|
$point_lat,
|
|
$point_lng,
|
|
$listing_key
|
|
));
|
|
} else {
|
|
// Insert new -- must use raw SQL to include the NOT NULL spatial location column
|
|
$data['listing_key'] = $listing_key;
|
|
$data['created_at'] = current_time('mysql');
|
|
|
|
$columns = array();
|
|
$placeholders = array();
|
|
$values = array();
|
|
foreach ($data as $col => $val) {
|
|
$columns[] = "`{$col}`";
|
|
if ($val === null) {
|
|
$placeholders[] = 'NULL';
|
|
} elseif (is_int($val) || is_float($val)) {
|
|
$placeholders[] = is_int($val) ? '%d' : '%f';
|
|
$values[] = $val;
|
|
} else {
|
|
$placeholders[] = '%s';
|
|
$values[] = $val;
|
|
}
|
|
}
|
|
|
|
// Append spatial location column
|
|
$columns[] = '`location`';
|
|
$placeholders[] = "ST_PointFromText(CONCAT('POINT(', %f, ' ', %f, ')'), 4326)";
|
|
$values[] = $point_lat;
|
|
$values[] = $point_lng;
|
|
|
|
$sql = "INSERT INTO {$this->db->properties_table()} (" . implode(', ', $columns) . ") VALUES (" . implode(', ', $placeholders) . ")";
|
|
$wpdb->query($wpdb->prepare($sql, $values));
|
|
|
|
$this->stats['created']++;
|
|
$this->emit_progress('property_created', array('listing_key' => $listing_key));
|
|
}
|
|
|
|
// Process media if present
|
|
if (isset($property['Media']) && is_array($property['Media'])) {
|
|
$this->media_handler->sync_property_media($listing_key, $property['Media'], false, $this->progress_callback);
|
|
|
|
// Auto-download and cache all images for HomeProz listings
|
|
// These images are stored in persistent cache and never garbage collected
|
|
if ($is_homeproz) {
|
|
$this->media_handler->download_homeproz_images($listing_key, $this->progress_callback);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Download pending media for all HomeProz properties
|
|
*
|
|
* Finds HomeProz properties that have media records with pending download status
|
|
* and downloads them. This ensures HomeProz images are always cached locally.
|
|
*
|
|
* @param bool $dry_run If true, don't download
|
|
* @return array Stats with 'downloaded' and 'skipped' counts
|
|
*/
|
|
private function download_pending_homeproz_media($dry_run = false) {
|
|
global $wpdb;
|
|
|
|
$stats = array('downloaded' => 0, 'skipped' => 0, 'properties' => 0);
|
|
|
|
// Find HomeProz properties with pending media downloads
|
|
$properties_table = $this->db->properties_table();
|
|
$media_table = $this->db->media_table();
|
|
|
|
$homeproz_with_pending = $wpdb->get_results(
|
|
"SELECT DISTINCT p.listing_key
|
|
FROM {$properties_table} p
|
|
INNER JOIN {$media_table} m ON p.listing_key = m.listing_key
|
|
WHERE p.is_homeproz = 1
|
|
AND m.download_status = 'pending'
|
|
AND m.media_url IS NOT NULL
|
|
ORDER BY p.modification_timestamp DESC"
|
|
);
|
|
|
|
if (empty($homeproz_with_pending)) {
|
|
$this->logger->info('No HomeProz properties with pending media downloads');
|
|
return $stats;
|
|
}
|
|
|
|
$this->logger->info('Found HomeProz properties with pending media', array(
|
|
'count' => count($homeproz_with_pending),
|
|
));
|
|
|
|
$this->emit_progress('homeproz_media_start', array(
|
|
'total_properties' => count($homeproz_with_pending),
|
|
));
|
|
|
|
$property_count = count($homeproz_with_pending);
|
|
$current = 0;
|
|
|
|
foreach ($homeproz_with_pending as $row) {
|
|
$current++;
|
|
|
|
if ($dry_run) {
|
|
$stats['properties']++;
|
|
continue;
|
|
}
|
|
|
|
$this->logger->info('Downloading HomeProz media', array(
|
|
'listing_key' => $row->listing_key,
|
|
'progress' => "{$current}/{$property_count}",
|
|
));
|
|
|
|
// Download with 10-second delay between each image to respect MLS API limits
|
|
$result = $this->media_handler->download_homeproz_images(
|
|
$row->listing_key,
|
|
$this->progress_callback,
|
|
10 // delay_seconds between each image
|
|
);
|
|
|
|
$stats['downloaded'] += $result['downloaded'];
|
|
$stats['skipped'] += $result['skipped'];
|
|
$stats['properties']++;
|
|
}
|
|
|
|
$this->emit_progress('homeproz_media_complete', $stats);
|
|
|
|
$this->logger->info('HomeProz media download completed', $stats);
|
|
|
|
return $stats;
|
|
}
|
|
|
|
/**
|
|
* Emit progress event
|
|
*
|
|
* @param string $event Event name
|
|
* @param array $data Event data
|
|
*/
|
|
private function emit_progress($event, $data = array()) {
|
|
if ($this->progress_callback) {
|
|
call_user_func($this->progress_callback, $event, $data);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Map API property data to database columns
|
|
*
|
|
* @param array $property API property data
|
|
* @return array Mapped data for database
|
|
*/
|
|
private function map_property_data($property) {
|
|
// Validate coordinates against state boundaries
|
|
$coordinates_invalid = MLS_Geo_Validator::validate_coordinates(
|
|
$property['Latitude'] ?? null,
|
|
$property['Longitude'] ?? null,
|
|
$property['StateOrProvince'] ?? 'MN'
|
|
) ? 0 : 1;
|
|
|
|
return array(
|
|
'listing_id' => $property['ListingId'] ?? null,
|
|
'originating_system' => $property['OriginatingSystemName'] ?? 'northstar',
|
|
'standard_status' => $property['StandardStatus'] ?? null,
|
|
'mls_status' => $property['MlsStatus'] ?? null,
|
|
'mlg_can_view' => isset($property['MlgCanView']) ? ($property['MlgCanView'] ? 1 : 0) : 1,
|
|
|
|
'list_price' => $property['ListPrice'] ?? null,
|
|
'original_list_price' => $property['OriginalListPrice'] ?? null,
|
|
'close_price' => $property['ClosePrice'] ?? null,
|
|
|
|
'street_number' => $property['StreetNumber'] ?? null,
|
|
'street_name' => $property['StreetName'] ?? null,
|
|
'street_suffix' => $property['StreetSuffix'] ?? null,
|
|
'unit_number' => $property['UnitNumber'] ?? null,
|
|
'city' => $property['City'] ?? null,
|
|
'state_or_province' => $property['StateOrProvince'] ?? 'MN',
|
|
'postal_code' => $property['PostalCode'] ?? null,
|
|
'county' => $property['CountyOrParish'] ?? null,
|
|
'latitude' => $property['Latitude'] ?? null,
|
|
'longitude' => $property['Longitude'] ?? null,
|
|
'coordinates_invalid' => $coordinates_invalid,
|
|
|
|
'property_type' => $property['PropertyType'] ?? null,
|
|
'property_sub_type' => $property['PropertySubType'] ?? null,
|
|
'bedrooms_total' => $property['BedroomsTotal'] ?? null,
|
|
'bathrooms_total' => $property['BathroomsTotalInteger'] ?? null,
|
|
'bathrooms_full' => $property['BathroomsFull'] ?? null,
|
|
'bathrooms_half' => $property['BathroomsHalf'] ?? null,
|
|
'living_area' => $property['LivingArea'] ?? null,
|
|
'lot_size_area' => $property['LotSizeArea'] ?? null,
|
|
'lot_size_units' => $property['LotSizeUnits'] ?? null,
|
|
'year_built' => $property['YearBuilt'] ?? null,
|
|
'garage_spaces' => $property['GarageSpaces'] ?? null,
|
|
|
|
'public_remarks' => $property['PublicRemarks'] ?? null,
|
|
'directions' => $property['Directions'] ?? null,
|
|
|
|
'list_agent_key' => $property['ListAgentKey'] ?? null,
|
|
'list_agent_mls_id' => $property['ListAgentMlsId'] ?? null,
|
|
'list_office_key' => $property['ListOfficeKey'] ?? null,
|
|
'list_office_mls_id' => $property['ListOfficeMlsId'] ?? null,
|
|
'list_office_name' => $property['ListOfficeName'] ?? null,
|
|
'is_homeproz' => (
|
|
(($property['ListOfficeMlsId'] ?? '') === MLS_HOMEPROZ_OFFICE_ID)
|
|
|| (defined('MLS_HOMEPROZ_OVERRIDE_LISTINGS') && in_array($property['ListingId'] ?? '', MLS_HOMEPROZ_OVERRIDE_LISTINGS))
|
|
) ? 1 : 0,
|
|
|
|
'photos_count' => $property['PhotosCount'] ?? 0,
|
|
'modification_timestamp' => $this->format_timestamp($property['ModificationTimestamp'] ?? null),
|
|
'photos_change_timestamp' => $this->format_timestamp($property['PhotosChangeTimestamp'] ?? null),
|
|
'listing_contract_date' => $this->format_date($property['ListingContractDate'] ?? null),
|
|
'close_date' => $this->format_date($property['CloseDate'] ?? null),
|
|
'days_on_market' => $property['DaysOnMarket'] ?? null,
|
|
|
|
'raw_data' => wp_json_encode($property),
|
|
'updated_at' => current_time('mysql'),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Format ISO 8601 timestamp to MySQL datetime
|
|
*
|
|
* @param string|null $timestamp ISO 8601 timestamp
|
|
* @return string|null MySQL datetime
|
|
*/
|
|
private function format_timestamp($timestamp) {
|
|
if (!$timestamp) {
|
|
return null;
|
|
}
|
|
$dt = new DateTime($timestamp);
|
|
return $dt->format('Y-m-d H:i:s');
|
|
}
|
|
|
|
/**
|
|
* Format date string to MySQL date
|
|
*
|
|
* @param string|null $date Date string
|
|
* @return string|null MySQL date
|
|
*/
|
|
private function format_date($date) {
|
|
if (!$date) {
|
|
return null;
|
|
}
|
|
return date('Y-m-d', strtotime($date));
|
|
}
|
|
|
|
/**
|
|
* Delete a property and its media
|
|
*
|
|
* @param string $listing_key Listing key
|
|
*/
|
|
private function delete_property($listing_key) {
|
|
global $wpdb;
|
|
|
|
// Delete media files
|
|
$this->media_handler->delete_property_media($listing_key);
|
|
|
|
// Delete from database
|
|
$wpdb->delete(
|
|
$this->db->properties_table(),
|
|
array('listing_key' => $listing_key)
|
|
);
|
|
|
|
$this->logger->debug('Deleted property', array('listing_key' => $listing_key));
|
|
}
|
|
|
|
/**
|
|
* Create a sync state record
|
|
*
|
|
* @param string $type Sync type
|
|
* @return int Sync state ID
|
|
*/
|
|
private function create_sync_state($type) {
|
|
global $wpdb;
|
|
|
|
$wpdb->insert(
|
|
$this->db->sync_state_table(),
|
|
array(
|
|
'sync_type' => $type,
|
|
'entity_type' => 'Property',
|
|
'status' => self::STATUS_RUNNING,
|
|
'started_at' => current_time('mysql'),
|
|
'created_at' => current_time('mysql'),
|
|
)
|
|
);
|
|
|
|
return $wpdb->insert_id;
|
|
}
|
|
|
|
/**
|
|
* Update sync state record
|
|
*
|
|
* @param array $data Data to update
|
|
*/
|
|
private function update_sync_state($data) {
|
|
global $wpdb;
|
|
|
|
if (!$this->sync_state_id) {
|
|
return;
|
|
}
|
|
|
|
$data['updated_at'] = current_time('mysql');
|
|
|
|
$wpdb->update(
|
|
$this->db->sync_state_table(),
|
|
$data,
|
|
array('id' => $this->sync_state_id)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Get last modification timestamp from synced data
|
|
*
|
|
* @return string|null ISO 8601 timestamp
|
|
*/
|
|
private function get_last_modification_timestamp() {
|
|
global $wpdb;
|
|
|
|
$timestamp = $wpdb->get_var(
|
|
"SELECT MAX(modification_timestamp) FROM {$this->db->properties_table()}"
|
|
);
|
|
|
|
if ($timestamp) {
|
|
// Look back 10 minutes past the latest timestamp as a safety margin
|
|
// to catch any records that may have been missed due to race conditions
|
|
// or clock skew between our DB and the MLS API
|
|
$dt = new DateTime($timestamp);
|
|
$dt->modify('-10 minutes');
|
|
return $dt->format('Y-m-d\TH:i:s.v\Z');
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Get sync status
|
|
*
|
|
* @return array Sync status
|
|
*/
|
|
public function get_status() {
|
|
global $wpdb;
|
|
|
|
$last_sync = $wpdb->get_row(
|
|
"SELECT * FROM {$this->db->sync_state_table()}
|
|
WHERE status = 'completed'
|
|
ORDER BY completed_at DESC
|
|
LIMIT 1"
|
|
);
|
|
|
|
$running_sync = $wpdb->get_row(
|
|
"SELECT * FROM {$this->db->sync_state_table()}
|
|
WHERE status = 'running'
|
|
ORDER BY started_at DESC
|
|
LIMIT 1"
|
|
);
|
|
|
|
$failed_sync = $wpdb->get_row(
|
|
"SELECT * FROM {$this->db->sync_state_table()}
|
|
WHERE status = 'failed'
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1"
|
|
);
|
|
|
|
return array(
|
|
'last_sync' => $last_sync,
|
|
'running_sync' => $running_sync,
|
|
'last_failed' => $failed_sync,
|
|
'stats' => $this->db->get_stats(),
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Get resumable (failed or interrupted) syncs
|
|
*
|
|
* @return array List of resumable sync states
|
|
*/
|
|
public function get_resumable_syncs() {
|
|
global $wpdb;
|
|
|
|
// Find failed syncs that have a next_link (can be resumed)
|
|
// Also find "running" syncs older than 1 hour (likely interrupted)
|
|
$one_hour_ago = date('Y-m-d H:i:s', strtotime('-1 hour'));
|
|
|
|
return $wpdb->get_results($wpdb->prepare(
|
|
"SELECT * FROM {$this->db->sync_state_table()}
|
|
WHERE (status = 'failed' AND last_next_link IS NOT NULL)
|
|
OR (status = 'running' AND updated_at < %s)
|
|
ORDER BY updated_at DESC",
|
|
$one_hour_ago
|
|
));
|
|
}
|
|
|
|
/**
|
|
* Get the most recent resumable sync
|
|
*
|
|
* @return object|null Sync state or null
|
|
*/
|
|
public function get_latest_resumable() {
|
|
$resumable = $this->get_resumable_syncs();
|
|
return !empty($resumable) ? $resumable[0] : null;
|
|
}
|
|
|
|
/**
|
|
* Mark stale running syncs as failed
|
|
* Call this on startup to clean up interrupted syncs
|
|
*
|
|
* @param int $hours_threshold Hours after which running sync is considered stale
|
|
* @return int Number of syncs marked as failed
|
|
*/
|
|
public function cleanup_stale_syncs($hours_threshold = 1) {
|
|
global $wpdb;
|
|
|
|
$threshold_time = date('Y-m-d H:i:s', strtotime("-{$hours_threshold} hour"));
|
|
|
|
$updated = $wpdb->query($wpdb->prepare(
|
|
"UPDATE {$this->db->sync_state_table()}
|
|
SET status = 'failed', last_error = 'Sync interrupted (stale)'
|
|
WHERE status = 'running' AND updated_at < %s",
|
|
$threshold_time
|
|
));
|
|
|
|
if ($updated > 0) {
|
|
$this->logger->info('Cleaned up stale syncs', array('count' => $updated));
|
|
}
|
|
|
|
return $updated;
|
|
}
|
|
|
|
/**
|
|
* Auto-resume the most recent failed/interrupted sync
|
|
*
|
|
* @param callable|null $progress_callback Progress callback
|
|
* @return array|null Sync results or null if nothing to resume
|
|
*/
|
|
public function auto_resume($progress_callback = null) {
|
|
// First clean up any stale syncs
|
|
$this->cleanup_stale_syncs();
|
|
|
|
// Find the most recent resumable sync
|
|
$resumable = $this->get_latest_resumable();
|
|
|
|
if (!$resumable) {
|
|
return null;
|
|
}
|
|
|
|
$this->logger->info('Auto-resuming sync', array(
|
|
'sync_id' => $resumable->id,
|
|
'type' => $resumable->sync_type,
|
|
'processed' => $resumable->records_processed,
|
|
));
|
|
|
|
return $this->resume_sync($resumable->id, $progress_callback);
|
|
}
|
|
|
|
/**
|
|
* Smart sync - autonomous self-healing sync that handles all scenarios
|
|
*
|
|
* Decision logic:
|
|
* 1. If a sync is currently running (and not stale), abort
|
|
* 2. If there's a resumable failed/interrupted sync, resume it
|
|
* 3. If no data exists, run full sync
|
|
* 4. Otherwise, run incremental sync
|
|
*
|
|
* On failure, the sync state is preserved for future resume.
|
|
*
|
|
* @param callable|null $progress_callback Progress callback
|
|
* @param callable|null $status_callback Callback for status messages: function(string $message, string $level)
|
|
* @return array Sync results with 'action' key indicating what was done
|
|
*/
|
|
public function smart_sync($progress_callback = null, $status_callback = null) {
|
|
// Helper to emit status messages
|
|
$status = function($message, $level = 'info') use ($status_callback) {
|
|
if ($status_callback) {
|
|
call_user_func($status_callback, $message, $level);
|
|
}
|
|
$this->logger->log($level, $message);
|
|
};
|
|
|
|
// Step 1: Clean up stale syncs (running > 1 hour = probably dead)
|
|
$stale_cleaned = $this->cleanup_stale_syncs();
|
|
if ($stale_cleaned > 0) {
|
|
$status("Cleaned up {$stale_cleaned} stale sync(s)", 'info');
|
|
}
|
|
|
|
// Step 2: Check if a sync is actively running
|
|
$running = $this->get_running_sync();
|
|
if ($running) {
|
|
// If a full sync is in progress, exit silently so cron incremental
|
|
// syncs don't log warnings while the weekly full sync runs
|
|
if ($running->sync_type === 'full') {
|
|
$status("Full sync #{$running->id} in progress (started {$running->started_at}), skipping", 'info');
|
|
} else {
|
|
$status("Sync #{$running->id} is already running (started {$running->started_at})", 'warning');
|
|
}
|
|
return array(
|
|
'success' => false,
|
|
'action' => 'aborted',
|
|
'reason' => 'Sync already running',
|
|
'running_sync' => $running,
|
|
);
|
|
}
|
|
|
|
// Step 3: Check for resumable syncs
|
|
$resumable = $this->get_latest_resumable();
|
|
if ($resumable) {
|
|
$status("Found resumable sync #{$resumable->id} ({$resumable->sync_type}), processed {$resumable->records_processed} records", 'info');
|
|
$status("Resuming...", 'info');
|
|
|
|
$result = $this->resume_sync($resumable->id, $progress_callback);
|
|
$result['action'] = 'resumed';
|
|
$result['resumed_sync_id'] = $resumable->id;
|
|
return $result;
|
|
}
|
|
|
|
// Step 4: Check if we have any data
|
|
$has_data = $this->has_synced_data();
|
|
|
|
if (!$has_data) {
|
|
// No data - need full sync
|
|
$status("No existing data found, starting full sync", 'info');
|
|
|
|
$result = $this->run_full_sync(false, null, $progress_callback);
|
|
$result['action'] = 'full';
|
|
return $result;
|
|
}
|
|
|
|
// Step 5: We have data - run incremental sync
|
|
$last_timestamp = $this->get_last_modification_timestamp();
|
|
$status("Running incremental sync (changes since {$last_timestamp})", 'info');
|
|
|
|
$result = $this->run_incremental_sync(false, $progress_callback);
|
|
$result['action'] = 'incremental';
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* Check if there's a currently running sync (not stale)
|
|
*
|
|
* @return object|null Running sync state or null
|
|
*/
|
|
public function get_running_sync() {
|
|
global $wpdb;
|
|
|
|
$one_hour_ago = date('Y-m-d H:i:s', strtotime('-1 hour'));
|
|
|
|
return $wpdb->get_row($wpdb->prepare(
|
|
"SELECT * FROM {$this->db->sync_state_table()}
|
|
WHERE status = 'running' AND updated_at >= %s
|
|
ORDER BY started_at DESC
|
|
LIMIT 1",
|
|
$one_hour_ago
|
|
));
|
|
}
|
|
|
|
/**
|
|
* Check if we have any synced property data
|
|
*
|
|
* @return bool
|
|
*/
|
|
public function has_synced_data() {
|
|
global $wpdb;
|
|
|
|
$count = $wpdb->get_var(
|
|
"SELECT COUNT(*) FROM {$this->db->properties_table()} WHERE mlg_can_view = 1"
|
|
);
|
|
|
|
return (int) $count > 0;
|
|
}
|
|
}
|