自定义函数
Node.JS 版本:v10.23.1(安装 NPM 包时请注意版本)
参考文档
函数节点快速入门
函数快速入门
Public API 快速入门
自定义函数开发指南
数据读、写、校验
部门停用校验
停用条件
- 不能有启用中的下级部门
- 不能有在职员工
核心接口
- 数据查询
- 错误返回
- 数据更新
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
// const linq = require('linq');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`函数开始执行,上下文信息如下:${JSON.stringify(context)}`);
// 获取要停用的目标部门实体信息
const record = context.targetRecord.original;
// 获取部门对象 DBContext
// const depObj = context.db.object('_department');
const deptObj = context.db.currentObject();
// 查询部门对象下符合条件的任一条记录
const dept = await deptObj.where({
'_status': '_active',
'_superior': record._id
}).findOne();
// 返回业务验证错误
if (dept) {
return new ValidationError(new kunlun.type.Multilingual({
'cn': '停用部门失败,因为该部门仍有启用中的下级部门',
'en': 'Failed to deactivate this department as it still has active sub-departments'
}))
}
// 检查对应部门下是否存在启用中的用户
const checkUser = await context.db.object('_user').where({
'_active': true,
'moon__department': record._id
}).findOne();
// 返回验证错误
if (checkUser) {
return new ValidationError(new kunlun.type.Multilingual({
'cn': '停用部门失败,因为该部门仍有在职员工',
'en': 'Failed to deactivate this department as it still has active employees'
}))
}
// 更新部门的状态为 _inactive 并实时生效
await deptObj.update({
'_id': record._id,
'_status': '_inactive'
});
}
自动化流程中进行数据创建和更新
const record = await context.db.object('test').create({
_name: new kunlun.type.Multilingual({
zh: context.trigger.new._name
})
});
logger.info(record);
const record1 = await context.db.object('test').orderBy('_createdAt').findOne();
if (record1) {
await context.db.object('test').update({
_id: record1._id,
_name: new kunlun.type.Multilingual({zh: `${new Date().getTime()}`})
})
}
为 File 字段赋值(文件上传)
const axios = require('axios');
// 获取网络文件流
const resp = await axios({
method: 'get',
responseType: 'stream'
});
// 上传文件获取文件 token
const file_info = await context.resources.file.upload(resp.data);
console.log(file_info);
/*
返回示例
{
id: 'a4879a6d2e8e43b98781ba6f5b261071',
mime_type: 'jpg',
name: '328648-14011213253758.jpg',
size: 143592,
token: '57b70c0acf244e62887a2dcf9bf938c1'
}
*/
const record = await context.db.currentObject().create({
_name: `${new Date().getTime()}`,
moon__singleFile: [{ ...file_info }] // 文件类型字段赋值
});
console.log(record);
为 Avatar 字段设置图片
数据量比较大,需控制并发(建议)/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 为 avatar 字段设置值
await context.db.currentObject().create({
_name: '123',
// avatar 字段
avatar: {
image: {
},
source: 'image'
}
})
}
数据量特别大时,不控制并发,可能触发频控
// 安装 p-limit 3.1.0
const pLimit = require('p-limit');
// 设置并发为 10,可根据需要适当增加
const limit = pLimit(10);
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
const T = context.db.object('bigObj');
const filter = {};
const count = await T.where(filter).count();
console.log(count);
let ps = [];
for (let i = 0; i < Math.ceil(count / 200); i++) {
ps.push(limit(() => {
return T
.where(filter)
.select('_id')
.offset(i * 200)
.orderBy('_id').find()
}));
}
const result = await Promise.all(ps);
const res = [].concat(...result);
console.log(res.length);
}
GroupBy 等聚合操作(小数据量)
只适用于记录数小于 5 万场景
原理:把数据读到函数内存,利用 linq 库进行内存操作
示例一 单属性聚合
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
const linq = require('linq');
// 安装 p-limit 3.1.0
const pLimit = require('p-limit');
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 获取所有数据(4.12.x 版本的 kldx/core 包已支持 findAll 接口)
// 这里尽可能使用 select 只选择需要的字段
const records = await context.db.object('groupByTest')
.where().select('_id', 'lookup_28jx8bq').orderBy('_id').findAll();
let ps = [];
const T = context.db.object('groupByTest');
const count = await T.count();
logger.info(count);
// 查询所有数据记录,数据量太大可能会导致内存溢出等问题
const limit = pLimit(3); // 设置并发度,建议不超过 5
for (let i = 0; i < Math.ceil(count / 200); i++) {
ps.push(limit(() => {
return T.where().select('_id', 'lookup_28jx8bq').offset(i * 200).orderBy('_id').find();
}));
}
const result = await Promise.all(ps);
const records = [].concat(...result);
logger.info(records.length);
// 使用 linq 库进行聚合操作
const groupByResult = linq.from(records)
.select(x => { return { id: x.key(), count: x.count() } })
.orderByDescending(x => x.count)
.toArray();
logger.info(groupByResult);
}
250px|700px|reset

250px|700px|reset

对象模型 和 运行结果
示例二 多属性聚合
const linq = require('linq');
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
const data = [{
country: 'China',
provinve: 'Beijing',
city: 'Haidian',
count: 1
}, {
country: 'China',
provinve: 'Beijing',
city: 'Changping',
count: 1
}, {
country: 'China',
provinve: 'Henan',
city: 'Zhengzhou',
count: 1
}, {
country: 'China',
provinve: 'Beijing',
city: 'Haidian',
count: 10
}]
const result = linq.from(data)
.groupBy(x => {
return {
country: x.country,
provinve: x.provinve,
city: x.city,
}
},
null,
null, x => `${x.country}|${x.provinve}|${x.city}`)
.select(x => { return { ...x.key(), count: x.sum(y => y.count) } })
.toArray();
logger.info(result);
}
250px|700px|reset

调飞书接口
新增用户后同步在飞书创建飞书账号
场景:
- 用户填写创建用户表单
- 调用飞书接口创建飞书账号并获取飞书用户 OpenId
- 在飞书低代码平台保存新用户信息并关联飞书用户 OpenId
核心接口
获取 workflow 变量
更新 workflow 变量
返回错误
使用 redis 服务
发起 HTTP 请求,调用飞书 Open Api
// 引用第三方 axios 和 linq 包
const axios = require('axios');
const linq = require('linq');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`函数开始执行,上下文信息如下:${JSON.stringify(context)}`);
// 获取用户填写的表单信息
// create 类型的 workflow,会默认创建 _initialRecord 变量,用户填写表单后,相关信息
// 会暂存到此变量中
const newUserInfo = context.workflow.variables['_initialRecord'].value;
// 调用飞书 Open Api 去创建用户
const response = await addFeishuUser(newUserInfo);
if (response.code !== 0) {
return new FunctionError(new kunlun.type.Multilingual({
'cn': '调用飞书接口失败',
'en': 'Call Feishu API Error'
}))
}
// 更新 workflow 中 apiname 为 feishuOpenId 的变量的值
return {
'feishuOpenId': response.open_id
}
}
// 飞书 Open Api - 鉴权接口参考
async function getFeishuAuth() {
const app_id = 'cli_9f310635b274900e', app_secret = 'EhN********', url = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/';
const token_key = `feishu_open_api_token_${app_id}`;
// 先从 redis 取 token
// 飞书验证接口返回的 token 是有一定时间有效期的,有效期内不必每次重新申请 token
// 这里演示了如何使用函数自带的 redis 服务来实现 token 的缓存策略
const token = JSON.parse(await kunlun.redis.get(token_key));
if (token && new Date().getTime() <= token.expireTime - 600 * 1000) {
return token.tenant_access_token;
}
try {
// 使用 axios 库来发起 HTTP 请求
// 调用飞书 AUTH 接口来申请 token
const response = await axios({
method: 'post',
url: url,
headers: {
'Content-Type': 'application/json'
},
data: {
"app_id": app_id,
"app_secret": app_secret
}
});
if (response.data.code === 0) {
// 将 token 缓存到 redis 中
await kunlun.redis.set(token_key, JSON.stringify({
tenant_access_token: response.data.tenant_access_token,
expireTime: new Date().getTime() + response.data.expire * 1000
}));
// 返回 token
return response.data.tenant_access_token;
}
}
catch (e) {
// 发生了错误,记录日志并返回错误码 -1
logger.info(e);
return -1;
}
}
// 飞书 Open Api - 新增用户接口参考
async function addFeishuUser(userInfo) {
const tenant_access_token = await getFeishuAuth();
if (tenant_access_token === -1) {
return {'code': -1};
}
// 构造请求体
// 因为用户信息中的姓名是多语字段 [{'language_code':2052, 'text': ''}]
// 这里使用了 linq 这个库来快速找到中文名字
const requestBody = {
"name": linq.from(userInfo._name).where(l => l.language_code === 2052).toArray()[0].text,
"mobile": `${userInfo._phoneNumber.code}${userInfo._phoneNumber.number}`,
"open_department_ids": ["od-f4a3200e0885e52afaf*********"]
};
// 使用 axios 发起 HTTP 调用
// 调用飞书新增用户接口
const response = await axios({
method: 'post',
url: url,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${tenant_access_token}`
},
data: requestBody
});
// 根据情况返回对应信息或错误码
if (response.data.code === 0) {
return {'code': 0, 'open_id': response.data.data.user_info.open_id};
}
return {'code': -1, 'data': response.data};
}
发送飞书图片消息
上传图片到飞书服务器示例(发送图片消息需要先上传图片获取图片 key)
如下代码演示了如何在函数节点中获取网络图片并调用飞书上传图片接口上传图片
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
const axios = require('axios');
const FormData = require('form-data');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
* 快速入门:Workflow 自定义函数节点快速入门
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
const response = await axios({
method: 'get',
responseType: 'stream'
})
let formData = new FormData();
formData.append('image', response.data);
formData.append('image_type', 'message');
const res = await axios({
method: 'post',
headers: {
'Authorization': `Bearer t-07609583e7f9582c560588f**********`,
...await getHeaders(formData)
},
data: formData
})
console.log(res.data.data.image_key)
}
/**
* @param {FormData} form
*/
async function getHeaders(form) {
return new Promise((resolve, reject) => {
form.getLength((err, length) => {
if (err) reject(err);
const headers = Object.assign({ 'Content-Length': length }, form.getHeaders());
resolve(headers);
});
});
}
使用 Feishu Node.JS SDK 调飞书接口
const lark = require('@larksuiteoapi/allcore');
const appSettings = lark.newInternalAppSettings({
appID: 'cli_a144607************',
appSecret: 'AqIl7Ods3NjRj*******'
});
const conf = lark.newConfig(lark.Domain.FeiShu, appSettings, {
loggerLevel: lark.LoggerLevel.ERROR
});
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} 函数开始执行`);
// 在这里补充业务代码
const body = {
msg_type: 'text',
content: {
text: 'test send msg'
}
}
const req = lark.api.newRequest('/open-apis/message/v4/send', 'POST', lark.api.AccessTokenType.Tenant, body);
const resp = await lark.api.sendRequest(conf, req);
logger.info(resp.data);
}
250px|700px|reset

250px|700px|reset

通用
日期加减
const moment = require('moment');
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
let date = new Date();
let t = moment(date);
t.add(1, 'days');
logger.info(`add 1 day: ${t.toISOString()}`);
t.add(1, 'month');
logger.info(`add 1 month: ${t.toISOString()}`);
t.add(1, 'y');
logger.info(`add 1 year: ${t.toISOString()}`);
// 链式调用
t.add(1, 'h').add(1, 'm').add(1, 's');
logger.info(`add 1h 1m 1s: ${t.toISOString()}`);
}
/*
years(y):年
quarters(Q):季度
months(M):月
weeks(w):周
days(d):日
hours(h):时
minutes(m):分
seconds(s):秒
milliseconds(ms):毫秒
*/
250px|700px|reset

生成二维码
const qr = require('qr-image');
const fs = require('fs');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 在这里补充业务代码
// 生成 png 文件
const buffer = qr.imageSync('二维码内容', { type: 'png' });
const file = `/tmp/qrcode-${Date.now()}.png`;
fs.writeFileSync(file, buffer);
// 用完之后可以删除
// fs.unlinkSync(f);
// 生成 svg
const svg = qr.imageSync('二维码内容', {type: 'svg'});
}
使用 MongoDB 存储自定义数据
const T = kunlun.mongodb.table('my_test_table');
// 存储自定义 schema 的数据
const record = await T.save({
time: new Date().getTime(),
name: '王一一',
age: 20,
params: {
a: 1
}
});
console.log(record);
// 查询自定义数据
const records = await T.where('age').greaterThanOrEquals(20).find();
console.log(records);
通过分布式任务批量处理大量数据
场景:有大量数据需要处理,但单个函数运行时长限制 5 min。
思路:通过分布式任务将大数据切分为若干个小数据量的子任务来并发处理
250px|700px|reset

启动分布式任务
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} 函数开始执行`);
logger.info({ context });
// 这个主要为了后面推送小铃铛消息用的
const c_user = await context.db.object('_user')
.select('_id').findOne();
let dataset = [
{ _id: 1671710269480973, _name: "zhangsan", age: "18" }
];
// 随机造一些测试数据
const r_count = Math.ceil(Math.random() * 100);
for (let i = 0; i < r_count; i++) {
dataset.push({
_id: new Date().getTime(),
_name: `${i}`,
age: `${Math.floor(Math.random() * 20)}`
})
}
logger.info(`dataset.length=${dataset.length}`);
// 创建分布式任务,获取任务 ID
const taskId = await context.tasks.createDistributedTask(
dataset,
"handlerFunc", // 逻辑处理函数
"progressCallbackFunc", // 进度变化处理函数
"completedCallbackFunc", // 任务完成处理函数
{
concurrency: 5, // 并发数量
maxSliceNumber: 20, // 单个子任务的最大数据量
progressCallbackStep: 2 // 进度变化步长
});
logger.info({ taskId })
// 创建消息中心消息,拿到消息 ID
const msgId = await context.msg.notifyCenter.create({
target_users: [c_user._id],
title: new kunlun.type.Multilingual({ zh: '任务开始' }),
detail: new kunlun.type.Multilingual({ zh: `数据更新中...` }),
icon: 'progress',
percent: 0
});
// redis 中存储任务和消息的关联
await kunlun.redis.set(`distri-${taskId}`, msgId);
return { msgId }
}
250px|700px|reset

函数 1 - 任务处理函数 handlerFunc
这个函数是业务逻辑处理函数,即要怎么处理每条数据
分布式任务引擎会在运行时传入切片后的 data
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
logger.info(`${new Date().toISOString()} handlerFunc 开始运行`);
// 获取当前子任务数据
const subTaskData = params.data;
logger.info({
dataLength: params.data.length
});
for (let i = 0; i < subTaskData.length; i++) {
logger.info({
i,
data: subTaskData[i]
});
await sleep(2000);// 这里可以补充具体的业务逻辑。这里为了模拟,只是 sleep 2s
}
}
async function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
250px|700px|reset

250px|700px|reset

函数 2 - 进度变化回调函数 progressCallbackFunc
可以通过这个函数来监控任务的进度
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} progressCallbackFunc 函数开始执行`);
// 在这里补充业务代码
logger.info({ params })
// 获取分布式任务的 ID 和当前整体进度
const taskId = params.taskId;
const percent = params.percent;
// 从 redis 获取和当前任务关联的 msgId
const msgId = await kunlun.redis.get(`distri-${taskId}`);
logger.info({ msgId })
if (msgId && parseInt(msgId)) {
// 更新进度
await context.msg.notifyCenter.update(parseInt(msgId), {
percent: percent,
icon: 'progress'
})
}
}
250px|700px|reset

函数 3 - 任务完成回调函数 completedCallbackFunc
可以通过这个函数来监控分布式任务的完成事件
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} completedCallbackFunc 函数开始执行`);
logger.info({ params })
// 获取分布式任务的 ID
const taskId = params.taskId;
// 从 redis 获取和当前任务关联的 msgId
const msgId = await kunlun.redis.get(`distri-${taskId}`);
logger.info({ msgId });
if (msgId) {
// 更新消息中心状态
await context.msg.notifyCenter.update(parseInt(msgId), {
title: new kunlun.type.Multilingual({
zh: '演示任务成功'
}),
detail: new kunlun.type.Multilingual({
zh: `任务完成了 - ${new Date().toISOString()}`
}),
icon: 'success'
})
}
}
250px|700px|reset

效果演示
250px|700px|reset

250px|700px|reset

250px|700px|reset

云函数中在临时目录读写文件
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
const axios = require('axios');
const fs = require('fs');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 在这里补充业务代码
const resp = await axios({
method: 'get',
responseType: 'arraybuffer'
});
// 只能读写 /tmp 目录
// 注意不能将此目录作为永久存储,目录里的文件会随着底层实例的释放而删除
const file = `/tmp/${new Date().getTime()}.jpg`
fs.writeFileSync(file, resp.data);
// 上传文件获取文件 token
const file_info = await context.resources.file.upload(fs.createReadStream(file));
console.log(file_info);
// 删除临时文件
fs.unlinkSync(file)
}
从网络下载文件并转为 base64
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
const axios = require('axios');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} 函数开始执行`);
const resp = await axios({
method: 'get',
responseType: 'arraybuffer'
});
const b64 = Buffer.from(resp.data).toString('base64');
return b64;
}
multipart/form-data 上传大文件
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
const axios = require('axios');
const fs = require('fs');
const FormData = require('form-data');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} 函数开始执行`);
// 在这里补充业务代码
// 获取用户新添加的 record
const newRecord = context.trigger.new;
logger.info(newRecord);
const record = await context.db.object('files').where({
_id: newRecord._id
}).findOne();
logger.info(record);
if (!record.files || record.files.length === 0) {
return;
}
const files = record.files;
for (let i = 0; i < files.length; i++) {
const file = files[i];
logger.info(file);
if (file.size > 50 * 1024 * 1000) {
logger.error(`${file.name} 的体积(${file.size / 1024 / 1000}MB) 大于 50MB`);
continue;
}
//将文件保存在临时目录
const filepath = `/tmp/${new Date().getTime()}_${file.name}`;
await context.resources.file.download(file.token, filepath);
if (fs.existsSync(filepath)) {
logger.info(`文件成功保存在 ${filepath}`);
// 调用三方接口上传文件
await uploadToExternalSystem(filepath, logger);
// 上传后要及时删除临时文件
fs.unlinkSync(filepath);
}
}
}
/**
* 这里演示怎么通过 multipart/form-data 的形式上传文件到三方接口,这里按照图虫的接口实现对应的逻辑就行
* @param {String} filePath 文件路径
* @param {Logger} logger
*/
async function uploadToExternalSystem(filePath, logger) {
let formData = new FormData();
formData.append('image', fs.createReadStream(filePath));
const res = await axios({
method: 'post',
headers: {
...await getHeaders(formData)
},
maxContentLength: Infinity,
maxBodyLength: Infinity,
data: formData
})
logger.info(res.data)
}
/**
* @param {FormData} form
*/
async function getHeaders(form) {
return new Promise((resolve, reject) => {
form.getLength((err, length) => {
if (err) reject(err);
const headers = Object.assign({ 'Content-Length': length }, form.getHeaders());
resolve(headers);
});
});
}
发送邮件(SMTP)
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
const client = new SMTPClient({
password: 'your password',
ssl: true,
port: 465
});
const message = {
subject: '邮件发送 demo',
text: 'test by zhong',
}
// send the message
const resp = await client.sendAsync(message);
logger.info(resp);
}
250px|700px|reset

250px|700px|reset

解析 Excel
不适合处理特别大的 Excel 文件
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
const axios = require('axios');
const fs = require('fs');
const ExcelJS = require('exceljs');
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} 函数开始执行`);
// 只能在 /tmp 目录读写文件
const path = `/tmp/${new Date().getTime()}.xlsx`;
// 下载文件
const resp = await axios({
method: 'get',
responseType: 'arraybuffer'
});
await fs.writeFileSync(path, resp.data);
if (fs.existsSync(path)) {
const workbook = new ExcelJS.Workbook();
// 读取 excel 文件
const wb = await workbook.xlsx.readFile(path);
// 获取第一张 sheet
const sheet = wb.worksheets[0];
logger.info({
sheet_name: sheet.name, // sheet 的名字
firstRow: sheet.getRow(1) // 获取第一行
})
//delete tmp file
fs.unlinkSync(path)
}
}
250px|700px|reset

自动重试
网络编程中,默认网络是不可靠的,所以有时需要自动重试来增加程序的健壮性,sdk 提供了重试方法,方便开发者来进行重试操作
250px|700px|reset

module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 在这里补充业务代码
const resp = await kunlun.tool.retry(async () => {
return await axios({
url: ''
})
})
}
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 在这里补充业务代码
const resp = await kunlun.tool.retry(async () => {
return await context.db.object('obj1').find()
})
}
Redis 锁(解决并发场景下数据一致性问题)
场景描述:
对象 A 上有个字段叫投票人,多值 Lookup User,有 1 个操作叫 投票,会向这个字段写入值。
假如某时刻这个字段的值是 2、4、5, User 1 和 User 6 同时点了投票,期望的结果是字段值变为 1、2、4、5、6,但因为并发读写等因素,结果可能是 1、2、4、5 或 2、4、5、6
250px|700px|reset

2 个操作有时空重合,导致最终结果不符合预期
解决这个问题,就要引入锁的概念,确保 1 个操作没完成前,另一个操作排队等候,避免操作之间出现时空重合
250px|700px|reset

/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} 函数开始执行`);
const lockKey = "vote_lock_key_asdfghjkl"
const lockSeconds = 60;//默认锁最长时间
//尝试获取锁
while (true) {
const lock = await kunlun.tool.retry(
async () => {
//不存在就设置一个60秒自动清除掉值 返回OK,如果存在则设置失败返回空
return await kunlun.redis.set(lockKey, 1, "ex", lockSeconds, "nx");
},
{ retryCount: 5, retryDelay: 500 },
);
if (lock !== "OK") {
//如果直接 throw new Error("获取锁失败")就是乐观锁 程序不自旋 可以处理表单防重复提交。
//没有拿到锁 请等待
await sleep(100);
} else {
//获取到锁了
break;
}
}
//做业务的事情,不要超过lockSeconds秒 否则就会有其他线程抢占锁
...
//释放锁
await kunlun.tool.retry(
async () => {
await kunlun.redis.del(lockKey);
},
{ retryCount: 5, retryDelay: 500 },
);
//如果直接 throw new Error("获取锁失败")就是乐观锁 程序不自旋 可以处理表单并发提交。
/**
* @param {Number} ms sleep 的毫秒数
*/
async function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
发起 rpc 调用(仅字节可用)
联系 oncall 安装内部包:@byted-service/rpc
const fs = require('fs');
const { createClient } = require('@byted-service/rpc');
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 在这里补充业务代码
const thrift_file = `/tmp/Translation.thrift`;
if (!fs.existsSync(thrift_file)) {
logger.info('thrift file not exists');
fs.writeFileSync(thrift_file, await context.function('thrift').invoke());
}
const rpc = createClient({
idl: '/tmp/',
services: {
MetaTranslation: {
filename: 'Translation.thrift',
}
}
});
const resp = await rpc.MetaTranslation.translation({
text_list: ['你好'],
trg_lang: 'en',
id: 'test',
key: '7f30e9358adb2be77b6************'
})
logger.info(resp)
}
// 这里通过一个全局函数来返回 thrift 文件内容,可以把内容放到 redis、MongoDB、函数等各种介质中
// thrift.js 代码
module.exports = async function (params, context, logger) {
return `namespace py MetaTranslationThirft
# batch
typedef list<string> StringList
typedef list<double> DoubleList
typedef list<i32> IntList
// the string in one request should be the same language
struct TranslationRequest{
1: required StringList text_list; // should be string list
2: required string trg_lang; // must provide
3: required string id; // must provide, like p.s.m
4: required string key; // must provide
16: optional string src_lang; // better to provide
17: optional StringList api_list; // no need to provide, for test
}
// translation result for each text in text_list of request
struct TranslationResponsePerText{
1: string translated_text; // translated string list
2: string text; // original text
3: string provider; // translation provider, lab, cache, google or microsoft
4: string src_lang; // detected src language
5: double score; // the score evaluated
6: string msg; // some msg
7: i32 code; // success (0) or failed (not 0),
}
// translation result for text_list of request
struct TranslationResponse{
1: StringList translated_text_list; // translated string list
2: StringList provider; // translation provider, lab, cache, google or microsoft
3: DoubleList score_list; // the score evaluated
4: StringList text_list; // origin text
5: IntList code; // success (0) or failed (not 0),
}
// for better translation
typedef list<TranslationResponse> TranslationResponseList
// for translation
typedef list<TranslationResponsePerText> TranslationResponsePerTextList
service MetaTranslation{
TranslationResponsePerTextList translation(1:TranslationRequest req), // use this, cheap and fast
TranslationResponse faster_translation(1:TranslationRequest req), // don't use, expensive
TranslationResponseList better_translation(1:TranslationRequest req), // don't use, expensive
StringList available_api(1:string src_lang, 2:string trg_lang), // no need to use, for test
}`;
}
效果
250px|700px|reset

生成 PDF
const pdf = require('pdfjs');
const axios = require('axios');
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
const doc = new pdf.Document({ font: require('pdfjs/font/Helvetica') });
const resp = await axios({
responseType: 'arraybuffer'
})
header.cell().text({ textAlign: 'right' })
.add('A Portable Document Format (PDF) generation library targeting both the server- and client-side.')
underline: true,
color: 0x569cd6
})
doc.footer()
.pageNumber(function (curr, total) { return curr + ' / ' + total }, { textAlign: 'center' })
cell.text('Features:', { fontSize: 16, font: require('pdfjs/font/Helvetica-Bold') })
cell.text({ fontSize: 14, lineHeight: 1.35 })
.add('-')
.add('different', { color: 0xf8dc3f })
.add('font', { font: require('pdfjs/font/Times-Roman') })
.add('styling', { underline: true })
.add('options', { fontSize: 9 })
cell.text('- Images (JPEGs, other PDFs)')
cell.text('- Tables (fixed layout, header row)')
cell.text('- AFM fonts and')
cell.text('- OTF font embedding (as CID fonts, i.e., support for fonts with large character sets)', {
font: require('pdfjs/font/Times-Italic')
})
cell.text('- Add existing PDFs (merge them or add them as page templates)')
.add('For more information visit the')
.add('Documentation', {
underline: true,
color: 0x569cd6
})
var table = doc.table({
borderHorizontalWidths: function (i) { return i < 2 ? 1 : 0.1 },
padding: 5
})
var tr = table.header({ font: require('pdfjs/font/Helvetica'), borderBottomWidth: 1.5 })
tr.cell('#')
tr.cell('Unit')
tr.cell('Subject')
tr.cell('Price', { textAlign: 'right' })
tr.cell('Total', { textAlign: 'right' })
const lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cum id
fugiunt, re eadem quae Peripatetici, verba. Tenesne igitur, inquam,
Hieronymus Rhodius quid dicat esse summum bonum, quo putet
omnia referri oportere? Quia nec honesto quic quam honestius nec
turpi turpius`;
addRow(table, 0, 'Article A', lorem, 500)
addRow(table, 1, 'Article B', lorem, 250)
addRow(table, 2, 'Article C', lorem, 330)
addRow(table, 3, 'Article D', lorem, 1220)
addRow(table, 4, 'Article E', lorem, 120)
addRow(table, 5, 'Article F', lorem, 50)
const buffer = await doc.asBuffer();
const r = await kunlun.oss.upload('pdfjs_demo.pdf', buffer, { region: 'cn' });
logger.info(r.url)
}
function addRow(table, qty, name, desc, price) {
var tr = table.row()
tr.cell(qty.toString())
tr.cell('pc.')
var article = tr.cell().text()
article.add(name, { font: require('pdfjs/font/Helvetica-Bold') })
.br()
.add(desc, { fontSize: 10, textAlign: 'justify' })
tr.cell(price.toFixed(2) + ' €', { textAlign: 'right' })
tr.cell((price * qty).toFixed(2) + ' €', { textAlign: 'right' })
}
250px|700px|reset

多媒体
音频播放
详情页拖入 HTML 组件
250px|700px|reset

云函数返回 base64 音频内容
// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
const axios = require('axios');
/**
* @param params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date().toISOString()} 函数开始执行`);
// 在这里补充业务代码
const resp = await axios({
method: 'get',
responseType: 'arraybuffer'
});
const b64 = Buffer.from(resp.data).toString('base64');
return `data:audio/x-wav;base64,${b64}`;
}
效果
250px|700px|reset

视频播放
效果
上传附件,自动转为 URL,打开详情页时自动播放视频(针对视频体积小于 30MB 的视频)
如果 URL 是自行处理,则没有 30MB 的限制
250px|700px|reset

实现
250px|700px|reset

对象模型
250px|700px|reset

页面
250px|700px|reset

250px|700px|reset

250px|700px|reset

在详情页中增加 HTML 组件
250px|700px|reset

HTML 组件代码
逻辑:获取上下文中的 recordId,调用云函数通过 recordId 获取对应的视频 URL
自定义函数
函数用来在 HTML 组件中调用以根据 recordId 获取存在记录上的视频 URL
250px|700px|reset

// 通过 NPM dependencies 成功安装 NPM 包后此处可引入使用
// 如安装 linq 包后就可以引入并使用这个包
// const linq = require('linq');
/**
* @param {Params} params 自定义参数
* @param {Context} context 上下文参数,可通过此参数下钻获取上下文变量信息
* @param {Logger} logger 日志记录器
*
* @return 函数的返回数据
*/
module.exports = async function (params, context, logger) {
// 日志功能
logger.info(`${new Date()} 函数开始执行`);
// 在这里补充业务代码
if (!params || !params.recordId) {
return {
code: 999
}
}
const record = await context.db.object('videos')
.where({ _id: parseInt(params.recordId) })
.select('video_url').findOne();
return {
code: 0,
data: record
}
}
技巧
复用代码模块
- 在自定义代码 -> 函数下新建函数
- 在流程中的函数节点或者其他函数中通过 await context.function('func_apiname').invoke(params) 来本地调用
250px|700px|reset

250px|700px|reset

新建一个 parseDiff 的通用函数来对比 2 个 obj
250px|700px|reset

函数中直接调用 parseDiff 这个通用函数
一些好用的第三方包
- axios - 非常方便的发起 HTTP 请求 (简单的网络请求建议使用 kunlun.tool.http 工具)
- form-data - 配合 axios 在 Node.JS 中便捷发起 "multipart/form-data" 请求
- exceljs - 读写 excel 文件
- image-size - 获取图片的尺寸等信息
- qr-image - 生成二维码
- deep-diff - diff 2 个 JSON 对象
- pdfjs - 生成 PDF 文件
- moment - 好用的日期加减、格式化库
- p-limit - 使用 Promise.all 时控制并发