diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index a1aa70a4d..bd7314268 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -378,6 +378,35 @@ def _query_requeue_dead_jobs(self): RETURNING uuid """ + def _query_requeue_orphaned_jobs(self): + """Query to requeue jobs stuck in 'enqueued' state without a lock. + + This handles the edge case where the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). These jobs have no lock record + because set_started() was never called, so they are invisible to + _query_requeue_dead_jobs(). + """ + return """ + UPDATE + queue_job + SET + state='pending' + WHERE + state = 'enqueued' + AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + AND NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_id = queue_job.id + ) + RETURNING uuid + """ + def requeue_dead_jobs(self): """ Set started and enqueued jobs but not locked to pending @@ -406,6 +435,14 @@ def requeue_dead_jobs(self): for (uuid,) in cr.fetchall(): _logger.warning("Re-queued dead job with uuid: %s", uuid) + # Requeue orphaned jobs (enqueued but never started, no lock) + query = self._query_requeue_orphaned_jobs() + cr.execute(query) + for (uuid,) in cr.fetchall(): + _logger.warning( + "Re-queued orphaned job (enqueued without lock) with uuid: %s", uuid + ) + class QueueJobRunner: def __init__( diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index a6328fed7..c90feaa97 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -99,3 +99,25 @@ def test_requeue_dead_jobs(self): uuids_requeued = self.env.cr.fetchall() self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) + + def test_requeue_orphaned_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) + + # Only enqueued job, don't set it to started to simulate the scenario + # that system shutdown before job is starting + job_obj.set_enqueued() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # job ins't actually picked up by the first requeue attempt + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertFalse(uuids_requeued) + + # job is picked up by the 2nd requeue attempt + query = Database(self.env.cr.dbname)._query_requeue_orphaned_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)