如果不太了解HAPI-FHIR的資料庫結構可以看看上一篇[HAPI-FHIR Database (一) ⇒ Database結構]
此篇將用Typescript實作從HAPI FHIR資料庫轉置資料到另一個FHIR Server。
使用的程式語言
- Typescript
使用到的套件
- axios (用於API操作)
- pg
- sequelize
npm i axios pg sequelize
實作
第一步:初始化Typescript專案
npm init -y \
tsc --init
ℹ️ 整體資料夾結構與內容
config
:放設定檔models
:放資料模型SQL、Interfaceutils
:公用,放資料操作CRUD等
第二步:撰寫設定檔
- 路徑:
config/config.ts
- 設定
SQL
、FHIR
、轉置資料
等設定
export const config = {
db : {
service : "postgres" , //only support postgres now
username : "username", //postgresql 帳號
password : "password", //postgresql 密碼
database : "hapi" , //要連接的database名稱
hostName : "localhost" , //postgresql 連接之hostname
},
sync: {
limit: 100, //pagination limit
totalWorker: 5, //併行處理數量
FHIRBaseURL: "http://localhost:8088/fhir/" , //the FHIR base URL
haveAuthToken : true, //if FHIR server need to auth to use, please change to true
token : "token", // the authorization token
method: "put" //using `create` or `update` to sync data
}
}
第三步:定義SQL資料表
資料夾結構與內容
models/sql/hfj_res_sync.ts
- 此檔案為
hfj_res_sync
資料表的Sequelize
程式碼定義。 hfj_res_sync
用於記錄已轉置資料之來源ID、Resource Type、Resource Version以及目標之ID。
import { Sequelize, DataTypes, Model } from 'sequelize';
/**
*
* @param {Sequelize} sequelize
* @returns {Model}
*/
module.exports = (sequelize: Sequelize) => {
const hfj_res_sync = sequelize.define('hfj_res_sync',
{
//記錄在HAPI-FHIR的唯一ID,即Resource ID
res_id: {
type: DataTypes.BIGINT
},
//Resource Type
res_type: {
type: DataTypes.STRING
},
//Resource Version
res_ver: {
type: DataTypes.BIGINT
},
//目標FHIR Server創建Resource回傳的ID
sync_id : {
type: DataTypes.STRING
}
},
{
timestamps : false,
freezeTableName: true
});
return hfj_res_sync;
}
models/sql/index.ts
- 匯出Sequelize主體物件的檔案,以操作資料表CRUD功能。
- 包含連接資料庫、創建資料表等。
import { Sequelize, Op, Dialect } from 'sequelize';
import { config } from '../../config/config';
const sequelize = new Sequelize(config.db.database , config.db.username , config.db.password , {
host: config.db.hostName,
dialect: config.db.service as Dialect, //mssql
// logging : false
});
require('./hfj_res_sync')(sequelize);
//exec this function when you init
/**
* @type {Sequelize}
*/
module.exports = (async function () {
try {
await sequelize.authenticate();
await sequelize.models['hfj_res_sync'].sync();
console.log('Connection has been established successfully.');
return sequelize;
} catch (error) {
console.error('Unable to connect to the database:', error);
process.exit(1);
}
})();
models/resource.ts
export interface IResource {
res_id : number;
res_ver: number;
}
models/resouce_ver.ts
從HAPI-FHIR hfj_res_ver
取回資料之interface,這邊我們只會用到res_id
、res_ver
、lo_get
。
export interface IResourceVer {
res_id: number;
res_ver: number;
lo_get: Buffer;
}
第四步:資料操作程式
資料夾結構與內容
utils/lob.ts
- 把HAPI-FHIR
hfj_res_ver
資料表中的res_text
欄位LOB轉成JSON。 - 由於轉出來的LOB有經過gzip壓縮過,所以有一段Gunzip的程式碼。
import stream from 'stream';
import zlib from 'zlib';
//Gunzip file : https://stackoverflow.com/questions/12148948/how-do-i-ungzip-decompress-a-nodejs-requests-module-gzip-response-body
export const convertLOBToJson = (iLOB: any) => {
return new Promise((resolve, reject)=> {
let buffer: any[] = [];
let bufferStream = new stream.PassThrough();
// Write your buffer
bufferStream.end(Buffer.from(iLOB.lo_get));
// Pipe it to something else (i.e. stdout)
// 解壓縮LOB回傳之Binary
let gunzip = zlib.createGunzip();
bufferStream.pipe(gunzip);
gunzip.on('data', function (data) {
// decompression chunk ready, add it to the buffer
buffer.push(data.toString());
}).on("end", function () {
// response and decompression complete, join the buffer and return
//callback(null, buffer.join(""));
let bufferStr = buffer.join("");
let jsonItem = JSON.parse(bufferStr);
resolve(jsonItem);
}).on("error", function (e) {
//callback(e);
reject(e);
});
})
}
utils/res.ts
- 取得在hfj_resource的資料
- 取得hfj_res_ver單個Resource資料
import { QueryTypes } from "sequelize";
import { IResource } from "../models/resource";
/**
* 用於取得hfj_resource資料的id, resource type, 最新res_ver
* @param limit SQL query data limit count
* @param offset SQL query data offset
*/
export const getResources = async (limit: number, offset: number): Promise<Array<IResource>> => {
const sequelize = await require('../models/sql/');
let item = await sequelize.query(`SELECT res_type, res_id, res_ver FROM hfj_resource LIMIT ${limit} OFFSET ${offset};`, {
type: QueryTypes.SELECT
});
return item;
}
/**
* 取得hfj_res_ver單個Resource資料
* @param resId Resource id
* @param resVer Resource Version
*/
export const getResource = async (resId: number, resVer: number) => {
const sequelize = await require('../models/sql/');
let item = await sequelize.query(`SELECT lo_get(res_text), res_id, res_ver FROM hfj_res_ver WHERE res_id=${resId} and res_ver=${resVer};`, {
type: QueryTypes.SELECT
});
return item.pop();
}
export const getResourcesCountByResourceType= async (): Promise<number> => {
const sequelize = await require('../models/sql/');
let count = await sequelize.query(`SELECT COUNT(*) FROM hfj_resource`, {
type: QueryTypes.SELECT
});
return Number(count.pop().count);
}
utils/res_sync.ts
- 取得hfj_res_sync某Resource id已轉置的資料
- 新增hfj_res_sync已轉置資料
import * as axios from "axios";
import { QueryTypes, Sequelize } from "sequelize";
import { config } from "../config/config";
import { IResource } from "../models/resource";
import { log } from '../utils/log';
/**
* 取得hfj_res_sync某Resource id已轉置的資料,如果沒資料回傳空陣列[]
* @param resId Resource id
*/
export const getSyncedResourceById = async (resId: number)=> {
const sequelize = await require('../models/sql/');
let item = await sequelize.query(`SELECT res_id, res_ver FROM hfj_res_sync WHERE res_id=${resId};`, {
type: QueryTypes.SELECT
});
return item;
}
/**
* 新增hfj_res_sync已轉置資料
* @param resourceType resource type e.g. Patient
* @param syncId The target FHIR Server response id
* @param resource hfj_resource content
*/
export const addSyncResource = async (resourceType: string, syncId: string, resource: IResource) => {
let syncResource: any = resource;
try {
syncResource["res_type"] = resourceType;
syncResource["sync_id"] = syncId;
const sequelize: Sequelize = await require('../models/sql/');
let syncedResource = await sequelize.models["hfj_res_sync"].findOne({
where: {
res_id : resource.res_id
}
});
if (syncedResource) {
await sequelize.models["hfj_res_sync"].update(syncResource , {
where: {
res_id : resource.res_id
}
});
} else {
await sequelize.models["hfj_res_sync"].create(syncResource);
}
return {
status: true,
data: syncResource
};
} catch(e) {
log.error(e);
return {
status: false,
data: e
}
}
}
export const doSync: any = {
/** Using PUT `update` Web API to create Resource on target FHIR Server
* @param resourceType resource type e.g. Patient
* @param resId Resource id
* @param resJson Resource JSON content
*/
"put": async (resourceType:string, resId: number, resJson: any)=> {
try {
let requestConfig: any = {};
if (config.sync.haveAuthToken) {
let headers: axios.AxiosRequestHeaders = {
Authorization: `Bearer ${config.sync.token}`
}
requestConfig["headers"] = headers;
}
let res = await axios.default.put(`${config.sync.FHIRBaseURL}${resourceType}/${resId}`, resJson,
requestConfig);
return res.data;
} catch(e) {
log.error(e);
return false;
}
},
/** Using POST `create` Web API to create Resource on target FHIR Server
* @param resourceType resource type e.g. Patient
* @param resId Resource id
* @param resJson Resource JSON content
*/
"post": async (resourceType:string, resId: number, resJson: any) => {
try {
let requestConfig: any = {};
if (config.sync.haveAuthToken) {
let headers: axios.AxiosRequestHeaders = {
Authorization: `Bearer ${config.sync.token}`
}
requestConfig["headers"] = headers;
}
let res = await axios.default.post(`${config.sync.FHIRBaseURL}${resourceType}`, resJson,
requestConfig);
return res.data;
} catch(e) {
log.error(e);
return false;
}
}
}
第五步:程式主體
index.ts
import { convertLOBToJson } from './utils/lob';
import { getResources, getResourcesCountByResourceType, getResource } from './utils/res';
import { getSyncedResourceById, addSyncResource, doSync } from './utils/res_sync'
import { config } from './config/config';
import { IResource } from './models/resource';
import { IResourceVer } from './models/resource_ver';
/**
* Check source FHIR Server content is need to sync to target FHIR Server.
* @param resource
* @param resourceVer
*/
async function isNeedSync(resource:IResource): Promise<boolean> {
let syncedResource = await getSyncedResourceById(resource.res_id);
if (syncedResource.length !== 0 ) {
return syncedResource[0].res_ver === resource.res_ver;
}
return true;
}
/**
* 執行被丟入陣列的promise function
* @param workerList
*/
async function doWorks(workerList: Array<any>) {
let doWorkList = [];
while(workerList.length > 0) {
doWorkList = workerList.slice(0,config.sync.totalWorker);
workerList.splice(0,config.sync.totalWorker);
await Promise.allSettled(doWorkList.map(f=> f()))
}
}
/**
* 新增執行同步資料的function到陣列裡
* @param worker 當前待執行promise function陣列
* @param resourceItemList 當前hfj_resource資料
* @param limit SQL query data limit
* @param offset SQL query data offset
*/
function addWork(worker: Array<any>, resourceItemList: Array<any>, limit: number, offset:number) {
worker.push(async ()=> {
let successCount = 0;
for (let i = 0 ; i < resourceItemList.length ; i++) {
let resourceItem = resourceItemList[i];
let resource = await getResource(resourceItem.res_id, resourceItem.res_ver);
let syncResourceType = resourceItem.res_type;
let needSync = await isNeedSync(resourceItem);
if (needSync) {
let res_id = resourceItem.res_id;
let lobJson = await convertLOBToJson(resource);
let syncResData = await doSync[config.sync.method](syncResourceType, res_id, lobJson);
if (syncResData) {
let addResult = await addSyncResource(syncResourceType, syncResData.id, resource);
if (addResult.status) {
successCount++;
}
}
} else {
successCount++;
}
}
console.log(`convert ${offset}~${offset+limit} successfully ${successCount}/${resourceItemList.length}`);
});
}
/**
* 程式主體main
*/
(async () => {
let worker:Array<any> = [];
let limit = config.sync.limit;
let offset = 0;
//取得第一批hfj_resource資料
let resourceItemList = await getResources(limit , offset);
//執行直到pagination hfj_resource資料長度為0
while(resourceItemList.length > 0) {
//當待執行promise function陣列長度與設定檔最大併行數量相同時,併行執行所有promise function
if (worker.length == config.sync.totalWorker) await doWorks(worker);
addWork(worker, resourceItemList, limit, offset);
//When offset is 0, limit is 100 that 0~100, after first processing this will be next page, offset is 100, limit is 100 = 100~200
offset += limit;
//get next page `hfj_resource` data
resourceItemList = await getResources(limit , offset);
}
//do remain works
await doWorks(worker);
})();
整體完整程式碼
- Github: hapi-sync-mediator-ts