之前介绍了关于socket的用法,这里在此基础上再来看看跟socket类似的socketpair。

0一个demo

这里以一个例子来说明socketpair的广泛性,这里先介绍一下BitTube类。这里只是简单介绍,具体关于BitTube的使用的原理可以点击这里

0.1本进程多线程通信

0.2跨进程通信

0.2.1发送句柄到其他进程

发送句柄的过程中,使用了fcntl的函数,是用来修改已经打开文件的属性的函数,这里起到了一个dup复制句柄的作用。

fcntl介绍,具体可以点击这里

1#include <fcntl.h>
2int fcntl(int fd, int cmd, ...);

fcntl是用来修改已经打开文件的属性的函数,包含5个功能:

  1. 复制一个已有文件描述符

    功能和dup和dup2相同,对应的cmd:F_DUPFDF_DUPFD_CLOEXEC。当使用这两个cmd时,需要传入第三个参数,fcntl返回复制后的文件描述符,此返回值是之前未被占用的描述符,并且必须一个大于等于第三个参数值。F_DUPFD命令要求返回的文件描述符会清除对应的FD_CLOEXEC标志;F_DUPFD_CLOEXEC要求设置新描述符的FD_CLOEXEC标志。

  2. 获取、设置文件描述符标志

    对应的cmd:F_GETFDF_SETFD。用于设置FD_CLOEXEC标志,此标志的含义是:当进程执行exec系统调用后此文件描述符会被自动关闭。

  3. 获取、设置文件访问状态标志

    对应的cmd:F_GETFLF_SETFL。获取当前打开文件的访问标志,设置对应的访问标志,一般常用来设置做非阻塞读写操作。

  4. 获取、设置记录锁功能

    对应的cmd:F_GETLKF_SETLKF_SETLKW

  5. 获取、设置异步I/O所有权

    对应的cmd:F_GETOWNF_SETOWN。 获取和设置用来接收SIGIO/SIGURG信号的进程id或者进程组id。返回对应的进程id或者进程组id取负值。

0.2.2从其他进程接收句柄

0.3总结

socketpair利用socket为双方建立了全双工的通信管道(communication pipe)。通过文件描述符的复用(dup/dup2),可以传递socket handle到另一个进程,复用它并开启通信。

BitTube使用了Linux/Unix socket中的顺序数据包(sequenced packets,SOCK_SEQPACKET),像SOCK_DGRAM,它只传送整包数据;又像SOCK_STREAM,面向连接且提供有序的数据报传送。

1socketpair

socketpair,跟socket类似,都为套接字。可以用于网络通信,也可以用于本机内的进程通信。

区别于管道pipe是半双工的,pipe两次才能实现全双工,使得代码复杂。socketpair直接就可以实现全双工,socketpair对两个文件描述符中的任何一个都可读和可写,而pipe是一个读,一个写。

2Linux socketpair

创建一对连接的套接字

2.1函数原型

1#include <sys/types.h>          
2#include <sys/socket.h>
3int socketpair(int domain, int type, int protocol, int sv[2]);
  • domain:协议族,可以是AF_UNIX或AF_LOCAL。
  • type:套接字类型,可以是SOCK_STREAM或SOCK_DGRAM。
  • protocol:协议类型,可以是0或IPPROTO_TCP。
  • sv:用于存储返回的套接字描述符的数组,其中sv[0]表示第一个套接字描述符,sv[1]表示第二个套接字描述符。

2.2描述

socketpair调用使用可选指定的套接字类型,在指定的域中以指定的类型创建一对未命名的已连接套接字。有关这些参数的更多详细信息,请参见socket,点击这里

2.3返回值

返回值如果成功,则返回0。如果出现错误,则返回-1,并适当地设置errno。

ERRORNO 解释
EAFNOSUPPORT 在这台计算机上不支持指定的地址族
EFAULT 地址sv没有指定进程地址空间的有效部分
EMFILE 已达到每个进程打开的文件描述符数量限制
ENFILE 已达到打开文件总数的全系统限制
EOPNOTSUPP 指定的协议不支持创建套接字对
EPROTONOSUPPORT 这台机器不支持指定的协议

3源码中的socketpair

本文主要介绍三种套接字类型,SOCK_STREAM, SOCK_SEQPACKET,SOCK_DGRAM。但不代表socketpair只支持这三种,其余的套接字类型可以根据兴趣爱好自我探索。

3.1属性系统源码

属性系统中用到了epoll加上SOCK_SEQPACKET套接字类型的socketpair。这个源码更详细的可以点击这里

3.1.1创建socketpair

创建socketpair,其中套接字类型是SOCK_SEQPACKET,另外sockets[0]为接收端,sockets[1]为发送端

 1//system/core/init/property_service.cpp
 2void StartPropertyService(int* epoll_socket) {
 3    InitPropertySet("ro.property_service.version", "2");
 4
 5    int sockets[2];
 6    if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, sockets) != 0) {
 7        PLOG(FATAL) << "Failed to socketpair() between property_service and init";
 8    }
 9    //epoll_socket和from_init_socket都为接收端,会返回到init.cpp中的StartPropertyService
10    *epoll_socket = from_init_socket = sockets[0];
11    //init_socket为发送端
12    init_socket = sockets[1];
13    StartSendingMessages();
14    //这里创建了套接字类型为SOCK_STREAM的TCP流的socket,句柄为property_set_fd
15    if (auto result = CreateSocket(PROP_SERVICE_NAME, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK,
16                                   /*passcred=*/false, /*should_listen=*/false, 0666, /*uid=*/0,
17                                   /*gid=*/0, /*socketcon=*/{});
18        result.ok()) {
19        property_set_fd = *result;
20    } else {
21        LOG(FATAL) << "start_property_service socket creation failed: " << result.error();
22    }
23
24    listen(property_set_fd, 8);
25
26    auto new_thread = std::thread{PropertyServiceThread};
27    property_service_thread.swap(new_thread);
28}

3.1.1属性系统客户端

属性的服务端是在启动流程中开启的,调用StartPropertyService,并传入一个句柄property_fd,很显然这个句柄获取得到是socketpait对应的socket[0]

1//system/core/init/init.cpp
2int SecondStageMain(int argc, char** argv) {
3    ...
4    StartPropertyService(&property_fd);   
5}

但是这里获取到了property_fd,承载着客户端的操作。

这里调用SendLoadPersistentPropertiesMessage,可以发现关于property_fd从客户端发送到socketpair另外一端

1//system/core/init/init.cpp
2void SendLoadPersistentPropertiesMessage() {
3    //这里使用的跨平台的protobuf,会被解析成c++类型
4    auto init_message = InitMessage{};
5    init_message.set_load_persistent_properties(true);
6    if (auto result = SendMessage(property_fd, init_message); !result.ok()) {
7        LOG(ERROR) << "Failed to send load persistent properties message: " << result.error();
8    }
9}

对应的protobuf文件

1//system/core/init/property_service.proto
2message InitMessage {
3    oneof msg {
4        bool load_persistent_properties = 1;
5        bool stop_sending_messages = 2;
6        bool start_sending_messages = 3;
7    };
8}

其中SendLoadPersistentPropertiesMessage并不是直接启动的,是通过rc文件解析之后Action类型执行的

 1//system/core/init/builtins.cpp
 2const BuiltinFunctionMap& GetBuiltinFunctionMap() {
 3    constexpr std::size_t kMax = std::numeric_limits<std::size_t>::max();
 4    // clang-format off
 5    static const BuiltinFunctionMap builtin_functions = {
 6		...
 7        {"load_persist_props",      {0,     0,    {false,  do_load_persist_props}}},
 8		...
 9    };
10    // clang-format on
11    return builtin_functions;
12}

Action类型执行对应的函数do_load_persist_props

1//system/core/init/builtins.cpp
2static Result<void> do_load_persist_props(const BuiltinArguments& args) {
3    SendLoadPersistentPropertiesMessage();
4	//从这里可以看出实际上常量属性加载完成之后才算完成这一步
5    start_waiting_for_property("ro.persistent_properties.ready", "true");
6    return {};
7}

proto说明

关于proto之前的Android属性系统中文章中提到过,但没有具体深入,当然本文也不会特别深入。如果需要进一步了解语法规则,可以点击这里

Protobuf 是一种与平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。而且由于和语言无关,proto文件既可以转化成c++文件,也可以转化成javapython这类的文件。当然在源码中,可以直接理解为定义了一种数据类,专门用于序列化存储。

目前我们定义的proto文件,比如上面的property_service.proto,可以在下面的源码路径中找到

11//其中xxx为具体的架构,有arm架构,arm架构等
22/out/soong/.intermediates/system/core/init/libinit/android_xxx_static/gen/proto/system/core/init/property_service.pb.h
33/out/soong/.intermediates/system/core/init/libinit/android_xxx_static/gen/proto/system/core/init/property_service.pb.cc

具体的规则如下,所指定的消息字段修饰符必须是如下之一:

  • required:一个格式良好的消息一定要含有1个这种字段。表示该值是必须要设置的;
  • optional:消息格式中该字段可以有0个或1个值(不超过1个)。
  • repeated:在一个格式良好的消息中,这种字段可以重复任意多次(包括0次)。重复的值的顺序会被保留。表示该值可以重复,相当于java中的List。

关于Oneof

如果你的消息中有很多可选字段, 并且同时至多一个字段会被设置, 你可以加强这个行为,使用oneof特性节省内存.

Oneof字段就像可选字段, 除了它们会共享内存, 至多一个字段会被设置。 设置其中一个字段会清除其它oneof字段。 你可以使用case()或者WhichOneof() 方法检查哪个oneof字段被设置, 看你使用什么语言了.

在产生的代码中, oneof字段拥有同样的 getterssetters, 就像正常的可选字段一样. 也有一个特殊的方法来检查到底那个字段被设置. 你可以在相应的语言API中找到oneof API介绍.

1message SampleMessage {
2	oneof test_oneof {
3		string name = 4;
4		SubMessage sub_message = 9;
5	}
6}

设置oneof会自动清楚其它oneof字段的值. 所以设置多次后,只有最后一次设置的字段有值.

1SampleMessage message;
2message.set_name("name");
3CHECK(message.has_name());
4message.mutable_sub_message(); // Will clear name field.
5CHECK(!message.has_name());

根据上面可以看到protobuf文件,实际上我们获取了序列化的内容,即load_persistent_properties为true。

3.1.2属性系统服务端

服务端的话就是上述的线程,property_service_thread,对应的线程函数为PropertyServiceThread

 1//system/core/init/property_service.cpp
 2static void PropertyServiceThread() {
 3    //源码封装的epoll
 4    Epoll epoll;
 5    //就是创造epoll_fd句柄
 6    if (auto result = epoll.Open(); !result.ok()) {
 7        LOG(FATAL) << result.error();
 8    }
 9	//将property_set_fd注册到epoll中,回调函数为handle_property_set_fd
10    if (auto result = epoll.RegisterHandler(property_set_fd, handle_property_set_fd);
11        !result.ok()) {
12        LOG(FATAL) << result.error();
13    }
14	//将init_socket注册到epoll中,回调函数为HandleInitSocket
15    if (auto result = epoll.RegisterHandler(init_socket, HandleInitSocket); !result.ok()) {
16        LOG(FATAL) << result.error();
17    }
18	//循环等待结果
19    while (true) {
20        //这里默认是无限等待,一直阻塞,直到有事件来的时候返回,并执行对应的回调函数
21        auto pending_functions = epoll.Wait(std::nullopt);
22        if (!pending_functions.ok()) {
23            LOG(ERROR) << pending_functions.error();
24        } else {
25            for (const auto& function : *pending_functions) {
26                (*function)();
27            }
28        }
29    }
30}

可以看到这里面又涉及到了epoll,不过这个epoll在源码中重新封装成了Epoll

 1//system/core/init/epoll.h
 2class Epoll {
 3  public:
 4    Epoll();
 5	//回调函数Handler
 6    typedef std::function<void()> Handler;
 7
 8    Result<void> Open();
 9    //注册的默认事件为EPOLLIN
10    Result<void> RegisterHandler(int fd, Handler handler, uint32_t events = EPOLLIN);
11    Result<void> UnregisterHandler(int fd);
12    Result<std::vector<std::shared_ptr<Handler>>> Wait(
13            std::optional<std::chrono::milliseconds> timeout);
14
15  private:
16    //将回调函数的事件封装到一起
17    struct Info {
18        std::shared_ptr<Handler> handler;
19        uint32_t events;
20    };
21
22    android::base::unique_fd epoll_fd_;
23    std::map<int, Info> epoll_handlers_;
24};
 1//system/core/init/epoll.cpp
 2Result<void> Epoll::Open() {
 3    epoll_fd_.reset(epoll_create1(EPOLL_CLOEXEC));
 4    ...
 5}
 6
 7Result<void> Epoll::RegisterHandler(int fd, Handler handler, uint32_t events) {
 8    Info info;
 9    info.events = events;
10    info.handler = std::make_shared<decltype(handler)>(std::move(handler));
11    
12    auto [it, inserted] = epoll_handlers_.emplace(fd, std::move(info));
13    epoll_event ev;
14    ev.events = events;
15    ev.data.ptr = reinterpret_cast<void*>(&it->second);
16	epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
17}
18
19Result<std::vector<std::shared_ptr<Epoll::Handler>>> Epoll::Wait(
20        std::optional<std::chrono::milliseconds> timeout) {
21    int timeout_ms = -1;
22    
23    const auto max_events = epoll_handlers_.size();
24    epoll_event ev[max_events];
25    //默认无限阻塞,直到有EPOLLIN事件过来
26    auto num_events = TEMP_FAILURE_RETRY(epoll_wait(epoll_fd_, ev, max_events, timeout_ms));
27    
28    std::vector<std::shared_ptr<Handler>> pending_functions;
29    for (int i = 0; i < num_events; ++i) {
30        //取出事件中的ptr,即对应的info指针
31        auto& info = *reinterpret_cast<Info*>(ev[i].data.ptr);
32        //将对应事件放入到数组中,通过数组来依次排队输出
33        pending_functions.emplace_back(info.handler);
34    }
35
36    return pending_functions;
37}

最终会回调到两个函数,这两个函数的先后顺序是由于哪个事件先触发先调用的原则,会被放在pending_functions队列中。

3.1.2.1handle_property_set_fd

这里回调实际上是对应socket,而不是socketpair,这里更多的是IO多路复用的经典场景,监听socket句柄,有新的连接进来了就会回调到这个函数。

 1//system/core/init/property_service.cpp
 2static void handle_property_set_fd() {
 3    static constexpr uint32_t kDefaultSocketTimeout = 2000; /* ms */
 4	//接收客户端的句柄,服务端这里与之连接的句柄为s,
 5    //其实这里可以把s也添加到epoll中,但是源码中并没有这么操作,原因是更高的处理效率,而是用了poll机制
 6    int s = accept4(property_set_fd, nullptr, nullptr, SOCK_CLOEXEC);
 7    if (s == -1) {
 8        return;
 9    }
10	//这里包含权限问题,pid,uid,gid,
11    ucred cr;
12    socklen_t cr_size = sizeof(cr);
13    if (getsockopt(s, SOL_SOCKET, SO_PEERCRED, &cr, &cr_size) < 0) {
14        ...
15    }
16
17    SocketConnection socket(s, cr);
18    uint32_t timeout_ms = kDefaultSocketTimeout;
19
20    uint32_t cmd = 0;
21    if (!socket.RecvUint32(&cmd, &timeout_ms)) {
22        ...
23        return;
24    }
25
26    switch (cmd) {
27    ...
28    //这里默认使用PROP_MSG_SETPROP2 0x00020001
29    case PROP_MSG_SETPROP2: {
30        std::string name;
31        std::string value;
32        if (!socket.RecvString(&name, &timeout_ms) ||
33            !socket.RecvString(&value, &timeout_ms)) {
34          ...
35        }
36
37        std::string source_context;
38        if (!socket.GetSourceContext(&source_context)) {
39            ...
40        }
41
42        const auto& cr = socket.cred();
43        std::string error;
44        uint32_t result = HandlePropertySet(name, value, source_context, cr, &socket, &error);
45        ...
46        socket.SendUint32(result);
47        break;
48      }
49
50    default:
51        ...
52    }
53}

这里还涉及到了系统管理的id问题

这里只是简单的介绍一下UID,PID和GID的关系,具体内容可以点击这里

  1. UID

    在Linux中用户的概念分为:普通用户、根用户和系统用户

    Linux用户 解释说明
    普通用户 表示平时使用的用户概念,在使用Linux时,需要通过用户名和密码登录,获取该用户相应的权限,其权限具体表现在对系统中文件的增删改查和命令执行的限制,不同用户具有不同的权限设置,其UID通常大于500
    根用户 该用户就是ROOT用户,其UID为0,可以对系统中任何文件进行增删改查处理,执行任何命令,因此ROOT用户极其危险,如操作不当,会导致系统彻底崩掉
    系统用户 该用户是系统虚拟出的用户概念,不对使用者开发的用户,其UID范围为1-499,例如运行MySQL数据库服务时,需要使用系统用户mysql来运行mysqld进程
  2. PID

    系统在程序运行时,会为每个可执行程序分配一个唯一的进程ID(PID),PID的直接作用是为了表明该程序所拥有的文件操作权限,不同的可执行程序运行时互不影响,相互之间的数据访问具有权限限制。

  3. GID

    GID顾名思义就是对于UID的封装处理,就是包含多个UID的意思,实际上在Linux下每个UID都对应着一个GID。设计GID是为了便于对系统的统一管理,例如增加某个文件的用户权限时,只对admin组的用户开放,那么在分配权限时,只需对该组分配,其组下的所有用户均获取权限。同样在删除时,也便于统一操作。

    除了UID和GID外,其还包括其扩展的有效的用户、组(euid、egid)、文件系统的用户、组(fsuid、fsgid)和保存的设置用户、组(suid、sgid)等。

  • 不同的应用具有唯一的UID同一个UID可具有不同的PID
  • 针对不同的PID之间数据的暴露可采用私有暴露和权限暴露,针对不同的UID之间可通过完全暴露的方式;
  • 如果一个应用是系统应用,则不需要其他应用暴露,便可直接访问该应用的数据。

另外上面的socket还存在封装,封装成了对应的SocketConnection

  1//system/core/init/property_service.cpp
  2class SocketConnection {
  3  public:
  4    SocketConnection(int socket, const ucred& cred) : socket_(socket), cred_(cred) {}
  5	...
  6    //以接收string为例,另外的int和char都是类似    
  7    bool RecvString(std::string* value, uint32_t* timeout_ms) {
  8        uint32_t len = 0;
  9        /
 10        if (!RecvUint32(&len, timeout_ms)) {
 11            return false;
 12        }
 13
 14        std::vector<char> chars(len);
 15        if (!RecvChars(&chars[0], len, timeout_ms)) {
 16            return false;
 17        }
 18
 19        *value = std::string(&chars[0], len);
 20        return true;
 21    }
 22
 23    bool SendUint32(uint32_t value) {
 24        if (!socket_.ok()) {
 25            return true;
 26        }
 27        //发送的话,直接发送
 28        int result = TEMP_FAILURE_RETRY(send(socket_, &value, sizeof(value), 0));
 29        return result == sizeof(value);
 30    }
 31
 32    const ucred& cred() { return cred_; }
 33
 34  private:
 35    //接收的过程用到了poll机制,这里传递进来的timeout_ms为2s
 36    bool PollIn(uint32_t* timeout_ms) {
 37        struct pollfd ufds[1];
 38        ufds[0].fd = socket_;
 39        ufds[0].events = POLLIN;
 40        ufds[0].revents = 0;
 41        while (*timeout_ms > 0) {
 42            auto start_time = std::chrono::steady_clock::now();
 43            //对socket_,其实就是s进行监听,是否会超时。只有fds中准备好读写,返回值nr大于0为s
 44            int nr = poll(ufds, 1, *timeout_ms);
 45            auto now = std::chrono::steady_clock::now();
 46            auto time_elapsed =
 47                std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time);
 48            uint64_t millis = time_elapsed.count();
 49            *timeout_ms = (millis > *timeout_ms) ? 0 : *timeout_ms - millis;
 50
 51            if (nr > 0) {
 52                return true;
 53            }
 54
 55            if (nr == 0) {
 56                // Timeout
 57                break;
 58            }
 59
 60            if (nr < 0 && errno != EINTR) {
 61                PLOG(ERROR) << "sys_prop: error waiting for uid " << cred_.uid
 62                            << " to send property message";
 63                return false;
 64            } else {  // errno == EINTR
 65                // Timer rounds milliseconds down in case of EINTR we want it to be rounded up
 66                // to avoid slowing init down by causing EINTR with under millisecond timeout.
 67                if (*timeout_ms > 0) {
 68                    --(*timeout_ms);
 69                }
 70            }
 71        }
 72
 73        LOG(ERROR) << "sys_prop: timeout waiting for uid " << cred_.uid
 74                   << " to send property message.";
 75        return false;
 76    }
 77	
 78    bool RecvFully(void* data_ptr, size_t size, uint32_t* timeout_ms) {
 79        size_t bytes_left = size;
 80        char* data = static_cast<char*>(data_ptr);
 81        while (*timeout_ms > 0 && bytes_left > 0) {
 82            //用于判断接收是否超时2s,这里是不允许超时的
 83            if (!PollIn(timeout_ms)) {
 84                return false;
 85            }
 86			//这里实际上是流的形式,所以或循环去读取的操作
 87            int result = TEMP_FAILURE_RETRY(recv(socket_, data, bytes_left, MSG_DONTWAIT));
 88            if (result <= 0) {
 89                PLOG(ERROR) << "sys_prop: recv error";
 90                return false;
 91            }
 92
 93            bytes_left -= result;
 94            data += result;
 95        }
 96
 97        if (bytes_left != 0) {
 98            LOG(ERROR) << "sys_prop: recv data is not properly obtained.";
 99        }
100
101        return bytes_left == 0;
102    }
103
104    unique_fd socket_;
105    ucred cred_;
106
107    DISALLOW_IMPLICIT_CONSTRUCTORS(SocketConnection);
108};

内容比较多

接收。以接收string为例,另外的int和char都是类似,循环recv接收数据。先判断传输的string长度,长度限制为0-2^16-1。然后接收的过程以char类型接收,接收完成之后再转成string类。

发送。已发送int为例,这里发送直接通过send发送数据。

poll机制

poll是Linux的事件轮询机制函数,每个进程都可以管理一个pollfd队列,由poll函数进行事件注册和查询。内核将用户的fds结构体数组拷贝到内核中。当有事件发生时,再将所有事件都返回到fds结构体数组中,poll只返回已就绪事件的个数,所以用户要操作就绪事件就要用轮询的方法。

1)函数原型

1#include <poll.h>
2int poll(struct pollfd* fds, nfds_t nfds, int timeout);
  • **fds:**是一个struct pollfd类型的指针,用于存放需要检测其状态的socket描述符
  • **nfds:**是nfd_t类型的参数,用于标记fds数组中结构体元素的数量
  • **timeout:**没有接受事件时等待的事件,单位毫秒,若值为-1,则永远不会超时

2)pollfd结构体

1struct pollfd
2{
3	int fd; 
4	short events;
5	short revents; 
6}
  • **fd:**文件描述符
  • events:等待发生的事件类型
  • **revents:**检测之后返回的事件,当某个文件描述符有变化时,值就不为空
事件标识 说明
POLLIN 普通或优先级带数据可读
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级数据可读
POLLOUT 普通数据可写
POLLWRNORM 普通数据可写不会导致阻塞
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述字不是一个打开的文件

3)poll返回值

poll机制会判断fds中的文件是否满足条件,如果休眠时间内条件满足则会唤醒进程;超过休眠时间,条件一直不满足则自动唤醒。

  • 返回值>0:fds中准备好读写,或出错状态的那些socket描述符
  • **返回值=0:**fds中没有socket描述符需要读写或出错;此时poll超时,时长为timeout
  • **返回值=-1:**调用失败

poll机制也是IO多路复用的一种方式,这里主要是承接连接的客户端的socket,对应服务端会有一个连接socket句柄s,对其poll轮询机制,可以更好的去获取对应的TCP流。

这里的s接收到的数据,实际上是客户端发送的数据。

3.1.2.2HandleInitSocket

 1//system/core/init/property_service.cpp
 2static void HandleInitSocket() {
 3    //处理socketpair对端传过来的数据
 4    auto message = ReadMessage(init_socket);
 5    if (!message.ok()) {
 6        LOG(ERROR) << "Could not read message from init_dedicated_recv_socket: " << message.error();
 7        return;
 8    }
 9	//初始化proto数据
10    auto init_message = InitMessage{};
11    //将序列化数据转化成string类
12    if (!init_message.ParseFromString(*message)) {
13        LOG(ERROR) << "Could not parse message from init";
14        return;
15    }
16	//判断属性系统传过来的数据类型,是否为kLoadPersistentProperties
17    switch (init_message.msg_case()) {
18        case InitMessage::kLoadPersistentProperties: {
19            load_override_properties();
20            // 导入persist属性,并且初始化
21            auto persistent_properties = LoadPersistentProperties();
22            for (const auto& persistent_property_record : persistent_properties.properties()) {
23                InitPropertySet(persistent_property_record.name(),
24                                persistent_property_record.value());
25            }
26            InitPropertySet("ro.persistent_properties.ready", "true");
27            persistent_properties_loaded = true;
28            break;
29        }
30        default:
31            LOG(ERROR) << "Unknown message type from init: " << init_message.msg_case();
32    }
33}

上面主要做了几件事

  1. 处理socketpair对端传过来的数据
  2. 将proto序列化数据转化成string类
  3. 判断属性系统传过来的数据类型,如果是就加载并初始化persist属性

ReadMessage

 1//system/core/init/proto_utils.h
 2constexpr size_t kBufferSize = 4096;
 3
 4inline Result<std::string> ReadMessage(int socket) {
 5    char buffer[kBufferSize] = {};
 6    auto result = TEMP_FAILURE_RETRY(recv(socket, buffer, sizeof(buffer), 0));
 7    if (result == 0) {
 8        return Error();
 9    } else if (result < 0) {
10        return ErrnoError();
11    }
12    return std::string(buffer, result);
13}

这个具体数据,实际上就是对应的msg_case为InitMessage::kLoadPersistentProperties的数据。

3.1.3总结

实际上,这里的socket的作用意图很明显,将Init进程非Init进程的隔离开来,非Init进程操作属性的写入需要转到Init进程去操作,而非Init进程和Init进程通信这里用到了本地的socket通信。这样,属性的管控者就是Init进程,可以做到过滤隔离作用。

除此之外,这里还涉及到了epoll、poll、protobuf、socket和socketpair等概念,充分理解和掌握这些基本概念之后,对于源码的阅读可以理解的更加深入。

3.2Camera dump数据源码

Camera dump操作中用到了SOCK_STREAM套接字类型的socketpair。

3.2.1创建socketpair

 1//frameworks/base/core/jni/android_hardware_camera2_CameraMetadata.cpp
 2static void CameraMetadata_dump(JNIEnv *env, jclass thiz, jlong ptr) {
 3    ALOGV("%s", __FUNCTION__);
 4    CameraMetadata* metadata = CameraMetadata_getPointerThrow(env, ptr);
 5    if (metadata == NULL) {
 6        return;
 7    }
 8	int writeFd, readFd;
 9    {
10        int sv[2];
11        //这里创建socketpair,使用TCP的形式,一端发送,一端接受
12        if (socketpair(AF_LOCAL, SOCK_STREAM, /*protocol*/0, &sv[0]) < 0) {
13            jniThrowExceptionFmt(env, "java/io/IOException",
14                    "Failed to create socketpair (errno = %#x, message = '%s')",
15                    errno, strerror(errno));
16            return;
17        }
18        writeFd = sv[0];
19        readFd = sv[1];
20    }
21
22    pthread_t writeThread;
23    DumpMetadataParams params = {
24        writeFd,
25        metadata
26    };
27	//开启write线程,作为写入段
28    {
29        int threadRet = pthread_create(&writeThread, /*attr*/NULL,
30                CameraMetadata_writeMetadataThread, (void*)&params);
31
32        if (threadRet != 0) {
33            close(writeFd);
34            close(readFd);
35
36            jniThrowExceptionFmt(env, "java/io/IOException",
37                    "Failed to create thread for writing (errno = %#x, message = '%s')",
38                    threadRet, strerror(threadRet));
39            return;
40        }
41    }
42	...
43
44    int res;
45
46    // 等待数据完成后,退出线程
47    if ((res = pthread_join(writeThread, /*retval*/NULL)) != 0) {
48        ALOGE("%s: Failed to join thread (errno = %#x, message = '%s')",
49                __FUNCTION__, res, strerror(res));
50    }
51}

3.2.2发送端

子线程用来发送数据

 1//frameworks/base/core/jni/android_hardware_camera2_CameraMetadata.cpp
 2struct DumpMetadataParams {
 3    int writeFd;
 4    const CameraMetadata* metadata;
 5};
 6
 7static void* CameraMetadata_writeMetadataThread(void* arg) {
 8    DumpMetadataParams* p = static_cast<DumpMetadataParams*>(arg);
 9
10    p->metadata->dump(p->writeFd, /*verbosity*/2);
11
12    if (close(p->writeFd) < 0) {
13        ALOGE("%s: Failed to close writeFd (errno = %#x, message = '%s')",
14                __FUNCTION__, errno, strerror(errno));
15    }
16
17    return NULL;
18}

这里的p为DumpMetadataParams是结构体类型,实际上是CameraMetadata数据dump

1//frameworks/av/camera/CameraMetadata.cpp
2camera_metadata_t *mBuffer;
3void dump(int fd, int verbosity = 1, int indentation = 0) const;
4void CameraMetadata::dump(int fd, int verbosity, int indentation) const {
5    dump_indented_camera_metadata(mBuffer, fd, verbosity, indentation);
6}

调用到dump_indented_camera_metadata

 1//system/media/camera/src/camera_metadata.c
 2void dump_indented_camera_metadata(const camera_metadata_t *metadata,
 3        int fd,
 4        int verbosity,
 5        int indentation) {
 6    unsigned int i;
 7    dprintf(fd,
 8            "%*sDumping camera metadata array: %" PRIu32 " / %" PRIu32 " entries, "
 9            "%" PRIu32 " / %" PRIu32 " bytes of extra data.\n", indentation, "",
10            metadata->entry_count, metadata->entry_capacity,
11            metadata->data_count, metadata->data_capacity);
12    dprintf(fd, "%*sVersion: %d, Flags: %08x\n",
13            indentation + 2, "",
14            metadata->version, metadata->flags);
15    camera_metadata_buffer_entry_t *entry = get_entries(metadata);
16    for (i=0; i < metadata->entry_count; i++, entry++) {
17
18        const char *tag_name, *tag_section;
19        tag_section = get_local_camera_metadata_section_name(entry->tag, metadata);
20        if (tag_section == NULL) {
21            tag_section = "unknownSection";
22        }
23        tag_name = get_local_camera_metadata_tag_name(entry->tag, metadata);
24        if (tag_name == NULL) {
25            tag_name = "unknownTag";
26        }
27        const char *type_name;
28        if (entry->type >= NUM_TYPES) {
29            type_name = "unknown";
30        } else {
31            type_name = camera_metadata_type_names[entry->type];
32        }
33        dprintf(fd, "%*s%s.%s (%05x): %s[%" PRIu32 "]\n",
34             indentation + 2, "",
35             tag_section,
36             tag_name,
37             entry->tag,
38             type_name,
39             entry->count);
40
41        if (verbosity < 1) continue;
42
43        if (entry->type >= NUM_TYPES) continue;
44
45        size_t type_size = camera_metadata_type_size[entry->type];
46        uint8_t *data_ptr;
47        if ( type_size * entry->count > 4 ) {
48            if (entry->data.offset >= metadata->data_count) {
49                ALOGE("%s: Malformed entry data offset: %" PRIu32 " (max %" PRIu32 ")",
50                        __FUNCTION__,
51                        entry->data.offset,
52                        metadata->data_count);
53                continue;
54            }
55            data_ptr = get_data(metadata) + entry->data.offset;
56        } else {
57            data_ptr = entry->data.value;
58        }
59        int count = entry->count;
60        if (verbosity < 2 && count > 16) count = 16;
61
62        print_data(fd, data_ptr, entry->tag, entry->type, count, indentation);
63    }
64}

可以看到这里没有使用相应的write去发送,而是选择使用dprintf的方式来发送。

关于IO函数比较,write和dprintf区别

write

向已打开文件描述符. 将缓存buf内容写count个字节到fd指向的文件. buf不必以null终结符结尾.

1#include <unistd.h>
2ssize_t write(int fd, const void *buf, size_t count);

示例: 向stdout写入所有缓存字符

1char buf[2] = {'a', 'b'};
2int n = write(STDOUT_FILENO, buf, sizeof(buf));
3if (n < 0) {
4 perror("write error");
5 exit(1);
6}

dprintf

将格式化串输出到已打开文件描述符, 其他同printf.

1#include <stdio.h>
2int sprintf(char *str, const char *format, ...);

示例: 向标准输出(默认已打开, 文件描述符 = STDOUT_FILENO)输出

1int age = 20;
2dprintf(STDOUT_FILENO, "my age is %d\n", age);

区别

write函数和dprintf函数,都可以实现将字符串写入文件,但两者的区别在于,write会写入字符串中的NULL字符,因而以vim打开文件后会出现少量乱码,dprintf则不会。

3.2.3接收端

 1//frameworks/base/core/jni/android_hardware_camera2_CameraMetadata.cpp
 2static void CameraMetadata_dump(JNIEnv *env, jclass thiz, jlong ptr) {
 3	//当前线程读取数据
 4    {
 5        char out[] = {'\0', '\0'}; 
 6        String8 logLine;
 7
 8        //每次读取一个数据,当读取到换行符的时候,就把当前缓冲区数据,打印出来,直到全部读完为止
 9        ssize_t res;
10        while ((res = TEMP_FAILURE_RETRY(read(readFd, &out[0], /*count*/1))) > 0) {
11            if (out[0] == '\n') {
12                ALOGD("%s", logLine.string());
13                logLine.clear();
14            } else {
15                logLine.append(out);
16            }
17        }
18
19        ALOGD("%s", logLine.string());
20        close(readFd);
21    }
22    ...
23}

3.2.4总结

其实这一块内容比较简单,主要这里可能涉及到了Camera相关的CameraMetadata数据结构,这个比较复杂,具体关于CameraMetadata可以在参考中查找,这里不展开说明。

3.3AudioGroup相关源码

Audio相关操作中用到了SOCK_DGRAM套接字类型的socketpair。

3.3.1创建socketpair

 1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
 2bool AudioGroup::set(int sampleRate, int sampleCount)
 3{
 4    //这里也涉及到了epoll机制
 5    mEventQueue = epoll_create1(EPOLL_CLOEXEC);
 6    if (mEventQueue == -1) {
 7        ALOGE("epoll_create1: %s", strerror(errno));
 8        return false;
 9    }
10
11    mSampleRate = sampleRate;
12    mSampleCount = sampleCount;
13
14    // 创建socketpair,是UDP的形式
15    int pair[2];
16    if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
17        ALOGE("socketpair: %s", strerror(errno));
18        return false;
19    }
20    //会在线程中使用
21    mDeviceSocket = pair[0];
22
23    // Create device stream.
24    mChain = new AudioStream;
25    //把另一端发给mChain保存
26    if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
27        sampleRate, sampleCount, -1, -1)) {
28        close(pair[1]);
29        ALOGE("cannot initialize device stream");
30        return false;
31    }
32
33    // Give device socket a reasonable timeout.
34    timeval tv;
35    tv.tv_sec = 0;
36    tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
37    //设置接收时间为微妙
38    if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
39        ALOGE("setsockopt: %s", strerror(errno));
40        return false;
41    }
42
43    // Add device stream into event queue.
44    epoll_event event;
45    event.events = EPOLLIN;
46    event.data.ptr = mChain;
47    //将pair[1]加入到epoll中,当发生变化,回调给mChain
48    if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
49        ALOGE("epoll_ctl: %s", strerror(errno));
50        return false;
51    }
52
53    // Anything else?
54    ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
55    return true;
56}

AudioStream::set

这里是一些初始化操作,包括模式,传输头的魔数

 1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
 2bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
 3    AudioCodec *codec, int sampleRate, int sampleCount,
 4    int codecType, int dtmfType)
 5{
 6    if (mode < 0 || mode > LAST_MODE) {
 7        return false;
 8    }
 9    mMode = mode;
10
11    mCodecMagic = (0x8000 | codecType) << 16;
12    mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
13	...
14    // 传入的pair[1]赋值给mSocket
15    mSocket = socket;
16 	...
17    return true;
18}

pair[0]赋值给了mDeviceSocket,pair[1]最终加入到了epoll中,等到写入操作触发。

3.3.2发送端

发送端主要是NetworkThread线程中的encode和decode

 1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
 2bool AudioGroup::NetworkThread::threadLoop()
 3{
 4    AudioStream *chain = mGroup->mChain;
 5    int tick = elapsedRealtime();
 6    int deadline = tick + 10;
 7    int count = 0;
 8	//根据mChain数量来确定epoll等待fd数量
 9    for (AudioStream *stream = chain; stream; stream = stream->mNext) {
10        if (tick - stream->mTick >= 0) {
11            //[1]首先编码传输
12            stream->encode(tick, chain);
13        }
14        if (deadline - stream->mTick > 0) {
15            deadline = stream->mTick;
16        }
17        ++count;
18    }
19
20    int event = mGroup->mDtmfEvent;
21    if (event != -1) {
22        for (AudioStream *stream = chain; stream; stream = stream->mNext) {
23            stream->sendDtmf(event);
24        }
25        mGroup->mDtmfEvent = -1;
26    }
27
28    deadline -= tick;
29    if (deadline < 1) {
30        deadline = 1;
31    }
32
33    epoll_event events[count];
34    //等待时间至少是1s
35    count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
36    if (count == -1) {
37        ALOGE("epoll_wait: %s", strerror(errno));
38        return false;
39    }
40    //有写入事件发生,调用对应的ptr->decode,实际是mChain->decode回调函数
41    //[2]然后解码传输
42    for (int i = 0; i < count; ++i) {
43        ((AudioStream *)events[i].data.ptr)->decode(tick);
44    }
45
46    return true;
47}

上面主要是两个环节,先编码然后解码

3.3.2.1循环调用encode

 1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
 2void AudioStream::encode(int tick, AudioStream *chain)
 3{
 4    ...  
 5    if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
 6        int duration = mTimestamp - mDtmfStart;
 7        // Make sure duration is reasonable.
 8        if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
 9            duration += mSampleCount;
10            int32_t buffer[4] = {
11                static_cast<int32_t>(htonl(mDtmfMagic | mSequence)),
12                static_cast<int32_t>(htonl(mDtmfStart)),
13                static_cast<int32_t>(mSsrc),
14                static_cast<int32_t>(htonl(mDtmfEvent | duration)),
15            };
16            if (duration >= mSampleRate * DTMF_PERIOD) {
17                buffer[3] |= htonl(1 << 23);
18                mDtmfEvent = -1;
19            }
20            //[1]第一次发送一些关于头部的信息,总共16个字节
21            sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
22                (sockaddr *)&mRemote, sizeof(mRemote));
23            return;
24        }
25        mDtmfEvent = -1;
26    }
27    int32_t buffer[mSampleCount + 3];
28    int16_t samples[mSampleCount];
29    // Cook the packet and send it out.
30    buffer[0] = htonl(mCodecMagic | mSequence);
31    buffer[1] = htonl(mTimestamp);
32    buffer[2] = mSsrc;
33    //[2]在发送前先编码,调用AudioCodec编码
34    int length = mCodec->encode(&buffer[3], samples);
35    if (length <= 0) {
36        ALOGV("stream[%d] encoder error", mSocket);
37        return;
38    }
39    //[3]编码之后发送,MSG_DONTWAIT代表无阻塞
40    sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
41        sizeof(mRemote));
42}

3.3.2.2回调到decode函数

当完成发送的时候,epoll会回调到decode函数

 1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
 2void AudioStream::decode(int tick)
 3{
 4    ...
 5    int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
 6    if (count < mSampleCount) {
 7        // 如果发送的数据越界,那么退出
 8        ALOGV("stream[%d] buffer overflow", mSocket);
 9        recv(mSocket, &c, 1, MSG_DONTWAIT);
10        return;
11    }
12
13    // Receive the packet and decode it.
14    int16_t samples[count];
15    if (!mCodec) {
16        ... 
17    } else {
18        //四字节对其
19        __attribute__((aligned(4))) uint8_t buffer[2048];
20        sockaddr_storage remote;
21        socklen_t addrlen = sizeof(remote);
22
23        int bufferSize = sizeof(buffer);
24        //接收数据
25        int length = recvfrom(mSocket, buffer, bufferSize,
26            MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
27
28        //进行数据头部校验
29        if (length < 12 || length > bufferSize ||
30            (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
31            ALOGV("stream[%d] malformed packet", mSocket);
32            return;
33        }
34        int offset = 12 + ((buffer[0] & 0x0F) << 2);
35        if (offset + 2 + (int)sizeof(uint16_t) > length) {
36            ALOGV("invalid buffer offset: %d", offset+2);
37            return;
38        }
39        if ((buffer[0] & 0x10) != 0) {
40            offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
41        }
42        if ((buffer[0] & 0x20) != 0) {
43            length -= buffer[length - 1];
44        }
45        length -= offset;
46        if (length >= 0) {
47            //接收完成后解码,调用AudioCodec
48            length = mCodec->decode(samples, count, &buffer[offset], length);
49        }
50        if (length > 0 && mFixRemote) {
51            mRemote = remote;
52            mFixRemote = false;
53        }
54        count = length;
55    }
56    ...
57}

3.3.3接收端

接收端也是另外一个DeviceThread线程,主要是用于本地的一些实时处理

 1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
 2bool AudioGroup::DeviceThread::threadLoop()
 3{
 4    //在线程初始化的时候就赋值了
 5    int deviceSocket = mGroup->mDeviceSocket;
 6    ...
 7    //[0]设置接收和发送的数据为最小帧数
 8    setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
 9    setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
10
11    // [0]这里主要去除一些错误信息
12    char c;
13    while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
14	...
15    //本地回声处理的一些初始化操作 
16    while (!exitPending()) {
17        int16_t output[sampleCount];
18        //[1]接收数据,缓冲区大小为2*sampleCount
19        if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
20            memset(output, 0, sizeof(output));
21        }
22
23        int16_t input[sampleCount];
24        int toWrite = sampleCount;
25        int toRead = (mode == MUTED) ? 0 : sampleCount;
26        int chances = 100;
27		...
28		//处理数据
29        if (mode != MUTED) {
30            if (echo != NULL) {
31                ALOGV("echo->run()");
32                //[2]回声抵消操作
33                echo->run(output, input);
34            }
35            //[3]处理完成之后,再把处理后的数据发送给对端,MSG_DONTWAIT非阻塞
36            send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
37        }
38    }
39
40exit:
41    delete echo;
42    return true;
43}

3.3.4总结

这里是rtp协议,通过网络形式发送实时流,然后通过编码传输到本地,进行回声抵消等操作,然后处理完成后再从本地发送给网络端,并对其解码,整个流程是UDP的套接字,说明对数据的实时性要求非常高,且允许发送和接收过程丢包。

4总结

总的来说,socketpair就是socket和pipe的结合,具有全双工,并且两端都可读写,不管是在源码中还是平时使用中,特别是多进程/多线程之间的通信,都会涉及到socketpair这一块。本文主要讲了三种类型的套接字类型SOCK_STREAM(TCP流),SOCK_SEQPACKET(整体发送),SOCK_DGRAM(UDP性质的),其实还可以有其他类型,这一块需要读者自己去研究了。耐心阅读,找到最根本的原理,即可一通百通。

参考

[1] 明明1109. Linux C常见数I/O函数比较: printf, sprintf, fprintf, write…, 2021.

[1] Donald_Shallwing. 经验分享:编写简易的温度监控程序(3), 2019.

[1] alibli. Camera MetaData介绍_独家原创, 2023.

[1] IT技术分享网. Android Camera之CameraMetadata分析, 2023.

[1] cfc1243570631. CameraMetadata 知识学习整理, 2022.

[1] armwind. Android Camera之CameraMetadata分析, 2016.

[1] “小夜猫&小懒虫&小财迷"的男人. 【高通SDM660平台】(8) — Camera MetaData介绍, 2020.

[1]程序猿Ricky的日常干货 . Linux fcntl 函数详解, 2019.

[1] 二的次方. Android 图像显示系统 - 基础知识之 BitTube, 2022.

[1] 程序员Android. Android 系统init进程启动流程, 2023.

[1] 晓涵涵. Android中UID、GID和PID的讲解, 2019.

[1] 猿來孺兹. protobuf c/c++详解, 2022.

[1] 书山青鸟叫. 浅谈Linux poll机制, 2022.