Add queue-based media download system with rate limiting

- Add download_status, retry_after, queued_at columns to mls_media table
- Add mls_media_log table for download attempt tracking
- Rewrite media handler to queue downloads instead of immediate download
- Add 700ms delay between downloads (25% buffer over 2/sec limit)
- Add 3-hour backoff for rate-limited (429) responses
- Add max 5 attempts before marking as permanently failed
- Add wp mls media command: status, process, reset, logs
- Deprecate wp mls sync media in favor of wp mls media process
- Update documentation with queue system details and cron examples

Media downloads are now separate from property sync:
1. wp mls sync full/incremental - syncs properties, queues media
2. wp mls media process - downloads queued media with rate limiting

Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Hanson.xyz Dev
2025-12-14 22:52:58 -06:00
parent b62867d834
commit 6eadf3d266
5 changed files with 930 additions and 334 deletions
@@ -3,6 +3,10 @@
* MLS Media Handler
*
* Handles downloading and managing media files from MLS listings
* Uses a queue-based system with rate limiting to comply with API limits
*
* Rate limits: 2 requests/second (500ms minimum between requests)
* We use 700ms between requests (25% buffer)
*/
if (!defined('ABSPATH')) {
@@ -16,6 +20,21 @@ class MLS_Media_Handler {
*/
const UPLOAD_SUBDIR = 'mls-listings';
/**
* Minimum delay between media downloads in milliseconds (700ms = 25% buffer over 500ms limit)
*/
const DOWNLOAD_DELAY_MS = 700;
/**
* Retry backoff time in hours for failed downloads
*/
const RETRY_BACKOFF_HOURS = 3;
/**
* Maximum download attempts before permanent failure
*/
const MAX_ATTEMPTS = 5;
/**
* Database instance
*/
@@ -67,14 +86,13 @@ class MLS_Media_Handler {
}
/**
* Sync media for a property
* Queue media for a property (does NOT download immediately)
*
* @param string $listing_key Listing key
* @param array $media_array Media array from API
* @param bool $force Force re-download all media
* @param callable|null $progress_callback Callback for progress updates
*/
public function sync_property_media($listing_key, $media_array, $force = false, $progress_callback = null) {
public function queue_property_media($listing_key, $media_array, $progress_callback = null) {
global $wpdb;
if (empty($media_array)) {
@@ -82,6 +100,8 @@ class MLS_Media_Handler {
}
$received_keys = array();
$queued_count = 0;
$skipped_count = 0;
foreach ($media_array as $media) {
$media_key = $media['MediaKey'] ?? null;
@@ -121,37 +141,28 @@ class MLS_Media_Handler {
array('id' => $existing->id)
);
// Check if we need to re-download
if ($force || $this->needs_download($existing, $media)) {
$result = $this->download_media($existing->id);
// Check if we need to re-download (queue it)
if ($this->needs_download($existing, $media)) {
$this->mark_for_download($existing->id);
$queued_count++;
if ($progress_callback) {
if ($result) {
call_user_func($progress_callback, 'media_downloaded', array('media_key' => $media_key));
} else {
$error = $this->get_last_download_error($existing->id);
call_user_func($progress_callback, 'media_error', array('media_key' => $media_key, 'error' => $error));
}
call_user_func($progress_callback, 'media_queued', array('media_key' => $media_key));
}
} else {
$skipped_count++;
if ($progress_callback) {
call_user_func($progress_callback, 'media_skipped', array('media_key' => $media_key));
}
}
} else {
// Insert new record
// Insert new record - queued for download
$data['created_at'] = current_time('mysql');
$data['queued_at'] = current_time('mysql');
$data['download_status'] = 'pending';
$wpdb->insert($this->db->media_table(), $data);
$new_id = $wpdb->insert_id;
// Queue download
$result = $this->download_media($new_id);
$queued_count++;
if ($progress_callback) {
if ($result) {
call_user_func($progress_callback, 'media_downloaded', array('media_key' => $media_key));
} else {
$error = $this->get_last_download_error($new_id);
call_user_func($progress_callback, 'media_error', array('media_key' => $media_key, 'error' => $error));
}
call_user_func($progress_callback, 'media_queued', array('media_key' => $media_key));
}
}
}
@@ -180,20 +191,432 @@ class MLS_Media_Handler {
$wpdb->delete($this->db->media_table(), array('id' => $record->id));
}
}
return array(
'queued' => $queued_count,
'skipped' => $skipped_count,
);
}
/**
* Get the last download error for a media record
* Mark a media record for download
*
* @param int $media_id Media ID
* @return string|null Error message
*/
private function get_last_download_error($media_id) {
private function mark_for_download($media_id) {
global $wpdb;
return $wpdb->get_var($wpdb->prepare(
"SELECT download_error FROM {$this->db->media_table()} WHERE id = %d",
$wpdb->update(
$this->db->media_table(),
array(
'download_status' => 'pending',
'queued_at' => current_time('mysql'),
'local_path' => null,
'local_url' => null,
'downloaded_at' => null,
'download_error' => null,
),
array('id' => $media_id)
);
}
/**
* Check if media needs to be downloaded
*
* @param object $existing Existing media record
* @param array $new_data New media data from API
* @return bool
*/
private function needs_download($existing, $new_data) {
// No local file
if (empty($existing->local_path)) {
return true;
}
// File doesn't exist
$file_path = $this->get_upload_dir() . '/' . $existing->local_path;
if (!file_exists($file_path)) {
return true;
}
// Media URL changed
if ($existing->media_url !== ($new_data['MediaURL'] ?? null)) {
return true;
}
return false;
}
/**
* Get the next media item to download from the queue
*
* @return object|null Media record or null if queue is empty
*/
public function get_next_queued() {
global $wpdb;
$now = current_time('mysql');
// Get next pending item that's not in retry backoff
return $wpdb->get_row($wpdb->prepare(
"SELECT * FROM {$this->db->media_table()}
WHERE download_status = 'pending'
AND media_url IS NOT NULL
AND download_attempts < %d
AND (retry_after IS NULL OR retry_after <= %s)
ORDER BY queued_at ASC
LIMIT 1",
self::MAX_ATTEMPTS,
$now
));
}
/**
* Get queue statistics
*
* @return array Queue stats
*/
public function get_queue_stats() {
global $wpdb;
$now = current_time('mysql');
return array(
'pending' => (int) $wpdb->get_var(
"SELECT COUNT(*) FROM {$this->db->media_table()}
WHERE download_status = 'pending'"
),
'ready' => (int) $wpdb->get_var($wpdb->prepare(
"SELECT COUNT(*) FROM {$this->db->media_table()}
WHERE download_status = 'pending'
AND media_url IS NOT NULL
AND download_attempts < %d
AND (retry_after IS NULL OR retry_after <= %s)",
self::MAX_ATTEMPTS,
$now
)),
'in_backoff' => (int) $wpdb->get_var($wpdb->prepare(
"SELECT COUNT(*) FROM {$this->db->media_table()}
WHERE download_status = 'pending'
AND retry_after > %s",
$now
)),
'failed' => (int) $wpdb->get_var($wpdb->prepare(
"SELECT COUNT(*) FROM {$this->db->media_table()}
WHERE download_status = 'failed'
OR download_attempts >= %d",
self::MAX_ATTEMPTS
)),
'completed' => (int) $wpdb->get_var(
"SELECT COUNT(*) FROM {$this->db->media_table()}
WHERE download_status = 'completed'"
),
);
}
/**
* Process media queue with rate limiting
*
* @param int $limit Max items to process
* @param callable|null $progress_callback Callback for progress updates
* @return array Processing stats
*/
public function process_queue($limit = 100, $progress_callback = null) {
$stats = array(
'processed' => 0,
'success' => 0,
'failed' => 0,
'skipped' => 0,
);
$last_download_time = 0;
for ($i = 0; $i < $limit; $i++) {
$media = $this->get_next_queued();
if (!$media) {
// Queue empty
break;
}
// Rate limiting: ensure minimum delay between downloads
$now_ms = microtime(true) * 1000;
$elapsed = $now_ms - $last_download_time;
if ($elapsed < self::DOWNLOAD_DELAY_MS && $last_download_time > 0) {
$wait_ms = (int) (self::DOWNLOAD_DELAY_MS - $elapsed);
usleep($wait_ms * 1000);
}
// Download the media
$result = $this->download_media($media->id);
$last_download_time = microtime(true) * 1000;
$stats['processed']++;
if ($result === true) {
$stats['success']++;
if ($progress_callback) {
call_user_func($progress_callback, 'media_downloaded', array(
'media_key' => $media->media_key,
'listing_key' => $media->listing_key,
));
}
} elseif ($result === 'backoff') {
$stats['skipped']++;
if ($progress_callback) {
call_user_func($progress_callback, 'media_backoff', array(
'media_key' => $media->media_key,
'listing_key' => $media->listing_key,
));
}
} else {
$stats['failed']++;
if ($progress_callback) {
call_user_func($progress_callback, 'media_error', array(
'media_key' => $media->media_key,
'listing_key' => $media->listing_key,
'error' => $result,
));
}
}
}
return $stats;
}
/**
* Download a media file
*
* @param int $media_id Media record ID
* @return bool|string True on success, 'backoff' if set for retry, error message on failure
*/
public function download_media($media_id) {
global $wpdb;
$media = $wpdb->get_row($wpdb->prepare(
"SELECT * FROM {$this->db->media_table()} WHERE id = %d",
$media_id
));
if (!$media || empty($media->media_url)) {
return 'No media URL';
}
// Increment attempt counter
$wpdb->update(
$this->db->media_table(),
array('download_attempts' => $media->download_attempts + 1),
array('id' => $media_id)
);
// Make the request
$start_time = microtime(true);
$response = wp_remote_get($media->media_url, array(
'timeout' => 60,
'stream' => false,
));
$response_time_ms = (int) ((microtime(true) - $start_time) * 1000);
$status_code = 0;
$error_msg = null;
if (is_wp_error($response)) {
$error_msg = $response->get_error_message();
$this->log_download($media, 'error', null, $response_time_ms, $error_msg);
$this->handle_download_failure($media_id, $error_msg, false);
return $error_msg;
}
$status_code = wp_remote_retrieve_response_code($response);
$this->log_download($media, 'attempt', $status_code, $response_time_ms, null);
// Success
if ($status_code === 200) {
$body = wp_remote_retrieve_body($response);
if (empty($body)) {
$error_msg = 'Empty response body';
$this->log_download($media, 'error', $status_code, $response_time_ms, $error_msg);
$this->handle_download_failure($media_id, $error_msg, false);
return $error_msg;
}
// Save the file
$save_result = $this->save_media_file($media, $body, $response);
if ($save_result !== true) {
$this->log_download($media, 'error', $status_code, $response_time_ms, $save_result);
$this->handle_download_failure($media_id, $save_result, false);
return $save_result;
}
$this->log_download($media, 'success', $status_code, $response_time_ms, null);
return true;
}
// Rate limited (429) or server error (5xx) - set backoff
$retryable = in_array($status_code, array(429, 500, 502, 503, 504));
$error_msg = "HTTP {$status_code}";
if ($retryable) {
$this->log_download($media, 'rate_limited', $status_code, $response_time_ms, $error_msg);
$this->handle_download_failure($media_id, $error_msg, true);
return 'backoff';
}
// Permanent failure (404, 403, etc.)
$this->log_download($media, 'permanent_error', $status_code, $response_time_ms, $error_msg);
$this->handle_download_failure($media_id, $error_msg, false);
return $error_msg;
}
/**
* Handle download failure
*
* @param int $media_id Media ID
* @param string $error Error message
* @param bool $set_backoff Whether to set retry backoff
*/
private function handle_download_failure($media_id, $error, $set_backoff) {
global $wpdb;
$media = $wpdb->get_row($wpdb->prepare(
"SELECT * FROM {$this->db->media_table()} WHERE id = %d",
$media_id
));
$update_data = array(
'download_error' => $error,
);
if ($set_backoff) {
// Set retry_after to 3 hours from now
$retry_after = date('Y-m-d H:i:s', strtotime('+' . self::RETRY_BACKOFF_HOURS . ' hours'));
$update_data['retry_after'] = $retry_after;
}
// Check if max attempts reached
if ($media && $media->download_attempts >= self::MAX_ATTEMPTS) {
$update_data['download_status'] = 'failed';
$this->log_missing_media($media, $error);
}
$wpdb->update(
$this->db->media_table(),
$update_data,
array('id' => $media_id)
);
}
/**
* Save downloaded media file to disk
*
* @param object $media Media record
* @param string $body File contents
* @param array $response HTTP response
* @return bool|string True on success, error message on failure
*/
private function save_media_file($media, $body, $response) {
global $wpdb;
// Determine file extension from content type or URL
$content_type = wp_remote_retrieve_header($response, 'content-type');
$extension = $this->get_extension_from_content_type($content_type, $media->media_url);
// Create directory
$listing_dir = $this->get_listing_dir($media->listing_key);
if (!file_exists($listing_dir)) {
wp_mkdir_p($listing_dir);
}
// Save file
$filename = $media->media_order . '.' . $extension;
$file_path = $listing_dir . '/' . $filename;
if (file_put_contents($file_path, $body) === false) {
return 'Failed to write file';
}
// Calculate relative path
$prefix = substr($media->listing_key, 0, 2);
$relative_path = $prefix . '/' . $media->listing_key . '/' . $filename;
$local_url = $this->get_upload_url() . '/' . $relative_path;
// Update record
$wpdb->update(
$this->db->media_table(),
array(
'local_path' => $relative_path,
'local_url' => $local_url,
'file_size' => strlen($body),
'mime_type' => $content_type,
'downloaded_at' => current_time('mysql'),
'download_error' => null,
'download_status' => 'completed',
'retry_after' => null,
),
array('id' => $media->id)
);
return true;
}
/**
* Log a download attempt to the media log table
*
* @param object $media Media record
* @param string $action Action type (attempt, success, error, rate_limited, permanent_error)
* @param int|null $status_code HTTP status code
* @param int $response_time_ms Response time in milliseconds
* @param string|null $error Error message
*/
private function log_download($media, $action, $status_code, $response_time_ms, $error) {
global $wpdb;
$wpdb->insert(
$this->db->media_log_table(),
array(
'media_id' => $media->id,
'listing_key' => $media->listing_key,
'media_key' => $media->media_key,
'action' => $action,
'status_code' => $status_code,
'response_time_ms' => $response_time_ms,
'error_message' => $error,
'url' => $media->media_url,
'created_at' => current_time('mysql'),
)
);
}
/**
* Get file extension from content type
*
* @param string $content_type Content type header
* @param string $url Original URL as fallback
* @return string File extension
*/
private function get_extension_from_content_type($content_type, $url) {
// Extract main type from content-type header
$content_type = strtolower(explode(';', $content_type)[0]);
$map = array(
'image/jpeg' => 'jpg',
'image/jpg' => 'jpg',
'image/png' => 'png',
'image/gif' => 'gif',
'image/webp' => 'webp',
);
if (isset($map[$content_type])) {
return $map[$content_type];
}
// Fallback to URL extension
$path = parse_url($url, PHP_URL_PATH);
$ext = pathinfo($path, PATHINFO_EXTENSION);
return $ext ?: 'jpg';
}
/**
@@ -250,227 +673,6 @@ class MLS_Media_Handler {
return substr_count($content, "\n");
}
/**
* Check if media needs to be downloaded
*
* @param object $existing Existing media record
* @param array $new_data New media data from API
* @return bool
*/
private function needs_download($existing, $new_data) {
// No local file
if (empty($existing->local_path)) {
return true;
}
// File doesn't exist
$file_path = $this->get_upload_dir() . '/' . $existing->local_path;
if (!file_exists($file_path)) {
return true;
}
// Media URL changed
if ($existing->media_url !== ($new_data['MediaURL'] ?? null)) {
return true;
}
return false;
}
/**
* Download a media file
*
* @param int $media_id Media record ID
* @return bool Success
*/
public function download_media($media_id) {
global $wpdb;
$media = $wpdb->get_row($wpdb->prepare(
"SELECT * FROM {$this->db->media_table()} WHERE id = %d",
$media_id
));
if (!$media || empty($media->media_url)) {
return false;
}
// Increment attempt counter
$wpdb->update(
$this->db->media_table(),
array('download_attempts' => $media->download_attempts + 1),
array('id' => $media_id)
);
// Download with exponential backoff for rate limits
$max_retries = 5;
$response = null;
$status_code = 0;
$base_delay = 1; // Start with 1 second
for ($retry = 0; $retry < $max_retries; $retry++) {
// Exponential backoff: 1s, 2s, 4s, 8s, 16s
if ($retry > 0) {
$delay = $base_delay * pow(2, $retry - 1);
$this->logger->debug('Media download retry', array(
'media_id' => $media_id,
'retry' => $retry,
'delay' => $delay,
));
sleep($delay);
}
$response = wp_remote_get($media->media_url, array(
'timeout' => 60,
'stream' => false,
));
if (is_wp_error($response)) {
$error_msg = $response->get_error_message();
$this->logger->warning('Media download failed', array(
'media_id' => $media_id,
'error' => $error_msg,
'retry' => $retry,
));
if ($retry === $max_retries - 1) {
$wpdb->update(
$this->db->media_table(),
array('download_error' => $error_msg),
array('id' => $media_id)
);
$this->log_missing_media($media, $error_msg);
return false;
}
continue;
}
$status_code = wp_remote_retrieve_response_code($response);
// Success
if ($status_code === 200) {
break;
}
// Retryable errors: 429 (rate limit), 500, 502, 503, 504 (server errors)
$retryable = in_array($status_code, array(429, 500, 502, 503, 504));
if ($retryable && $retry < $max_retries - 1) {
$this->logger->debug('Media download retryable error', array(
'media_id' => $media_id,
'status_code' => $status_code,
'retry' => $retry,
));
continue;
}
// Non-retryable or exhausted retries - record and fail
$error_msg = "HTTP {$status_code}";
$wpdb->update(
$this->db->media_table(),
array('download_error' => $error_msg),
array('id' => $media_id)
);
$this->log_missing_media($media, $error_msg);
return false;
}
if ($status_code !== 200) {
$error_msg = "HTTP {$status_code}";
$wpdb->update(
$this->db->media_table(),
array('download_error' => $error_msg),
array('id' => $media_id)
);
$this->log_missing_media($media, $error_msg);
return false;
}
$body = wp_remote_retrieve_body($response);
if (empty($body)) {
$wpdb->update(
$this->db->media_table(),
array('download_error' => 'Empty response'),
array('id' => $media_id)
);
return false;
}
// Determine file extension from content type or URL
$content_type = wp_remote_retrieve_header($response, 'content-type');
$extension = $this->get_extension_from_content_type($content_type, $media->media_url);
// Create directory
$listing_dir = $this->get_listing_dir($media->listing_key);
if (!file_exists($listing_dir)) {
wp_mkdir_p($listing_dir);
}
// Save file
$filename = $media->media_order . '.' . $extension;
$file_path = $listing_dir . '/' . $filename;
if (file_put_contents($file_path, $body) === false) {
$wpdb->update(
$this->db->media_table(),
array('download_error' => 'Failed to write file'),
array('id' => $media_id)
);
return false;
}
// Calculate relative path
$prefix = substr($media->listing_key, 0, 2);
$relative_path = $prefix . '/' . $media->listing_key . '/' . $filename;
$local_url = $this->get_upload_url() . '/' . $relative_path;
// Update record
$wpdb->update(
$this->db->media_table(),
array(
'local_path' => $relative_path,
'local_url' => $local_url,
'file_size' => strlen($body),
'mime_type' => $content_type,
'downloaded_at' => current_time('mysql'),
'download_error' => null,
),
array('id' => $media_id)
);
return true;
}
/**
* Get file extension from content type
*
* @param string $content_type Content type header
* @param string $url Original URL as fallback
* @return string File extension
*/
private function get_extension_from_content_type($content_type, $url) {
// Extract main type from content-type header
$content_type = strtolower(explode(';', $content_type)[0]);
$map = array(
'image/jpeg' => 'jpg',
'image/jpg' => 'jpg',
'image/png' => 'png',
'image/gif' => 'gif',
'image/webp' => 'webp',
);
if (isset($map[$content_type])) {
return $map[$content_type];
}
// Fallback to URL extension
$path = parse_url($url, PHP_URL_PATH);
$ext = pathinfo($path, PATHINFO_EXTENSION);
return $ext ?: 'jpg';
}
/**
* Delete all media for a property
*
@@ -558,44 +760,48 @@ class MLS_Media_Handler {
}
/**
* Download pending media (for batch processing)
* Reset failed downloads for retry
*
* @param int $limit Max media to download
* @param callable|null $progress_callback Callback for progress updates
* @return array Stats
* @param string|null $listing_key Optional listing key to filter
* @return int Number of records reset
*/
public function download_pending($limit = 100, $progress_callback = null) {
public function reset_failed_downloads($listing_key = null) {
global $wpdb;
$pending = $wpdb->get_results($wpdb->prepare(
"SELECT id, media_key FROM {$this->db->media_table()}
WHERE local_path IS NULL AND media_url IS NOT NULL
AND download_attempts < 3
LIMIT %d",
$limit
));
$where = "download_status = 'failed' OR download_attempts >= " . self::MAX_ATTEMPTS;
$values = array();
$stats = array(
'total' => count($pending),
'success' => 0,
'failed' => 0,
);
foreach ($pending as $media) {
if ($this->download_media($media->id)) {
$stats['success']++;
if ($progress_callback) {
call_user_func($progress_callback, 'media_downloaded', array('media_key' => $media->media_key));
}
} else {
$stats['failed']++;
if ($progress_callback) {
call_user_func($progress_callback, 'media_error', array('media_key' => $media->media_key));
}
}
if ($listing_key) {
$where .= " AND listing_key = %s";
$values[] = $listing_key;
}
return $stats;
if (!empty($values)) {
$sql = $wpdb->prepare(
"UPDATE {$this->db->media_table()}
SET download_status = 'pending',
download_attempts = 0,
download_error = NULL,
retry_after = NULL,
queued_at = %s
WHERE {$where}",
array_merge(array(current_time('mysql')), $values)
);
} else {
$sql = $wpdb->prepare(
"UPDATE {$this->db->media_table()}
SET download_status = 'pending',
download_attempts = 0,
download_error = NULL,
retry_after = NULL,
queued_at = %s
WHERE {$where}",
current_time('mysql')
);
}
$wpdb->query($sql);
return $wpdb->rows_affected;
}
/**
@@ -646,4 +852,78 @@ class MLS_Media_Handler {
return $deleted;
}
/**
* Get recent download logs
*
* @param int $limit Number of entries to return
* @param string|null $action Optional action filter
* @return array Log entries
*/
public function get_download_logs($limit = 100, $action = null) {
global $wpdb;
$where = '';
$values = array();
if ($action) {
$where = "WHERE action = %s";
$values[] = $action;
}
$values[] = $limit;
return $wpdb->get_results($wpdb->prepare(
"SELECT * FROM {$this->db->media_log_table()}
{$where}
ORDER BY created_at DESC
LIMIT %d",
$values
));
}
/**
* Clear old download logs
*
* @param int $days_old Delete logs older than this many days
* @return int Number of entries deleted
*/
public function clear_old_logs($days_old = 7) {
global $wpdb;
$cutoff = date('Y-m-d H:i:s', strtotime("-{$days_old} days"));
$wpdb->query($wpdb->prepare(
"DELETE FROM {$this->db->media_log_table()} WHERE created_at < %s",
$cutoff
));
return $wpdb->rows_affected;
}
/**
* Legacy sync method - now queues media instead of downloading immediately
* Kept for backward compatibility
*
* @param string $listing_key Listing key
* @param array $media_array Media array from API
* @param bool $force Force re-download all media
* @param callable|null $progress_callback Callback for progress updates
*/
public function sync_property_media($listing_key, $media_array, $force = false, $progress_callback = null) {
// Now just queues media - actual download happens via process_queue()
return $this->queue_property_media($listing_key, $media_array, $progress_callback);
}
/**
* Legacy download_pending method - now uses process_queue
* Kept for backward compatibility
*
* @param int $limit Max media to download
* @param callable|null $progress_callback Callback for progress updates
* @return array Stats
*/
public function download_pending($limit = 100, $progress_callback = null) {
return $this->process_queue($limit, $progress_callback);
}
}