作者:段红亮
推荐理由
本文从团队工作中遇到的搜索过程中存在效率痛点出发,利用飞书搜索相关的 OpenAPI 实现了在飞书搜索入口可以搜索BI报表,一方面让公司全体员工能对公司全局数据概念,另一方面缩短访问路径,提升工作效率。 文章重点介绍了开发的整体过程以及相关 Tips,供大家参考、交流和讨论。
一、关于我
大家好,我是今晚打老斧先森,来自字节跳动
日常开发工作中不仅需要关注系统,更需要不断反思工作流,改善工作方式,高效运转。
本 case 是在总结了日常工作流中提炼出的一个小开发,让系统帮助提效,case 虽简单,但提效显著。
二、需求分析
随着信息量的不断增加,高效地获取和管理数据变得愈发重要。在这样的背景下,将数据接入飞书搜索的需求逐渐浮现。本调研报告旨在分析将数据接入飞书搜索的需求,并提供相关建议和解决方案。
(1)数据集成需求:用户希望能够将数据库作为数据源无缝地接入飞书搜索,期望能够通过飞书搜索统一搜索和访问这些数据,提高工作效率。
(2)离线数据同步:用户数据源中的数据都是t+1新增、更新和删除操作同步到飞书搜索中。所以并不需要及时获取最新的数据搜索结果。
(3)灵活的数据检索(长期需求):用户希望能够通过飞书搜索进行灵活的数据检索,例如根据关键词、时间范围、属性过滤等条件进行搜索。他们需要高效地找到所需数据,并支持多个维度进行搜索和筛选。
(4)安全与权限控制(长期需求):用户对数据的安全性和权限控制非常关注。他们希望能够通过飞书搜索实现对数据的访问权限控制,确保只有合法的用户能够搜索和访问特定的数据。
(5)性能和可扩展性(长期需求):用户希望飞书搜索具有良好的性能和可扩展性,能够处理大规模数据和高并发查询。他们期望搜索结果能够在短时间内返回,并支持水平扩展以满足未来的业务增长。
三、方案调研
搜索连接器提供了一组RESTful API,帮助企业快速实现飞书之外的各类信息的搜索能力,在飞书内即可实现一站式的信息检索和获取,有效提升工作和协同效率。开发者使用数据源API建立数据源,再通过数据API将数据推送到该数据源,就完成了飞书搜索能力的构建。企业员工可以在飞书搜索框中输入 "/" 进入该数据源的搜索,或者将该数据源的搜索自定义为飞书搜索的垂类(在飞书搜索-更多-设置中进行拖动)。
实现的模型样例:
四、开发流程
1. 新建应用
250px|700px|reset
创建应用主要拿到的信息是应用凭证,这个凭证相当于你访问后台的一个钥匙,两小时刷新一次,所以需要不断的获取。
2. 申请权限
为什么要申请权限?不同的操作需要不同的权限,有些权限是敏感信息,根据权限最小原则,尽量不要申请敏感权限。在本需求中在线创建完飞书搜索后,在应用的“权限管理”中添加“查询、创建、修改和删除自定义搜索数据源、数据范式或数据项”和“查询自定义搜索数据源、数据范式或数据项”两个权限;
250px|700px|reset
3. 设计最终产出的搜索样式
数据范式:数据范式用来描述数据项中结构化字段(即structured_data中的数据)的字段类型、字段属性等。其决定了“飞书搜索”会以何种方式使用外部数据,并提供不同的搜索体验。用户必须在创建数据源时关联对应的数据范式。
数据源:数据源是逻辑上的数据容器,用来存储添加到“飞书搜索”中的外部数据记录。是在“飞书搜索”中创建数据索引的前提条件。
数据项:数据项表示支持检索的资源对象,即一条数据记录。例如一个用户信息、一个审批单、一条数据库记录。
4. API 调试,并借助 API 能力进行实际的后端开发。
飞书的开发文档给出了api-http的请求结构,可先行采用 http 走一遍全流程,但是并不需要自己去写详细的 http,开放平台提供了调试工具 Postman 的模板库(含有所有的环境变量配置以及请求结构),导入即可,不需自己一个个去写,改改参数就行。
大家在后端完整接入之前,可以采用 postman 串一遍流程。
在开发过程中,不能避免会和访问凭证打交道,而不同的请求类型具有不同的凭证需求(告知飞书服务器“谁在调这个接口”),大家一般采用 tenant_access_token 可满足要求。
5.进入开发
话不多说,直接上才艺!
1.搜索所需的数据源
# -*- coding: utf-8 -*-
# ********************************************************************
# Author: 今晚打老斧先森
# Create by: 2023-09-08 16:01:13.000
# Description:
# Update: Task Update Description
# ********************************************************************
import os
import datetime
import requests
import json
# 获取token
def get_token():
payload = json.dumps({
"app_id": "xx"
, "app_secret": "xx"
})
headers = {
'Content-Type': 'application/json;charset=utf-8'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = json.loads(response.text)
print(data)
if 'tenant_access_token' in data and data['tenant_access_token'] != '':
# print('data:%s' % data['data'])
return data['tenant_access_token']
else:
raise Exception('获取data异常!')
# 第一步创建数据范式(只运行一次)
def post_schemas():
headers = {
"Authorization": "Bearer " + get_token(),
'Content-Type': 'application/json;charset=utf-8',
}
payload = json.dumps(
{
"display": {
"card_key": "search_common_card",
"fields_mapping": [
{
"display_field": "summary",
"data_field": "summary: ${summary}"
},
{
"display_field": "footer",
"data_field": "footer ${footer}"
}
]
},
"properties": [
{
"is_searchable": 'true',
"search_options": {
"enable_number_suffix_match": 'false',
"enable_semantic_match": 'true',
"enable_camel_match": 'false',
"enable_exact_match": 'false',
"enable_prefix_match": 'false'
},
"name": "summary",
"type_definitions": {},
"type": "text",
"is_returnable": 'true'
}, {
"is_searchable": 'true',
"search_options": {
"enable_number_suffix_match": 'false',
"enable_semantic_match": 'true',
"enable_camel_match": 'false',
"enable_exact_match": 'false',
"enable_prefix_match": 'false'
},
"name": "footer",
"type_definitions": {},
"type": "text",
"is_returnable": 'true'
}
],
"schema_id": "xx"
}
)
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
# 第二步根据数据范式创建数据源(只运行一次)
def post_data_sources():
headers = {
"Authorization": "Bearer " + get_token(),
'Content-Type': 'application/json;charset=utf-8',
}
# 根据数据范式,设置数据源,就相当于搜索中的分类
payload = json.dumps({
"name": "xx",
"description": "",
"icon_url": "",
"schema_id": "xx",
"template": "",
"state": 0
})
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
# 第三步根据数据源往数据源中写数据
def post_items(项目空间,名称,owner,url,timestamp,id):
# 这里的xx是从第二步返回的id得到,因为第一步第二步只需要执行一次,所以就在这里将id直接放在这里
headers = {
"Authorization": "Bearer " + get_token(),
'Content-Type': 'application/json;charset=utf-8',
}
body = json.dumps(
{
"id": id, #id可传,跟项目空间保持一对一
"acl": [
{
"access": "allow",
"value": "everyone",
"type": "user"
}
],
"metadata": {
"source_url": url,
# 这个就传url吧,没啥说的
"title": 名称,
# 传项目空间
"update_time": timestamp
#时间就传时间
},
"structured_data": "{\"summary\":\""+"owner:"+owner+"\",\"footer\":\""+"所在项目:"+项目空间+"\"}"
#summary传的这个空间的标题,然后fotter相当于介绍
})
response = requests.request("POST", url, headers=headers, data=body)
print(response.text)
def delete_item(idr):
headers = {
"Authorization":"Bearer "+get_token()
}
payload={}
response = requests.request("DELETE", url, headers=headers, data=payload)
print(response.text)
# 获取hive任务
def exe_sql(sql):
cmd = '/hive -e "%s"' % sql
command = (cmd)
r = os.popen(command)
rsp = r.readlines()
return rsp
def select_data():
sql = "SELECT concat(id_app, '##&##',name_app, '##&##',item_key, '##&##', item_name, '##&##', owner, '##&##',item_type, '##&##', create_time,'##&##', url, '##&##', row_id) FROM database.table where p_date = '2023-09-07' ;"
rsp = exe_sql(sql)
bi_operation_details = []
for data in rsp:
item = data.split("##&##")
bi_operation_details.append(item)
return bi_operation_details
def search_demo():
# 查询具体的值
result_list = select_data()
result_list = result_list[:-4]
for array in result_list:
print(array)
项目空间 = array[1]
名称=array[3]
对象类型=array[5]
owner=array[4]
url=array[7]
timestamp = int(datetime.datetime.strptime(array[6], '%Y-%m-%d %H:%M:%S').timestamp())
id=array[0]
post_items(项目空间,名称,owner,url,timestamp,id)
if __name__ == "__main__":
search_demo()
print("完成 bye~")
2.将第一步获取的data_source_id放进最终代码
# -*- coding: utf-8 -*-
# ********************************************************************
# Author: 今晚打老斧先森
# Create by: 2023-09-08 10:48:53.000
# Description:
# Update: Task Update Description
# ********************************************************************
import os
import requests
import json
import datetime
# 获取token
def get_token():
payload = json.dumps({
"app_id":"xx"
,"app_secret":"xx"
})
headers = {
'Content-Type': 'application/json;charset=utf-8'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = json.loads(response.text)
print(data)
if 'tenant_access_token' in data and data['tenant_access_token'] != '':
# print('data:%s' % data['data'])
return data['tenant_access_token']
else:
raise Exception('获取data异常!')
#第一步创建数据范式
def post_schemas():
headers = {
"Authorization":"Bearer "+get_token(),
'Content-Type': 'application/json;charset=utf-8',
}
body=json.dumps(
{
"display": {
"card_key": "search_common_card",
"fields_mapping": [
{
"display_field": "summary",
"data_field": "所在项目: ${summary}"
},
{
"display_field": "footer",
"data_field": "owner : ${owner_name} ${assign_time}"
},
{
"display_field": "tag1",
"data_field": "${result}"
}
]
},
"properties": [
{
"is_searchable": false,
"name": "meeting",
"type_definitions": {},
"type": "text",
"is_returnable": true,
"is_filterable": true,
"filter_options": {
"display_name": "所在项目空间",
"filter_type": "searchable",
"reference_datasource_id": "${reference_datasource_id}"
}
},
{
"is_searchable": true,
"search_options": {
"enable_number_suffix_match": true,
"enable_semantic_match": false,
"enable_camel_match": true,
"enable_exact_match": true,
"enable_prefix_match": true
},
"name": "owner_name",
"type_definitions": {},
"is_returnable": true,
"type": "tinytext"
},
{
"is_searchable": true,
"search_options": {
"enable_number_suffix_match": false,
"enable_semantic_match": true,
"enable_camel_match": false,
"enable_exact_match": false,
"enable_prefix_match": false
},
"name": "summary",
"type_definitions": {},
"type": "text",
"is_returnable": true
},
{
"is_searchable": false,
"name": "assign_time",
"type_definitions": {},
"type": "timestamp",
"is_returnable": true,
"is_filterable": true,
"filter_options": {
"display_name": "执行时间",
"associated_smart_filter": "date",
"filter_type": "time"
}
},
{
"name": "result",
"type_definitions": {
"tag": [
{
"color": "blue",
"name": "win",
"text": "数据集"
},
{
"color": "blue",
"name": "lose",
"text": "仪表盘"
}
]
},
"type": "tag",
"is_returnable": true,
"is_filterable": true,
"filter_options": {
"display_name": "对象类型",
"filter_type": "predefine_enum",
"predefine_enum_values": [
{
"name": "win",
"text": "数据集"
},
{
"name": "lose",
"text": "仪表盘"
}
]
}
}
],
"schema_id": "datawind_schema_id"
}
)
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
#第二步根据数据范式创建数据源
def post_data_sources():
headers = {
"Authorization":"Bearer "+get_token(),
'Content-Type': 'application/json;charset=utf-8',
}
# 根据数据范式,设置数据源,就相当于搜索中的分类
body=json.dumps({
"name": "xx",
"description": "",
"icon_url": "",
"schema_id": "xx",
"template": "search_common_card",
"state": 0
})
response = requests.request("POST", url, headers=headers, data=body)
print(response.text)
# 第三步根据数据源往数据源中写数据
def post_items(项目空间,名称,owner,timestamp,urls,searid,idr):
# 这里的xx是从第二步返回的id得到,因为第一步第二步只需要执行一次,所以就在这里将id直接放在这里
headers = {
"Authorization": "Bearer " + get_token(),
'Content-Type': 'application/json;charset=utf-8',
}
body = json.dumps(
{
"id": idr,
"acl": [
{
"access": "allow",
"value": "everyone",
"type": "user"
}
],
"metadata": {
"source_url": urls,
"title": 名称,
"update_time":timestamp
},
"structured_data": "{\"owner_name\":\""+owner+"\",\"summary\":\""+项目空间+"\",\"assign_time\":\""+timestamp+"\",\"result\":\"lose\", \"meeting\":\""+searid+"\"}"
})
response = requests.request("POST", url, headers=headers, data=body)
print(body)
print(response.text)
def delete_item(idr):
headers = {
"Authorization":"Bearer "+get_token()
}
payload={}
response = requests.request("DELETE", url, headers=headers, data=payload)
print(response.text)
# 获取hive任务
def exe_sql(sql):
cmd='/hive -e "%s"' %sql
command = (cmd)
r=os.popen(command)
rsp=r.readlines()
return rsp
def select_data():
sql = "SELECT concat(id_app, '##&##',name_app, '##&##',item_key, '##&##', item_name, '##&##', owner, '##&##',item_type, '##&##', create_time,'##&##', url, '##&##', row_id) FROM dataleap_operation.ods_bi_operation_details_df where p_date = '2023-09-07' ;"
rsp = exe_sql(sql)
bi_operation_details=[]
for data in rsp:
item=data.split("##&##")
bi_operation_details.append(item)
return bi_operation_details
# result_list = select_data()
# result_list = result_list[:-4]
# for array in result_list:
# print(array)
# print(array[1])
def search_demo():
# 查询具体的值
result_list = select_data()
result_list = result_list[:-4]
for array in result_list:
print(array)
项目空间 = array[1]
名称=array[3]
对象类型=array[5]
owner=array[4]
url=array[7]
timestamp = str(int(datetime.datetime.strptime(array[6], '%Y-%m-%d %H:%M:%S').timestamp()))
searid=str(array[0])
idr=str(array[8])
print(项目空间,名称,owner,timestamp,url,searid,idr)
post_items(项目空间,名称,owner,timestamp,url,searid,idr)
if __name__ == "__main__":
search_demo()
五、更多相关开发心得
1.可能有开发者会问,调用飞书的API,为什么需要创建一个应用呢?
首先这是飞书的应用开发流程要求的。我个人理解,这主要还是和权限&安全相关,如果开放 API 给任何人用,任何人都可以操作其他人的数据,这不就乱套了嘛。创建一个应用,收敛权限为:只有管理员和协作者有操作权限,若不申请相关权限,是不能获取企业内他人信息的。
2.那么应用如何解决个人的权限隔离问题?
就调用 API 来自动创建文档需求而言,创建的文档在应用本身自己的空间中(后续若让其他人有操作权限,需通过 api 添加权限或转移所有权)。或者讲:自己的空间自己操作。
3.如何理解开发者调用api的行为与应用之间的关系?
应用本身有自己的空间,一种好的类比理解是:新建应用(注册一个飞书账号)--通过 API 操作应用(在账号内部自己正常做自己的事情,区别只在于一个通过 api,一个自己去点界面而已)。则其产出的文档肯定在自己的空间中,就好比在飞书自己的空间中创建的文档,如果没有授权,其他人是没有访问权限的。
4.企业自建应用 和 商店应用 应该建哪一个?该需求创建 商店应用 可以不可以?
5.关于 token 刷新问题:
如果采用 Postman 进行调试,一个明显的感知是 token 会自动过期,需要一直刷新,在这一点上,后端 api 获取的操作对象会自动进行刷新,比较方便。
6.如果业务需求过于复杂,在开发的过程中,并不能实现,怎么解决?
思路:重新评估需求->拆分需求->寻求帮助和建议->使用现有解决方案或工具->与利益相关者沟通->持续学习和提升技能
关键在于始终保持一颗学习的心,不要过早妄下结论
六、收益评估&写在最后
整体来看,本需求的成功实施,减少了无效人力的浪费,因为属于长期收益,随着时间的推进,团队收益越大。
开发的工作量上,本次需求研发整体花费 4天(包含调研 1 天➕设计方案 1 天➕开发调试 2 天),相对于需求收益来说,可基本持平。而且在调研阶段充分认识了飞书开放后台的此种能力以及其他包括(机器人等)的能力,在后续的工作中,该种能力得到复用,整体收益还是比较大的。
本需求其实属于一个比较小的优化点,其关键技术较少,但收益可量化、可评估。在平时的开发过程中,及时解决痛点很重要,小的痛点可随着时间放大为大的痛点。
飞书套件中提供的日历、通讯录、云文档、会议室、任务、考试等功能,其实基本都有相关 API 提供,大家可以探索研究下飞书开放能力,研发更多提升工作效率的玩法。