[TOC]

文章参考:http://gityuan.com/2015/12/27/handler-message-native/

概述

前面几章我们讲解了Java层的消息处理机制,其中MessageQueue类里面涉及到多个native方法,除了MessageQueue的native方法,native层本身也有一套完整的消息机制,用于处理native的消息,如下图Native层的消息机制。

在整个消息机制中,而MessageQueue是连接Java层和Native层的纽带,换言之,Java层可以向MessageQueue消息队列中添加消息,Native层也可以向MessageQueue消息队列中添加消息,接下来来看看MessageQueue。

我们再来简单回顾一下MessageQueue的对象。

MessageQueue

在MessageQueue中的native方法如下:

1
2
3
4
5
6
7
// 代码位于:/frameworks/base/core/java/android/os/MessageQueue.java
private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native void nativePollOnce(long ptr, int timeoutMillis);
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);

类图

image-20210725164104352

nativeInit方法

初始化过程的调用链如下:

image-20210725164838149

我们来看Native层的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 代码位于:/frameworks/base/core/java/android/os/MessageQueue.java
/**
* MessageQueue的消息队列管理的初始化。
*
* @param quitAllowed 是否允许退出 之前咱们在学习Looper对象的时候。
* 我们之后:Looper对象在实例化的时候会创建他绑定的MessageQueue对象
* 同时主线程Looper对象在实例化的时候quitAllowed传入的是false
* 用户自定义的Looper.prepare()的quitAllowed传入是true。允许退出
*/
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
// MessageQueue在实例化的时候调用nativeInit初始化底层
// 返回值是一个MessageQueue指针引用。其实底层也会对应初始化一个MessageQueue
mPtr = nativeInit();
}

我们来看一下nativeInit()是怎么调用到底层的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 // 代码位于:/frameworks/base/core/jni/android_os_MessageQueue.cpp
/**
* Jni方法的的静态方法注册
**/
static const JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V",
(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};

我们通过jni的静态注册是调用的android_os_MessageQueue_nativeInit方法。

下面我们看一下这个方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 代码位于:/frameworks/base/core/jni/android_os_MessageQueue.cpp
/**
* 上层的MessageQueue在实例化的时候,会调用nativeInit.
**/
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 我们看到底层在调用到android_os_MessageQueue_nativeInit方法的时候会实例化NativeMessageQueue指针对象
// 来和上层的MessageQueue的对应起来。
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
// 异常判断
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
// 调用NativeMessageQueue的incStrong强引用增加引用计数
nativeMessageQueue->incStrong(env);
// reinterpret_cast是C++里的强制类型转换符
return reinterpret_cast<jlong>(nativeMessageQueue);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 代码位于:/frameworks/base/core/libutils/RefBase.cpp
void RefBase::incStrong(const void* id) const
{
weakref_impl* const refs = mRefs;
// 增加强引用
refs->incWeak(id);

refs->addStrongRef(id);
const int32_t c = refs->mStrong.fetch_add(1, std::memory_order_relaxed);
ALOG_ASSERT(c > 0, "incStrong() called on %p after last strong ref", refs);
#if PRINT_REFS
ALOGD("incStrong of %p from %p: cnt=%d\n", this, id, c);
#endif
if (c != INITIAL_STRONG_VALUE) {
return;
}

int32_t old __unused = refs->mStrong.fetch_sub(INITIAL_STRONG_VALUE, std::memory_order_relaxed);
// A decStrong() must still happen after us.
ALOG_ASSERT(old > INITIAL_STRONG_VALUE, "0x%x too small", old);
refs->mBase->onFirstRef();
}

我们可以看到Native层的android_os_MessageQueue_nativeInit方法的时候会实例化NativeMessageQueue指针对象来和上层的MessageQueue的对应起来。然后,将NativeMessageQueue的强引用转化为jlong返回给上层。

NativeMessageQueue实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 代码位于:代码位于:/frameworks/base/core/jni/android_os_MessageQueue.cpp
*/
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 创建一个Looper对象。通过调用getForThread来获取当前线程的Looper对象
// 这里需要注意。这里面的线程其实就是Java层的MessageQueue构造函数调用的时候所在的线程,
// 也就是Looper#prepare()创建对应线程的Looper的时候所在的线程
// 同样,在Native层同一线程里面也会创建对应的Looper对象

// Looper::getForThread()功能类比于Java层的Looper.myLooper()
mLooper = Looper::getForThread();
// 如果当前线程没有Looper对象。我们就是实例化一个Looper
// 放入当前线程中。这样也保证了底层每个线程中只有一个Looper对象
if (mLooper == NULL) {
mLooper = new Looper(false);
// Looper::setForThread(mLooper),功能类比于Java层的ThreadLocal.set();
Looper::setForThread(mLooper);
}
}

  • Looper::getForThread(),功能类比于Java层的Looper.myLooper();
  • Looper::setForThread(mLooper),功能类比于Java层的ThreadLocal.set();

此处Native层的Looper与Java层的Looper没有任何的关系,只是在Native层重实现了一套类似功能的逻辑。

Native层Looper实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 代码位于:system/core/libutils/Looper.cpp
*/
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 构造唤醒事件的fd句柄
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
// 重建Epoll事件。 这个方法比较重要。关系到我们使用Linux的Epoll机制
rebuildEpollLocked();
}

我们看到Native层Looper的实例化主要就是一些对象的初始化。

下面我们看一下,Native层的Looper对象是怎么重建Epoll事件的。

Native层Looper::rebuildEpollLocked方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 代码位于:system/core/libutils/Looper.cpp
*/
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
// 关闭旧的epoll实例
mEpollFd.reset();
}

// Allocate the new epoll instance and register the WakeEventFd.
//创建并分配新的Epoll实例,并注册wake管道
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
// 创建新的Epoll事件
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
// 将唤醒事件的fd句柄传递给Epoll实例
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));

for (const auto& [seq, request] : mRequests) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
// 将Looper对象中的mWakeEventFd添加到epoll监控之后,同时将mRequests也添加到epoll的监控范围内。
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}

这段代码主要就是将Looper对象中的mWakeEventFd添加到epoll监控,以及mRequests也添加到epoll的监控范围内。

nativePollOnce方法

nativePollOnce用于提取消息队列中的消息,提取消息的调用链。提取的调用一般在上层的MessageQueue#next()方法的时候进行调用。

image-20210725175229712

如果所有的消息,都遍历完,没有找到异步消息的话,我们就会nextPollTimeoutMillis设置为-1 及永久阻塞住next的方法,或者是第一个可处理消息的延迟时间。知道我们有新的消息插入的时候,我们调用native层的方法nativeWake(mPtr)进行唤醒。然后才会继续执行。或者就是第一个可处理消息的时间到了之后就是自动唤醒。

MessageQueue.next()方法解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Message next() {
final long ptr = mPtr;
if (ptr == 0) {
return null;
}

for (;;) {
...
// 调用native层进行消息标示,
// nextPollTimeoutMillis 为0立即返回,
// nextPollTimeoutMillis 为-1则阻塞等待。
// nextPollTimeoutMillis 为正整数则阻塞等待对应时长之后返回。
// 就是这行代码进行消息阻塞的。这当代码会调用Native层的Looper::pollOnce()的方法.
// 如果是-1.造成阻塞等待。则会释放出CPU。
nativePollOnce(ptr, nextPollTimeoutMillis);
...
}

android_os_MessageQueue_nativePollOnce方法

我们看一下JNI层的nativePollOnce的方法具体实现:

1
2
3
4
5
6
7
8
9
10
/**
* 代码位于:/frameworks/base/core/jni/android_os_MessageQueue.cpp
*/
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
// 将Java层传递下来的NativeMessageQueue的指针地址mPtr转换为nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 然后执行NativeMessageQueue的pullOnce方法
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

NativeMessageQueue::pollOnce方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 代码位于:/frameworks/base/core/jni/android_os_MessageQueue.cpp
*/
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj; // 其实就是Java层的MessageQueue对象
// 调用当前NativeMessageQueue对象持有的Looper对象的pollOnce的方法。并将唤醒的时间传入
// 从之前的学习我们也知道:-1会一直阻塞,0:会立即唤醒
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

我们看到最终这个Native层的方法是调用到了Native层的Looper对象的pollOnce方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
    /**
* 代码位于:system/core/libutils/include/utils/Looper.h
*/
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}


/**
* 代码位于:system/core/libutils/Looper.cpp
* timeoutMillis:超时时长
* outFd:发生事件的文件描述符
* outEvents:当前outFd上发生的事件,包含以下4类事件
* EVENT_INPUT 可读
* EVENT_OUTPUT 可写
* EVENT_ERROR 错误
* EVENT_HANGUP 中断
* outData:上下文数据
*/
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
// 可以看到Native层的Looper的pullOnce这个地方也是一个死循环
for (;;) {
// 先处理没有Callback方法的 Response事件
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
// ident大于0,则表示没有callback, 因为POLL_CALLBACK = -2,
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}

if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// 再处理内部轮询
result = pollInner(timeoutMillis);
}
}

Looper::pollInner(timeoutMillis)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// 代码位于:/frameworks/base/core/libutils/Looper.cpp
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}

// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

// We are about to idle.
// 即将处于idle状态
mPolling = true;
// fd最大个数为16
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回;
//或者等待timeoutMillis之后。该方法就会返回
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

// No longer idling.
//不再处于idle状态
mPolling = false;

// Acquire lock.
//请求锁
mLock.lock();

// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
// epoll重建,直接跳转Done;
rebuildEpollLocked();
goto Done;
}

// Check for poll error.
if (eventCount < 0) {
// epoll事件个数小于0,发生错误,直接跳转Done;
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}

// Check for poll timeout.
//epoll事件个数等于0,发生超时,直接跳转Done;
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}

// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
//循环遍历,处理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
//已经唤醒了,则读取并清空管道数据
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
//处理request,生成对应的reponse对象,push到响应数组
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;

// Invoke pending message callbacks.
// 再处理Native的Message,调用相应回调方法
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
//再处理Native的Message,调用相应回调方法
mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
// 处理消息事件
handler->handleMessage(message);
} // release handler
//请求锁
mLock.lock();
// 发生回调
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

// Release lock.
//释放锁
mLock.unlock();

// Invoke all response callbacks.
// 处理带有Callback()方法的Response事件,执行Reponse相应的回调方法
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
//移除fd
removeFd(fd, response.request.seq);
}

// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
//清除reponse引用的回调方法
response.request.callback.clear();
// 发生回调
result = POLL_CALLBACK;
}
}
return result;
}

pollOnce返回值说明:

  • POLL_WAKE: 表示由wake()触发,即pipe写端的write事件触发;
  • POLL_CALLBACK: 表示某个被监听fd被触发。
  • POLL_TIMEOUT: 表示等待超时;
  • POLL_ERROR:表示等待期间发生错误;
1
2
3
4
5
6
// 代码位于:/frameworks/base/core/libutils/Looper.cpp
void Looper::awoken() {
uint64_t counter;
//不断读取管道数据,目的就是为了清空管道内容
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

pollInner()方法的处理流程:

  1. 先调用epoll_wait(),这是阻塞方法,用于等待事件发生或者超时;
  2. 对于epoll_wait()返回,当且仅当以下3种情况出现:
    • POLL_ERROR,发生错误,直接跳转到Done;
    • POLL_TIMEOUT,发生超时,直接跳转到Done;
    • 检测到管道有事件发生,则再根据情况做相应处理:
      • 如果是管道读端产生事件,则直接读取管道的数据;
      • 如果是其他事件,则处理request,生成对应的reponse对象,push到reponse数组;
  3. 进入Done标记位的代码段:
    • 先处理Native的Message,调用Native 的Handler来处理该Message;
    • 再处理Response数组,POLL_CALLBACK类型的事件;

从上面的流程,可以发现对于Request先收集,一并放入reponse数组,而不是马上执行。真正在Done开始执行的时候,是先处理native Message,再处理Request,说明native Message的优先级高于Request请求的优先级。

另外pollOnce()方法中,先处理Response数组中不带Callback的事件,再调用了pollInner()方法。

nativeWake()方法解析

nativeWake用于唤醒功能,在添加消息到消息队列enqueueMessage(), 或者把消息从消息队列中全部移除quit(),再有需要时都会调用 nativeWake方法。包含唤醒过程的添加消息的调用链,如下:

image-20210725181922243
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 代码位于:frameworks/base/core/java/android/os/MessageQueue.java
* 调用native层的nativeWake唤醒方法。传入的参数是Native层的NativeMessageQueue的指针地址
* 共有三处调用:
* 1. Lopper退出的时候,主线程一般不会调用、这个主要是为了退出之后。通知Native层不再进行阻塞。
* 2. 移除屏障消息的时候。这个主要是考虑,业务层不会调用。主要是页面在进行刷新的完毕之后,移除屏障消息。
* 通知屏障消息后面的消息,可以正常执行。同时唤醒Native的阻塞。
* 3. 插入消息队里的时候,需要判断是否唤醒。防止之前消息队列没有消息。nativePollOnce(-1) 传入-1长时间阻塞没有唤醒的问题。
* 这次调用是非常重要的需要我们关注
* @param ptr
*/
private native static void nativeWake(long ptr);

android_os_MessageQueue_nativeWake()

1
2
3
4
5
6
7
8
9
/**
* 代码位于:/frameworks/base/core/jni/android_os_MessageQueue.cpp
*/
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
// 将Java层传递下来的NativeMessageQueue的指针地址mPtr转换为nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 然后执行NativeMessageQueue的wake方法
nativeMessageQueue->wake();
}
1
2
3
void NativeMessageQueue::wake() {
mLooper->wake();
}

Looper::wake()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 代码位于:system/core/libutils/Looper.cpp
*/
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
// 向管道mWakeEventFd写入字符1
uint64_t inc = 1;
// TEMP_FAILURE_RETRY 是一个宏定义, 当执行write失败后,会不断重复执行,直到执行成功为止。
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
// 异常判断:写入失败,则进行异常打印
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}

nativeDestroy()方法解析

image-20210725182921658

上层Java层的MessageQueue在调用dispose()方法的时候回调Jni层的nativeDestroy

1
2
3
4
5
6
7
8
// Disposes of the underlying message queue.
// Must only be called on the looper thread or the finalizer.
private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr);
mPtr = 0;
}
}

android_os_MessageQueue_nativeDestroy()

1
2
3
4
5
6
7
// 代码位于:/frameworks/base/core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
// 将Java层传递下来的mPtr转换为nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
// 然后执行NativeMessageQueue的decStrong方法。来减去强引用计数
nativeMessageQueue->decStrong(env);
}

nativeMessageQueue继承自RefBase类,所以decStrong最终调用的是RefBase.decStrong().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 代码位于:/frameworks/base/core/libutils/RefBase.cpp
void RefBase::decStrong(const void* id) const
{
weakref_impl* const refs = mRefs;
//移除强引用
refs->removeStrongRef(id);
const int32_t c = refs->mStrong.fetch_sub(1, std::memory_order_release);
#if PRINT_REFS
ALOGD("decStrong of %p from %p: cnt=%d\n", this, id, c);
#endif
LOG_ALWAYS_FATAL_IF(BAD_STRONG(c), "decStrong() called on %p too many times",
refs);
if (c == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
refs->mBase->onLastStrongRef(id);
int32_t flags = refs->mFlags.load(std::memory_order_relaxed);
if ((flags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) {
delete this;
// The destructor does not delete refs in this case.
}
}
// Note that even with only strong reference operations, the thread
// deallocating this may not be the same as the thread deallocating refs.
// That's OK: all accesses to this happen before its deletion here,
// and all accesses to refs happen before its deletion in the final decWeak.
// The destructor can safely access mRefs because either it's deleting
// mRefs itself, or it's running entirely before the final mWeak decrement.
//
// Since we're doing atomic loads of `flags`, the static analyzer assumes
// they can change between `delete this;` and `refs->decWeak(id);`. This is
// not the case. The analyzer may become more okay with this patten when
// https://bugs.llvm.org/show_bug.cgi?id=34365 gets resolved. NOLINTNEXTLINE
// 移除弱引用
refs->decWeak(id);
}

归纳总结

本节介绍MessageQueue的native()方法,经过层层调用:

  • nativeInit()方法:

    创建了NativeMessageQueue对象,增加其引用计数,并将NativeMessageQueue指针mPtr保存在Java层的MessageQueue

    创建了Native Looper对象

    调用epoll的epoll_create()/epoll_ctl()来完成对mWakeEventFd和mRequests的可读事件监听

  • nativePollOnce()方法

    调用Looper::pollOnce()来完成,空闲时停留在epoll_wait()方法,用于等待事件发生或者超时

  • nativeWake()方法

    调用Looper::wake()来完成,向管道mWakeEventfd写入字符;

  • nativeDestroy()方法

    调用RefBase::decStrong()来减少对象的引用计数

    当引用计数为0时,则删除NativeMessageQueue对象

image-20210725183957675