Skip to content

Commit

Permalink
Merge pull request #600 from jbernal0019/master
Browse files Browse the repository at this point in the history
Implement async scheduling of PACS query operation
  • Loading branch information
jbernal0019 authored Dec 13, 2024
2 parents 779acdc + eca2c9c commit df2b666
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 93 deletions.
1 change: 1 addition & 0 deletions chris_backend/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
{'queue': 'periodic'},
'plugininstances.tasks.cancel_waiting_plugin_instances':
{'queue': 'periodic'},
'pacsfiles.tasks.send_pacs_query': {'queue': 'main2'},
'pacsfiles.tasks.register_pacs_series': {'queue': 'main2'}
}
app.conf.update(task_routes=task_routes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.5 on 2024-12-07 00:55

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('pacsfiles', '0004_pacsretrieve'),
]

operations = [
migrations.AddField(
model_name='pacsquery',
name='status',
field=models.CharField(choices=[('created', 'Default initial'), ('sent', 'Sent to PACS'), ('succeeded', 'Finished successfully'), ('errored', 'Finished with error')], default='created', max_length=10),
),
migrations.AddField(
model_name='pacsretrieve',
name='status',
field=models.CharField(choices=[('created', 'Default initial'), ('sent', 'Sent to PACS'), ('succeeded', 'Finished successfully'), ('errored', 'Finished with error')], default='created', max_length=10),
),
]
68 changes: 63 additions & 5 deletions chris_backend/pacsfiles/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
from django_filters.rest_framework import FilterSet

from core.models import ChrisFolder, ChrisFile
from core.utils import filter_files_by_n_slashes
from core.utils import filter_files_by_n_slashes, json_zip2str
from core.storage import connect_storage
from .services import PfdcmClient


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,12 +44,20 @@ class Meta:
fields = ['id', 'identifier', 'active']


PACS_QUERY_STATUS_CHOICES = [("created", "Default initial"),
("sent", "Sent to PACS"),
("succeeded", "Finished successfully"),
("errored", "Finished with error")]


class PACSQuery(models.Model):
creation_date = models.DateTimeField(auto_now_add=True)
title = models.CharField(max_length=300, db_index=True)
query = models.JSONField()
description = models.CharField(max_length=700, blank=True)
result = models.TextField(blank=True)
status = models.CharField(max_length=10, choices=PACS_QUERY_STATUS_CHOICES,
default='created')
pacs = models.ForeignKey(PACS, on_delete=models.CASCADE, related_name='query_list')
owner = models.ForeignKey('auth.User', on_delete=models.CASCADE)

Expand All @@ -57,7 +66,28 @@ class Meta:
unique_together = ('pacs', 'owner', 'title',)

def __str__(self):
return self.query
return self.title

def send(self):
"""
Custom method to send the query request to pfdcm service.
"""
pacs_name = str(self.pacs.identifier)
query = self.query
pfdcm_cl = PfdcmClient()

self.status = 'sent'
self.save()
try:
result = pfdcm_cl.query(pacs_name, query)
except Exception:
self.status = 'errored'
self.save()
else:
if result:
self.result = json_zip2str(result)
self.status = 'succeeded'
self.save()


class PACSQueryFilter(FilterSet):
Expand All @@ -78,12 +108,18 @@ class PACSQueryFilter(FilterSet):
class Meta:
model = PACSQuery
fields = ['id', 'min_creation_date', 'max_creation_date', 'title_exact',
'title', 'description', 'pacs_id', 'pacs_identifier', 'owner_username']
'title', 'status', 'description', 'pacs_id', 'pacs_identifier',
'owner_username']


PACS_RETRIEVE_STATUS_CHOICES = PACS_QUERY_STATUS_CHOICES


class PACSRetrieve(models.Model):
creation_date = models.DateTimeField(auto_now_add=True)
result = models.TextField(blank=True)
status = models.CharField(max_length=10, choices=PACS_RETRIEVE_STATUS_CHOICES,
default='created')
pacs_query = models.ForeignKey(PACSQuery, on_delete=models.CASCADE,
related_name='retrieve_list')
owner = models.ForeignKey('auth.User', on_delete=models.CASCADE)
Expand All @@ -92,7 +128,28 @@ class Meta:
ordering = ('pacs_query', '-creation_date',)

def __str__(self):
return self.pacs_query.query
return self.pacs_query.title

def send(self):
"""
Custom method to send the retrieve request to pfdcm service.
"""
pacs_query = self.pacs_query
pacs_name = str(pacs_query.pacs.identifier)
query = pacs_query.query
pfdcm_cl = PfdcmClient()

self.status = 'sent'
self.save()
try:
result = pfdcm_cl.retrieve(pacs_name, query)
except Exception:
self.status = 'errored'
self.save()
else:
if result:
self.result = json_zip2str(result)
self.save()


class PACSRetrieveFilter(FilterSet):
Expand All @@ -105,7 +162,8 @@ class PACSRetrieveFilter(FilterSet):

class Meta:
model = PACSRetrieve
fields = ['id', 'min_creation_date', 'max_creation_date', 'owner_username']
fields = ['id', 'min_creation_date', 'max_creation_date', 'status',
'owner_username']


class PACSSeries(models.Model):
Expand Down
43 changes: 8 additions & 35 deletions chris_backend/pacsfiles/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
from core.models import ChrisFolder
from core.storage import connect_storage
from core.serializers import ChrisFileSerializer
from core.utils import json_zip2str

from .models import PACS, PACSQuery, PACSRetrieve, PACSSeries, PACSFile
from .services import PfdcmClient


logger = logging.getLogger(__name__)
Expand All @@ -39,38 +36,30 @@ class PACSQuerySerializer(serializers.HyperlinkedModelSerializer):
pacs_identifier = serializers.ReadOnlyField(source='pacs.identifier')
owner_username = serializers.ReadOnlyField(source='owner.username')
result = serializers.ReadOnlyField()
status = serializers.ReadOnlyField()
retrieve_list = serializers.HyperlinkedIdentityField(view_name='pacsretrieve-list')

class Meta:
model = PACSQuery
fields = ('url', 'id', 'creation_date', 'title', 'query', 'description',
'pacs_identifier', 'owner_username', 'result', 'retrieve_list')
'status', 'pacs_identifier', 'owner_username', 'result',
'retrieve_list')

def create(self, validated_data):
"""
Overriden to rise a serializer error when attempting to create a PACSQuery
object that results in a DB conflict. Then a PACS query operation is requested
to the PFDCM service.
object that results in a DB conflict.
"""
title = validated_data['title']
query = validated_data['query']
pacs_name = validated_data['pacs'].identifier

try:
pacs_query = super(PACSQuerySerializer, self).create(validated_data)
return super(PACSQuerySerializer, self).create(validated_data)
except IntegrityError:
error_msg = (f'You have already registered a PACS query with title={title} '
f'for pacs {pacs_name}')
raise serializers.ValidationError([error_msg])

pfdcm_cl = PfdcmClient()
result = pfdcm_cl.query(pacs_name, query)

if result:
pacs_query.result = json_zip2str(result)
pacs_query.save()
return pacs_query

def update(self, instance, validated_data):
"""
Overriden to rise a serializer error when attempting to update a PACSQuery
Expand Down Expand Up @@ -106,31 +95,15 @@ class PACSRetrieveSerializer(serializers.HyperlinkedModelSerializer):
pacs_identifier = serializers.ReadOnlyField(source='pacs_query.pacs.identifier')
owner_username = serializers.ReadOnlyField(source='owner.username')
result = serializers.ReadOnlyField()
status = serializers.ReadOnlyField()
pacs_query = serializers.HyperlinkedRelatedField(view_name='pacsquery-detail',
read_only=True)

class Meta:
model = PACSRetrieve
fields = ('url', 'id', 'creation_date', 'pacs_query_id', 'pacs_query_title',
'query', 'pacs_identifier', 'owner_username', 'result', 'pacs_query')

def create(self, validated_data):
"""
Overriden to request a PACS retrieve operation to the PFDCM service.
"""
pacs_query = validated_data['pacs_query']
query = pacs_query.query
pacs_name = pacs_query.pacs.identifier

pacs_retrieve = super(PACSRetrieveSerializer, self).create(validated_data)

pfdcm_cl = PfdcmClient()
result = pfdcm_cl.retrieve(pacs_name, query)

if result:
pacs_retrieve.result = json_zip2str(result)
pacs_retrieve.save()
return pacs_retrieve
'query', 'pacs_identifier', 'status', 'owner_username', 'result',
'pacs_query')


class PACSSeriesSerializer(serializers.HyperlinkedModelSerializer):
Expand Down
15 changes: 14 additions & 1 deletion chris_backend/pacsfiles/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@

from typing import Optional
from celery import shared_task

from django.contrib.auth.models import User

from celery import shared_task
from .models import PACSQuery
from .serializers import PACSSeriesSerializer


@shared_task
def send_pacs_query(pacs_query_id):
"""
Send PACS query.
"""
#from celery.contrib import rdb;rdb.set_trace()
pacs_query = PACSQuery.objects.get(pk=pacs_query_id)
pacs_query.send()


@shared_task
def register_pacs_series(
PatientID: str,
Expand Down
Loading

0 comments on commit df2b666

Please sign in to comment.