Android下UDP socket收发(含>64K字节大包)
一个实现udp收发的例子,在Android 3.x以上版本,不能直接在主线程中发送网络数据包,以免ui线程阻塞。
这个方式效率不高,一方面还得减少拷贝次数,另一方面最好还是以c实现,封装成so库,然后jniLib方式来调用。
因为UDP单次最大发送64kB的包,所以更大的包发送前需要拆分,接收时需要合并。
代码很简单,不做过多解释。
发送类:
public class UdpSendThread extends Thread {
private static final String TAG = “UdpSendThread”;
boolean isRunning = true;
DatagramSocket mSocket;
private static final int UDP_PACKET_USER_MAX_SIZE = 65506; //UDP包长65535 – 20Bytes IP header – 8Bytes UDP header – 1Byte自定义的头序列号
ArrayList<SendMsg> mArrayListSnd = new ArrayList<SendMsg>();
boolean mArrayListSndSync = true;
public class SendMsg{
byte[] bytes;
int length;
int delayMs;
String address;
int port;
}
public UdpSendThread() {
isRunning = true;
mArrayListSnd.clear();
}
public void Stop() {
isRunning = false;
if(mSocket != null) {
mSocket.close();
mSocket = null;
mArrayListSnd.clear();
}
}
@Override
public void run() {
super.run();
try {
mSocket = new DatagramSocket();
}catch (SocketException e){
e.printStackTrace();
Log.e(TAG, “DatagramSocket error !!!”);
}
while (true) {
if (!isRunning) {
break;
}
if(mArrayListSnd.isEmpty()){
try {
Thread.sleep(10);
}catch (Exception e){
e.printStackTrace();
Log.e(TAG, “Thread.sleep error !!!”);
}
continue;
}
try {
SendMsg sendMsg = mArrayListSnd.get(0);
/*******************发送数据***********************/
InetAddress address = InetAddress.getByName(sendMsg.address);
//1.构造数据包
DatagramPacket packet = new DatagramPacket(sendMsg.bytes,
sendMsg.length, address, sendMsg.port);
//3.从此套接字发送数据报包。
mSocket.send(packet);
int delay = sendMsg.delayMs;
mArrayListSnd.remove(0);
Thread.sleep(delay); //防止丢包,但是发送太快还是会丢包,看来还是得从linux层封装成so来用。
} catch (UnknownHostException e) {
e.printStackTrace();
Log.e(TAG, “run: socket.send error !!!”);
} catch (SocketException e) {
e.printStackTrace();
Log.e(TAG, “run: socket.send error !!!”);
} catch (IOException e) {
e.printStackTrace();
Log.e(TAG, “run: DatagramSocket error !!!”);
} catch (InterruptedException e){
Log.e(TAG, “run: Thread.sleep interrupted!”);
}
}
if(mSocket != null) {
mSocket.close();
mSocket = null;
mArrayListSnd.clear();
}
}
/**
* 发送消息
*/
public boolean SendUdpMsg(byte[] bytes, int length, String address, int port) {
//下面 UDP_PACKET_USER_MAX_SIZE + 1 的1 表示保留一字节消息头,即序列号,最大支持上层length大小为UDP包最大的64kBytes(实际中还要去除头)乘以254。
//序列号: 255表示该包是一个完整的上层数据包,无须组装。
// 0 表示该包的上层数据包太大,该包是它的一个起始子包。
// 1,2,3…250 表示继续上面0子包的数据包,该包是它的中间子包。
// 254 表示上面一系列子包所表示的上层子包结束,该包是最后一个子包。
// 如果中间序号断了,表示中间有丢包,无法组装给上层,收到的其他中间子包直接丢弃。
if ((length > (UDP_PACKET_USER_MAX_SIZE * 254)) || length == 0) {
Log.e(TAG, “SendUdpMsg: length is too big or zero, can’t support: ” + length);
return false;
}
int userBytesOffset = 0;
int userBytesRemainLen = length;
if (length > UDP_PACKET_USER_MAX_SIZE) { //需拆包
int packetNum = length / UDP_PACKET_USER_MAX_SIZE;
int packetM = length % UDP_PACKET_USER_MAX_SIZE;
if (packetM == 0) { //刚好整除,则循环里面少发一个包,因为最后一个包的header serial不一样
packetNum -= 1;
}
int i;
for (i = 0; i < packetNum; i++) { //发送前面的子包
SendMsg msg = new SendMsg();
msg.length = UDP_PACKET_USER_MAX_SIZE + 1;
msg.bytes = new byte[msg.length];
msg.bytes[0] = (byte) i;
System.arraycopy(bytes, i * UDP_PACKET_USER_MAX_SIZE, msg.bytes, 1, UDP_PACKET_USER_MAX_SIZE);
msg.address = new String(address);
msg.port = port;
msg.delayMs = 20;
mArrayListSnd.add(msg);
}
userBytesOffset = i * UDP_PACKET_USER_MAX_SIZE;
userBytesRemainLen = length – userBytesOffset;
}
//发送最后一个子包,或者整个上层包一次发送
SendMsg msg = new SendMsg();
msg.length = userBytesRemainLen + 1;
msg.bytes = new byte[msg.length];
if (userBytesOffset == 0) { //上层长度可以在一个UDP包中发送,不拆包
msg.bytes[0] = (byte) 255;
msg.delayMs = 10;
} else { //最后一个子包
msg.bytes[0] = (byte) 254;
msg.delayMs = 10;
}
System.arraycopy(bytes, userBytesOffset, msg.bytes, 1, userBytesRemainLen);
msg.address = new String(address);
msg.port = port;
mArrayListSnd.add(msg);
return true;
}
}
接收类:
public class UdpReceiveThread extends Thread {
private static final String TAG = “UdpReceiveThread”;
//IP地址
private String mAddress;
//端口
private int mPort;
private Handler mHandler;
private DatagramSocket mSocket;
private boolean isRunning = true;
private byte[] packetBuffer;
private int packetBufferLen;
private static final int UDP_PACKET_USER_MAX_SIZE = 65506; //UDP包长65535 – 20Bytes IP header – 8Bytes UDP header – 1Byte自定义的头序列号
public UdpReceiveThread(Handler handler, String address, int port) {
this.mHandler = handler;
this.mAddress = address;
this.mPort = port;
isRunning = true;
}
public void Stop() {
isRunning = false;
if(mSocket != null) {
mSocket.close();
mSocket = null;
}
}
@Override
public void run() {
super.run();
ReceivePacket();
}
/**
* 设置
*/
private void ReceivePacket() {
try {
mSocket = new DatagramSocket(mPort);
}catch (SocketException e){
e.printStackTrace();
Log.e(TAG, “DatagramSocket error !!!”);
}
int packetNo = 0;
while (true) {
if(! isRunning){
break;
}
try {
/*******************接收数据***********************/
//1.构造 DatagramPacket,用来接收长度为 length 的数据包。
final byte[] bytes1 = new byte[UDP_PACKET_USER_MAX_SIZE + 100];
DatagramPacket receiverPacket = new DatagramPacket(bytes1, bytes1.length);
mSocket.receive(receiverPacket);
byte headSerial = receiverPacket.getData()[0];
//Log.e(TAG, “headSerial: ” + headSerial);
if(headSerial == (byte)255) {
packetBufferLen = receiverPacket.getLength() – 1;
packetBuffer = new byte[packetBufferLen];
System.arraycopy(receiverPacket.getData(), 1, packetBuffer, 0, packetBufferLen);
sendMsg(0, new String(packetBuffer, 0, packetBufferLen));
}else if(headSerial == (byte)254){
if(packetBufferLen >= packetBuffer.length){
Log.e(TAG, “ReceivePacket: packetBufferLen is too big: “+ packetBufferLen);
continue;
}
System.arraycopy(receiverPacket.getData(), 1, packetBuffer, packetBufferLen, receiverPacket.getLength() -1);
packetBufferLen = packetBufferLen + receiverPacket.getLength() – 1;
//Log.e(TAG, “Len: ” + packetBufferLen);
sendMsg(0, new String(packetBuffer, 0, packetBufferLen));
}else if(headSerial == (byte)0){
packetBufferLen = UDP_PACKET_USER_MAX_SIZE;
packetBuffer = new byte[UDP_PACKET_USER_MAX_SIZE * 254];
System.arraycopy(receiverPacket.getData(), 1, packetBuffer, 0, UDP_PACKET_USER_MAX_SIZE);
}else{
packetBufferLen += UDP_PACKET_USER_MAX_SIZE;
System.arraycopy(receiverPacket.getData(), 1, packetBuffer, headSerial * UDP_PACKET_USER_MAX_SIZE, UDP_PACKET_USER_MAX_SIZE);
}
} catch (UnknownHostException e) {
e.printStackTrace();
Log.e(TAG, “socket.receive error !!!”);
} catch (SocketException e) {
e.printStackTrace();
Log.e(TAG, “socket.receive error !!!”);
} catch (IOException e) {
e.printStackTrace();
Log.e(TAG, “socket.receive error !!!”);
}
}
if(mSocket != null) {
mSocket.close();
mSocket = null;
}
}
/**
* 发送消息到UI
*/
private void sendMsg(int what, Object object) {
Message msg = new Message();
msg.what = what;
msg.obj = object;
mHandler.sendMessage(msg);
}
}