您现在的位置是:首页 > 正文

2021-06-03

2024-02-29 14:45:37阅读 1

Streamset消费kafka数据到hbase(复合rowkey):

管道创建及配置:

kafka:

js:

json二次开发:

一,标准的json
解析之前:
{“tagname”:“H_ZG_DE_Per1_pExtr2HPH2Abs”,“time”:“2021-05-19 11:27:46”,“value”:“0.000”,“status”:“0”}

解析之后:
{“rowkey”:“H_ZG_DE_Per1_pExtr2HPH2Abs_2021-05-19 11:27:46”,“value”:“0.000”,“status”:“0”}

主键拼接:

var records = sdc.records;
for(var i = 0; i < records.length; i++) {
try {
var newMap = sdc.createMap(false);
// 根据原先的record创建一个新的record对象
var newRecord = sdc.createRecord(records[i].sourceId + ‘:newRecordId’);
// 将这个对象的value属性置空
newRecord.value = {};
// apply_id;car_no;car_id;date_created;date_end
// 把记录中需要的字段提取出来封装到新map中
newMap[‘rowkey’]=records[i].value[‘tagname’]+’_’+records[i].value[‘time’];
newMap[‘value’]=records[i].value[‘value’];
newMap[‘status’]=records[i].value[‘status’];
newRecord.value = newMap
sdc.output.write(newRecord);
} catch (e) {
// 这里可以进行异常处理
// Send record to error
sdc.error.write(records[i], e);
}
}

hbase:

最后hbase数据:

streamset状态:

二,标准的jsonarray

[{“tagname”:“MonthlyPowerGeneration”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“0.005”},{“tagname”:“Calculation”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“4.496”},{“tagname”:“Calculation”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“0.48”}]

转换后:

一行变多行:
{“tagname”:“MonthlyPowerGeneration”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“0.005”}
{“tagname”:“Calculation”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“4.496”}
{“tagname”:“Calculation”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“0.48”}

StreamSets管道配置:

Kafka:

JavaScripe:

js解析代码:
var records=sdc.records;
for(var i = 0; i < records.length; i++) {
try {
var result = records[i].value;

for(var j = 0; j < result.length; j++) {
var newRecord = sdcFunctions.createRecord(true);
var resultMap = sdcFunctions.createMap(true);

resultMap.rowkey = result[j].tagname+'_'+result[j].time;

 resultMap.time = result[j].time;
 resultMap.status = result[j].status;
 resultMap.value = result[j].value;

   newRecord.value=resultMap;

log.info("-------------" + newRecord.value[‘rowkey’]);
log.info("-------------" + result[j].tagname);
log.info("-------------" + result[j].time);
log.info("-------------" + result[j].status);
log.info("-------------" + result[j].value);
output.write(newRecord);

}
} catch (e) {
  error.write(records[i], e);
}

}

Hbase:

指定rowkey

最后导入kafka测试数据:
[{“tagname”:“MonthlyPowerGeneration”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“0.005”},{“tagname”:“Calculation”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“4.496”},{“tagname”:“Calculation”,“time”:“2021-05-30 10:55:51”,“status”:0,“value”:“0.48”}]

查看日志

查看hbase:

查看StreamSets状态:

网站文章

  • 安卓入门--Spinner控件与二级联动详解

    安卓入门--Spinner控件与二级联动详解

    SPinner 父类:AbsSpinner–AdapterView 定义:下拉菜单控件 常用方法: android:spinnerMode:1. dialog---对话框模式 2. dropdown-...

    2024-02-29 14:45:31
  • NIST RBAC 模型 —— 向统一标准化的努力(3)

    [b]第3节 Flat RBAC[/b] Flat RBAC如图1所示。Flat RBAC所要求的特性对于任何一个RBAC实现来说是必不可少,并且显而易见的。对于Flat RBAC来说,讨论的重点仿佛应当是那些被排除在外的特性了。 Flat RBAC 主要从传统操作系统中广泛使用的基于组的访问控制中,以新一个角度提取了相关特性。有的人可能觉得这些特性不足以诠释RBAC这个概念。NIST...

    2024-02-29 14:45:04
  • python中win32模块的安装及使用方法

    (3) 执行以下命令 python pywin32_postinstall.py -install。网上也有说pip3 install pipywin32 的,这个只能安装win32com模块。win...

    2024-02-29 14:44:57
  • 【前端面试指南】JS-10-开发环境

    关于开发环境面试官想通过开发环境了解候选人的实际工作情况开发环境的工具,能体现工作产出的效率会以聊天形式为主,不会问具体的问题git最常用的代码版本管理工具大型项目需要多人协作开发,必须熟用git如果...

    2024-02-29 14:44:49
  • g++编译——动态链接库问题

    编译时找不到&quot;.so&quot;文件 -L 添加路径 -l添加库文件,例: g++ test.cpp -o test -lopencv_core -lopencv_imgproc -lope...

    2024-02-29 14:44:22
  • 六.Redis极简入门-Redis实现分布式锁原理

    六.Redis极简入门-Redis实现分布式锁原理

    老鸟飞过,学习使用,欢迎交流 理解分布式锁 为什么要分布式锁 在并发场景中,我们可以使用加锁的手段来保证业务方法或代码的原子性操作,从而防止数据被并发修改引发安全问题,在单体应用中我们可以使用互斥锁如...

    2024-02-29 14:44:18
  • 离职前的最后三周

    倒数第二十一天 2022.03.10 每天总得学点东西吧... 学习了Mybatis源码架构、设计模式(装饰器模式、适配器模式与桥接模式): 【设计模式】装饰器模式 【设计模式】适配器模式与桥接模式 ...

    2024-02-29 14:44:12
  • 什么是哈希表?

    什么是哈希表?

    我们在这篇文章将要学习最有用的数据结构之一—哈希表,哈希表的英文叫 Hash Table,也可以称为散列表或者 Hash 表。 哈希表用的是数组支持按照下标随机访问数据的特性,所以哈希表其实就是数组的一种扩展,由数组演化而来。可以说,如果没有数组,就没有散列表。 哈希表存储的是由键(key)和值(value)组成的数据。 例如,我们将每个人的性别作为数据进行存储,键为人名,值为对应的性...

    2024-02-29 14:43:43
  • Properties集合基本介绍和使用

    Properties集合基本介绍和使用

    Map接口实现类Properties【重点】 基本介绍: Properties类继承自Hashtable类并且实现了Map接口,也是使用一种键值对的形式来保存数据 它使用特点和Hashtable类似 ...

    2024-02-29 14:43:35
  • 位置不可用–无法访问X:.–文件或目录损坏且无法读取

    位置不可用–无法访问X:.–文件或目录损坏且无法读取

    位置不可用–无法访问X:.–文件或目录损坏且无法读取 正当打开我心爱的移动硬盘时出现如下: 当时我的内心慌得一批,毕竟。。。 网上搜索后发现如下办法,抱着活马当死马医的想法试了试 打开CMD–&gt;输入 chkdsk i: /f (i:为修复的盘符) 等待片刻后。。。 OK!! ...

    2024-02-29 14:43:28