Android中大量的用到了消息机制,而最终消息机制都离不开native Looper。

0简介

实际上,除了本身Handler的消息机制外,很多进程也会用到这些消息机制,比如SurfaceFlinger进程。所以说Native的Looper机制就显得更加重要。下面以例子分析Looper原理。

1举例说明

选择源码编译或者NDK编译皆可,这里笔者选择源码编译,编译路径为/external/

 1//external/native_looper_test/main.h
 2#ifndef NATIVE_LOOPER_TEST_MAIN_H
 3#define NATIVE_LOOPER_TEST_MAIN_H
 4#include <utils/Looper.h>
 5#include <utils/Timers.h>
 6#include <utils/Log.h>
 7#include <unistd.h>
 8#include <time.h>
 9#include <utils/threads.h>
10
11//使用Android智能指针或者日志需要使用这个命名空间
12using namespace android;
13using namespace std;
14
15class MyMessageHandler : public MessageHandler {
16public:
17    Vector<Message> messages;
18
19    virtual void handleMessage(const Message& message);
20    virtual ~MyMessageHandler(){}
21};
22
23struct MyLooperThread : public Thread {
24public:
25    MyLooperThread(Looper *looper)
26        : mLooper(looper) {
27    }
28	
29    virtual bool threadLoop();
30
31protected:
32    virtual ~MyLooperThread() {}
33
34private:
35    Looper *mLooper;
36};
37
38class MyPipe {
39public:
40    int sendFd;
41    int receiveFd;
42    
43    MyPipe();
44    ~MyPipe();
45    status_t writeSignal();
46    status_t readSignal();
47};
48
49#endif

定义完成了头文件,还有源文件

  1//external/native_looper_test/main.cpp
  2#ifdef LOG_TAG
  3#undef LOG_TAG
  4#define LOG_TAG "native_looper_test"
  5#endif
  6
  7#include "main.h"
  8
  9void MyMessageHandler::handleMessage(const Message& message) {
 10    ALOGD("[Thread=%d] %s message.what=%d \n", gettid(), __func__, message.what);
 11    messages.push(message);
 12}
 13
 14//使用Android源码中的Thread,run就会调用threadLoop,如果返回true不断循环调用threadLoop,直到返回false,不再调用
 15bool MyLooperThread::threadLoop() {
 16    if(mLooper == NULL)
 17        return false;
 18    //调用pollOnce里面也会循环处理,底层使用了epoll机制,-1的时候表示一直等下去,直到有事件返回
 19    int32_t ret = mLooper->pollOnce(-1);
 20    switch (ret) {
 21        case Looper::POLL_WAKE:
 22        case Looper::POLL_CALLBACK:
 23            return true;
 24        case Looper::POLL_ERROR:
 25            ALOGE("Looper::POLL_ERROR");
 26            return true;
 27        case Looper::POLL_TIMEOUT:
 28            // timeout (should not happen)
 29            return true;
 30        default:
 31            // should not happen
 32            ALOGE("Looper::pollOnce() returned unknown status %d", ret);
 33            return true;
 34    }
 35}
 36
 37MyPipe::MyPipe() {
 38    int fds[2];
 39    //这里调用真正的底层linux管道初始化
 40    ::pipe(fds);
 41
 42    receiveFd = fds[0];
 43    sendFd = fds[1];
 44}
 45
 46MyPipe::~MyPipe() {
 47    if (sendFd != -1) {
 48        ::close(sendFd);
 49    }
 50
 51    if (receiveFd != -1) {
 52        ::close(receiveFd);
 53    }
 54}
 55
 56status_t MyPipe::writeSignal() {
 57    ssize_t nWritten = ::write(sendFd, "1", 1);
 58    return nWritten == 1 ? 0 : -errno;
 59}
 60
 61status_t MyPipe::readSignal() {
 62    char buf[1];
 63    ssize_t nRead = ::read(receiveFd, buf, 1);
 64    return nRead == 1 ? 0 : nRead == 0 ? -EPIPE : -errno;
 65}
 66
 67//回调类
 68class MyCallbackHandler {
 69public:
 70    MyCallbackHandler() : callbackCount(0) {}
 71    void setCallback(const sp<Looper>& looper, int fd, int events) {
 72        //往native looper注册fd,回调为staticHandler,参数为MyCallbackHandler.this
 73        looper->addFd(fd, 0, events, staticHandler, this);
 74    }
 75
 76protected:
 77    int handler(int fd, int events) {
 78        callbackCount++;
 79        ALOGD("[Thread=%d] %s fd=%d, events=%d, callbackCount=%d\n", gettid(), __func__, fd, events, callbackCount);
 80        return 0;
 81    }
 82
 83private:
 84    static int staticHandler(int fd, int events, void* data) {
 85        return static_cast<MyCallbackHandler*>(data)->handler(fd, events);
 86    }
 87    int callbackCount;
 88};
 89
 90int main(int argc, char ** argv)
 91{
 92    //测试消息机制
 93    // Looper的轮询处理工作在新线程中
 94    sp<Looper> mLooper = new Looper(true);
 95    sp<MyLooperThread> mLooperThread = new MyLooperThread(mLooper.get());
 96    mLooperThread->run("MyLooperThread");
 97
 98    // 测试消息的发送与处理
 99    sp<MyMessageHandler> handler = new MyMessageHandler();
100    ALOGD("[Thread=%d] sendMessage message.what=%d \n", gettid(), 1);
101    mLooper->sendMessage(handler, Message(1));
102    ALOGD("[Thread=%d] sendMessage message.what=%d \n", gettid(), 2);
103    mLooper->sendMessage(handler, Message(2));
104    sleep(1);
105    
106    // 测试监测fd与回调callback
107    MyPipe pipe;
108    MyCallbackHandler mCallbackHandler;
109    mCallbackHandler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
110    ALOGD("[Thread=%d] writeSignal 1\n", gettid());
111    pipe.writeSignal(); // would cause FD to be considered signalled
112    sleep(1);
113    mCallbackHandler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
114    ALOGD("[Thread=%d] writeSignal 2\n", gettid());
115    pipe.writeSignal();
116    
117    sleep(1);
118    mLooperThread->requestExit();
119    mLooper.clear();
120}

这里出现了继承Thread,这个是system源码中的线程,系统中很多都会用到这个线程使用。

源码所示中并没有实现run方法,说明使用的是Thread父类的run方法

可以看到父类确实有run方法

 1//system/core/libutils/include/utils/Thread.h
 2class Thread : virtual public RefBase
 3{
 4public:
 5    //这里默认传入的参数是true
 6    explicit            Thread(bool canCallJava = true);
 7    virtual             ~Thread();
 8    virtual status_t    run(    const char* name,
 9                                int32_t priority = PRIORITY_DEFAULT,
10                                size_t stack = 0);
11    virtual void        requestExit();
12    virtual status_t    readyToRun();
13            status_t    requestExitAndWait();
14            status_t    join();
15            bool        isRunning() const;
16            pid_t       getTid() const;
17
18protected:
19            bool        exitPending() const;
20    
21private:
22    // 1)循环:如果threadLoop()返回true,如果requestExit()没有被调用,它将被再次调用。
23	// 2)一次:如果threadLoop()返回false,线程返回后退出。
24    virtual bool        threadLoop() = 0;
25
26private:
27    Thread& operator=(const Thread&);
28    static  int             _threadLoop(void* user);
29    const   bool            mCanCallJava;
30            thread_id_t     mThread;
31    mutable Mutex           mLock;
32            Condition       mThreadExitedCondition;
33            status_t        mStatus;
34    volatile bool           mExitPending;
35    volatile bool           mRunning;
36            sp<Thread>      mHoldSelf;
37            pid_t           mTid;
38
39};

真实调用run方法

 1//system/core/libutils/Threads.cpp
 2status_t Thread::run(const char* name, int32_t priority, size_t stack)
 3{
 4    LOG_ALWAYS_FATAL_IF(name == nullptr, "thread name not provided to Thread::run");
 5
 6    Mutex::Autolock _l(mLock);
 7	...
 8    bool res;
 9    if (mCanCallJava) {
10        //这里传入的是_threadLoop为函数指针
11        //this=MyLooperThread
12        //name="MyLooperThread"
13        //priority=PRIORITY_DEFAULT=0
14        //stack=0
15        //mThread=thread_id_t(-1)
16        res = createThreadEtc(_threadLoop,
17                this, name, priority, stack, &mThread);
18    } else {
19        res = androidCreateRawThreadEtc(_threadLoop,
20                this, name, priority, stack, &mThread);
21    }
22    ...
23    return OK;
24}

传入的参数mCanCallJava为true

 1//system/core/libutils/include/utils/AndroidThreads.h
 2inline bool createThreadEtc(thread_func_t entryFunction,
 3                            void *userData,
 4                            const char* threadName = "android:unnamed_thread",
 5                            int32_t threadPriority = PRIORITY_DEFAULT,
 6                            size_t threadStackSize = 0,
 7                            thread_id_t *threadId = nullptr)
 8{
 9    //这里通过返回直来判断创建线程是否成功
10    return androidCreateThreadEtc(entryFunction, userData, threadName,
11        threadPriority, threadStackSize, threadId) ? true : false;
12}

调用到androidCreateThreadEtc

 1//system/core/libutils/Threads.cpp
 2static android_create_thread_fn gCreateThreadFn = androidCreateRawThreadEtc;
 3
 4int androidCreateThreadEtc(android_thread_func_t entryFunction,
 5                            void *userData,
 6                            const char* threadName,
 7                            int32_t threadPriority,
 8                            size_t threadStackSize,
 9                            android_thread_id_t *threadId)
10{
11    return gCreateThreadFn(entryFunction, userData, threadName,
12        threadPriority, threadStackSize, threadId);
13}

调用到androidCreateRawThreadEtc

 1//system/core/libutils/Threads.cpp
 2int androidCreateRawThreadEtc(android_thread_func_t entryFunction,
 3                               void *userData,
 4                               const char* threadName __android_unused,
 5                               int32_t threadPriority,
 6                               size_t threadStackSize,
 7                               android_thread_id_t *threadId)
 8{
 9    //设置线程分离
10    pthread_attr_t attr;
11    pthread_attr_init(&attr);
12    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
13
14    errno = 0;
15    pthread_t thread;
16    //真是创建线程,linux方法
17    int result = pthread_create(&thread, &attr,
18                    (android_pthread_entry)entryFunction, userData);
19    pthread_attr_destroy(&attr);
20    if (result != 0) {
21        ALOGE("androidCreateRawThreadEtc failed (entry=%p, res=%d, %s)\n"
22             "(android threadPriority=%d)",
23            entryFunction, result, strerror(errno), threadPriority);
24        return 0;
25    }
26
27    if (threadId != nullptr) {
28        *threadId = (android_thread_id_t)thread; // XXX: this is not portable
29    }
30    return 1;
31}

最终线程开启,分离线程,会走到entryFunction对应的函数指针_threadLoop,传入的参数为userData,就是传入的MyLooperThread指针

 1//system/core/libutils/Threads.cpp
 2int Thread::_threadLoop(void* user)
 3{
 4    //这里的self实际上指的是MyLooperThread
 5    Thread* const self = static_cast<Thread*>(user);
 6
 7    sp<Thread> strong(self->mHoldSelf);
 8    wp<Thread> weak(strong);
 9    self->mHoldSelf.clear()
10
11    bool first = true;
12
13    do {
14        bool result;
15        if (first) {
16            first = false;
17            self->mStatus = self->readyToRun();
18            result = (self->mStatus == OK);
19
20            if (result && !self->exitPending()) {
21                result = self->threadLoop();
22            }
23        } else {
24            result = self->threadLoop();
25        }
26        ...
27    } while(strong != nullptr);
28
29    return 0;
30}
  1. _threadLoop()这个方法就是Thread的最大秘密,它是一个while循环。创建线程时,会sp和wp一次线程本身
  2. 如果是第一次执行会运行线程的readyToRun()方法,再执行threadLoop(),否则,直接运行threadLoop()
  3. threadLoop()方法有返回值,如果threadLoop()返回false的时候,线程会做清理工作,然后退出while循环,结束运行

因此线程Thread中的threadLoop()能够循环处理数据就到此做了说明。Thread被创 建,Thread中的run被调用,__threadLoop()被调用,readyToRun()被调用,然后循环调用threadLoop()。并且 在threadLoop()返回false时,可以退出循环

总结:threadLoop()方法有返回值,如果threadLoop()返回false的时候,线程会做清理工作,然后退出while循环,结束运行。

最终运行起来的部分日志

106-23 17:00:44.250   1548   1548 D native_looper_test: [Thread=1548] sendMessage message.what=1
206-23 17:00:44.250   1548   1548 D native_looper_test: [Thread=1548] sendMessage message.what=2
306-23 17:00:44.250   1548   1548 D native_looper_test: [Thread=1548] handleMessage message.what=1
406-23 17:00:44.250   1548   1548 D native_looper_test: [Thread=1548] handleMessage message.what=2
506-23 17:00:45.250   1548   1548 D native_looper_test: [Thread=1548] writeSignal 1
606-23 17:00:45.250   1548   1548 D native_looper_test: [Thread=1548] handler fd=6, events=1, callbackCount=1
706-23 17:00:46.250   1548   1548 D native_looper_test: [Thread=1548] writeSignal 2
806-23 17:00:46.250   1548   1548 D native_looper_test: [Thread=1548] handler fd=6, events=2, callbackCount=1

2源码解析

2.1关于Looper的数据结构

直接查看下面的图

名称 含义
Message 消息的载体,代表了一个事件,通过一个what字段来标记是什么事件
MessageHandler/WeakMessageHandler 消息处理的接口(基类), 子类通过实现handleMessage来实现特定Message的处理逻辑。WeakMessageHandler包含了一个MessageHandler的弱指针
LooperCallback/SimpleLooperCallback 用于Looper回调,实际上就是保存一个Looper_callbackFunc指针的包装基类。在Looper::addFd()方法添加监测的fd时来设置回调

2.2创建Looper

2.2.1直接构造

 1//system/core/libutils/Looper.cpp
 2Looper::Looper(bool allowNonCallbacks)
 3    //参数allowNonCallbacks表明是否可以在Looper_addFd时不提供callback
 4    : mAllowNonCallbacks(allowNonCallbacks),
 5      mSendingMessage(false),
 6      mPolling(false),
 7      mEpollRebuildRequired(false),
 8      mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
 9      mResponseIndex(0),
10      mNextMessageUptime(LLONG_MAX) {
11    mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
12    LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
13
14    AutoMutex _l(mLock);
15    rebuildEpollLocked();
16}

Looper的构造函数里主要做两件事情

  • 调用eventfd(0, EFD_NONBLOCK)返回mWakeEventFd,用于唤醒epoll_wait()
  • 调用rebuildEpollLocked() 创建epoll 文件描述符,并将mWakeEventFd加入到epoll监听队列中

这里出现了eventfd,简单介绍一下这个函数

1#include <sys/eventfd.h>
2int eventfd(unsigned int initval, int flags);
  • initval

    eventfd()创建了一个“eventfd对象”,它可以被用户空间应用程序用作事件等待/通知机制,也可以被内核用于将事件通知用户空间应用程序。该对象包含一个由内核维护的无符号64位整数(uint64_t)计数器。该计数器使用参数initval中指定的值初始化

    eventfd 是一个计数相关的fd。计数不为零是有可读事件发生,read 之后计数会清零,write 则会递增计数器,因此通常初始化为0。

  • flags

    EFD_CLOEXEC 在新的文件描述符上设置关闭执行(FD_CLOEXEC)标志。

    EFD_NONBLOCK 在新打开的文件描述上设置O_NONBLOCK文件状态标志。使用这个标志可以节省对fcntl(2)的额外调用来达到相同的结果。

    EFD_SEMAPHORE 为从新的文件描述符读取提供类似信号量的语义。

  • 返回值

    作为它的返回值,eventfd()返回一个新的文件描述符,可以用来引用eventfd对象。

 1//system/core/libutils/Looper.cpp
 2constexpr uint64_t WAKE_EVENT_FD_SEQ = 1;
 3
 4void Looper::rebuildEpollLocked() {
 5    // Close old epoll instance if we have one.
 6    if (mEpollFd >= 0) {
 7        mEpollFd.reset();
 8    }
 9
10    // 分配新的epoll实例并注册WakeEventFd
11    mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
12    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
13	//添加了第一个fd到mEpollFd中
14    epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
15    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
16    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
17                        strerror(errno));
18	//std::unordered_map<SequenceNumber, Request> mRequests;
19    for (const auto& [seq, request] : mRequests) {
20        epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
21
22        int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
23        if (epollResult < 0) {
24            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
25                  request.fd, strerror(errno));
26        }
27    }
28}
29
30epoll_event createEpollEvent(uint32_t events, uint64_t seq) {
31    return {.events = events, .data = {.u64 = seq}};
32}

这里涉及到IO多路复用的epoll机制

关于epoll机制

关于具体epoll,请参考https://blog.csdn.net/bandaoyu/article/details/89531493

  • epoll高效的核心是:

    1、用户态和内核太共享内存mmap

    2、数据到来采用事件通知机制(而不需要轮询)

  • epoll的接口

    epoll的接口非常简单,一共就三个函数:

  1. epoll_create系统调用

    1int epoll_create(int size);
    

    epoll_create返回一个句柄,之后 epoll的使用都将依靠这个句柄来标识。参数 size是告诉 epoll所要处理的大致事件数目。不再使用 epoll时,必须调用 close关闭这个句柄。

    注意:size参数只是告诉内核这个 epoll对象会处理的事件大致数目,而不是能够处理的事件的最大个数。在 Linux最新的一些内核版本的实现中,这个 size参数没有任何意义。

  2. epoll_ctl系统调用

    1int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
    

    epoll_ctl向 epoll对象中添加、修改或者删除感兴趣的事件,返回0表示成功,否则返回–1,此时需要根据errno错误码判断错误类型。epoll_wait方法返回的事件必然是通过 epoll_ctl添加到 epoll中的。

    名称 解释
    epfd epoll_create返回的句柄
    op EPOLL_CTL_ADD:注册新的fdepfdEPOLL_CTL_MOD:修改已经注册的fd的监听事件EPOLL_CTL_DEL:从epfd中删除一个fd
    fd 需要监听的socket句柄fd
    event 告诉内核需要监听什么事的结构体

    struct epoll_event结构如下所示

    1epoll_data_t;
    2
    3struct epoll_event {
    4 __uint32_t events; /* Epoll events */
    5 epoll_data_t data; /* User data variable */
    6};
    

    events表示时间,具体时间再定义的列表中选取

    名称 解释
    EPOLLIN 表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
    EPOLLOUT 表示对应的文件描述符可以写
    EPOLLPRI 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来
    EPOLLERR 表示对应的文件描述符发生错误
    EPOLLHUP 表示对应的文件描述符被挂断
    EPOLLONESHOT 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
    EPOLLET EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的

    data成员是一个epoll_data联合,其定义如下

    1typedef union epoll_data {
    2 void *ptr;
    3 int fd;
    4 uint32_t u32;
    5 uint64_t u64;
    6} epoll_data_t;
    
  3. int epoll_wait系统调用

    1int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);
    

    收集在 epoll监控的事件中已经发生的事件,如果 epoll中没有任何一个事件发生,则最多等待timeout毫秒后返回。epoll_wait的返回值表示当前发生的事件个数,如果返回0,则表示本次调用中没有事件发生,如果返回–1,则表示出现错误,需要检查 errno错误码判断错误类型。

    解释 名称
    epoll的描述符 epfd
    分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高) events
    表示本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的 maxevents
    表示在没有检测到事件发生时最多等待的时间(单位为毫秒)。一般如果网络主循环是单独的线程的话,可以用-1来等,这样可以保证一些效率,如果是和主逻辑在同一个线程的话,则可以用0来保证主循环的效率0的时候表示马上返回-1的时候表示一直等下去,直到有事件返回为任意正整数的时候表示等这么长的时间,如果一直没有事件,则返回 timeout

2.2.2通过prepare创建

 1//system/core/libutils/Looper.cpp
 2sp<Looper> Looper::prepare(int opts) {
 3    //PREPARE_ALLOW_NON_CALLBACKS = 1
 4    bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS;
 5    sp<Looper> looper = Looper::getForThread();
 6    if (looper == nullptr) {
 7        looper = sp<Looper>::make(allowNonCallbacks);
 8        Looper::setForThread(looper);
 9    }
10    if (looper->getAllowNonCallbacks() != allowNonCallbacks) {
11        ALOGW("Looper already prepared for this thread with a different value for the "
12                "LOOPER_PREPARE_ALLOW_NON_CALLBACKS option.");
13    }
14    return looper;
15}

这里的创建跟Java层的Handler类型,也是先判断当前线程是否存在Looper,没有Looper就和当前线程绑定一个Looper

 1//system/core/libutils/Looper.cpp
 2static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
 3static pthread_key_t gTLSKey = 0;
 4void Looper::initTLSKey() {
 5    int error = pthread_key_create(&gTLSKey, threadDestructor);
 6    LOG_ALWAYS_FATAL_IF(error != 0, "Could not allocate TLS key: %s", strerror(error));
 7}
 8//现成结束的时候,会调用到对应的threadDestructor,并释放其中对应的Looper指针
 9void Looper::threadDestructor(void *st) {
10    Looper* const self = static_cast<Looper*>(st);
11    if (self != nullptr) {
12        self->decStrong((void*)threadDestructor);
13    }
14}
15
16sp<Looper> Looper::getForThread() {
17    //这个initTLSKey有且仅会执行一次
18    int result = pthread_once(& gTLSOnce, initTLSKey);
19    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
20	//从这里可以得知pthread_key_create对应传入的函数指针的void*为Looper*
21    Looper* looper = (Looper*)pthread_getspecific(gTLSKey);
22    return sp<Looper>::fromExisting(looper);
23}

1)关于pthread_once解析,第二个参数的函数指针会被执行有且仅有一次

1int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));

pthread_once的作用为,在给定once_control的情况下,在整个程序中仅调用一次init_routine(在多线程中具体哪一个线程执行不一定)。如果再次调用,pthread_once将不会调用init_routine。

该函数执行成功返回0,执行失败返回错误码。

2)另外这里涉及到线程存储的用法,具体可以点击这里

调用 pthread_key_create() 来创建一个类型为 pthread_key_t 类型的变量。

1int pthread_key_create(pthread_key_t *key, void (*destr_function) (void*));

该函数有两个参数,第一个参数就是上面声明的 pthread_key_t 变量,第二个参数是一个清理函数,用来在线程释放该线程存储的时候被调用。该函数指针可以设成 NULL ,这样系统将调用默认的清理函数。那么在线程执行完毕退出时,已key指向的内容为入参调用destr_function(),释放分配的缓冲区以及其他数据

key是全局变量,这个全局变量可以认为是线程内部的私有空间。不论哪个线程调用了pthread_key_create,所创建的key都是所有的线程都可以访问,每个线程根据自己的需求往key中set不同的值,这就形成了同名而不同值,即同key不同value,一键多值。

当线程中需要存储特殊值的时候,可以调用 pthread_setspcific() 。该函数有两个参数,第一个为前面声明的 pthread_key_t 变量,第二个为 void* 变量,这样你可以存储任何类型的值。

1int pthread_setspecific(pthread_key_t key, const void *pointer);

该接口将指针pointer的值(指针值而非其指向的内容)与key相关联,用pthread_setspecific为一个键指定新的线程数据时,线程必须释放原有的数据用以回收空间。这个pointer参数会正好对应上面pthread_key_create函数中第二个函数指针参数的void*。

1void * pthread_getspecific(pthread_key_t key);

读函数就是将与key关联的数据val读出来,数据类型为void * ,所有可以指向任何类型的数据。通过上面我们得知,数据存在一个32 * 32的二维数组中,所以访问的时候,也是通过计算key值得到数据的位置再返回其内容的

设置线程

 1//system/core/libutils/Looper.cpp
 2void Looper::setForThread(const sp<Looper>& looper) {
 3    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS
 4
 5    if (looper != nullptr) {
 6        looper->incStrong((void*)threadDestructor);
 7    }
 8
 9    pthread_setspecific(gTLSKey, looper.get());
10
11    if (old != nullptr) {
12        old->decStrong((void*)threadDestructor);
13    }
14}

这个函数的意义,就是以gTLSKey为key,以传入的Looper的sp对象为value进行数据绑定,即完成当前线程和Looper对象的绑定,然后当线程执行完毕退出的时候,会调用对应的threadDestructor函数去释放对应绑定的Looper的sp对象。

2.3发送消息

 1//system/core/libutils/Looper.cpp
 2//1.直接发消息
 3void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
 4    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
 5    sendMessageAtTime(now, handler, message);
 6}
 7
 8//2.延时发消息
 9void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
10                                const Message& message) {
11    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
12    sendMessageAtTime(now + uptimeDelay, handler, message);
13}

最终都会调用到sendMessageAtTime

 1//system/core/libutils/Looper.cpp
 2//这里的Vector不是stl中的vector,这个是Android自定义的数组,并且是排序好的
 3Vector<MessageEnvelope> mMessageEnvelopes;
 4void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
 5        const Message& message) {
 6    size_t i = 0;
 7    { // acquire lock
 8        AutoMutex _l(mLock);
 9		
10        size_t messageCount = mMessageEnvelopes.size();
11        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
12            i += 1;
13        }
14		//实际上会把消息根据时间优先级去插入到对应的数组中去
15        MessageEnvelope messageEnvelope(uptime, handler, message);
16        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
17
18		//如果loop当前正在发送消息,那么我们可以跳过对wake()的调用
19        //对应后面的轮询机制的操作,循环机制已经在处理消息的时候,不需要再次唤醒
20        if (mSendingMessage) {
21            return;
22        }
23    } // release lock
24
25    //只有当我们在头部排队新消息时,才会唤醒投票循环
26    if (i == 0) {
27        wake();
28    }
29}

根据uptime在mMessageEnvelopes遍历,找到合适的位置,并将message 封装成MessageEnvlope,插入找到的位置上。 然后决定是否要唤醒Looper:

  1. 如果Looper此时正在派发message,则不需要wakeup Looper。因为这一次looper处理完消息之后,会重新估算下一次epoll_wait() 的wakeup时间。
  2. 如果是插在消息队列的头部,则需要立即wakeup Looper

这里的wake的作用,暂时按下不表,在下面的轮询机制会得到解释

2.4注册监听fd的方法

根据上面demo实际上调用的是int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data);

 1//system/core/libutils/Looper.cpp
 2enum {
 3    POLL_WAKE = -1,
 4    POLL_CALLBACK = -2,
 5    POLL_TIMEOUT = -3,
 6    POLL_ERROR = -4,
 7};
 8//这个mRequests不做排序,可以认为是k-v的键值对
 9std::unordered_map<SequenceNumber, Request> mRequests;
10//这里将自定义的demo中的callback封装成SimpleLooperCallback的callback
11int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
12    sp<SimpleLooperCallback> looperCallback;
13    if (callback) {
14        looperCallback = sp<SimpleLooperCallback>::make(callback);
15    }
16    return addFd(fd, ident, events, looperCallback, data);
17}
18
19int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
20    //可能传入的callback为nullptr
21    if (!callback.get()) {
22        if (! mAllowNonCallbacks) {
23            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
24            return -1;
25        }
26
27        if (ident < 0) {
28            ALOGE("Invalid attempt to set NULL callback with ident < 0.");
29            return -1;
30        }
31    } else {
32        //从传入的0变成传入的-2
33        ident = POLL_CALLBACK;
34    }
35
36    { // acquire lock
37        AutoMutex _l(mLock);
38        // There is a sequence number reserved for the WakeEventFd.
39        if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
40        const SequenceNumber seq = mNextRequestSeq++;
41
42        Request request;
43        request.fd = fd;
44        request.ident = ident;
45        request.events = events;
46        request.callback = callback;
47        request.data = data;
48		//通过传入的events和序列号来获取fd相关的eventItem
49        epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
50        auto seq_it = mSequenceNumberByFd.find(fd);
51        if (seq_it == mSequenceNumberByFd.end()) {
52            //在缓存中没有找到,就添加一个新的eventItem
53            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
54            if (epollResult < 0) {
55                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
56                return -1;
57            }
58            //并且将封装好的request和序列号seq(第一个fd的seq为3)封装成新的mRequests
59            mRequests.emplace(seq, request);
60            mSequenceNumberByFd.emplace(fd, seq);
61        } else {
62            //如果在缓存中找到对应的eventItem,则修改对应的value值
63            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
64            ...
65            //并且更新对应的seq
66            const SequenceNumber oldSeq = seq_it->second;
67            mRequests.erase(oldSeq);
68            mRequests.emplace(seq, request);
69            seq_it->second = seq;
70        }
71    } // release lock
72    return 1;
73}

2.5轮询机制

看到上面无论是发送消息还是注册监听fd,还没有和我们的demo产生关联,这个时候需要我们的轮询机制来让整个Looper动起来。

demo中启动对应的MyLooperThread现成,根据前文可知threadLoop()返回值为true,说明本身就会一直调用threadLoop。

其中threadLoop中

 1//external/native_looper_test/main.cpp
 2//使用Android源码中的Thread,run就会调用threadLoop,如果返回true不断循环调用threadLoop,直到返回false,不再调用
 3bool MyLooperThread::threadLoop() {
 4    if(mLooper == NULL)
 5        return false;
 6    //调用pollOnce里面也会循环处理,底层使用了epoll机制,-1的时候表示一直等下去,直到有事件返回
 7    int32_t ret = mLooper->pollOnce(-1);
 8    switch (ret) {
 9        case Looper::POLL_WAKE:
10        case Looper::POLL_CALLBACK:
11            return true;
12        case Looper::POLL_ERROR:
13            ALOGE("Looper::POLL_ERROR");
14            return true;
15        case Looper::POLL_TIMEOUT:
16            // timeout (should not happen)
17            return true;
18        default:
19            // should not happen
20            ALOGE("Looper::pollOnce() returned unknown status %d", ret);
21            return true;
22    }
23}

pollOnce(-1)

 1//system/core/libutils/Looper.cpp
 2enum {
 3    POLL_WAKE = -1,
 4    POLL_CALLBACK = -2,
 5    POLL_TIMEOUT = -3,
 6    POLL_ERROR = -4,
 7};
 8
 9int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
10inline int pollOnce(int timeoutMillis) {
11    return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
12}
13//传入的只有timeoutMillis=-1,其余为nullptr
14int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
15    int result = 0;
16    for (;;) {
17        //这里开始判断mResponses,不过这里只看ident大于等于0,而上面有回调的会变成-2就不走这里
18        while (mResponseIndex < mResponses.size()) {
19            const Response& response = mResponses.itemAt(mResponseIndex++);
20            int ident = response.request.ident;
21            if (ident >= 0) {
22                int fd = response.request.fd;
23                int events = response.events;
24                void* data = response.request.data;
25                if (outFd != nullptr) *outFd = fd;
26                if (outEvents != nullptr) *outEvents = events;
27                if (outData != nullptr) *outData = data;
28                return ident;
29            }
30        }
31		//每次的轮询会返回一个值,这个值通常为POLL_WAKE、POLL_CALLBACK、POLL_TIMEOUT、POLL_ERROR
32        if (result != 0) {
33            if (outFd != nullptr) *outFd = 0;
34            if (outEvents != nullptr) *outEvents = 0;
35            if (outData != nullptr) *outData = nullptr;
36            return result;
37        }
38		//开始轮询
39        result = pollInner(timeoutMillis);
40    }
41}

真正开始轮询的地方为pollInner

  1//system/core/libutils/Looper.cpp
  2static const int EPOLL_MAX_EVENTS = 16;
  3Vector<Response> mResponses;//这个是排序的mResponses,根据mResponses中的seq排序
  4
  5int Looper::pollInner(int timeoutMillis) {
  6    //重新调整时间,这里传入的是消息队列等待才会调整,不然消息队列时间没到就会一直等待
  7    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
  8        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
  9        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
 10        if (messageTimeoutMillis >= 0
 11            && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
 12            timeoutMillis = messageTimeoutMillis;
 13        }
 14    }
 15    // Poll.
 16    //默认为唤醒,唤醒的含义是消息队列过来,用于从子线程给主线程发送消息的
 17    int result = POLL_WAKE;
 18    mResponses.clear();
 19    mResponseIndex = 0;
 20
 21    // We are about to idle.
 22    mPolling = true;
 23	//这里最大为16
 24    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
 25    //出去唤醒句柄,实际上其他文件相关的fd注册,一次性不能超过15
 26    //1.根据之前的epoll机制了解到timeoutMillis=-1,一直阻塞直到有唤醒或者注册的fd发生变化(变化包括读写)
 27    //2.只有消息队列时间还没到会调整时间,所以这里timeoutMillis调整过后不会一直阻塞,会循环调用
 28    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
 29
 30    // No longer idling.
 31    mPolling = false;
 32
 33    // Acquire lock.
 34    mLock.lock();
 35	
 36    //如果epoll出现问题,本轮轮询结束
 37    if (eventCount < 0) {
 38        if (errno == EINTR) {
 39            goto Done;
 40        }
 41        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
 42        result = POLL_ERROR;
 43        goto Done;
 44    }
 45
 46    //如果返回0,说明传入的timeoutMillis时间到了,走到这里必须timeoutMillis>0
 47    //由于不会一直阻塞,所以隔段时间就会返回一次超时
 48    if (eventCount == 0) {
 49        result = POLL_TIMEOUT;
 50        goto Done;
 51    }
 52
 53    // 这里开始处理事件了
 54    for (int i = 0; i < eventCount; i++) {
 55        const SequenceNumber seq = eventItems[i].data.u64;
 56        uint32_t epollEvents = eventItems[i].events;
 57        //如果是唤醒的事件响应了,那么执行awoken
 58        if (seq == WAKE_EVENT_FD_SEQ) {
 59            if (epollEvents & EPOLLIN) {
 60                awoken();
 61            } else {
 62                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
 63            }
 64        } else {
 65            //如果不为唤醒事件,在demo中就是想Looper添加监听的fd
 66            const auto& request_it = mRequests.find(seq);
 67            if (request_it != mRequests.end()) {
 68                const auto& request = request_it->second;
 69                int events = 0;
 70                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
 71                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
 72                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
 73                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
 74                //将对应的时间request取出放入到mResponses
 75                mResponses.push({.seq = seq, .events = events, .request = request});
 76            }
 77        }
 78    }
 79Done: ;
 80    //最终都会走到这里
 81    mNextMessageUptime = LLONG_MAX;
 82    //这块属于消息机制发送处理
 83    while (mMessageEnvelopes.size() != 0) {
 84        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
 85        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
 86        //根据当前时间和排序的队列时间判断,如果队列中的时间更早那么处理发送的消息
 87        //如果时间没到等下轮消息轮询
 88        if (messageEnvelope.uptime <= now) {
 89            // we reacquire our lock.
 90            { // obtain handler
 91                //这里的handler就是传入的外部MessageHandler,即为MyMessageHandler
 92                sp<MessageHandler> handler = messageEnvelope.handler;
 93                Message message = messageEnvelope.message;
 94                //先移除队列中的消息,在处理消息
 95                mMessageEnvelopes.removeAt(0);
 96                mSendingMessage = true;
 97                mLock.unlock();
 98				//处理消息,如果是刚加入的队列中的元素也会判断时间执行里面内容
 99                handler->handleMessage(message);
100            } // release handler
101
102            mLock.lock();
103            mSendingMessage = false;
104            //消息机制也是返回一个POLL_CALLBACK
105            result = POLL_CALLBACK;
106        }
107    }
108
109    // Release lock.
110    mLock.unlock();
111
112    // 回调对应的fd监听对应的函数
113    //将刚刚的mResponses中的元素取出
114    for (size_t i = 0; i < mResponses.size(); i++) {
115        Response& response = mResponses.editItemAt(i);
116        //如果存在回调,那么处理这个response
117        if (response.request.ident == POLL_CALLBACK) {
118            int fd = response.request.fd;
119            int events = response.events;
120            void* data = response.request.data;
121            //回调到对应的handleEvent,先处理再移除对应的response
122            int callbackResult = response.request.callback->handleEvent(fd, events, data);
123            if (callbackResult == 0) {
124                AutoMutex _l(mLock);
125                removeSequenceNumberLocked(response.seq);
126            }
127            response.request.callback.clear();
128            result = POLL_CALLBACK;
129        }
130    }
131    return result;
132}

这个方法特别长

  1. 调整时间

    mNextMessageUptime 是 消息队列 mMessageEnvelopes 中最近一个即将要被处理的message的时间点。所以需要根据mNextMessageUptime 与 调用者传下来的timeoutMillis 比较计算出一个最小的timeout。通常是只有消息队列时间还没到的情况

  2. epoll_wait

    处理epoll_wait() 返回的epoll events.判断epoll event 是哪个fd上发生的事件。如果是mWakeEventFd,则执行awoken(). awoken() 只是将数据read出来,然后继续往下处理了。其目的也就是使epoll_wait() 从阻塞中返回。

  3. 监听队列的fd准备

    如果是通过Looper.addFd() 接口加入到epoll监听队列的fd,并不是立马处理,而是先push到mResponses,后面再处理。

  4. 处理消息队列和监听队列的fd

    处理消息队列 mMessageEnvelopes 中的Message.如果还没有到处理时间,就更新一下mNextMessageUptime

    处理刚才放入mResponses中的 事件.只处理ident 为POLL_CALLBACK的事件。其他事件在pollOnce中处理

其中handleEvent

1//system/core/libutils/Looper.cpp
2int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
3    //这里的mCallback就是竹签addFd中的回调
4    //回调会返回fd,event事件还有传入的data,这个data就是demo中的this指针,指代MyCallbackHandler
5    return mCallback(fd, events, data);
6}

另外解释上面的还有一个问题wake

 1//system/core/libutils/Looper.cpp
 2void Looper::wake() {
 3    uint64_t inc = 1;
 4    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
 5    if (nWrite != sizeof(uint64_t)) {
 6        if (errno != EAGAIN) {
 7            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
 8                             mWakeEventFd.get(), nWrite, strerror(errno));
 9        }
10    }
11}

现在看就明白含义了,就是发生一个写操作,让注册到epoll中的mWakeEventFd发生变化,最终破除epoll_wait的阻塞,直接调用到后面的处理过程。

对应唤醒的事件响应了,那么执行awoken

1//system/core/libutils/Looper.cpp
2void Looper::awoken() {
3    uint64_t counter;
4    TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
5}

发生一个读操作,让之前的写操作清除。

2.6图解

2.6.1发送消息

2.6.2注册监听fd

3补充

解释一下为什么上述demo的fd=6

4总结

通过上述直到native Looper的机制之后,可以更加顺利的阅读其他类型的源码,或者可以在自己的demo中使用现成的消息机制。

下载

点击这里

参考文献

[1] 二的次方. Android Native – Message/Handler/Looper机制(应用篇), 2021.

[2] 二的次方,Android Native – Message/Handler/Looper机制,2021.

[3] bandaoyu, 【epoll】epoll使用详解(精髓)–研读和修正, 2019.

[4] 安德路, Android Thread之threadLoop方法, 2018.

[5] 煎鱼(EDDYCJY), Linux fd 系列 — eventfd 是什么?, 2021.

[6] donaldsy, pthread_once使用 – 仅初始化一次, 2021.

[7] cheems~, Linux线程私有数据Thread-specific Data(TSD) 详解, 2022.