本文参考了这篇文章,非常感谢:https://www.jianshu.com/p/31655775b040

这两天研究从hdfs里读数据,写进sparkStreaming,不使用kafka,直接发送给streaming,于是想到了socketTextStreaming这个接收器。

用nc工具 没问题,

自己用scala/java写了socket程序,给streaming发数据,streaming没报错,但是也收不到数据?!诡异!

我自己写的发送程序如下:

import java.io.{BufferedWriter, OutputStreamWriter}
import java.net.{ServerSocket, Socket}

object TestSocketSend {

  def main(args: Array[String]): Unit = {

    println("启动 server ....")
    val ss = new ServerSocket(4447)
    val s: Socket = ss.accept()
//    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(s.getOutputStream))
    val os=s.getOutputStream
    
    var i=0
    while (true) {
      i+=1
      val msg="erha jinmao fadou "+i
      os.write(msg.getBytes())
      Thread.sleep(1000)
      println(msg)
    }

    
  } //main ends
  
  
}

我想莫非是因为写出流太抽象啦?没有flush? 尝试后 都没用。直到我看到了开篇提到的这篇文章,它的代码如下:

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class TestSocketSend2 {

    public static void main(String[] args) {
        try {
            ServerSocket ss = new ServerSocket(4447);
            System.out.println("启动 server ....");
            Socket s = ss.accept();
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
            String response = "java,1,2";

            //每 2s 发送一次消息
            int i = 0;
            Random r=new Random();   //不传入种子
            String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

            while(true){
                response= lang[r.nextInt(lang.length)]+ i + "," + i + "," + i+"\n";
                System.out.println(response);
                try{
                    bw.write(response);
                    bw.flush();
                    i++;
                }catch (Exception ex){
                    System.out.println(ex.getMessage());
                }
                Thread.sleep(1000 );
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

以上代码发送的数据,streaming接收成功!

我挨行对比,一步步测试,终于找到了关键所在: \n  !!  对就是这个换行符!

原来写入换行符之前,streaming一直给我存着呢!认为我还没写完一行,一直在等待! 看看streaming的源码!

你看,淫家等着“\n”来结束这一行呢!!

大家一定记得,给steaming的socketTextStream发数据的时候,末尾要加“\n”呐!

 

原文地址:http://www.cnblogs.com/qiandaohu27/p/16791354.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性