...

среда, 1 апреля 2015 г.

Простой DICOM клиент на GO с балансировщиком задач и веб-интерфейсом



Привет Хабр! В последнее время я очень сильно увлекся разработкой на языке GO. Изящный и выразительный язык программирования. Мне давно хотелось сделать что-нибудь полезное. По специфике своей работы мне приходится работать с медицинскими архивами DICOM-изображений PACS.



Я решил, что пришло время создать свой dicom-клиент с (блэкджеком..) веб-интерфейсом, который может выполнять следующие стандартные операции:


  • Dicom ping;

  • Cкачивание исследований;

  • Загрузка исследования,

  • А также поиск по реквизитам




(c-echo, c-move,c-store,c-find соотвествено).

В качестве dicom-библиотеки была выбрана библиотека GrassRoot SDK. Наш клиент будет распараллеливать задачи. Язык go для этого хорошо адаптирован

Похожий сценарий работы был описан http://ift.tt/H6i8DG.

Наш сценарий несколько отличается:

У нас есть некий балансировщик задач, который получает задания dicom-сервиса, проверяет возможность выполнения и асинхронно их выполняет. Для того чтобы не было ситуации, когда параллельно выполняется 1000 задач, мы реализуем очередь задач таким образом, чтобы были активные задачи и те, которые находятся в спящем состоянии. По умолчанию только 10 задач будут активными. В противном случае мы могли бы обойтись без баллансировщика вообще, тупо параллельно выполнить 1000 задач параллельно без какого либо контроля.

Весь код балансировщика находится в файле job_ballancer.go.


В начале идет описание интерфейсов обработчиков. В случае если работа была выполнена успешно, в случае если вернулась ошибка и сам процесс обработки задачи.



type JobDispatcher interface {
Dispatch(interface{}) (interface{}, error)
}
type ErrDispatcher interface {
DispatchError(FaJob) error
}
type CompDispatcher interface {
DispatchSuccess(CompJob) error
}


Когда мы создаем экземпляр диспетчера, мы его инициализируем соответствующими обработчиками.



srv.jbBal.Init(&srv.dDisp, srv, srv)


//Сама структура балансировщика



<source lang="go">
type JobBallancer struct {
jChan chan interface{} //канал в который мы передаем задания
acJob map[string]Job //список активных работ
slJob map[string]Job //список неактивных работ
errDisp ErrDispatcher //обработчик работ завершившихся error
jobDisp JobDispatcher //обработчик заданий
compDisp CompDispatcher //обработчик успешно выполненных работ
JbDone sync.WaitGroup //ожидаем завершения всех работ
aJobC int //количество параллельных (активных)
}

//инициализация балансировщика
func (jbal *JobBallancer) Init(jdis JobDispatcher, cmd CompDispatcher, erd ErrDispatcher) {
jbal.errDisp = erd
jbal.jobDisp = jdis
jbal.compDisp = cmd
jbal.acJob = make(map[string]Job)
jbal.slJob = make(map[string]Job)
jbal.aJobC = 10
jbal.jChan = make(chan interface{})
go jbal.takeJob() //запускаем поток в котором осуществляем //балансировку работ
log.Println("info: job ballancer inited")
}
добавление новой работы в очередь. Операция асинхронная, т.е. работа отсылается в канал, а функция takeJob подбирает ее от туда.
<source lang="go">
func (jbal *JobBallancer) PushJob(jdat interface{}) error {
if jbal.checkInit() {
return errors.New("error: JobChan is not inited")
}
uid := genUid()
job := Job{JobId: uid, Data: jdat}
jbal.jChan <- job
return nil

}



func (jbal *JobBallancer) takeJob() {
for {
//извлекаем работу из канала
recivedTask := <-jbal.jChan
log.Println("info: job taken")
switch job := recivedTask.(type) {
case TermJob:
//если мы получаем сигнал на завершение выходим из функции
log.Println("info: recive terminate dispatch singal")
return
case Job:
//обычная обработка (если все слоты активных работ заняты то работа записывается в список не активных работ)
if len(jbal.acJob) < jbal.aJobC {
jbal.JbDone.Add(1)
jbal.addActiveJob(job)
go jbal.startJob(job)
log.Println("info: normal dispatch")
} else {
jbal.addSleepJob(job)
jbal.JbDone.Add(1)
log.Println("info: attend maximum active job")
}
case CompJob:
//работа завершилась успехом
if err := jbal.compDisp.DispatchSuccess(job); err != nil {
log.Println("error: failed dispatch success" + job.Job.JobId)
}
//удачно завершившуюся работу можно удалить из списка
jbal.removeJob(job.Job.JobId)
jbal.JbDone.Done()
jbal.resumeJobs()
case FaJob:
//работа завершилась ошибкой
if err := jbal.errDisp.DispatchError(job); err != nil {
log.Println("error: failed dispatch error" + job.Job.JobId)
}
//завершившуюся работу можно удалить из списка
jbal.removeJob(job.Job.JobId)
jbal.JbDone.Done()
jbal.resumeJobs()
default:
log.Fatalln("error: unknown job type")
jbal.JbDone.Done()
}
}
}
//функция удаления работы
func (jbal *JobBallancer) removeJob(jid string) error {
if _, isFind := jbal.acJob[jid]; isFind {
delete(jbal.acJob, jid)
} else {
return errors.New("error: can't remove job because job with id not found")
}
return nil
}


//функция позволяющая правильно завершить работу балансировщика, в случае если есть работы которые не завершены, функция будет ожидать их завершения
func (jbal *JobBallancer) TerminateTakeJob() error {
if jbal.checkInit() {
return errors.New("error: is not inited")
}
jbal.JbDone.Wait()
jbal.jChan <- TermJob{}
close(jbal.jChan)
if len(jbal.acJob) > 0 {
return errors.New("error: list job is not empty")
}
log.Println("info: greacefully terminate take job")
return nil
}


Остальные вспомогательные функции мы не будем рассматривать. полный код можно посмотреть

http://ift.tt/1OYjv5E


Не смотря, что код не сложный и я долго обдумывал его. Но все равно для проверки надежности я реализовал нагрузочный тест на десятки задач:



testJobDispatcher := TestJobDispatcher{}
testErrorDispatcher := TestErrorDispatcher{}
testSuccessDispatcher := TestCompletedDispatcher{}
jobBallancer := JobBallancer{}
jobBallancer.Init(&testJobDispatcher, &testSuccessDispatcher, &testErrorDispatcher)
for i := 0; i < 40; i++ {
jobBallancer.PushJob("data: " + strconv.Itoa(i))
}
jobBallancer.TerminateTakeJob()




Он отработал нормально. Все задачи были выполнены, а функция TerminateTakeJob завершилась тогда, когда все задачи были выполненны. Для контроля отработаных задач используется объект синхронизации sync.WaitGroup JbDone, который ведет подсчет количества выполненных работ. Как я уже отмечал выше, код балансировщика является универсальным и для того чтобы наш балансировщик работал по-другому, нам достаточно проинстанцировать его соотвествующими обработчиками.

Как и в прошлой своей поделке) (http://ift.tt/14QS22C) интерефейс приложения я реализовал в виде веб-интерфейса.


Для теста я использовал публичный dicom-архив 213.165.94.158:11112. С него можно скачивать исследования, если есть прямой айпи и если на стороне клиента открыт порт 11112. Так же я проверил работу на свободном dicom-архиве dcm4che http://ift.tt/1OYjvlS.

Мне удалось собрать рабочую версию для Linux, к сожалению собрать под Widows мне не удалось. Библиотека grassroot успешно собралась, но ошибка возникает при линковке самого приложения.

cmd/ld: Malformed PE file: Unexpected flags for PE section.


Об этой ошибке много написано тут: http://ift.tt/1OYjvlU.

К сожалению я не настолько знаком с тонкостями сборки и поэтому получилась версия только под Linux. Может «хабра-эфект» сдвинет с мертвой точки и эту проблему. Для пользователей Windows, которые хотят проверить и посмотреть как это работает, я подготовил виртуальную машину на базе CoreOS (http://ift.tt/1F1L4mq). В демо машине наш dicom-клиент работает как systemd-сервис.

При наличии желания, можно например реализовать сервис, который выкачивает исследования с различных dicom-узлов, и выкладывает в zip-архиве для скачивания. Для управления сервисом можно использовать json-сообщения, так же, как делает наш GUI.

А можно поступить, как поступил я: прикрутить в наше приложение какой-нибудь веб-просмотровщик на базе html5.




Github: http://ift.tt/1F1L4mu

Версия Linux-amd64: http://ift.tt/1F1L4CI


This entry passed through the Full-Text RSS service - if this is your content and you're reading it on someone else's site, please read the FAQ at http://ift.tt/jcXqJW.


Комментариев нет:

Отправить комментарий