HAPI-FHIR Database 轉置資料 (二) ⇒ 實作

Posted on Tue, Jan 18, 2022 HAPI-FHIR FHIR SQL TS

如果不太了解HAPI-FHIR的資料庫結構可以看看上一篇[HAPI-FHIR Database (一) ⇒ Database結構]

此篇將用Typescript實作從HAPI FHIR資料庫轉置資料到另一個FHIR Server。

使用的程式語言

使用到的套件

npm i axios pg sequelize

實作

第一步:初始化Typescript專案

npm init -y \
tsc --init

ℹ️ 整體資料夾結構與內容

第二步:撰寫設定檔

export const config = {
    db : {
        service : "postgres" , //only support postgres now
        username : "username", //postgresql 帳號
        password : "password", //postgresql 密碼
        database : "hapi" , //要連接的database名稱
        hostName : "localhost" , //postgresql 連接之hostname
    },
    sync: {
        limit: 100, //pagination limit
        totalWorker: 5, //併行處理數量
        FHIRBaseURL: "http://localhost:8088/fhir/" , //the FHIR base URL
        haveAuthToken : true, //if FHIR server need to auth to use, please change to true
        token : "token", // the authorization token
        method: "put" //using `create` or `update` to sync data
    }
}

第三步:定義SQL資料表

資料夾結構與內容

models/sql/hfj_res_sync.ts

import { Sequelize, DataTypes, Model } from 'sequelize';

/**
 * 
 * @param {Sequelize} sequelize 
 * @returns {Model}
 */
module.exports = (sequelize: Sequelize) => {
    const hfj_res_sync = sequelize.define('hfj_res_sync',
    {
        //記錄在HAPI-FHIR的唯一ID,即Resource ID
        res_id: {
            type: DataTypes.BIGINT
        },
        //Resource Type
        res_type: {
            type: DataTypes.STRING
        },
        //Resource Version
        res_ver: {
            type: DataTypes.BIGINT
        },
        //目標FHIR Server創建Resource回傳的ID
        sync_id : {
            type: DataTypes.STRING
        }
    },
    {
        timestamps : false,
        freezeTableName: true
    });
    return hfj_res_sync;
}

models/sql/index.ts

import { Sequelize, Op, Dialect } from 'sequelize';
import { config } from '../../config/config';

const sequelize = new Sequelize(config.db.database  , config.db.username , config.db.password , {
    host: config.db.hostName,
    dialect:  config.db.service as Dialect, //mssql
    // logging : false
});

require('./hfj_res_sync')(sequelize);
//exec this function when you init

/**
 * @type {Sequelize}
 */
module.exports = (async function () {
    try {
        await sequelize.authenticate();
        await sequelize.models['hfj_res_sync'].sync();
        console.log('Connection has been established successfully.');
        return sequelize;
    } catch (error) {
        console.error('Unable to connect to the database:', error);
        process.exit(1);
    }
})();

models/resource.ts

export interface IResource {
    res_id : number;
    res_ver: number;
}

models/resouce_ver.ts

從HAPI-FHIR hfj_res_ver 取回資料之interface,這邊我們只會用到res_idres_verlo_get

export interface IResourceVer {
    res_id: number;
    res_ver: number;
    lo_get: Buffer;
}

第四步:資料操作程式

資料夾結構與內容

utils/lob.ts

import stream from 'stream';
import zlib from 'zlib';

//Gunzip file : https://stackoverflow.com/questions/12148948/how-do-i-ungzip-decompress-a-nodejs-requests-module-gzip-response-body
export const convertLOBToJson = (iLOB: any) => {
    return new Promise((resolve, reject)=> {
        let buffer: any[] = [];
        let bufferStream = new stream.PassThrough();
    
        // Write your buffer
        bufferStream.end(Buffer.from(iLOB.lo_get));
    
        // Pipe it to something else  (i.e. stdout)
        // 解壓縮LOB回傳之Binary
        let gunzip = zlib.createGunzip();
        bufferStream.pipe(gunzip);
        gunzip.on('data', function (data) {
            // decompression chunk ready, add it to the buffer
            buffer.push(data.toString());
    
        }).on("end", function () {
            // response and decompression complete, join the buffer and return
            //callback(null, buffer.join(""));
            let bufferStr = buffer.join("");
            let jsonItem = JSON.parse(bufferStr);
            resolve(jsonItem);
        }).on("error", function (e) {
            //callback(e);
            reject(e);
        });
    })
}

utils/res.ts

import { QueryTypes } from "sequelize";
import { IResource } from "../models/resource";

/**
 * 用於取得hfj_resource資料的id, resource type, 最新res_ver
 * @param limit SQL query data limit count
 * @param offset SQL query data offset
 */
export const getResources = async (limit: number, offset: number): Promise<Array<IResource>> => {
    const sequelize = await require('../models/sql/');
    let item = await sequelize.query(`SELECT res_type, res_id, res_ver FROM hfj_resource LIMIT ${limit} OFFSET ${offset};`, {
        type: QueryTypes.SELECT
    });
    return item;
}

/**
 * 取得hfj_res_ver單個Resource資料
 * @param resId Resource id
 * @param resVer Resource Version
 */
export const getResource = async (resId: number, resVer: number) => {
    const sequelize = await require('../models/sql/');
    let item = await sequelize.query(`SELECT lo_get(res_text), res_id, res_ver FROM hfj_res_ver WHERE res_id=${resId} and res_ver=${resVer};`, {
        type: QueryTypes.SELECT
    });
    return item.pop();
}

export const getResourcesCountByResourceType= async (): Promise<number> => {
    const sequelize = await require('../models/sql/');
    let count = await sequelize.query(`SELECT COUNT(*) FROM hfj_resource`, {
        type: QueryTypes.SELECT
    });
    return Number(count.pop().count);
}

utils/res_sync.ts

import * as axios from "axios";
import { QueryTypes, Sequelize } from "sequelize";
import { config } from "../config/config";
import { IResource } from "../models/resource";
import { log } from '../utils/log';

/**
 * 取得hfj_res_sync某Resource id已轉置的資料,如果沒資料回傳空陣列[]
 * @param resId Resource id
 */
export const getSyncedResourceById = async (resId: number)=> {
    const sequelize = await require('../models/sql/');
    let item = await sequelize.query(`SELECT res_id, res_ver FROM hfj_res_sync WHERE res_id=${resId};`, {
        type: QueryTypes.SELECT
    });
    return item;
}

/**
 * 新增hfj_res_sync已轉置資料
 * @param resourceType resource type e.g. Patient
 * @param syncId The target FHIR Server response id 
 * @param resource hfj_resource content
 */
export const addSyncResource = async (resourceType: string, syncId: string, resource: IResource) => {
    let syncResource: any = resource;
    try {
        syncResource["res_type"] = resourceType;
        syncResource["sync_id"] = syncId;
        const sequelize: Sequelize = await require('../models/sql/');
        let syncedResource = await sequelize.models["hfj_res_sync"].findOne({
            where: {
                res_id : resource.res_id
            }
        });
        if (syncedResource) {
            await sequelize.models["hfj_res_sync"].update(syncResource , {
                where: {
                    res_id : resource.res_id
                }
            });
        } else {
            await sequelize.models["hfj_res_sync"].create(syncResource);
        }
        return {
            status: true,
            data: syncResource
        };
    } catch(e) {
        log.error(e);
        return {
            status: false,
            data: e
        }
    }
}

export const doSync: any = {
    /** Using PUT `update` Web API to create Resource on target FHIR Server
     * @param resourceType resource type e.g. Patient
     * @param resId Resource id
     * @param resJson Resource JSON content
     */
    "put": async (resourceType:string, resId: number, resJson: any)=> {
        try {
            let requestConfig: any = {};
            if (config.sync.haveAuthToken) {
                let headers: axios.AxiosRequestHeaders = {
                    Authorization: `Bearer ${config.sync.token}`
                }
                requestConfig["headers"] = headers;
            }
            let res = await axios.default.put(`${config.sync.FHIRBaseURL}${resourceType}/${resId}`, resJson, 
            requestConfig);
            return res.data;
        } catch(e) {
            log.error(e);
            return false;
        }
    },
    /** Using POST `create` Web API to create Resource on target FHIR Server
     * @param resourceType resource type e.g. Patient
     * @param resId Resource id
     * @param resJson Resource JSON content
     */
    "post": async (resourceType:string, resId: number, resJson: any) => {
        try {
            let requestConfig: any = {};
            if (config.sync.haveAuthToken) {
                let headers: axios.AxiosRequestHeaders = {
                    Authorization: `Bearer ${config.sync.token}`
                }
                requestConfig["headers"] = headers;
            }
            let res = await axios.default.post(`${config.sync.FHIRBaseURL}${resourceType}`, resJson, 
            requestConfig);
            return res.data;
        } catch(e) {
            log.error(e);
            return false;
        }
    }
}

第五步:程式主體

index.ts

import { convertLOBToJson } from './utils/lob';
import { getResources, getResourcesCountByResourceType, getResource } from './utils/res';
import { getSyncedResourceById, addSyncResource, doSync } from './utils/res_sync'
import { config } from './config/config';
import { IResource } from './models/resource';
import { IResourceVer } from './models/resource_ver';

/**
 * Check source FHIR Server content is need to sync to target FHIR Server.
 * @param resource 
 * @param resourceVer 
 */
async function isNeedSync(resource:IResource): Promise<boolean> {
    let syncedResource = await getSyncedResourceById(resource.res_id);
    if (syncedResource.length !== 0 ) {
        return syncedResource[0].res_ver === resource.res_ver;
    }
    return true;
}

/**
 * 執行被丟入陣列的promise function
 * @param workerList 
 */
async function doWorks(workerList: Array<any>) {
    let doWorkList = [];
    while(workerList.length > 0) {
        doWorkList = workerList.slice(0,config.sync.totalWorker);
        workerList.splice(0,config.sync.totalWorker);
        await Promise.allSettled(doWorkList.map(f=> f()))
    }
}

/**
 * 新增執行同步資料的function到陣列裡
 * @param worker 當前待執行promise function陣列
 * @param resourceItemList 當前hfj_resource資料
 * @param limit SQL query data limit
 * @param offset SQL query data offset
 */
function addWork(worker: Array<any>, resourceItemList: Array<any>, limit: number, offset:number) {
    worker.push(async ()=> {
        let successCount = 0;
        for (let i = 0 ; i < resourceItemList.length ; i++) {
            let resourceItem = resourceItemList[i];
            let resource = await getResource(resourceItem.res_id, resourceItem.res_ver);
            let syncResourceType = resourceItem.res_type;
            let needSync = await isNeedSync(resourceItem);
            if (needSync) {
                let res_id = resourceItem.res_id;
                let lobJson = await convertLOBToJson(resource);
                let syncResData = await doSync[config.sync.method](syncResourceType, res_id, lobJson);
                if (syncResData) {
                    let addResult = await addSyncResource(syncResourceType, syncResData.id, resource);
                    if (addResult.status) {
                        successCount++;
                    }
                }
            } else {
                successCount++;
            }
        }
        console.log(`convert ${offset}~${offset+limit} successfully ${successCount}/${resourceItemList.length}`);
    });
}

/**
 * 程式主體main
 */
(async () => {
    let worker:Array<any> = [];
    let limit = config.sync.limit;
    let offset = 0;
    //取得第一批hfj_resource資料
    let resourceItemList = await getResources(limit , offset);

    //執行直到pagination hfj_resource資料長度為0
    while(resourceItemList.length > 0) {
        //當待執行promise function陣列長度與設定檔最大併行數量相同時,併行執行所有promise function
        if (worker.length == config.sync.totalWorker) await doWorks(worker);
        addWork(worker, resourceItemList, limit, offset);
        //When offset is 0, limit is 100 that 0~100, after first processing this will be next page, offset is 100, limit is 100 = 100~200
        offset += limit;
        //get next page `hfj_resource` data
        resourceItemList = await getResources(limit , offset);
    }
    //do remain works
    await doWorks(worker);
})();

整體完整程式碼

參考資料