能力中心 > 论坛 能力高级开发指南 >
海量大数据分析平台 帮助 能力高级开发指南

开发者论坛

海量大数据分析平台

 找回密码
 立即注册
搜索
海量大数据分析平台 帮助 能力高级开发指南

能力高级开发指南

日志篇

[ 展开 ]

常见开发问题

[ 展开 ]

  1. 经常有程序在onTrigger()里一次仅处理一个flowFile,这样效率较低,建议批量处理的个数控制在百千级别(根据能力实际处理复杂度决定)。
    public int onTrigger(ProcessContext context, ProcessSession session) {
        int count = 0;
        for (int i = 0; i < 100; i++) {
            FlowFile flowFile = session.get();
            if (flowFile == null) {
                break;
            }
           .....
        }
   }
  1. 针对onTrigger中使用database连接池的情况,尽量引用nifi平台上的 service dbcp,

减少每个能力,都自己load 一套连接池驱动jdbc等n多类。

  1. onTrigger()中,尽量快速的完成任务,如果需要等待io等操作,建议使用 内部队列,来实现异

步,尽量把 cpu 释放给其他 兄弟能力,避免多吃多占,浪费现象。如果onTrigger时没有啥处

理,建议通过 context.yield() 方法,减慢onTrigger的频度,可以降低到每秒一次

onTrigger()。

  1. 跟踪错误时,可以通过右上角的日志输出进行快速定位。

  2. 针对类似rocketmq consumer的实例,可以初始化到成员变量中,通过阻塞队列的办

法来实现数据 在onTrigger逻辑中的拉取。



    /**
     * 处理flow file
     *
     * @param context context
     * @param session session
     */
    public int onTrigger(ProcessContext context, ProcessSession session) {

     for (int i = 0; i < processCount; i++) {
            MessageExt messageExt = queue.poll();
            if (messageExt == null) {
                break;
            }

     }
   }




    /**
     *  订阅消息
     *
     */
  public void beginConsumer() {
 
            DefaultMQPushConsumer myConsumer = new DefaultMQPushConsumer(consumerGroup);
            myConsumer.setNamesrvAddr(nameServer);
            myConsumer.subscribe(topic, tag);
            myConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->
            {
                for (MessageExt messageExt : msgs) {
                         queue.put(messageExt);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

  }



  1. 针对flowFile属性修改时,容易忽略的赋值问题。
错误代码如下:
session.setAttribute(fileflow, map)


正确的代码如下:
fileflow = session.setAttribute(fileflow, map)

  1. 下图问题就是 能力启动时间过久超过了30s了。

输入图片说明

针对上述问题,优化的办法是把 耗时的启动从 OnScheduled()转移到 onTrigger()中,可以避免 30s启动的失败问题。

常见错误原因排查

[ 展开 ]

  1. onTrigger() 运行一段时候后,失效了,啥原因?
 答:当下游队列满导致无法写入数据时,默认停止onTrigger的运行,队列默认长度10000,测试时可调整大小。
  1. oom 内存溢出了?
答:一般是由队列太大或者map表类引起的,请检查一下onTrigger中对成员变量的引用。
  1. throws FlowFileHandlingException: not the most recent version of this FlowFile within this session
答:如果你的代码是这样的:
    session.setAttribute(fileflow, map);
    就会出现上面的问题,参考正确的代码如下:
     fileflow = session.setAttribute(fileflow, map);

平台额外常用功能

[ 展开 ]

  1. 在能力内获取使用能力的userId和projectId

在nifi-XXX-nar下的pom文件里添加以下依赖

        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-framework-nar</artifactId>
            <type>nar</type>
        </dependency>

在nifi-XXX-processor的pom文件里添加以下依赖

        <dependency>
            <groupId>org.apache.nifi</groupId>
            <artifactId>nifi-framework-core</artifactId>
            <version>1.2.0</version>
            <scope>provided</scope>
        </dependency>

在程序代码里如下获取

import org.apache.nifi.util.LogAppendInfoUtil;

       String userId = LogAppendInfoUtil.userId();
       String projectId = LogAppendInfoUtil.projectId();

PS:LogAppendInfoUtil类在nifi-framework-core-1.2.0.jar里,使用前需要删除本地仓库中已有的jar,从平台maven仓库中重新导入

2、在能力内部获取注册到平台的能力ID

@Override
protected void init(final ProcessorInitializationContext context) {
        String procesorId = "0";
        if (context.getExtendProperties() != null
				&& !StringUtils.isBlank(context.getExtendProperties().get("hidden_processor_id"))) {
     procesorId = context.getExtendProperties().get("hidden_processor_id");
}
}

3、在onStop方法里获取能力是手工停止还是nifi重启导致的停止

public void stop(final ProcessContext context) throws IOException {
    if(!context.isAutoStopping()){
        //do something
    }
}

4、将能力上次运行状态信息保存起来,重新打开画布后能获取到

在能够获取ProcessContext context对象的方法里
使用context.putExtendProperty(processorId + extendPropertySuffix, lastJobInf("新创建", ""));保存信息
使用context.getExtendProperty(processorId + extendPropertySuffix);//获取信息

实际信息保存到画布的xml里,信息不要太大,为了保证key不冲突,建议key里包含processorId

交互篇

[ 展开 ]

编译部署篇

[ 展开 ]

开发篇

[ 展开 ]

海量大数据分析平台  

GMT+8, 2024-5-20 07:45 , Processed in 0.032926 second(s), 9 queries .

返回顶部