[ 展开 ]
[ 展开 ]
public int onTrigger(ProcessContext context, ProcessSession session) {
int count = 0;
for (int i = 0; i < 100; i++) {
FlowFile flowFile = session.get();
if (flowFile == null) {
break;
}
.....
}
}
减少每个能力,都自己load 一套连接池驱动jdbc等n多类。
步,尽量把 cpu 释放给其他 兄弟能力,避免多吃多占,浪费现象。如果onTrigger时没有啥处
理,建议通过 context.yield() 方法,减慢onTrigger的频度,可以降低到每秒一次
onTrigger()。
跟踪错误时,可以通过右上角的日志输出进行快速定位。
针对类似rocketmq consumer的实例,可以初始化到成员变量中,通过阻塞队列的办
法来实现数据 在onTrigger逻辑中的拉取。
/**
* 处理flow file
*
* @param context context
* @param session session
*/
public int onTrigger(ProcessContext context, ProcessSession session) {
for (int i = 0; i < processCount; i++) {
MessageExt messageExt = queue.poll();
if (messageExt == null) {
break;
}
}
}
/**
* 订阅消息
*
*/
public void beginConsumer() {
DefaultMQPushConsumer myConsumer = new DefaultMQPushConsumer(consumerGroup);
myConsumer.setNamesrvAddr(nameServer);
myConsumer.subscribe(topic, tag);
myConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->
{
for (MessageExt messageExt : msgs) {
queue.put(messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
}
错误代码如下:
session.setAttribute(fileflow, map)
正确的代码如下:
fileflow = session.setAttribute(fileflow, map)
针对上述问题,优化的办法是把 耗时的启动从 OnScheduled()转移到 onTrigger()中,可以避免 30s启动的失败问题。
[ 展开 ]
答:当下游队列满导致无法写入数据时,默认停止onTrigger的运行,队列默认长度10000,测试时可调整大小。
答:一般是由队列太大或者map表类引起的,请检查一下onTrigger中对成员变量的引用。
答:如果你的代码是这样的:
session.setAttribute(fileflow, map);
就会出现上面的问题,参考正确的代码如下:
fileflow = session.setAttribute(fileflow, map);
[ 展开 ]
在nifi-XXX-nar下的pom文件里添加以下依赖
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar</artifactId>
<type>nar</type>
</dependency>
在nifi-XXX-processor的pom文件里添加以下依赖
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
在程序代码里如下获取
import org.apache.nifi.util.LogAppendInfoUtil;
String userId = LogAppendInfoUtil.userId();
String projectId = LogAppendInfoUtil.projectId();
PS:LogAppendInfoUtil类在nifi-framework-core-1.2.0.jar里,使用前需要删除本地仓库中已有的jar,从平台maven仓库中重新导入
2、在能力内部获取注册到平台的能力ID
@Override
protected void init(final ProcessorInitializationContext context) {
String procesorId = "0";
if (context.getExtendProperties() != null
&& !StringUtils.isBlank(context.getExtendProperties().get("hidden_processor_id"))) {
procesorId = context.getExtendProperties().get("hidden_processor_id");
}
}
3、在onStop方法里获取能力是手工停止还是nifi重启导致的停止
public void stop(final ProcessContext context) throws IOException {
if(!context.isAutoStopping()){
//do something
}
}
4、将能力上次运行状态信息保存起来,重新打开画布后能获取到
在能够获取ProcessContext context对象的方法里
使用context.putExtendProperty(processorId + extendPropertySuffix, lastJobInf("新创建", ""));保存信息
使用context.getExtendProperty(processorId + extendPropertySuffix);//获取信息
实际信息保存到画布的xml里,信息不要太大,为了保证key不冲突,建议key里包含processorId
[ 展开 ]
[ 展开 ]
[ 展开 ]
GMT+8, 2024-5-20 10:29 , Processed in 0.035262 second(s), 9 queries .