[ 展开 ]
[ 展开 ]
public int onTrigger(ProcessContext context, ProcessSession session) {
int count = 0;
for (int i = 0; i < 100; i++) {
FlowFile flowFile = session.get();
if (flowFile == null) {
break;
}
.....
}
}
减少每个能力,都自己load 一套连接池驱动jdbc等n多类。
步,尽量把 cpu 释放给其他 兄弟能力,避免多吃多占,浪费现象。如果onTrigger时没有啥处
理,建议通过 context.yield() 方法,减慢onTrigger的频度,可以降低到每秒一次
onTrigger()。
跟踪错误时,可以通过右上角的日志输出进行快速定位。
针对类似rocketmq consumer的实例,可以初始化到成员变量中,通过阻塞队列的办
法来实现数据 在onTrigger逻辑中的拉取。
/**
* 处理flow file
*
* @param context context
* @param session session
*/
public int onTrigger(ProcessContext context, ProcessSession session) {
for (int i = 0; i < processCount; i++) {
MessageExt messageExt = queue.poll();
if (messageExt == null) {
break;
}
}
}
/**
* 订阅消息
*
*/
public void beginConsumer() {
DefaultMQPushConsumer myConsumer = new DefaultMQPushConsumer(consumerGroup);
myConsumer.setNamesrvAddr(nameServer);
myConsumer.subscribe(topic, tag);
myConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->
{
for (MessageExt messageExt : msgs) {
queue.put(messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
}
错误代码如下:
session.setAttribute(fileflow, map)
正确的代码如下:
fileflow = session.setAttribute(fileflow, map)
针对上述问题,优化的办法是把 耗时的启动从 OnScheduled()转移到 onTrigger()中,可以避免 30s启动的失败问题。
[ 展开 ]
[ 展开 ]
[ 展开 ]
[ 展开 ]
[ 展开 ]
GMT+8, 2024-5-20 07:45 , Processed in 0.026407 second(s), 9 queries .