本文参考了这篇文章,非常感谢:https://www.jianshu.com/p/31655775b040
这两天研究从hdfs里读数据,写进sparkStreaming,不使用kafka,直接发送给streaming,于是想到了socketTextStreaming这个接收器。
用nc工具 没问题,
自己用scala/java写了socket程序,给streaming发数据,streaming没报错,但是也收不到数据?!诡异!
我自己写的发送程序如下:
import java.io.{BufferedWriter, OutputStreamWriter} import java.net.{ServerSocket, Socket} object TestSocketSend { def main(args: Array[String]): Unit = { println("启动 server ....") val ss = new ServerSocket(4447) val s: Socket = ss.accept() // val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(s.getOutputStream)) val os=s.getOutputStream var i=0 while (true) { i+=1 val msg="erha jinmao fadou "+i os.write(msg.getBytes()) Thread.sleep(1000) println(msg) } } //main ends }
我想莫非是因为写出流太抽象啦?没有flush? 尝试后 都没用。直到我看到了开篇提到的这篇文章,它的代码如下:
import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.Random; public class TestSocketSend2 { public static void main(String[] args) { try { ServerSocket ss = new ServerSocket(4447); System.out.println("启动 server ...."); Socket s = ss.accept(); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream())); String response = "java,1,2"; //每 2s 发送一次消息 int i = 0; Random r=new Random(); //不传入种子 String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"}; while(true){ response= lang[r.nextInt(lang.length)]+ i + "," + i + "," + i+"\n"; System.out.println(response); try{ bw.write(response); bw.flush(); i++; }catch (Exception ex){ System.out.println(ex.getMessage()); } Thread.sleep(1000 ); } } catch (Exception e) { e.printStackTrace(); } } }
以上代码发送的数据,streaming接收成功!
我挨行对比,一步步测试,终于找到了关键所在: \n !! 对就是这个换行符!
原来写入换行符之前,streaming一直给我存着呢!认为我还没写完一行,一直在等待! 看看streaming的源码!
你看,淫家等着“\n”来结束这一行呢!!
大家一定记得,给steaming的socketTextStream发数据的时候,末尾要加“\n”呐!
原文地址:http://www.cnblogs.com/qiandaohu27/p/16791354.html
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,请务用于商业用途!
3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员!
8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载
声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性