diff --git a/docker-compose.yml b/docker-compose.yml index 2b6ffdc3..6156ae15 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,7 +18,7 @@ services: image: mysql:8.3.0 restart: unless-stopped ports: - - "3308:3306" + - "3306:3306" environment: MYSQL_DATABASE: test_db MYSQL_ROOT_PASSWORD: password @@ -73,7 +73,7 @@ services: restart: unless-stopped ports: - "1025:1025" - - "1080:8025" + - "1080:1080" networks: - bowphp_network healthcheck: diff --git a/src/Http/Client/Response.php b/src/Http/Client/Response.php index 48430686..ddbfd5fd 100644 --- a/src/Http/Client/Response.php +++ b/src/Http/Client/Response.php @@ -133,9 +133,9 @@ public function isSuccessful(): bool /** * Get the response executing time * - * @return ?int + * @return mixed */ - public function getExecutionTime(): ?int + public function getExecutionTime(): mixed { return $this->headers['total_time'] ?? null; } diff --git a/src/Mail/MailQueueTask.php b/src/Mail/MailQueueTask.php index b6cc2102..112a40e6 100644 --- a/src/Mail/MailQueueTask.php +++ b/src/Mail/MailQueueTask.php @@ -60,6 +60,6 @@ public function process(): void */ public function onException(Throwable $e): void { - $this->deleteJob(); + $this->deleteTask(); } } diff --git a/src/Notifier/Adapters/TelegramChannelAdapter.php b/src/Notifier/Adapters/TelegramChannelAdapter.php index 5a6aa3d6..0f8f1cbb 100644 --- a/src/Notifier/Adapters/TelegramChannelAdapter.php +++ b/src/Notifier/Adapters/TelegramChannelAdapter.php @@ -15,7 +15,7 @@ class TelegramChannelAdapter implements ChannelAdapterInterface /** * @var string */ - private string $botToken; + private ?string $botToken; /** * Constructor @@ -25,10 +25,6 @@ class TelegramChannelAdapter implements ChannelAdapterInterface public function __construct() { $this->botToken = config('messaging.telegram.bot_token'); - - if (!$this->botToken) { - throw new InvalidArgumentException('The Telegram bot token is required'); - } } /** @@ -51,6 +47,10 @@ public function send(Model $context, Notifier $notifier): void throw new InvalidArgumentException('The chat ID and message are required for Telegram'); } + if (!$this->botToken) { + throw new InvalidArgumentException('The Telegram bot token is required'); + } + $client = new HttpClient(); $endpoint = "https://api.telegram.org/bot{$this->botToken}/sendMessage"; diff --git a/src/Notifier/NotifierQueueTask.php b/src/Notifier/NotifierQueueTask.php index 7cf629d4..3547d694 100644 --- a/src/Notifier/NotifierQueueTask.php +++ b/src/Notifier/NotifierQueueTask.php @@ -52,6 +52,6 @@ public function process(): void */ public function onException(Throwable $e): void { - $this->deleteJob(); + $this->deleteTask(); } } diff --git a/src/Notifier/NotifierShouldQueue.php b/src/Notifier/NotifierShouldQueue.php index 73788f3d..01c0d2b3 100644 --- a/src/Notifier/NotifierShouldQueue.php +++ b/src/Notifier/NotifierShouldQueue.php @@ -4,4 +4,4 @@ interface NotifierShouldQueue { -} \ No newline at end of file +} diff --git a/src/Queue/Adapters/BeanstalkdAdapter.php b/src/Queue/Adapters/BeanstalkdAdapter.php index 662be05b..772bf665 100644 --- a/src/Queue/Adapters/BeanstalkdAdapter.php +++ b/src/Queue/Adapters/BeanstalkdAdapter.php @@ -6,27 +6,37 @@ use Bow\Queue\QueueTask; use Pheanstalk\Contract\PheanstalkPublisherInterface; +use Pheanstalk\Contract\JobIdInterface; use Pheanstalk\Pheanstalk; use Pheanstalk\Values\Timeout; use Pheanstalk\Values\TubeName; use RuntimeException; use Throwable; -use ErrorException; class BeanstalkdAdapter extends QueueAdapter { /** - * Define the instance Pheanstalk + * Maximum priority value for Beanstalkd + */ + private const MAX_PRIORITY = 4294967295; + + /** + * Cache key for storing queue names + */ + private const QUEUE_CACHE_KEY = "beanstalkd:queues"; + + /** + * The Pheanstalk client instance * * @var Pheanstalk */ private Pheanstalk $pheanstalk; /** - * Configure Beanstalkd driver + * Configure the Beanstalkd queue adapter * * @param array $config - * @return mixed + * @return BeanstalkdAdapter */ public function configure(array $config): BeanstalkdAdapter { @@ -34,10 +44,14 @@ public function configure(array $config): BeanstalkdAdapter throw new RuntimeException("Please install the pda/pheanstalk package"); } + $timeout = isset($config["timeout"]) && $config["timeout"] + ? new Timeout($config["timeout"]) + : null; + $this->pheanstalk = Pheanstalk::create( $config["hostname"], $config["port"], - $config["timeout"] ? new Timeout($config["timeout"]) : null, + $timeout, ); if (isset($config["queue"])) { @@ -48,36 +62,29 @@ public function configure(array $config): BeanstalkdAdapter } /** - * Get the size of the queue. + * Get the size of the queue * * @param string|null $queue * @return int */ public function size(?string $queue = null): int { - $queue = new TubeName($this->getQueue($queue)); + $tubeName = new TubeName($this->getQueue($queue)); - return (int)$this->pheanstalk->statsTube($queue)->currentJobsReady; + return (int) $this->pheanstalk->statsTube($tubeName)->currentJobsReady; } /** - * Queue a job + * Push a job onto the queue * * @param QueueTask $producer * @return bool - * @throws ErrorException */ public function push(QueueTask $producer): bool { - $queues = (array) cache("beanstalkd:queues"); - - if (!in_array($producer->getQueue(), $queues)) { - $queues[] = $producer->getQueue(); - cache("beanstalkd:queues", $queues); - } + $this->registerQueueName($producer->getQueue()); - $this->pheanstalk - ->useTube(new TubeName($producer->getQueue())); + $this->pheanstalk->useTube(new TubeName($producer->getQueue())); $this->pheanstalk->put( $this->serializeProducer($producer), @@ -90,101 +97,189 @@ public function push(QueueTask $producer): bool } /** - * Get the priority + * Register a queue name in cache for later reference + * + * @param string $queueName + * @return void + */ + private function registerQueueName(string $queueName): void + { + $queues = (array) cache(self::QUEUE_CACHE_KEY); + + if (!in_array($queueName, $queues)) { + $queues[] = $queueName; + cache(self::QUEUE_CACHE_KEY, $queues); + } + } + + /** + * Convert priority level to Beanstalkd priority value + * + * Priority mapping: + * - 0: Highest priority (urgent) + * - 1: Default priority (normal) + * - 2: Default priority (normal) + * - 3+: Lowest priority (bulk/background) * * @param int $priority * @return int */ public function getPriority(int $priority): int { - return match ($priority) { - $priority > 2 => 4294967295, - 1 => PheanstalkPublisherInterface::DEFAULT_PRIORITY, - 0 => 0, + return match (true) { + $priority <= 0 => 0, + $priority > 2 => self::MAX_PRIORITY, default => PheanstalkPublisherInterface::DEFAULT_PRIORITY, }; } /** - * Run the worker + * Run the queue worker * * @param string|null $queue * @return void - * @throws ErrorException */ public function run(?string $queue = null): void { - // we want jobs from define queue only. - $queue = $this->getQueue($queue); - $this->pheanstalk->watch(new TubeName($queue)); + $queueName = $this->getQueue($queue); + $this->pheanstalk->watch(new TubeName($queueName)); + $job = null; + $producer = null; + try { - // This hangs until a Job is produced. $job = $this->pheanstalk->reserve(); - $payload = $job->getData(); - $producer = $this->unserializeProducer($payload); - call_user_func([$producer, "process"]); + $producer = $this->unserializeProducer($job->getData()); + + $this->executeTask($producer); $this->pheanstalk->touch($job); $this->pheanstalk->delete($job); $this->updateProcessingTimeout(); } catch (Throwable $e) { - // Write the error log - error_log($e->getMessage()); - - try { - logger()->error($e->getMessage(), $e->getTrace()); - } catch (Throwable $loggerException) { - // Logger not available, already logged to error_log - } - - if (!$job) { - return; - } - - cache("job:failed:" . $job->getId(), $job->getData()); - - // Check if producer has been loaded - if (!isset($producer)) { - $this->pheanstalk->delete($job); - return; - } - - // Execute the onException method for notify the producer - // and let developer decide if the job should be deleted - $producer->onException($e); - - // Check if the job should be deleted - if ($producer->jobShouldBeDelete()) { - $this->pheanstalk->delete($job); - } else { - $this->pheanstalk->release($job, $this->getPriority($producer->getPriority()), $producer->getDelay()); - } - - $this->sleep(1); + $this->handleJobFailure($job, $producer, $e); + } + } + + /** + * Execute the task + * + * @param QueueTask $producer + * @return void + */ + private function executeTask(QueueTask $producer): void + { + call_user_func([$producer, "process"]); + } + + /** + * Handle job failure + * + * @param JobIdInterface|null $job + * @param QueueTask|null $producer + * @param Throwable $exception + * @return void + */ + private function handleJobFailure(?JobIdInterface $job, ?QueueTask $producer, Throwable $exception): void + { + $this->logError($exception); + + if (is_null($job)) { + return; + } + + cache("job:failed:" . $job->getId(), $job->getData()); + + if (is_null($producer)) { + $this->pheanstalk->delete($job); + return; + } + + $producer->onException($exception); + + if ($producer->taskShouldBeDelete()) { + $this->pheanstalk->delete($job); + } else { + $this->releaseJob($job, $producer); + } + + $this->sleep(1); + } + + /** + * Release the job back to the queue for retry + * + * @param JobIdInterface $job + * @param QueueTask $producer + * @return void + */ + private function releaseJob(JobIdInterface $job, QueueTask $producer): void + { + $this->pheanstalk->release( + $job, + $this->getPriority($producer->getPriority()), + $producer->getDelay() + ); + } + + /** + * Log an error + * + * @param Throwable $exception + * @return void + */ + private function logError(Throwable $exception): void + { + error_log($exception->getMessage()); + + try { + logger()->error($exception->getMessage(), $exception->getTrace()); + } catch (Throwable $loggerException) { + // Logger not available, already logged to error_log } } /** - * Flush the queue + * Flush all jobs from the queue * * @param string|null $queue * @return void - * @throws ErrorException */ public function flush(?string $queue = null): void { - $queues = (array)$queue; + $queues = $this->getQueuesToFlush($queue); - if (count($queues) == 0) { - $queues = cache("beanstalkd:queues"); + foreach ($queues as $queueName) { + $this->flushQueue($queueName); } + } - foreach ($queues as $queue) { - $this->pheanstalk->useTube($queue); + /** + * Get the list of queues to flush + * + * @param string|null $queue + * @return array + */ + private function getQueuesToFlush(?string $queue): array + { + if (!is_null($queue)) { + return [$queue]; + } + + return (array) cache(self::QUEUE_CACHE_KEY) ?: []; + } + + /** + * Flush all jobs from a specific queue + * + * @param string $queueName + * @return void + */ + private function flushQueue(string $queueName): void + { + $this->pheanstalk->useTube(new TubeName($queueName)); - while ($job = $this->pheanstalk->reserve()) { - $this->pheanstalk->delete($job); - } + while ($job = $this->pheanstalk->reserveWithTimeout(0)) { + $this->pheanstalk->delete($job); } } } diff --git a/src/Queue/Adapters/DatabaseAdapter.php b/src/Queue/Adapters/DatabaseAdapter.php index ffc96cce..fe20e486 100644 --- a/src/Queue/Adapters/DatabaseAdapter.php +++ b/src/Queue/Adapters/DatabaseAdapter.php @@ -1,5 +1,7 @@ $this->generateId(), "queue" => $this->getQueue(), "payload" => base64_encode($this->serializeProducer($job)), "attempts" => $this->tries, - "status" => "waiting", + "status" => self::STATUS_WAITING, "available_at" => date("Y-m-d H:i:s", time() + $job->getDelay()), "reserved_at" => null, "created_at" => date("Y-m-d H:i:s"), ]; - $count = $this->table->insert($value); - - return $count > 0; + return $this->table->insert($payload) > 0; } /** - * Run the worker + * Run the queue worker * * @param string|null $queue * @return void @@ -79,85 +89,188 @@ public function push(QueueTask $job): bool */ public function run(?string $queue = null): void { - // we want jobs from define queue only. - $queue = $this->getQueue($queue); - $queues = $this->table - ->where("queue", $queue) - ->whereIn("status", ["waiting", "reserved"]) - ->get(); + $queueName = $this->getQueue($queue); + $jobs = $this->fetchPendingJobs($queueName); - if (count($queues) == 0) { - $this->sleep($this->sleep ?? 5); + if (count($jobs) === 0) { + $this->sleep($this->sleep); return; } - foreach ($queues as $queue) { - try { - $producer = $this->unserializeProducer(base64_decode($queue->payload)); - if (strtotime($queue->available_at) >= time()) { - if (!is_null($queue->reserved_at) && strtotime($queue->reserved_at) < time()) { - continue; - } - $this->table->where("id", $queue->id)->update(["status" => "processing"]); - $this->execute($producer, $queue); - continue; - } - } catch (Exception $e) { - // Write the error log - error_log($e->getMessage()); - app('logger')->error($e->getMessage(), $e->getTrace()); - cache("job:failed:" . $queue->id, $queue->payload); - - // Check if producer has been loaded - if (!isset($producer)) { - $this->sleep(1); - continue; - } - - // Execute the onException method for notify the producer - // and let developer decide if the job should be deleted - $producer->onException($e); - - // Check if the job should be deleted - if ($producer->jobShouldBeDelete() || $queue->attempts <= 0) { - $this->table->where("id", $queue->id)->update([ - "status" => "failed", - ]); - $this->sleep(1); - continue; - } - - // Check if the job should be retried - $this->table->where("id", $queue->id)->update([ - "status" => "reserved", - "attempts" => $queue->attempts - 1, - "available_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()), - "reserved_at" => date("Y-m-d H:i:s", time() + $producer->getRetry()) - ]); - - $this->sleep(1); + foreach ($jobs as $job) { + $this->processJob($job); + } + } + + /** + * Fetch pending jobs from the queue + * + * @param string $queueName + * @return array + * @throws QueryBuilderException + */ + private function fetchPendingJobs(string $queueName): array + { + return $this->table + ->where("queue", $queueName) + ->whereIn("status", [self::STATUS_WAITING, self::STATUS_RESERVED]) + ->get(); + } + + /** + * Process a single job from the queue + * + * @param stdClass $job + * @return void + */ + private function processJob(stdClass $job): void + { + $producer = null; + + try { + $producer = $this->unserializeProducer(base64_decode($job->payload)); + + if (!$this->isJobReady($job)) { + return; } + + $this->markJobAs($job->id, self::STATUS_PROCESSING); + $this->executeTask($producer, $job); + } catch (Throwable $e) { + $this->handleJobFailure($job, $producer, $e); } } /** - * Process the next job on the queue. + * Check if the job is ready to be processed * - * @param QueueTask $job - * @param mixed $queue + * @param stdClass $job + * @return bool + */ + private function isJobReady(stdClass $job): bool + { + // Check if the job is available for processing + if (strtotime($job->available_at) > time()) { + return false; + } + + // Skip if the job is still reserved + if (!is_null($job->reserved_at) && strtotime($job->reserved_at) > time()) { + return false; + } + + return true; + } + + /** + * Execute the task + * + * @param QueueTask $producer + * @param stdClass $job + * @return void + * @throws QueryBuilderException + */ + private function executeTask(QueueTask $producer, stdClass $job): void + { + call_user_func([$producer, "process"]); + $this->markJobAs($job->id, self::STATUS_DONE); + $this->sleep($this->sleep); + } + + /** + * Handle job failure + * + * @param stdClass $job + * @param QueueTask|null $producer + * @param Throwable $exception + * @return void + */ + private function handleJobFailure(stdClass $job, ?QueueTask $producer, Throwable $exception): void + { + $this->logError($exception); + cache("job:failed:" . $job->id, $job->payload); + + if (is_null($producer)) { + $this->sleep(1); + return; + } + + $producer->onException($exception); + + if ($this->shouldMarkJobAsFailed($producer, $job)) { + $this->markJobAs($job->id, self::STATUS_FAILED); + $this->sleep(1); + return; + } + + $this->scheduleJobRetry($job, $producer); + $this->sleep(1); + } + + /** + * Log an error + * + * @param Throwable $exception + * @return void + */ + private function logError(Throwable $exception): void + { + error_log($exception->getMessage()); + + try { + logger()->error($exception->getMessage(), $exception->getTrace()); + } catch (Throwable $loggerException) { + // Logger not available, already logged to error_log + } + } + + /** + * Determine if the job should be marked as failed + * + * @param QueueTask $producer + * @param stdClass $job + * @return bool + */ + private function shouldMarkJobAsFailed(QueueTask $producer, stdClass $job): bool + { + return $producer->taskShouldBeDelete() || $job->attempts <= 0; + } + + /** + * Schedule a job for retry + * + * @param stdClass $job + * @param QueueTask $producer + * @return void * @throws QueryBuilderException */ - private function execute(QueueTask $job, mixed $queue): void + private function scheduleJobRetry(stdClass $job, QueueTask $producer): void { - call_user_func([$job, "process"]); - $this->table->where("id", $queue->id)->update(["status" => "done"]); - $this->sleep($this->sleep ?? 5); + $this->table->where("id", $job->id)->update([ + "status" => self::STATUS_RESERVED, + "attempts" => $job->attempts - 1, + "available_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()), + "reserved_at" => date("Y-m-d H:i:s", time() + $producer->getRetry()), + ]); + } + + /** + * Update job status + * + * @param string $jobId + * @param string $status + * @return void + * @throws QueryBuilderException + */ + private function markJobAs(string $jobId, string $status): void + { + $this->table->where("id", $jobId)->update(["status" => $status]); } /** * Flush the queue table * - * @param ?string $queue + * @param string|null $queue * @return void * @throws QueryBuilderException */ @@ -165,8 +278,9 @@ public function flush(?string $queue = null): void { if (is_null($queue)) { $this->table->truncate(); - } else { - $this->table->where("queue", $queue)->delete(); + return; } + + $this->table->where("queue", $queue)->delete(); } } diff --git a/src/Queue/Adapters/QueueAdapter.php b/src/Queue/Adapters/QueueAdapter.php index 6d1447f5..0d921e5a 100644 --- a/src/Queue/Adapters/QueueAdapter.php +++ b/src/Queue/Adapters/QueueAdapter.php @@ -45,7 +45,7 @@ abstract class QueueAdapter * * @var int */ - protected int $sleep = 5; + protected int $sleep = 0; /** * Make adapter configuration @@ -270,10 +270,10 @@ public function setQueue(string $queue): void /** * Get the queue size * - * @param string $queue + * @param ?string $queue * @return int */ - public function size(string $queue): int + public function size(?string $queue = null): int { return 0; } diff --git a/src/Queue/Adapters/RedisAdapter.php b/src/Queue/Adapters/RedisAdapter.php new file mode 100644 index 00000000..16a28bce --- /dev/null +++ b/src/Queue/Adapters/RedisAdapter.php @@ -0,0 +1,394 @@ +config = $config; + $this->redis = RedisStore::getClient(); + + if (isset($config["database"])) { + $this->redis->select($config["database"]); + } + + if (isset($config["queue"])) { + $this->setQueue($config["queue"]); + } + + return $this; + } + + /** + * Get the size of the queue + * + * @param string|null $queue + * @return int + */ + public function size(?string $queue = null): int + { + return (int) $this->redis->lLen($this->getQueueKey($queue)); + } + + /** + * Push a job onto the queue + * + * @param QueueTask $job + * @return bool + */ + public function push(QueueTask $job): bool + { + $payload = $this->buildPayload($job); + + $result = $this->redis->rPush( + $this->getQueueKey($job->getQueue()), + json_encode($payload) + ); + + return $result !== false; + } + + /** + * Build the job payload + * + * @param QueueTask $job + * @return array + */ + private function buildPayload(QueueTask $job): array + { + return [ + "id" => $this->generateId(), + "queue" => $this->getQueue($job->getQueue()), + "payload" => base64_encode($this->serializeProducer($job)), + "attempts" => $this->tries, + "delay" => $job->getDelay(), + "retry" => $job->getRetry(), + "available_at" => time() + $job->getDelay(), + "created_at" => time(), + ]; + } + + /** + * Run the queue worker + * + * @param string|null $queue + * @return void + */ + public function run(?string $queue = null): void + { + $queueKey = $this->getQueueKey($queue); + $processingKey = $queueKey . self::PROCESSING_SUFFIX; + + // Move job from queue to processing list (atomic operation) + $rawPayload = $this->redis->brPopLPush( + $queueKey, + $processingKey, + $this->config["block_timeout"] ?? 5 + ); + + if ($rawPayload === false) { + $this->sleep($this->sleep); + return; + } + + $this->processJob($rawPayload, $processingKey); + } + + /** + * Process a job from the queue + * + * @param string $rawPayload + * @param string $processingKey + * @return void + */ + private function processJob(string $rawPayload, string $processingKey): void + { + $jobData = json_decode($rawPayload, true); + $producer = null; + + try { + // Check if job is available for processing + if (!$this->isJobReady($jobData)) { + $this->requeue($rawPayload, $processingKey); + return; + } + + $producer = $this->unserializeProducer(base64_decode($jobData["payload"])); + + $this->executeTask($producer); + $this->removeFromProcessing($rawPayload, $processingKey); + $this->updateProcessingTimeout(); + } catch (Throwable $e) { + $this->handleJobFailure($rawPayload, $jobData, $producer, $processingKey, $e); + } + } + + /** + * Check if the job is ready to be processed + * + * @param array $jobData + * @return bool + */ + private function isJobReady(array $jobData): bool + { + return $jobData["available_at"] <= time(); + } + + /** + * Execute the task + * + * @param QueueTask $producer + * @return void + */ + private function executeTask(QueueTask $producer): void + { + call_user_func([$producer, "process"]); + } + + /** + * Handle job failure + * + * @param string $rawPayload + * @param array $jobData + * @param QueueTask|null $producer + * @param string $processingKey + * @param Throwable $exception + * @return void + */ + private function handleJobFailure( + string $rawPayload, + array $jobData, + ?QueueTask $producer, + string $processingKey, + Throwable $exception + ): void { + $this->logError($exception); + + // Store failed job info + $failedKey = $this->getQueueKey($jobData["queue"]) . self::FAILED_SUFFIX; + $this->redis->hSet($failedKey, $jobData["id"], $rawPayload); + + if (is_null($producer)) { + $this->removeFromProcessing($rawPayload, $processingKey); + $this->sleep(1); + return; + } + + $producer->onException($exception); + + if ($this->shouldMarkJobAsFailed($producer, $jobData)) { + $this->removeFromProcessing($rawPayload, $processingKey); + $this->sleep(1); + return; + } + + // Retry the job + $this->scheduleJobRetry($jobData, $producer, $processingKey); + $this->sleep(1); + } + + /** + * Determine if the job should be marked as failed + * + * @param QueueTask $producer + * @param array $jobData + * @return bool + */ + private function shouldMarkJobAsFailed(QueueTask $producer, array $jobData): bool + { + return $producer->taskShouldBeDelete() || $jobData["attempts"] <= 0; + } + + /** + * Schedule a job for retry + * + * @param array $jobData + * @param QueueTask $producer + * @param string $processingKey + * @return void + */ + private function scheduleJobRetry(array $jobData, QueueTask $producer, string $processingKey): void + { + // Update job data for retry + $jobData["attempts"] = $jobData["attempts"] - 1; + $jobData["available_at"] = time() + $producer->getDelay(); + + $newPayload = json_encode($jobData); + + // Remove from processing and add back to queue + $this->redis->lRem($processingKey, $newPayload, 0); + $this->redis->rPush($this->getQueueKey($jobData["queue"]), $newPayload); + } + + /** + * Requeue a job that is not yet ready + * + * @param string $rawPayload + * @param string $processingKey + * @return void + */ + private function requeue(string $rawPayload, string $processingKey): void + { + $jobData = json_decode($rawPayload, true); + + $this->redis->lRem($processingKey, $rawPayload, 0); + $this->redis->rPush($this->getQueueKey($jobData["queue"]), $rawPayload); + + $this->sleep(1); + } + + /** + * Remove a job from the processing list + * + * @param string $rawPayload + * @param string $processingKey + * @return void + */ + private function removeFromProcessing(string $rawPayload, string $processingKey): void + { + $this->redis->lRem($processingKey, $rawPayload, 0); + } + + /** + * Get the Redis key for a queue + * + * @param string|null $queue + * @return string + */ + private function getQueueKey(?string $queue = null): string + { + return self::QUEUE_PREFIX . $this->getQueue($queue); + } + + /** + * Log an error + * + * @param Throwable $exception + * @return void + */ + private function logError(Throwable $exception): void + { + error_log($exception->getMessage()); + + try { + logger()->error($exception->getMessage(), $exception->getTrace()); + } catch (Throwable $loggerException) { + // Logger not available, already logged to error_log + } + } + + /** + * Flush all jobs from the queue + * + * @param string|null $queue + * @return void + */ + public function flush(?string $queue = null): void + { + $queueKey = $this->getQueueKey($queue); + + $this->redis->del($queueKey); + $this->redis->del($queueKey . self::PROCESSING_SUFFIX); + $this->redis->del($queueKey . self::FAILED_SUFFIX); + } + + /** + * Get failed jobs for a queue + * + * @param string|null $queue + * @return array + */ + public function getFailedJobs(?string $queue = null): array + { + $failedKey = $this->getQueueKey($queue) . self::FAILED_SUFFIX; + + return $this->redis->hGetAll($failedKey); + } + + /** + * Retry a failed job + * + * @param string $jobId + * @param string|null $queue + * @return bool + */ + public function retryFailedJob(string $jobId, ?string $queue = null): bool + { + $failedKey = $this->getQueueKey($queue) . self::FAILED_SUFFIX; + $rawPayload = $this->redis->hGet($failedKey, $jobId); + + if ($rawPayload === false) { + return false; + } + + $jobData = json_decode($rawPayload, true); + $jobData["attempts"] = $this->tries; + $jobData["available_at"] = time(); + + $this->redis->rPush($this->getQueueKey($queue), json_encode($jobData)); + $this->redis->hDel($failedKey, $jobId); + + return true; + } + + /** + * Clear all failed jobs for a queue + * + * @param string|null $queue + * @return void + */ + public function clearFailedJobs(?string $queue = null): void + { + $this->redis->del($this->getQueueKey($queue) . self::FAILED_SUFFIX); + } +} diff --git a/src/Queue/Adapters/SQSAdapter.php b/src/Queue/Adapters/SQSAdapter.php index d540d509..f1e9e2ca 100644 --- a/src/Queue/Adapters/SQSAdapter.php +++ b/src/Queue/Adapters/SQSAdapter.php @@ -1,36 +1,43 @@ config = $config; - $this->sqs = new SqsClient($config); return $this; } /** - * Push a job onto the queue. + * Push a job onto the queue * * @param QueueTask $job * @return bool @@ -54,122 +60,206 @@ public function configure(array $config): QueueAdapter public function push(QueueTask $job): bool { $params = [ - 'DelaySeconds' => $job->getDelay(), - 'MessageAttributes' => [ - "Title" => [ - 'DataType' => "String", - 'StringValue' => get_class($job) - ], - "Id" => [ - "DataType" => "String", - "StringValue" => $this->generateId(), - ] - ], - 'MessageBody' => base64_encode($this->serializeProducer($job)), - 'QueueUrl' => $this->config["url"] + "DelaySeconds" => $job->getDelay(), + "MessageAttributes" => $this->buildMessageAttributes($job), + "MessageBody" => base64_encode($this->serializeProducer($job)), + "QueueUrl" => $this->getQueueUrl(), ]; try { $this->sqs->sendMessage($params); return true; } catch (AwsException $e) { - error_log($e->getMessage()); + $this->logError($e); return false; } } /** - * Get the size of the queue. + * Build message attributes for SQS + * + * @param QueueTask $job + * @return array + */ + private function buildMessageAttributes(QueueTask $job): array + { + return [ + "Title" => [ + "DataType" => "String", + "StringValue" => get_class($job), + ], + "Id" => [ + "DataType" => "String", + "StringValue" => $this->generateId(), + ], + ]; + } + + /** + * Get the size of the queue * - * @param string $queue + * @param string|null $queue * @return int */ - public function size(string $queue): int + public function size(?string $queue = null): int { $response = $this->sqs->getQueueAttributes([ - 'QueueUrl' => $this->getQueue($queue), - 'AttributeNames' => ['ApproximateNumberOfMessages'], + "QueueUrl" => $this->getQueue($queue), + "AttributeNames" => ["ApproximateNumberOfMessages"], ]); - $attributes = $response->get('Attributes'); + $attributes = $response->get("Attributes"); - return (int)$attributes['ApproximateNumberOfMessages']; + return (int) $attributes["ApproximateNumberOfMessages"]; } /** - * Process the next job on the queue. + * Process the next job on the queue * - * @param ?string $queue + * @param string|null $queue * @return void - * @throws ErrorException */ public function run(?string $queue = null): void { - $this->sleep($this->sleep ?? 5); - $message = null; - $delay = 5; + $this->sleep($this->sleep); + + $message = $this->receiveMessage(); + + if (is_null($message)) { + $this->sleep(1); + return; + } + + $this->processMessage($message); + } + + /** + * Receive a message from the queue + * + * @return array|null + */ + private function receiveMessage(): ?array + { + $result = $this->sqs->receiveMessage([ + "AttributeNames" => ["SentTimestamp"], + "MaxNumberOfMessages" => 1, + "MessageAttributeNames" => ["All"], + "QueueUrl" => $this->getQueueUrl(), + "WaitTimeSeconds" => self::WAIT_TIME_SECONDS, + ]); + + $messages = $result->get("Messages"); + + return empty($messages) ? null : $messages[0]; + } + + /** + * Process a single message from the queue + * + * @param array $message + * @return void + */ + private function processMessage(array $message): void + { + $job = null; try { - $result = $this->sqs->receiveMessage([ - 'AttributeNames' => ['SentTimestamp'], - 'MaxNumberOfMessages' => 1, - 'MessageAttributeNames' => ['All'], - 'QueueUrl' => $this->config["url"], - 'WaitTimeSeconds' => 20, - ]); - $messages = $result->get('Messages'); - if (empty($messages)) { - $this->sleep(1); - return; - } - $message = $result->get('Messages')[0]; $job = $this->unserializeProducer(base64_decode($message["Body"])); - $delay = $job->getDelay(); call_user_func([$job, "process"]); - $result = $this->sqs->deleteMessage([ - 'QueueUrl' => $this->config["url"], - 'ReceiptHandle' => $message['ReceiptHandle'] - ]); - } catch (AwsException $e) { - // Write the error log - error_log($e->getMessage()); - app('logger')->error($e->getMessage(), $e->getTrace()); - - if (!$message) { - $this->sleep(1); - return; - } - - cache("job:failed:" . $message["ReceiptHandle"], $message["Body"]); - - // Check if job has been loaded - if (!isset($job)) { - $this->sleep(1); - return; - } - - // Execute the onException method for notify the job - // and let developer decide if the job should be deleted - $job->onException($e); - - // Check if the job should be deleted - if ($job->jobShouldBeDelete()) { - $this->sqs->deleteMessage([ - 'QueueUrl' => $this->config["url"], - 'ReceiptHandle' => $message['ReceiptHandle'] - ]); - } else { - $this->sqs->changeMessageVisibilityBatch([ - 'QueueUrl' => $this->config["url"], - 'Entries' => [ - 'Id' => $job->getId(), - 'ReceiptHandle' => $message['ReceiptHandle'], - 'VisibilityTimeout' => $delay - ], - ]); - } + $this->deleteMessage($message); + } catch (Throwable $e) { + $this->handleMessageFailure($message, $job, $e); + } + } + + /** + * Handle message processing failure + * + * @param array $message + * @param QueueTask|null $job + * @param Throwable $exception + * @return void + */ + private function handleMessageFailure(array $message, ?QueueTask $job, Throwable $exception): void + { + $this->logError($exception); + cache("job:failed:" . $message["ReceiptHandle"], $message["Body"]); + if (is_null($job)) { $this->sleep(1); + return; + } + + $job->onException($exception); + + if ($job->taskShouldBeDelete()) { + $this->deleteMessage($message); + } else { + $this->changeMessageVisibility($message, $job); + } + + $this->sleep(1); + } + + /** + * Delete a message from the queue + * + * @param array $message + * @return void + */ + private function deleteMessage(array $message): void + { + $this->sqs->deleteMessage([ + "QueueUrl" => $this->getQueueUrl(), + "ReceiptHandle" => $message["ReceiptHandle"], + ]); + } + + /** + * Change message visibility for retry + * + * @param array $message + * @param QueueTask $job + * @return void + */ + private function changeMessageVisibility(array $message, QueueTask $job): void + { + $this->sqs->changeMessageVisibilityBatch([ + "QueueUrl" => $this->getQueueUrl(), + "Entries" => [ + [ + "Id" => $job->getId(), + "ReceiptHandle" => $message["ReceiptHandle"], + "VisibilityTimeout" => $job->getDelay(), + ], + ], + ]); + } + + /** + * Get the queue URL from configuration + * + * @return string + */ + private function getQueueUrl(): string + { + return $this->config["url"]; + } + + /** + * Log an error + * + * @param Throwable $exception + * @return void + */ + private function logError(Throwable $exception): void + { + error_log($exception->getMessage()); + + try { + logger()->error($exception->getMessage(), $exception->getTrace()); + } catch (Throwable $loggerException) { + // Logger not available, already logged to error_log } } } diff --git a/src/Queue/Connection.php b/src/Queue/Connection.php index 438a0834..ac37d773 100644 --- a/src/Queue/Connection.php +++ b/src/Queue/Connection.php @@ -4,11 +4,12 @@ namespace Bow\Queue; -use Bow\Queue\Adapters\BeanstalkdAdapter; -use Bow\Queue\Adapters\DatabaseAdapter; -use Bow\Queue\Adapters\QueueAdapter; use Bow\Queue\Adapters\SQSAdapter; use Bow\Queue\Adapters\SyncAdapter; +use Bow\Queue\Adapters\QueueAdapter; +use Bow\Queue\Adapters\RedisAdapter; +use Bow\Queue\Adapters\DatabaseAdapter; +use Bow\Queue\Adapters\BeanstalkdAdapter; use Bow\Queue\Exceptions\ConnexionException; use Bow\Queue\Exceptions\MethodCallException; @@ -24,6 +25,7 @@ class Connection "sqs" => SQSAdapter::class, "database" => DatabaseAdapter::class, "sync" => SyncAdapter::class, + "redis" => RedisAdapter::class, ]; /** * The configuration array diff --git a/src/Queue/QueueTask.php b/src/Queue/QueueTask.php index 19a51b1f..bfcb6034 100644 --- a/src/Queue/QueueTask.php +++ b/src/Queue/QueueTask.php @@ -23,14 +23,14 @@ abstract class QueueTask * * @var int */ - protected int $delay = 30; + protected int $delay = 0; /** * Define the time of retry * * @var int */ - protected int $retry = 60; + protected int $retry = 30; /** * Define the priority @@ -193,6 +193,16 @@ public function taskShouldBeDelete(): bool return $this->delete; } + /** + * Delete the job from queue. + * + * @return bool + */ + public function jobShouldBeDelete() + { + return $this->delete; + } + /** * Get the task error * diff --git a/src/Support/Env.php b/src/Support/Env.php index 7ed992b1..a3022853 100644 --- a/src/Support/Env.php +++ b/src/Support/Env.php @@ -45,13 +45,17 @@ class Env * * @throws */ - public function __construct(string $filename) + public function __construct(?string $filename = null) { if ($this->isLoaded()) { return; } - $this->envs = json_decode(file_get_contents($filename), true, 512, JSON_THROW_ON_ERROR); + if ($filename === null || !file_exists($filename)) { + $this->envs = []; + } else { + $this->envs = json_decode(file_get_contents($filename), true, 512, JSON_THROW_ON_ERROR); + } $this->envs = $this->bindVariables($this->envs); diff --git a/tests/Config/stubs/config/queue.php b/tests/Config/stubs/config/queue.php index 85f39dbb..7fc84cf1 100644 --- a/tests/Config/stubs/config/queue.php +++ b/tests/Config/stubs/config/queue.php @@ -26,6 +26,14 @@ "timeout" => 10, ], + /** + * The redis connexion + */ + "redis" => [ + "database" => 1, + "block_timeout" => 5, + ], + /** * The sqs connexion */ @@ -41,7 +49,7 @@ ], /** - * The sqs connexion + * The database connexion */ "database" => [ 'table' => "queues", diff --git a/tests/Queue/QueueTest.php b/tests/Queue/QueueTest.php index 4912796f..909f355e 100644 --- a/tests/Queue/QueueTest.php +++ b/tests/Queue/QueueTest.php @@ -2,7 +2,6 @@ namespace Bow\Tests\Queue; -use Bow\Cache\Adapters\RedisAdapter; use Bow\Cache\CacheConfiguration; use Bow\Configuration\EnvConfiguration; use Bow\Configuration\LoggerConfiguration; @@ -11,6 +10,7 @@ use Bow\Mail\Mail; use Bow\Queue\Adapters\BeanstalkdAdapter; use Bow\Queue\Adapters\DatabaseAdapter; +use Bow\Queue\Adapters\RedisAdapter; use Bow\Queue\Adapters\SQSAdapter; use Bow\Queue\Adapters\SyncAdapter; use Bow\Queue\Connection as QueueConnection; @@ -185,6 +185,9 @@ public function test_can_switch_between_connections(): void $beanstalkdAdapter = $this->getAdapter("beanstalkd"); $this->assertInstanceOf(BeanstalkdAdapter::class, $beanstalkdAdapter); + + $redisAdapter = $this->getAdapter("redis"); + $this->assertInstanceOf(RedisAdapter::class, $redisAdapter); } public function test_connection_returns_same_instance_for_same_adapter(): void @@ -210,11 +213,6 @@ public function test_can_get_current_connection_name(): void */ public function test_push_service_adapter(string $connection): void { - // Skip database adapter due to UUID collision bug - if ($connection === 'database') { - $this->markTestSkipped('Skipped: Str::uuid() generates duplicate UUIDs causing PRIMARY KEY violations'); - } - $adapter = $this->getAdapter($connection); $filename = $this->getProducerFilePath($connection); @@ -251,11 +249,6 @@ public function test_push_service_adapter(string $connection): void */ public function test_push_service_adapter_with_model(string $connection): void { - // Skip database adapter due to UUID collision bug - if ($connection === 'database') { - $this->markTestSkipped('Skipped: Str::uuid() generates duplicate UUIDs causing PRIMARY KEY violations'); - } - // Recreate table to reset auto-increment and avoid test pollution $this->recreatePetsTable(); @@ -529,6 +522,144 @@ public function test_beanstalkd_adapter_respects_queue_configuration(): void } } + public function test_redis_adapter_is_correct_instance(): void + { + try { + $adapter = $this->getAdapter("redis"); + $this->assertInstanceOf(RedisAdapter::class, $adapter); + } catch (\Exception $e) { + $this->markTestSkipped('Redis service is not available: ' . $e->getMessage()); + } + } + + /** + * @group integration + */ + public function test_redis_adapter_can_push_job(): void + { + $filename = $this->getProducerFilePath("redis"); + $this->cleanupFiles([$filename]); + + try { + $adapter = $this->getAdapter("redis"); + $producer = $this->createBasicJob("redis"); + + $result = $adapter->push($producer); + $this->assertTrue($result); + + // Verify queue size increased + $size = $adapter->size(); + $this->assertGreaterThanOrEqual(1, $size); + } catch (\Exception $e) { + $this->markTestSkipped('Redis service is not available: ' . $e->getMessage()); + } finally { + $this->cleanupFiles([$filename]); + } + } + + /** + * @group integration + */ + public function test_redis_adapter_can_process_queued_jobs(): void + { + $filename = $this->getProducerFilePath("redis"); + $this->cleanupFiles([$filename]); + + try { + $adapter = $this->getAdapter("redis"); + + // Flush the queue first to ensure clean state + $adapter->flush(); + + $producer = $this->createBasicJob("redis"); + $adapter->push($producer); + $adapter->run(); + + $this->assertFileExists($filename); + $this->assertEquals(BasicQueueTaskStub::class, file_get_contents($filename)); + } catch (\Exception $e) { + $this->markTestSkipped('Redis service is not available: ' . $e->getMessage()); + } finally { + $this->cleanupFiles([$filename]); + } + } + + /** + * @group integration + */ + public function test_redis_adapter_respects_queue_configuration(): void + { + $filename = $this->getProducerFilePath("redis"); + $this->cleanupFiles([$filename]); + + try { + $adapter = $this->getAdapter("redis"); + $adapter->setQueue("custom-redis-queue"); + $adapter->setTries(2); + $adapter->setSleep(1); + + $producer = $this->createBasicJob("redis"); + $result = $adapter->push($producer); + + $this->assertTrue($result); + + // Cleanup + $adapter->flush("custom-redis-queue"); + } catch (\Exception $e) { + $this->markTestSkipped('Redis service is not available: ' . $e->getMessage()); + } finally { + $this->cleanupFiles([$filename]); + } + } + + /** + * @group integration + */ + public function test_redis_adapter_can_get_queue_size(): void + { + try { + $adapter = $this->getAdapter("redis"); + + // Flush first + $adapter->flush(); + + $initialSize = $adapter->size(); + $this->assertEquals(0, $initialSize); + + $producer = $this->createBasicJob("redis"); + $adapter->push($producer); + + $newSize = $adapter->size(); + $this->assertEquals(1, $newSize); + + // Cleanup + $adapter->flush(); + } catch (\Exception $e) { + $this->markTestSkipped('Redis service is not available: ' . $e->getMessage()); + } + } + + /** + * @group integration + */ + public function test_redis_adapter_can_flush_queue(): void + { + try { + $adapter = $this->getAdapter("redis"); + + $producer = $this->createBasicJob("redis"); + $adapter->push($producer); + + $this->assertGreaterThanOrEqual(1, $adapter->size()); + + $adapter->flush(); + + $this->assertEquals(0, $adapter->size()); + } catch (\Exception $e) { + $this->markTestSkipped('Redis service is not available: ' . $e->getMessage()); + } + } + public function test_can_set_queue_name(): void { $adapter = $this->getAdapter("sync"); @@ -622,6 +753,7 @@ public function test_sync_adapter_executes_without_delay(): void $startTime = microtime(true); $producer = $this->createBasicJob("sync"); + $producer->setDelay(0); $adapter->push($producer); $endTime = microtime(true); @@ -747,6 +879,7 @@ public function getConnection(): array $data = [ ["beanstalkd"], ["database"], + ["redis"], ["sync"], ]; diff --git a/tests/Queue/Stubs/ModelJobStub.php b/tests/Queue/Stubs/ModelQueueTaskStub.php similarity index 94% rename from tests/Queue/Stubs/ModelJobStub.php rename to tests/Queue/Stubs/ModelQueueTaskStub.php index 0ad9e9d3..af540647 100644 --- a/tests/Queue/Stubs/ModelJobStub.php +++ b/tests/Queue/Stubs/ModelQueueTaskStub.php @@ -20,6 +20,6 @@ public function process(): void file_put_contents(TESTING_RESOURCE_BASE_DIRECTORY . "/{$this->connection}_queue_pet_model_stub.txt", $this->pet->toJson()); - $this->deleteJob(); + $this->deleteTask(); } }