Native Looper使用和原理
3100 Words|Read in about 15 Min|本文总阅读量次
Android中大量的用到了消息机制,而最终消息机制都离不开native Looper。
0简介
实际上,除了本身Handler的消息机制外,很多进程也会用到这些消息机制,比如SurfaceFlinger进程。所以说Native的Looper机制就显得更加重要。下面以例子分析Looper原理。
1举例说明
选择源码编译或者NDK编译皆可,这里笔者选择源码编译,编译路径为/external/
1//external/native_looper_test/main.h
2#ifndef NATIVE_LOOPER_TEST_MAIN_H
3#define NATIVE_LOOPER_TEST_MAIN_H
4#include <utils/Looper.h>
5#include <utils/Timers.h>
6#include <utils/Log.h>
7#include <unistd.h>
8#include <time.h>
9#include <utils/threads.h>
10
11//使用Android智能指针或者日志需要使用这个命名空间
12using namespace android;
13using namespace std;
14
15class MyMessageHandler : public MessageHandler {
16public:
17 Vector<Message> messages;
18
19 virtual void handleMessage(const Message& message);
20 virtual ~MyMessageHandler(){}
21};
22
23struct MyLooperThread : public Thread {
24public:
25 MyLooperThread(Looper *looper)
26 : mLooper(looper) {
27 }
28
29 virtual bool threadLoop();
30
31protected:
32 virtual ~MyLooperThread() {}
33
34private:
35 Looper *mLooper;
36};
37
38class MyPipe {
39public:
40 int sendFd;
41 int receiveFd;
42
43 MyPipe();
44 ~MyPipe();
45 status_t writeSignal();
46 status_t readSignal();
47};
48
49#endif
定义完成了头文件,还有源文件
1//external/native_looper_test/main.cpp
2#ifdef LOG_TAG
3#undef LOG_TAG
4#define LOG_TAG "native_looper_test"
5#endif
6
7#include "main.h"
8
9void MyMessageHandler::handleMessage(const Message& message) {
10 ALOGD("[Thread=%d] %s message.what=%d \n", gettid(), __func__, message.what);
11 messages.push(message);
12}
13
14//使用Android源码中的Thread,run就会调用threadLoop,如果返回true不断循环调用threadLoop,直到返回false,不再调用
15bool MyLooperThread::threadLoop() {
16 if(mLooper == NULL)
17 return false;
18 //调用pollOnce里面也会循环处理,底层使用了epoll机制,-1的时候表示一直等下去,直到有事件返回
19 int32_t ret = mLooper->pollOnce(-1);
20 switch (ret) {
21 case Looper::POLL_WAKE:
22 case Looper::POLL_CALLBACK:
23 return true;
24 case Looper::POLL_ERROR:
25 ALOGE("Looper::POLL_ERROR");
26 return true;
27 case Looper::POLL_TIMEOUT:
28 // timeout (should not happen)
29 return true;
30 default:
31 // should not happen
32 ALOGE("Looper::pollOnce() returned unknown status %d", ret);
33 return true;
34 }
35}
36
37MyPipe::MyPipe() {
38 int fds[2];
39 //这里调用真正的底层linux管道初始化
40 ::pipe(fds);
41
42 receiveFd = fds[0];
43 sendFd = fds[1];
44}
45
46MyPipe::~MyPipe() {
47 if (sendFd != -1) {
48 ::close(sendFd);
49 }
50
51 if (receiveFd != -1) {
52 ::close(receiveFd);
53 }
54}
55
56status_t MyPipe::writeSignal() {
57 ssize_t nWritten = ::write(sendFd, "1", 1);
58 return nWritten == 1 ? 0 : -errno;
59}
60
61status_t MyPipe::readSignal() {
62 char buf[1];
63 ssize_t nRead = ::read(receiveFd, buf, 1);
64 return nRead == 1 ? 0 : nRead == 0 ? -EPIPE : -errno;
65}
66
67//回调类
68class MyCallbackHandler {
69public:
70 MyCallbackHandler() : callbackCount(0) {}
71 void setCallback(const sp<Looper>& looper, int fd, int events) {
72 //往native looper注册fd,回调为staticHandler,参数为MyCallbackHandler.this
73 looper->addFd(fd, 0, events, staticHandler, this);
74 }
75
76protected:
77 int handler(int fd, int events) {
78 callbackCount++;
79 ALOGD("[Thread=%d] %s fd=%d, events=%d, callbackCount=%d\n", gettid(), __func__, fd, events, callbackCount);
80 return 0;
81 }
82
83private:
84 static int staticHandler(int fd, int events, void* data) {
85 return static_cast<MyCallbackHandler*>(data)->handler(fd, events);
86 }
87 int callbackCount;
88};
89
90int main(int argc, char ** argv)
91{
92 //测试消息机制
93 // Looper的轮询处理工作在新线程中
94 sp<Looper> mLooper = new Looper(true);
95 sp<MyLooperThread> mLooperThread = new MyLooperThread(mLooper.get());
96 mLooperThread->run("MyLooperThread");
97
98 // 测试消息的发送与处理
99 sp<MyMessageHandler> handler = new MyMessageHandler();
100 ALOGD("[Thread=%d] sendMessage message.what=%d \n", gettid(), 1);
101 mLooper->sendMessage(handler, Message(1));
102 ALOGD("[Thread=%d] sendMessage message.what=%d \n", gettid(), 2);
103 mLooper->sendMessage(handler, Message(2));
104 sleep(1);
105
106 // 测试监测fd与回调callback
107 MyPipe pipe;
108 MyCallbackHandler mCallbackHandler;
109 mCallbackHandler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
110 ALOGD("[Thread=%d] writeSignal 1\n", gettid());
111 pipe.writeSignal(); // would cause FD to be considered signalled
112 sleep(1);
113 mCallbackHandler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
114 ALOGD("[Thread=%d] writeSignal 2\n", gettid());
115 pipe.writeSignal();
116
117 sleep(1);
118 mLooperThread->requestExit();
119 mLooper.clear();
120}
这里出现了继承Thread,这个是system源码中的线程,系统中很多都会用到这个线程使用。
源码所示中并没有实现run方法,说明使用的是Thread父类的run方法
可以看到父类确实有run方法
1//system/core/libutils/include/utils/Thread.h 2class Thread : virtual public RefBase 3{ 4public: 5 //这里默认传入的参数是true 6 explicit Thread(bool canCallJava = true); 7 virtual ~Thread(); 8 virtual status_t run( const char* name, 9 int32_t priority = PRIORITY_DEFAULT, 10 size_t stack = 0); 11 virtual void requestExit(); 12 virtual status_t readyToRun(); 13 status_t requestExitAndWait(); 14 status_t join(); 15 bool isRunning() const; 16 pid_t getTid() const; 17 18protected: 19 bool exitPending() const; 20 21private: 22 // 1)循环:如果threadLoop()返回true,如果requestExit()没有被调用,它将被再次调用。 23 // 2)一次:如果threadLoop()返回false,线程返回后退出。 24 virtual bool threadLoop() = 0; 25 26private: 27 Thread& operator=(const Thread&); 28 static int _threadLoop(void* user); 29 const bool mCanCallJava; 30 thread_id_t mThread; 31 mutable Mutex mLock; 32 Condition mThreadExitedCondition; 33 status_t mStatus; 34 volatile bool mExitPending; 35 volatile bool mRunning; 36 sp<Thread> mHoldSelf; 37 pid_t mTid; 38 39};
真实调用run方法
1//system/core/libutils/Threads.cpp 2status_t Thread::run(const char* name, int32_t priority, size_t stack) 3{ 4 LOG_ALWAYS_FATAL_IF(name == nullptr, "thread name not provided to Thread::run"); 5 6 Mutex::Autolock _l(mLock); 7 ... 8 bool res; 9 if (mCanCallJava) { 10 //这里传入的是_threadLoop为函数指针 11 //this=MyLooperThread 12 //name="MyLooperThread" 13 //priority=PRIORITY_DEFAULT=0 14 //stack=0 15 //mThread=thread_id_t(-1) 16 res = createThreadEtc(_threadLoop, 17 this, name, priority, stack, &mThread); 18 } else { 19 res = androidCreateRawThreadEtc(_threadLoop, 20 this, name, priority, stack, &mThread); 21 } 22 ... 23 return OK; 24}
传入的参数mCanCallJava为true
1//system/core/libutils/include/utils/AndroidThreads.h 2inline bool createThreadEtc(thread_func_t entryFunction, 3 void *userData, 4 const char* threadName = "android:unnamed_thread", 5 int32_t threadPriority = PRIORITY_DEFAULT, 6 size_t threadStackSize = 0, 7 thread_id_t *threadId = nullptr) 8{ 9 //这里通过返回直来判断创建线程是否成功 10 return androidCreateThreadEtc(entryFunction, userData, threadName, 11 threadPriority, threadStackSize, threadId) ? true : false; 12}
调用到androidCreateThreadEtc
1//system/core/libutils/Threads.cpp 2static android_create_thread_fn gCreateThreadFn = androidCreateRawThreadEtc; 3 4int androidCreateThreadEtc(android_thread_func_t entryFunction, 5 void *userData, 6 const char* threadName, 7 int32_t threadPriority, 8 size_t threadStackSize, 9 android_thread_id_t *threadId) 10{ 11 return gCreateThreadFn(entryFunction, userData, threadName, 12 threadPriority, threadStackSize, threadId); 13}
调用到androidCreateRawThreadEtc
1//system/core/libutils/Threads.cpp 2int androidCreateRawThreadEtc(android_thread_func_t entryFunction, 3 void *userData, 4 const char* threadName __android_unused, 5 int32_t threadPriority, 6 size_t threadStackSize, 7 android_thread_id_t *threadId) 8{ 9 //设置线程分离 10 pthread_attr_t attr; 11 pthread_attr_init(&attr); 12 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 13 14 errno = 0; 15 pthread_t thread; 16 //真是创建线程,linux方法 17 int result = pthread_create(&thread, &attr, 18 (android_pthread_entry)entryFunction, userData); 19 pthread_attr_destroy(&attr); 20 if (result != 0) { 21 ALOGE("androidCreateRawThreadEtc failed (entry=%p, res=%d, %s)\n" 22 "(android threadPriority=%d)", 23 entryFunction, result, strerror(errno), threadPriority); 24 return 0; 25 } 26 27 if (threadId != nullptr) { 28 *threadId = (android_thread_id_t)thread; // XXX: this is not portable 29 } 30 return 1; 31}
最终线程开启,分离线程,会走到entryFunction对应的函数指针_threadLoop,传入的参数为userData,就是传入的MyLooperThread指针
1//system/core/libutils/Threads.cpp 2int Thread::_threadLoop(void* user) 3{ 4 //这里的self实际上指的是MyLooperThread 5 Thread* const self = static_cast<Thread*>(user); 6 7 sp<Thread> strong(self->mHoldSelf); 8 wp<Thread> weak(strong); 9 self->mHoldSelf.clear() 10 11 bool first = true; 12 13 do { 14 bool result; 15 if (first) { 16 first = false; 17 self->mStatus = self->readyToRun(); 18 result = (self->mStatus == OK); 19 20 if (result && !self->exitPending()) { 21 result = self->threadLoop(); 22 } 23 } else { 24 result = self->threadLoop(); 25 } 26 ... 27 } while(strong != nullptr); 28 29 return 0; 30}
- _threadLoop()这个方法就是Thread的最大秘密,它是一个while循环。创建线程时,会sp和wp一次线程本身
- 如果是第一次执行会运行线程的readyToRun()方法,再执行threadLoop(),否则,直接运行threadLoop()
- threadLoop()方法有返回值,如果threadLoop()返回false的时候,线程会做清理工作,然后退出while循环,结束运行
因此线程Thread中的threadLoop()能够循环处理数据就到此做了说明。Thread被创 建,Thread中的run被调用,__threadLoop()被调用,readyToRun()被调用,然后循环调用threadLoop()。并且 在threadLoop()返回false时,可以退出循环
总结:threadLoop()方法有返回值,如果threadLoop()返回false的时候,线程会做清理工作,然后退出while循环,结束运行。
最终运行起来的部分日志
106-23 17:00:44.250 1548 1548 D native_looper_test: [Thread=1548] sendMessage message.what=1
206-23 17:00:44.250 1548 1548 D native_looper_test: [Thread=1548] sendMessage message.what=2
306-23 17:00:44.250 1548 1548 D native_looper_test: [Thread=1548] handleMessage message.what=1
406-23 17:00:44.250 1548 1548 D native_looper_test: [Thread=1548] handleMessage message.what=2
506-23 17:00:45.250 1548 1548 D native_looper_test: [Thread=1548] writeSignal 1
606-23 17:00:45.250 1548 1548 D native_looper_test: [Thread=1548] handler fd=6, events=1, callbackCount=1
706-23 17:00:46.250 1548 1548 D native_looper_test: [Thread=1548] writeSignal 2
806-23 17:00:46.250 1548 1548 D native_looper_test: [Thread=1548] handler fd=6, events=2, callbackCount=1
2源码解析
2.1关于Looper的数据结构
直接查看下面的图
名称 | 含义 |
---|---|
Message |
消息的载体,代表了一个事件,通过一个what 字段来标记是什么事件 |
MessageHandler/WeakMessageHandler |
消息处理的接口(基类), 子类通过实现handleMessage 来实现特定Message 的处理逻辑。WeakMessageHandler 包含了一个MessageHandler 的弱指针 |
LooperCallback/SimpleLooperCallback |
用于Looper 回调,实际上就是保存一个Looper_callbackFunc 指针的包装基类。在Looper::addFd() 方法添加监测的fd 时来设置回调 |
2.2创建Looper
2.2.1直接构造
1//system/core/libutils/Looper.cpp
2Looper::Looper(bool allowNonCallbacks)
3 //参数allowNonCallbacks表明是否可以在Looper_addFd时不提供callback
4 : mAllowNonCallbacks(allowNonCallbacks),
5 mSendingMessage(false),
6 mPolling(false),
7 mEpollRebuildRequired(false),
8 mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
9 mResponseIndex(0),
10 mNextMessageUptime(LLONG_MAX) {
11 mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
12 LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
13
14 AutoMutex _l(mLock);
15 rebuildEpollLocked();
16}
Looper的构造函数里主要做两件事情
- 调用eventfd(0, EFD_NONBLOCK)返回mWakeEventFd,用于唤醒epoll_wait()
- 调用rebuildEpollLocked() 创建epoll 文件描述符,并将mWakeEventFd加入到epoll监听队列中
这里出现了eventfd,简单介绍一下这个函数
1#include <sys/eventfd.h> 2int eventfd(unsigned int initval, int flags);
initval
eventfd()创建了一个“eventfd对象”,它可以被用户空间应用程序用作事件等待/通知机制,也可以被内核用于将事件通知用户空间应用程序。该对象包含一个由内核维护的无符号64位整数(uint64_t)计数器。该计数器使用参数initval中指定的值初始化
eventfd 是一个计数相关的fd。计数不为零是有可读事件发生,
read
之后计数会清零,write
则会递增计数器,因此通常初始化为0。flags
EFD_CLOEXEC 在新的文件描述符上设置关闭执行(FD_CLOEXEC)标志。
EFD_NONBLOCK 在新打开的文件描述上设置O_NONBLOCK文件状态标志。使用这个标志可以节省对fcntl(2)的额外调用来达到相同的结果。
EFD_SEMAPHORE 为从新的文件描述符读取提供类似信号量的语义。
返回值
作为它的返回值,eventfd()返回一个新的文件描述符,可以用来引用eventfd对象。
1//system/core/libutils/Looper.cpp
2constexpr uint64_t WAKE_EVENT_FD_SEQ = 1;
3
4void Looper::rebuildEpollLocked() {
5 // Close old epoll instance if we have one.
6 if (mEpollFd >= 0) {
7 mEpollFd.reset();
8 }
9
10 // 分配新的epoll实例并注册WakeEventFd
11 mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
12 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
13 //添加了第一个fd到mEpollFd中
14 epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
15 int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
16 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
17 strerror(errno));
18 //std::unordered_map<SequenceNumber, Request> mRequests;
19 for (const auto& [seq, request] : mRequests) {
20 epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
21
22 int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
23 if (epollResult < 0) {
24 ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
25 request.fd, strerror(errno));
26 }
27 }
28}
29
30epoll_event createEpollEvent(uint32_t events, uint64_t seq) {
31 return {.events = events, .data = {.u64 = seq}};
32}
这里涉及到IO多路复用的epoll机制
关于epoll机制
关于具体epoll,请参考https://blog.csdn.net/bandaoyu/article/details/89531493
epoll高效的核心是:
1、用户态和内核太共享内存mmap
2、数据到来采用事件通知机制(而不需要轮询)
epoll的接口
epoll的接口非常简单,一共就三个函数:
epoll_create系统调用
1int epoll_create(int size);
epoll_create返回一个句柄,之后 epoll的使用都将依靠这个句柄来标识。参数 size是告诉 epoll所要处理的大致事件数目。不再使用 epoll时,必须调用 close关闭这个句柄。
注意:size参数只是告诉内核这个 epoll对象会处理的事件大致数目,而不是能够处理的事件的最大个数。在 Linux最新的一些内核版本的实现中,这个 size参数没有任何意义。
epoll_ctl系统调用
1int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
epoll_ctl向 epoll对象中添加、修改或者删除感兴趣的事件,返回0表示成功,否则返回–1,此时需要根据errno错误码判断错误类型。epoll_wait方法返回的事件必然是通过 epoll_ctl添加到 epoll中的。
名称 解释 epfd
epoll_create
返回的句柄op
EPOLL_CTL_ADD
:注册新的fd
到epfd
中EPOLL_CTL_MOD
:修改已经注册的fd
的监听事件EPOLL_CTL_DEL
:从epfd
中删除一个fd
fd
需要监听的 socket
句柄fd
event
告诉内核需要监听什么事的结构体 struct epoll_event结构如下所示
1epoll_data_t; 2 3struct epoll_event { 4 __uint32_t events; /* Epoll events */ 5 epoll_data_t data; /* User data variable */ 6};
events表示时间,具体时间再定义的列表中选取
名称 解释 EPOLLIN
表示对应的文件描述符可以读(包括对端SOCKET正常关闭) EPOLLOUT
表示对应的文件描述符可以写 EPOLLPRI
表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来 EPOLLERR
表示对应的文件描述符发生错误 EPOLLHUP
表示对应的文件描述符被挂断 EPOLLONESHOT
只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket
的话,需要再次把这个socket
加入到EPOLL
队列里EPOLLET
将 EPOLL
设为边缘触发(Edge Triggered)
模式,这是相对于水平触发(Level Triggered)
来说的data成员是一个epoll_data联合,其定义如下
1typedef union epoll_data { 2 void *ptr; 3 int fd; 4 uint32_t u32; 5 uint64_t u64; 6} epoll_data_t;
int epoll_wait系统调用
1int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);
收集在 epoll监控的事件中已经发生的事件,如果 epoll中没有任何一个事件发生,则最多等待timeout毫秒后返回。epoll_wait的返回值表示当前发生的事件个数,如果返回0,则表示本次调用中没有事件发生,如果返回–1,则表示出现错误,需要检查 errno错误码判断错误类型。
解释 名称 epoll
的描述符epfd
分配好的 epoll_event
结构体数组,epoll
将会把发生的事件复制到events
数组中(events
不可以是空指针,内核只负责把数据复制到这个events
数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)events
表示本次可以返回的最大事件数目,通常 maxevents
参数与预分配的events
数组的大小是相等的maxevents
表示在没有检测到事件发生时最多等待的时间(单位为毫秒)。一般如果网络主循环是单独的线程的话,可以用-1来等,这样可以保证一些效率,如果是和主逻辑在同一个线程的话,则可以用0来保证主循环的效率0的时候表示马上返回-1的时候表示一直等下去,直到有事件返回为任意正整数的时候表示等这么长的时间,如果一直没有事件,则返回 timeout
2.2.2通过prepare创建
1//system/core/libutils/Looper.cpp
2sp<Looper> Looper::prepare(int opts) {
3 //PREPARE_ALLOW_NON_CALLBACKS = 1
4 bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS;
5 sp<Looper> looper = Looper::getForThread();
6 if (looper == nullptr) {
7 looper = sp<Looper>::make(allowNonCallbacks);
8 Looper::setForThread(looper);
9 }
10 if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
11 ALOGW("Looper already prepared for this thread with a different value for the "
12 "LOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
13 }
14 return looper;
15}
这里的创建跟Java层的Handler类型,也是先判断当前线程是否存在Looper,没有Looper就和当前线程绑定一个Looper
1//system/core/libutils/Looper.cpp
2static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
3static pthread_key_t gTLSKey = 0;
4void Looper::initTLSKey() {
5 int error = pthread_key_create(&gTLSKey, threadDestructor);
6 LOG_ALWAYS_FATAL_IF(error != 0, "Could not allocate TLS key: %s", strerror(error));
7}
8//现成结束的时候,会调用到对应的threadDestructor,并释放其中对应的Looper指针
9void Looper::threadDestructor(void *st) {
10 Looper* const self = static_cast<Looper*>(st);
11 if (self != nullptr) {
12 self->decStrong((void*)threadDestructor);
13 }
14}
15
16sp<Looper> Looper::getForThread() {
17 //这个initTLSKey有且仅会执行一次
18 int result = pthread_once(& gTLSOnce, initTLSKey);
19 LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
20 //从这里可以得知pthread_key_create对应传入的函数指针的void*为Looper*
21 Looper* looper = (Looper*)pthread_getspecific(gTLSKey);
22 return sp<Looper>::fromExisting(looper);
23}
1)关于pthread_once解析,第二个参数的函数指针会被执行有且仅有一次
1int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));
pthread_once的作用为,在给定once_control的情况下,在整个程序中仅调用一次init_routine(在多线程中具体哪一个线程执行不一定)。如果再次调用,pthread_once将不会调用init_routine。
该函数执行成功返回0,执行失败返回错误码。
2)另外这里涉及到线程存储的用法,具体可以点击这里
调用 pthread_key_create() 来创建一个类型为 pthread_key_t 类型的变量。
1int pthread_key_create(pthread_key_t *key, void (*destr_function) (void*));
该函数有两个参数,第一个参数就是上面声明的 pthread_key_t 变量,第二个参数是一个清理函数,用来在线程释放该线程存储的时候被调用。该函数指针可以设成 NULL ,这样系统将调用默认的清理函数。那么在线程执行完毕退出时,已key指向的内容为入参调用destr_function(),释放分配的缓冲区以及其他数据。
key是全局变量,这个全局变量可以认为是线程内部的私有空间。不论哪个线程调用了pthread_key_create,所创建的key都是所有的线程都可以访问,每个线程根据自己的需求往key中set不同的值,这就形成了同名而不同值,即同key不同value,一键多值。
当线程中需要存储特殊值的时候,可以调用 pthread_setspcific() 。该函数有两个参数,第一个为前面声明的 pthread_key_t 变量,第二个为 void* 变量,这样你可以存储任何类型的值。
1int pthread_setspecific(pthread_key_t key, const void *pointer);
该接口将指针pointer的值(指针值而非其指向的内容)与key相关联,用pthread_setspecific为一个键指定新的线程数据时,线程必须释放原有的数据用以回收空间。这个pointer参数会正好对应上面pthread_key_create函数中第二个函数指针参数的void*。
1void * pthread_getspecific(pthread_key_t key);
读函数就是将与key关联的数据val读出来,数据类型为void * ,所有可以指向任何类型的数据。通过上面我们得知,数据存在一个32 * 32的二维数组中,所以访问的时候,也是通过计算key值得到数据的位置再返回其内容的
设置线程
1//system/core/libutils/Looper.cpp
2void Looper::setForThread(const sp<Looper>& looper) {
3 sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
4
5 if (looper != nullptr) {
6 looper->incStrong((void*)threadDestructor);
7 }
8
9 pthread_setspecific(gTLSKey, looper.get());
10
11 if (old != nullptr) {
12 old->decStrong((void*)threadDestructor);
13 }
14}
这个函数的意义,就是以gTLSKey为key,以传入的Looper的sp对象为value进行数据绑定,即完成当前线程和Looper对象的绑定,然后当线程执行完毕退出的时候,会调用对应的threadDestructor函数去释放对应绑定的Looper的sp对象。
2.3发送消息
1//system/core/libutils/Looper.cpp
2//1.直接发消息
3void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
4 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
5 sendMessageAtTime(now, handler, message);
6}
7
8//2.延时发消息
9void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
10 const Message& message) {
11 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
12 sendMessageAtTime(now + uptimeDelay, handler, message);
13}
最终都会调用到sendMessageAtTime
1//system/core/libutils/Looper.cpp
2//这里的Vector不是stl中的vector,这个是Android自定义的数组,并且是排序好的
3Vector<MessageEnvelope> mMessageEnvelopes;
4void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
5 const Message& message) {
6 size_t i = 0;
7 { // acquire lock
8 AutoMutex _l(mLock);
9
10 size_t messageCount = mMessageEnvelopes.size();
11 while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
12 i += 1;
13 }
14 //实际上会把消息根据时间优先级去插入到对应的数组中去
15 MessageEnvelope messageEnvelope(uptime, handler, message);
16 mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
17
18 //如果loop当前正在发送消息,那么我们可以跳过对wake()的调用
19 //对应后面的轮询机制的操作,循环机制已经在处理消息的时候,不需要再次唤醒
20 if (mSendingMessage) {
21 return;
22 }
23 } // release lock
24
25 //只有当我们在头部排队新消息时,才会唤醒投票循环
26 if (i == 0) {
27 wake();
28 }
29}
根据uptime在mMessageEnvelopes遍历,找到合适的位置,并将message 封装成MessageEnvlope,插入找到的位置上。 然后决定是否要唤醒Looper:
- 如果Looper此时正在派发message,则不需要wakeup Looper。因为这一次looper处理完消息之后,会重新估算下一次epoll_wait() 的wakeup时间。
- 如果是插在消息队列的头部,则需要立即wakeup Looper
这里的wake的作用,暂时按下不表,在下面的轮询机制会得到解释
2.4注册监听fd的方法
根据上面demo实际上调用的是int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data);
1//system/core/libutils/Looper.cpp
2enum {
3 POLL_WAKE = -1,
4 POLL_CALLBACK = -2,
5 POLL_TIMEOUT = -3,
6 POLL_ERROR = -4,
7};
8//这个mRequests不做排序,可以认为是k-v的键值对
9std::unordered_map<SequenceNumber, Request> mRequests;
10//这里将自定义的demo中的callback封装成SimpleLooperCallback的callback
11int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
12 sp<SimpleLooperCallback> looperCallback;
13 if (callback) {
14 looperCallback = sp<SimpleLooperCallback>::make(callback);
15 }
16 return addFd(fd, ident, events, looperCallback, data);
17}
18
19int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
20 //可能传入的callback为nullptr
21 if (!callback.get()) {
22 if (! mAllowNonCallbacks) {
23 ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
24 return -1;
25 }
26
27 if (ident < 0) {
28 ALOGE("Invalid attempt to set NULL callback with ident < 0.");
29 return -1;
30 }
31 } else {
32 //从传入的0变成传入的-2
33 ident = POLL_CALLBACK;
34 }
35
36 { // acquire lock
37 AutoMutex _l(mLock);
38 // There is a sequence number reserved for the WakeEventFd.
39 if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
40 const SequenceNumber seq = mNextRequestSeq++;
41
42 Request request;
43 request.fd = fd;
44 request.ident = ident;
45 request.events = events;
46 request.callback = callback;
47 request.data = data;
48 //通过传入的events和序列号来获取fd相关的eventItem
49 epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
50 auto seq_it = mSequenceNumberByFd.find(fd);
51 if (seq_it == mSequenceNumberByFd.end()) {
52 //在缓存中没有找到,就添加一个新的eventItem
53 int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
54 if (epollResult < 0) {
55 ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
56 return -1;
57 }
58 //并且将封装好的request和序列号seq(第一个fd的seq为3)封装成新的mRequests
59 mRequests.emplace(seq, request);
60 mSequenceNumberByFd.emplace(fd, seq);
61 } else {
62 //如果在缓存中找到对应的eventItem,则修改对应的value值
63 int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
64 ...
65 //并且更新对应的seq
66 const SequenceNumber oldSeq = seq_it->second;
67 mRequests.erase(oldSeq);
68 mRequests.emplace(seq, request);
69 seq_it->second = seq;
70 }
71 } // release lock
72 return 1;
73}
2.5轮询机制
看到上面无论是发送消息还是注册监听fd,还没有和我们的demo产生关联,这个时候需要我们的轮询机制来让整个Looper动起来。
demo中启动对应的MyLooperThread现成,根据前文可知threadLoop()返回值为true,说明本身就会一直调用threadLoop。
其中threadLoop中
1//external/native_looper_test/main.cpp
2//使用Android源码中的Thread,run就会调用threadLoop,如果返回true不断循环调用threadLoop,直到返回false,不再调用
3bool MyLooperThread::threadLoop() {
4 if(mLooper == NULL)
5 return false;
6 //调用pollOnce里面也会循环处理,底层使用了epoll机制,-1的时候表示一直等下去,直到有事件返回
7 int32_t ret = mLooper->pollOnce(-1);
8 switch (ret) {
9 case Looper::POLL_WAKE:
10 case Looper::POLL_CALLBACK:
11 return true;
12 case Looper::POLL_ERROR:
13 ALOGE("Looper::POLL_ERROR");
14 return true;
15 case Looper::POLL_TIMEOUT:
16 // timeout (should not happen)
17 return true;
18 default:
19 // should not happen
20 ALOGE("Looper::pollOnce() returned unknown status %d", ret);
21 return true;
22 }
23}
pollOnce(-1)
1//system/core/libutils/Looper.cpp
2enum {
3 POLL_WAKE = -1,
4 POLL_CALLBACK = -2,
5 POLL_TIMEOUT = -3,
6 POLL_ERROR = -4,
7};
8
9int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
10inline int pollOnce(int timeoutMillis) {
11 return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
12}
13//传入的只有timeoutMillis=-1,其余为nullptr
14int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
15 int result = 0;
16 for (;;) {
17 //这里开始判断mResponses,不过这里只看ident大于等于0,而上面有回调的会变成-2就不走这里
18 while (mResponseIndex < mResponses.size()) {
19 const Response& response = mResponses.itemAt(mResponseIndex++);
20 int ident = response.request.ident;
21 if (ident >= 0) {
22 int fd = response.request.fd;
23 int events = response.events;
24 void* data = response.request.data;
25 if (outFd != nullptr) *outFd = fd;
26 if (outEvents != nullptr) *outEvents = events;
27 if (outData != nullptr) *outData = data;
28 return ident;
29 }
30 }
31 //每次的轮询会返回一个值,这个值通常为POLL_WAKE、POLL_CALLBACK、POLL_TIMEOUT、POLL_ERROR
32 if (result != 0) {
33 if (outFd != nullptr) *outFd = 0;
34 if (outEvents != nullptr) *outEvents = 0;
35 if (outData != nullptr) *outData = nullptr;
36 return result;
37 }
38 //开始轮询
39 result = pollInner(timeoutMillis);
40 }
41}
真正开始轮询的地方为pollInner
1//system/core/libutils/Looper.cpp
2static const int EPOLL_MAX_EVENTS = 16;
3Vector<Response> mResponses;//这个是排序的mResponses,根据mResponses中的seq排序
4
5int Looper::pollInner(int timeoutMillis) {
6 //重新调整时间,这里传入的是消息队列等待才会调整,不然消息队列时间没到就会一直等待
7 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
8 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
9 int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
10 if (messageTimeoutMillis >= 0
11 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
12 timeoutMillis = messageTimeoutMillis;
13 }
14 }
15 // Poll.
16 //默认为唤醒,唤醒的含义是消息队列过来,用于从子线程给主线程发送消息的
17 int result = POLL_WAKE;
18 mResponses.clear();
19 mResponseIndex = 0;
20
21 // We are about to idle.
22 mPolling = true;
23 //这里最大为16
24 struct epoll_event eventItems[EPOLL_MAX_EVENTS];
25 //出去唤醒句柄,实际上其他文件相关的fd注册,一次性不能超过15
26 //1.根据之前的epoll机制了解到timeoutMillis=-1,一直阻塞直到有唤醒或者注册的fd发生变化(变化包括读写)
27 //2.只有消息队列时间还没到会调整时间,所以这里timeoutMillis调整过后不会一直阻塞,会循环调用
28 int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
29
30 // No longer idling.
31 mPolling = false;
32
33 // Acquire lock.
34 mLock.lock();
35
36 //如果epoll出现问题,本轮轮询结束
37 if (eventCount < 0) {
38 if (errno == EINTR) {
39 goto Done;
40 }
41 ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
42 result = POLL_ERROR;
43 goto Done;
44 }
45
46 //如果返回0,说明传入的timeoutMillis时间到了,走到这里必须timeoutMillis>0
47 //由于不会一直阻塞,所以隔段时间就会返回一次超时
48 if (eventCount == 0) {
49 result = POLL_TIMEOUT;
50 goto Done;
51 }
52
53 // 这里开始处理事件了
54 for (int i = 0; i < eventCount; i++) {
55 const SequenceNumber seq = eventItems[i].data.u64;
56 uint32_t epollEvents = eventItems[i].events;
57 //如果是唤醒的事件响应了,那么执行awoken
58 if (seq == WAKE_EVENT_FD_SEQ) {
59 if (epollEvents & EPOLLIN) {
60 awoken();
61 } else {
62 ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
63 }
64 } else {
65 //如果不为唤醒事件,在demo中就是想Looper添加监听的fd
66 const auto& request_it = mRequests.find(seq);
67 if (request_it != mRequests.end()) {
68 const auto& request = request_it->second;
69 int events = 0;
70 if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
71 if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
72 if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
73 if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
74 //将对应的时间request取出放入到mResponses
75 mResponses.push({.seq = seq, .events = events, .request = request});
76 }
77 }
78 }
79Done: ;
80 //最终都会走到这里
81 mNextMessageUptime = LLONG_MAX;
82 //这块属于消息机制发送处理
83 while (mMessageEnvelopes.size() != 0) {
84 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
85 const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
86 //根据当前时间和排序的队列时间判断,如果队列中的时间更早那么处理发送的消息
87 //如果时间没到等下轮消息轮询
88 if (messageEnvelope.uptime <= now) {
89 // we reacquire our lock.
90 { // obtain handler
91 //这里的handler就是传入的外部MessageHandler,即为MyMessageHandler
92 sp<MessageHandler> handler = messageEnvelope.handler;
93 Message message = messageEnvelope.message;
94 //先移除队列中的消息,在处理消息
95 mMessageEnvelopes.removeAt(0);
96 mSendingMessage = true;
97 mLock.unlock();
98 //处理消息,如果是刚加入的队列中的元素也会判断时间执行里面内容
99 handler->handleMessage(message);
100 } // release handler
101
102 mLock.lock();
103 mSendingMessage = false;
104 //消息机制也是返回一个POLL_CALLBACK
105 result = POLL_CALLBACK;
106 }
107 }
108
109 // Release lock.
110 mLock.unlock();
111
112 // 回调对应的fd监听对应的函数
113 //将刚刚的mResponses中的元素取出
114 for (size_t i = 0; i < mResponses.size(); i++) {
115 Response& response = mResponses.editItemAt(i);
116 //如果存在回调,那么处理这个response
117 if (response.request.ident == POLL_CALLBACK) {
118 int fd = response.request.fd;
119 int events = response.events;
120 void* data = response.request.data;
121 //回调到对应的handleEvent,先处理再移除对应的response
122 int callbackResult = response.request.callback->handleEvent(fd, events, data);
123 if (callbackResult == 0) {
124 AutoMutex _l(mLock);
125 removeSequenceNumberLocked(response.seq);
126 }
127 response.request.callback.clear();
128 result = POLL_CALLBACK;
129 }
130 }
131 return result;
132}
这个方法特别长
-
调整时间
mNextMessageUptime 是 消息队列 mMessageEnvelopes 中最近一个即将要被处理的message的时间点。所以需要根据mNextMessageUptime 与 调用者传下来的timeoutMillis 比较计算出一个最小的timeout。通常是只有消息队列时间还没到的情况。
-
epoll_wait
处理epoll_wait() 返回的epoll events.判断epoll event 是哪个fd上发生的事件。如果是mWakeEventFd,则执行awoken(). awoken() 只是将数据read出来,然后继续往下处理了。其目的也就是使epoll_wait() 从阻塞中返回。
-
监听队列的fd准备
如果是通过Looper.addFd() 接口加入到epoll监听队列的fd,并不是立马处理,而是先push到mResponses,后面再处理。
-
处理消息队列和监听队列的fd
处理消息队列 mMessageEnvelopes 中的Message.如果还没有到处理时间,就更新一下mNextMessageUptime
处理刚才放入mResponses中的 事件.只处理ident 为POLL_CALLBACK的事件。其他事件在pollOnce中处理
其中handleEvent
1//system/core/libutils/Looper.cpp
2int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
3 //这里的mCallback就是竹签addFd中的回调
4 //回调会返回fd,event事件还有传入的data,这个data就是demo中的this指针,指代MyCallbackHandler
5 return mCallback(fd, events, data);
6}
另外解释上面的还有一个问题wake
1//system/core/libutils/Looper.cpp
2void Looper::wake() {
3 uint64_t inc = 1;
4 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
5 if (nWrite != sizeof(uint64_t)) {
6 if (errno != EAGAIN) {
7 LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
8 mWakeEventFd.get(), nWrite, strerror(errno));
9 }
10 }
11}
现在看就明白含义了,就是发生一个写操作,让注册到epoll中的mWakeEventFd发生变化,最终破除epoll_wait的阻塞,直接调用到后面的处理过程。
对应唤醒的事件响应了,那么执行awoken
1//system/core/libutils/Looper.cpp
2void Looper::awoken() {
3 uint64_t counter;
4 TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
5}
发生一个读操作,让之前的写操作清除。
2.6图解
2.6.1发送消息
2.6.2注册监听fd
3补充
解释一下为什么上述demo的fd=6
4总结
通过上述直到native Looper的机制之后,可以更加顺利的阅读其他类型的源码,或者可以在自己的demo中使用现成的消息机制。
下载
点击这里
参考文献
[1] 二的次方. Android Native – Message/Handler/Looper机制(应用篇), 2021.
[2] 二的次方,Android Native – Message/Handler/Looper机制,2021.
[3] bandaoyu, 【epoll】epoll使用详解(精髓)–研读和修正, 2019.
[4] 安德路, Android Thread之threadLoop方法, 2018.
[5] 煎鱼(EDDYCJY), Linux fd 系列 — eventfd 是什么?, 2021.