IM消息超时检测机制
由于websocket是全双工通信的。因此通过同一个websocket通道进行发送和接受数据,相比于http的2xx,3xx,4xx,5xx响应码,websocket协议层并没有类似的code,也就是说,相对应的数据发送成功与否需要我们自己实现。
同时在考虑下另一个业务场景,对于一个发出的请求,等待这个请求的响应是有时间限制的--不可能无限制等待一个请求的响应,换句话说,需要有超时检测机制。
超时检测机制一般有3种实现方式:
- 定期遍历
对于每一条消息,发送时记录发送时间,当被遍历时,检测时间间隔是否已超过超时时间;同时对于收到的响应数据,主动从记录中删除,大致可如下实现
//已发送等待服务器响应队列 private Listqueue = Collections.synchronizedList(new LinkedList ()); scheduledExecutor.scheduleWithFixedDelay( new WaitCallBackScanner(), 2, ImConfig.TIME_OUT_SCANNER_SECONDS, TimeUnit.SECONDS); private class WaitCallBackScanner implements Runnable { @SuppressWarnings("WhileLoopReplaceableByForEach") @Override public void run() { if (queue == null) { L.i("WaitCallBackScanner", "queue is null at:" + System.currentTimeMillis()); } else { //noinspection SynchronizeOnNonFinalField synchronized (queue) { if (queue.size() == 0) { L.i("WaitCallBackScanner", "queue is empty at:" + System.currentTimeMillis()); } else { L.i("WaitCallBackScanner", "queue size at:" + queue.size()); Iterator it = queue.iterator(); while (it.hasNext()) { MessageWrap e = it.next(); long indexTime = IMUtils.getRelativeTime(e.getSendTimestamp(), System.currentTimeMillis()); if (indexTime > ImConfig.TIME_OUT_SECONDS || e.isCanRecycle()) { byte cmd = JsonConverter.getByteFromJsonObject(e.getData(), ImConfig.JSON_TAG_CMD); //滞留消息优先处理 if (e.isCanRecycle()) { executor.submit(new RemoveQueueReturner(e.getData(), e.getMsgId(), RemoveQueueReturner.TYPE_NONE, cmd)); } //处理超时消息,且当前消息不是滞留消息,这里可以不处理,已被处理的信息第二次不会再被处理 if (indexTime > ImConfig.TIME_OUT_SECONDS) { e.setCanRecycle(true); executor.submit(new RemoveQueueReturner(e.getData(), e.getMsgId(), RemoveQueueReturner.TYPE_FALSE, cmd)); } } } } } } } }复制代码
- 延时队列遍历解决方案
将所有发送后消息放入延时队列中,通过获取延时队列的对头来处理超时数据,大致实现如下:
//已发送等待服务器响应队列 private DelayQueuequeue = new DelayQueue<>(); /** * 等待回调的扫描队列 */ private class WaitCallBackRunner implements Runnable { @Override public void run() { synchronized (mLock) { for (; ; ) { try { if (queue == null || queue.size() == 0) { mLock.wait(); } else { final MessageWrap e = queue.poll(); if (e != null) { if (e.getCallBack() != null && !e.isCanRecycle()) { final byte cmd = JsonConverter.getByteFromJsonObject(e.getData(), ImConfig.JSON_TAG_CMD); e.setCanRecycle(true); mainHandler.post(new Runnable() { @Override public void run() { e.getCallBack().onFailureSend(e.getSentTime(), e.getData(), e.getMsgId(), cmd); pool.release(e); } }); }else { pool.release(e); } } } } catch (Exception e) { if (e instanceof InterruptedException) { mLock.notifyAll(); } e.printStackTrace(); } } } } }复制代码
埋炸弹-拆炸弹
方式实现
回想下系统对于anr的检测,先在事件源埋下一颗定时炸弹,如果爆炸期内处理了炸弹,将移除炸弹爆炸事件,否则炸弹被引爆,思路可如下实现:
class C4 implements Runnable { @Override public void run() { //超时处理 } } /** * 生成一个炸弹,埋入当前业务 */ private void produceBomb() { Log.i(TAG, "produceBomb"); if (handler == null) { handler = new Handler(getMainLooper()); } if (c4 == null) { c4 = new C4(); } handler.postDelayed(c4, BOMB_DEAD_TIME); } /** * 拆除该死的C4 */ private void disposalBomb() { if (handler != null) { handler.removeCallbacks(c4); } }复制代码
下期内容:
- 断线重连后,数据的自动重发功能设计