关于socketpair学习篇(包含源码对照解析)
3400 Words|Read in about 16 Min|本文总阅读量次
之前介绍了关于socket的用法,这里在此基础上再来看看跟socket类似的socketpair。
0一个demo
这里以一个例子来说明socketpair的广泛性,这里先介绍一下BitTube类。这里只是简单介绍,具体关于BitTube的使用的原理可以点击这里。
0.1本进程多线程通信
0.2跨进程通信
0.2.1发送句柄到其他进程
发送句柄的过程中,使用了fcntl的函数,是用来修改已经打开文件的属性的函数,这里起到了一个dup复制句柄的作用。
fcntl介绍,具体可以点击这里
1#include <fcntl.h> 2int fcntl(int fd, int cmd, ...);
fcntl是用来修改已经打开文件的属性的函数,包含5个功能:
复制一个已有文件描述符
功能和dup和dup2相同,对应的cmd:
F_DUPFD
、F_DUPFD_CLOEXEC
。当使用这两个cmd时,需要传入第三个参数,fcntl返回复制后的文件描述符,此返回值是之前未被占用的描述符,并且必须一个大于等于第三个参数值。F_DUPFD命令要求返回的文件描述符会清除对应的FD_CLOEXEC标志;F_DUPFD_CLOEXEC要求设置新描述符的FD_CLOEXEC标志。获取、设置文件描述符标志
对应的cmd:
F_GETFD
、F_SETFD
。用于设置FD_CLOEXEC标志,此标志的含义是:当进程执行exec系统调用后此文件描述符会被自动关闭。获取、设置文件访问状态标志
对应的cmd:
F_GETFL
、F_SETFL
。获取当前打开文件的访问标志,设置对应的访问标志,一般常用来设置做非阻塞读写操作。获取、设置记录锁功能
对应的cmd:
F_GETLK
、F_SETLK
、F_SETLKW
。获取、设置异步I/O所有权
对应的cmd:
F_GETOWN
、F_SETOWN
。 获取和设置用来接收SIGIO/SIGURG信号的进程id或者进程组id。返回对应的进程id或者进程组id取负值。
0.2.2从其他进程接收句柄
0.3总结
socketpair利用socket为双方建立了全双工的通信管道(communication pipe)。通过文件描述符的复用(dup/dup2),可以传递socket handle到另一个进程,复用它并开启通信。
BitTube使用了Linux/Unix socket中的顺序数据包(sequenced packets,SOCK_SEQPACKET),像SOCK_DGRAM,它只传送整包数据;又像SOCK_STREAM,面向连接且提供有序的数据报传送。
1socketpair
socketpair,跟socket类似,都为套接字。可以用于网络通信,也可以用于本机内的进程通信。
区别于管道pipe是半双工的,pipe两次才能实现全双工,使得代码复杂。socketpair直接就可以实现全双工,socketpair对两个文件描述符中的任何一个都可读和可写,而pipe是一个读,一个写。
2Linux socketpair
创建一对连接的套接字
2.1函数原型
1#include <sys/types.h>
2#include <sys/socket.h>
3int socketpair(int domain, int type, int protocol, int sv[2]);
- domain:协议族,可以是AF_UNIX或AF_LOCAL。
- type:套接字类型,可以是SOCK_STREAM或SOCK_DGRAM。
- protocol:协议类型,可以是0或IPPROTO_TCP。
- sv:用于存储返回的套接字描述符的数组,其中sv[0]表示第一个套接字描述符,sv[1]表示第二个套接字描述符。
2.2描述
socketpair调用使用可选指定的套接字类型,在指定的域中以指定的类型创建一对未命名的已连接套接字。有关这些参数的更多详细信息,请参见socket,点击这里。
2.3返回值
返回值如果成功,则返回0。如果出现错误,则返回-1,并适当地设置errno。
ERRORNO | 解释 |
---|---|
EAFNOSUPPORT |
在这台计算机上不支持指定的地址族 |
EFAULT |
地址sv没有指定进程地址空间的有效部分 |
EMFILE |
已达到每个进程打开的文件描述符数量限制 |
ENFILE |
已达到打开文件总数的全系统限制 |
EOPNOTSUPP |
指定的协议不支持创建套接字对 |
EPROTONOSUPPORT |
这台机器不支持指定的协议 |
3源码中的socketpair
本文主要介绍三种套接字类型,SOCK_STREAM, SOCK_SEQPACKET,SOCK_DGRAM。但不代表socketpair只支持这三种,其余的套接字类型可以根据兴趣爱好自我探索。
3.1属性系统源码
属性系统中用到了epoll加上SOCK_SEQPACKET套接字类型的socketpair。这个源码更详细的可以点击这里。
3.1.1创建socketpair
创建socketpair,其中套接字类型是SOCK_SEQPACKET
,另外sockets[0]为接收端,sockets[1]为发送端
1//system/core/init/property_service.cpp
2void StartPropertyService(int* epoll_socket) {
3 InitPropertySet("ro.property_service.version", "2");
4
5 int sockets[2];
6 if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, sockets) != 0) {
7 PLOG(FATAL) << "Failed to socketpair() between property_service and init";
8 }
9 //epoll_socket和from_init_socket都为接收端,会返回到init.cpp中的StartPropertyService
10 *epoll_socket = from_init_socket = sockets[0];
11 //init_socket为发送端
12 init_socket = sockets[1];
13 StartSendingMessages();
14 //这里创建了套接字类型为SOCK_STREAM的TCP流的socket,句柄为property_set_fd
15 if (auto result = CreateSocket(PROP_SERVICE_NAME, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK,
16 /*passcred=*/false, /*should_listen=*/false, 0666, /*uid=*/0,
17 /*gid=*/0, /*socketcon=*/{});
18 result.ok()) {
19 property_set_fd = *result;
20 } else {
21 LOG(FATAL) << "start_property_service socket creation failed: " << result.error();
22 }
23
24 listen(property_set_fd, 8);
25
26 auto new_thread = std::thread{PropertyServiceThread};
27 property_service_thread.swap(new_thread);
28}
3.1.1属性系统客户端
属性的服务端是在启动流程中开启的,调用StartPropertyService,并传入一个句柄property_fd,很显然这个句柄获取得到是socketpait对应的socket[0]
1//system/core/init/init.cpp
2int SecondStageMain(int argc, char** argv) {
3 ...
4 StartPropertyService(&property_fd);
5}
但是这里获取到了property_fd,承载着客户端的操作。
这里调用SendLoadPersistentPropertiesMessage,可以发现关于property_fd从客户端发送到socketpair另外一端
1//system/core/init/init.cpp
2void SendLoadPersistentPropertiesMessage() {
3 //这里使用的跨平台的protobuf,会被解析成c++类型
4 auto init_message = InitMessage{};
5 init_message.set_load_persistent_properties(true);
6 if (auto result = SendMessage(property_fd, init_message); !result.ok()) {
7 LOG(ERROR) << "Failed to send load persistent properties message: " << result.error();
8 }
9}
对应的protobuf文件
1//system/core/init/property_service.proto 2message InitMessage { 3 oneof msg { 4 bool load_persistent_properties = 1; 5 bool stop_sending_messages = 2; 6 bool start_sending_messages = 3; 7 }; 8}
其中SendLoadPersistentPropertiesMessage并不是直接启动的,是通过rc文件解析之后Action类型执行的
1//system/core/init/builtins.cpp
2const BuiltinFunctionMap& GetBuiltinFunctionMap() {
3 constexpr std::size_t kMax = std::numeric_limits<std::size_t>::max();
4 // clang-format off
5 static const BuiltinFunctionMap builtin_functions = {
6 ...
7 {"load_persist_props", {0, 0, {false, do_load_persist_props}}},
8 ...
9 };
10 // clang-format on
11 return builtin_functions;
12}
Action类型执行对应的函数do_load_persist_props
1//system/core/init/builtins.cpp
2static Result<void> do_load_persist_props(const BuiltinArguments& args) {
3 SendLoadPersistentPropertiesMessage();
4 //从这里可以看出实际上常量属性加载完成之后才算完成这一步
5 start_waiting_for_property("ro.persistent_properties.ready", "true");
6 return {};
7}
proto
说明关于
proto
之前的Android属性系统中文章中提到过,但没有具体深入,当然本文也不会特别深入。如果需要进一步了解语法规则,可以点击这里。
Protobuf
是一种与平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。而且由于和语言无关,proto
文件既可以转化成c++
文件,也可以转化成java
,python
这类的文件。当然在源码中,可以直接理解为定义了一种数据类,专门用于序列化存储。目前我们定义的
proto
文件,比如上面的property_service.proto
,可以在下面的源码路径中找到11//其中xxx为具体的架构,有arm架构,arm架构等 22/out/soong/.intermediates/system/core/init/libinit/android_xxx_static/gen/proto/system/core/init/property_service.pb.h 33/out/soong/.intermediates/system/core/init/libinit/android_xxx_static/gen/proto/system/core/init/property_service.pb.cc
具体的规则如下,所指定的消息字段修饰符必须是如下之一:
- required:一个格式良好的消息一定要含有1个这种字段。表示该值是必须要设置的;
- optional:消息格式中该字段可以有0个或1个值(不超过1个)。
- repeated:在一个格式良好的消息中,这种字段可以重复任意多次(包括0次)。重复的值的顺序会被保留。表示该值可以重复,相当于java中的List。
关于
Oneof
如果你的消息中有很多可选字段, 并且同时至多一个字段会被设置, 你可以加强这个行为,使用
oneof
特性节省内存.
Oneof
字段就像可选字段, 除了它们会共享内存, 至多一个字段会被设置。 设置其中一个字段会清除其它oneof
字段。 你可以使用case()
或者WhichOneof()
方法检查哪个oneof
字段被设置, 看你使用什么语言了.在产生的代码中,
oneof
字段拥有同样的getters
和setters
, 就像正常的可选字段一样. 也有一个特殊的方法来检查到底那个字段被设置. 你可以在相应的语言API中找到oneof API
介绍.1message SampleMessage { 2 oneof test_oneof { 3 string name = 4; 4 SubMessage sub_message = 9; 5 } 6}
设置
oneof
会自动清楚其它oneof
字段的值. 所以设置多次后,只有最后一次设置的字段有值.1SampleMessage message; 2message.set_name("name"); 3CHECK(message.has_name()); 4message.mutable_sub_message(); // Will clear name field. 5CHECK(!message.has_name());
根据上面可以看到protobuf文件,实际上我们获取了序列化的内容,即load_persistent_properties为true。
3.1.2属性系统服务端
服务端的话就是上述的线程,property_service_thread,对应的线程函数为PropertyServiceThread
1//system/core/init/property_service.cpp
2static void PropertyServiceThread() {
3 //源码封装的epoll
4 Epoll epoll;
5 //就是创造epoll_fd句柄
6 if (auto result = epoll.Open(); !result.ok()) {
7 LOG(FATAL) << result.error();
8 }
9 //将property_set_fd注册到epoll中,回调函数为handle_property_set_fd
10 if (auto result = epoll.RegisterHandler(property_set_fd, handle_property_set_fd);
11 !result.ok()) {
12 LOG(FATAL) << result.error();
13 }
14 //将init_socket注册到epoll中,回调函数为HandleInitSocket
15 if (auto result = epoll.RegisterHandler(init_socket, HandleInitSocket); !result.ok()) {
16 LOG(FATAL) << result.error();
17 }
18 //循环等待结果
19 while (true) {
20 //这里默认是无限等待,一直阻塞,直到有事件来的时候返回,并执行对应的回调函数
21 auto pending_functions = epoll.Wait(std::nullopt);
22 if (!pending_functions.ok()) {
23 LOG(ERROR) << pending_functions.error();
24 } else {
25 for (const auto& function : *pending_functions) {
26 (*function)();
27 }
28 }
29 }
30}
可以看到这里面又涉及到了epoll,不过这个epoll在源码中重新封装成了Epoll
1//system/core/init/epoll.h
2class Epoll {
3 public:
4 Epoll();
5 //回调函数Handler
6 typedef std::function<void()> Handler;
7
8 Result<void> Open();
9 //注册的默认事件为EPOLLIN
10 Result<void> RegisterHandler(int fd, Handler handler, uint32_t events = EPOLLIN);
11 Result<void> UnregisterHandler(int fd);
12 Result<std::vector<std::shared_ptr<Handler>>> Wait(
13 std::optional<std::chrono::milliseconds> timeout);
14
15 private:
16 //将回调函数的事件封装到一起
17 struct Info {
18 std::shared_ptr<Handler> handler;
19 uint32_t events;
20 };
21
22 android::base::unique_fd epoll_fd_;
23 std::map<int, Info> epoll_handlers_;
24};
1//system/core/init/epoll.cpp
2Result<void> Epoll::Open() {
3 epoll_fd_.reset(epoll_create1(EPOLL_CLOEXEC));
4 ...
5}
6
7Result<void> Epoll::RegisterHandler(int fd, Handler handler, uint32_t events) {
8 Info info;
9 info.events = events;
10 info.handler = std::make_shared<decltype(handler)>(std::move(handler));
11
12 auto [it, inserted] = epoll_handlers_.emplace(fd, std::move(info));
13 epoll_event ev;
14 ev.events = events;
15 ev.data.ptr = reinterpret_cast<void*>(&it->second);
16 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
17}
18
19Result<std::vector<std::shared_ptr<Epoll::Handler>>> Epoll::Wait(
20 std::optional<std::chrono::milliseconds> timeout) {
21 int timeout_ms = -1;
22
23 const auto max_events = epoll_handlers_.size();
24 epoll_event ev[max_events];
25 //默认无限阻塞,直到有EPOLLIN事件过来
26 auto num_events = TEMP_FAILURE_RETRY(epoll_wait(epoll_fd_, ev, max_events, timeout_ms));
27
28 std::vector<std::shared_ptr<Handler>> pending_functions;
29 for (int i = 0; i < num_events; ++i) {
30 //取出事件中的ptr,即对应的info指针
31 auto& info = *reinterpret_cast<Info*>(ev[i].data.ptr);
32 //将对应事件放入到数组中,通过数组来依次排队输出
33 pending_functions.emplace_back(info.handler);
34 }
35
36 return pending_functions;
37}
最终会回调到两个函数,这两个函数的先后顺序是由于哪个事件先触发先调用的原则,会被放在pending_functions队列中。
3.1.2.1handle_property_set_fd
这里回调实际上是对应socket,而不是socketpair,这里更多的是IO多路复用的经典场景,监听socket句柄,有新的连接进来了就会回调到这个函数。
1//system/core/init/property_service.cpp
2static void handle_property_set_fd() {
3 static constexpr uint32_t kDefaultSocketTimeout = 2000; /* ms */
4 //接收客户端的句柄,服务端这里与之连接的句柄为s,
5 //其实这里可以把s也添加到epoll中,但是源码中并没有这么操作,原因是更高的处理效率,而是用了poll机制
6 int s = accept4(property_set_fd, nullptr, nullptr, SOCK_CLOEXEC);
7 if (s == -1) {
8 return;
9 }
10 //这里包含权限问题,pid,uid,gid,
11 ucred cr;
12 socklen_t cr_size = sizeof(cr);
13 if (getsockopt(s, SOL_SOCKET, SO_PEERCRED, &cr, &cr_size) < 0) {
14 ...
15 }
16
17 SocketConnection socket(s, cr);
18 uint32_t timeout_ms = kDefaultSocketTimeout;
19
20 uint32_t cmd = 0;
21 if (!socket.RecvUint32(&cmd, &timeout_ms)) {
22 ...
23 return;
24 }
25
26 switch (cmd) {
27 ...
28 //这里默认使用PROP_MSG_SETPROP2 0x00020001
29 case PROP_MSG_SETPROP2: {
30 std::string name;
31 std::string value;
32 if (!socket.RecvString(&name, &timeout_ms) ||
33 !socket.RecvString(&value, &timeout_ms)) {
34 ...
35 }
36
37 std::string source_context;
38 if (!socket.GetSourceContext(&source_context)) {
39 ...
40 }
41
42 const auto& cr = socket.cred();
43 std::string error;
44 uint32_t result = HandlePropertySet(name, value, source_context, cr, &socket, &error);
45 ...
46 socket.SendUint32(result);
47 break;
48 }
49
50 default:
51 ...
52 }
53}
这里还涉及到了系统管理的id问题
这里只是简单的介绍一下UID,PID和GID的关系,具体内容可以点击这里。
UID
在Linux中用户的概念分为:普通用户、根用户和系统用户。
Linux用户 解释说明 普通用户 表示平时使用的用户概念,在使用Linux时,需要通过用户名和密码登录,获取该用户相应的权限,其权限具体表现在对系统中文件的增删改查和命令执行的限制,不同用户具有不同的权限设置,其UID通常大于500 根用户 该用户就是ROOT用户,其UID为0,可以对系统中任何文件进行增删改查处理,执行任何命令,因此ROOT用户极其危险,如操作不当,会导致系统彻底崩掉 系统用户 该用户是系统虚拟出的用户概念,不对使用者开发的用户,其UID范围为1-499,例如运行MySQL数据库服务时,需要使用系统用户mysql来运行mysqld进程 PID
系统在程序运行时,会为每个可执行程序分配一个唯一的进程ID(PID),PID的直接作用是为了表明该程序所拥有的文件操作权限,不同的可执行程序运行时互不影响,相互之间的数据访问具有权限限制。
GID
GID顾名思义就是对于UID的封装处理,就是包含多个UID的意思,实际上在Linux下每个UID都对应着一个GID。设计GID是为了便于对系统的统一管理,例如增加某个文件的用户权限时,只对admin组的用户开放,那么在分配权限时,只需对该组分配,其组下的所有用户均获取权限。同样在删除时,也便于统一操作。
除了UID和GID外,其还包括其扩展的有效的用户、组(euid、egid)、文件系统的用户、组(fsuid、fsgid)和保存的设置用户、组(suid、sgid)等。
- 不同的应用具有唯一的UID,同一个UID可具有不同的PID;
- 针对不同的PID之间数据的暴露可采用私有暴露和权限暴露,针对不同的UID之间可通过完全暴露的方式;
- 如果一个应用是系统应用,则不需要其他应用暴露,便可直接访问该应用的数据。
另外上面的socket还存在封装,封装成了对应的SocketConnection
1//system/core/init/property_service.cpp
2class SocketConnection {
3 public:
4 SocketConnection(int socket, const ucred& cred) : socket_(socket), cred_(cred) {}
5 ...
6 //以接收string为例,另外的int和char都是类似
7 bool RecvString(std::string* value, uint32_t* timeout_ms) {
8 uint32_t len = 0;
9 /
10 if (!RecvUint32(&len, timeout_ms)) {
11 return false;
12 }
13
14 std::vector<char> chars(len);
15 if (!RecvChars(&chars[0], len, timeout_ms)) {
16 return false;
17 }
18
19 *value = std::string(&chars[0], len);
20 return true;
21 }
22
23 bool SendUint32(uint32_t value) {
24 if (!socket_.ok()) {
25 return true;
26 }
27 //发送的话,直接发送
28 int result = TEMP_FAILURE_RETRY(send(socket_, &value, sizeof(value), 0));
29 return result == sizeof(value);
30 }
31
32 const ucred& cred() { return cred_; }
33
34 private:
35 //接收的过程用到了poll机制,这里传递进来的timeout_ms为2s
36 bool PollIn(uint32_t* timeout_ms) {
37 struct pollfd ufds[1];
38 ufds[0].fd = socket_;
39 ufds[0].events = POLLIN;
40 ufds[0].revents = 0;
41 while (*timeout_ms > 0) {
42 auto start_time = std::chrono::steady_clock::now();
43 //对socket_,其实就是s进行监听,是否会超时。只有fds中准备好读写,返回值nr大于0为s
44 int nr = poll(ufds, 1, *timeout_ms);
45 auto now = std::chrono::steady_clock::now();
46 auto time_elapsed =
47 std::chrono::duration_cast<std::chrono::milliseconds>(now - start_time);
48 uint64_t millis = time_elapsed.count();
49 *timeout_ms = (millis > *timeout_ms) ? 0 : *timeout_ms - millis;
50
51 if (nr > 0) {
52 return true;
53 }
54
55 if (nr == 0) {
56 // Timeout
57 break;
58 }
59
60 if (nr < 0 && errno != EINTR) {
61 PLOG(ERROR) << "sys_prop: error waiting for uid " << cred_.uid
62 << " to send property message";
63 return false;
64 } else { // errno == EINTR
65 // Timer rounds milliseconds down in case of EINTR we want it to be rounded up
66 // to avoid slowing init down by causing EINTR with under millisecond timeout.
67 if (*timeout_ms > 0) {
68 --(*timeout_ms);
69 }
70 }
71 }
72
73 LOG(ERROR) << "sys_prop: timeout waiting for uid " << cred_.uid
74 << " to send property message.";
75 return false;
76 }
77
78 bool RecvFully(void* data_ptr, size_t size, uint32_t* timeout_ms) {
79 size_t bytes_left = size;
80 char* data = static_cast<char*>(data_ptr);
81 while (*timeout_ms > 0 && bytes_left > 0) {
82 //用于判断接收是否超时2s,这里是不允许超时的
83 if (!PollIn(timeout_ms)) {
84 return false;
85 }
86 //这里实际上是流的形式,所以或循环去读取的操作
87 int result = TEMP_FAILURE_RETRY(recv(socket_, data, bytes_left, MSG_DONTWAIT));
88 if (result <= 0) {
89 PLOG(ERROR) << "sys_prop: recv error";
90 return false;
91 }
92
93 bytes_left -= result;
94 data += result;
95 }
96
97 if (bytes_left != 0) {
98 LOG(ERROR) << "sys_prop: recv data is not properly obtained.";
99 }
100
101 return bytes_left == 0;
102 }
103
104 unique_fd socket_;
105 ucred cred_;
106
107 DISALLOW_IMPLICIT_CONSTRUCTORS(SocketConnection);
108};
内容比较多
接收。以接收string为例,另外的int和char都是类似,循环recv接收数据。先判断传输的string长度,长度限制为0-2^16-1。然后接收的过程以char类型接收,接收完成之后再转成string类。
发送。已发送int为例,这里发送直接通过send发送数据。
poll机制
poll是Linux的
事件轮询机制
函数,每个进程都可以管理一个pollfd
队列,由poll函数
进行事件注册和查询。内核将用户的fds结构体数组
拷贝到内核中。当有事件发生时,再将所有事件都返回到fds结构体数组
中,poll只返回已就绪事件的个数,所以用户要操作就绪事件就要用轮询
的方法。1)函数原型
1#include <poll.h> 2int poll(struct pollfd* fds, nfds_t nfds, int timeout);
- **fds:**是一个struct pollfd类型的指针,用于存放需要检测其状态的socket描述符
- **nfds:**是nfd_t类型的参数,用于标记fds数组中结构体元素的数量
- **timeout:**没有接受事件时等待的事件,单位毫秒,若值为-1,则永远不会超时
2)pollfd结构体
1struct pollfd 2{ 3 int fd; 4 short events; 5 short revents; 6}
- **fd:**文件描述符
- events:等待发生的事件类型
- **revents:**检测之后返回的事件,当某个文件描述符有变化时,值就不为空
事件标识 说明 POLLIN
普通或优先级带数据可读 POLLRDNORM
普通数据可读 POLLRDBAND
优先级带数据可读 POLLPRI
高优先级数据可读 POLLOUT
普通数据可写 POLLWRNORM
普通数据可写不会导致阻塞 POLLERR
发生错误 POLLHUP
发生挂起 POLLNVAL
描述字不是一个打开的文件 3)poll返回值
poll机制会判断fds中的文件是否满足条件,如果休眠时间内条件满足则会唤醒进程;超过休眠时间,条件一直不满足则自动唤醒。
- 返回值>0:fds中准备好读写,或出错状态的那些socket描述符
- **返回值=0:**fds中没有socket描述符需要读写或出错;此时poll超时,时长为timeout
- **返回值=-1:**调用失败
poll机制也是IO多路复用的一种方式,这里主要是承接连接的客户端的socket,对应服务端会有一个连接socket句柄s,对其poll轮询机制,可以更好的去获取对应的TCP流。
这里的s接收到的数据,实际上是客户端发送的数据。
3.1.2.2HandleInitSocket
1//system/core/init/property_service.cpp
2static void HandleInitSocket() {
3 //处理socketpair对端传过来的数据
4 auto message = ReadMessage(init_socket);
5 if (!message.ok()) {
6 LOG(ERROR) << "Could not read message from init_dedicated_recv_socket: " << message.error();
7 return;
8 }
9 //初始化proto数据
10 auto init_message = InitMessage{};
11 //将序列化数据转化成string类
12 if (!init_message.ParseFromString(*message)) {
13 LOG(ERROR) << "Could not parse message from init";
14 return;
15 }
16 //判断属性系统传过来的数据类型,是否为kLoadPersistentProperties
17 switch (init_message.msg_case()) {
18 case InitMessage::kLoadPersistentProperties: {
19 load_override_properties();
20 // 导入persist属性,并且初始化
21 auto persistent_properties = LoadPersistentProperties();
22 for (const auto& persistent_property_record : persistent_properties.properties()) {
23 InitPropertySet(persistent_property_record.name(),
24 persistent_property_record.value());
25 }
26 InitPropertySet("ro.persistent_properties.ready", "true");
27 persistent_properties_loaded = true;
28 break;
29 }
30 default:
31 LOG(ERROR) << "Unknown message type from init: " << init_message.msg_case();
32 }
33}
上面主要做了几件事
- 处理socketpair对端传过来的数据
- 将proto序列化数据转化成string类
- 判断属性系统传过来的数据类型,如果是就加载并初始化persist属性
ReadMessage
1//system/core/init/proto_utils.h
2constexpr size_t kBufferSize = 4096;
3
4inline Result<std::string> ReadMessage(int socket) {
5 char buffer[kBufferSize] = {};
6 auto result = TEMP_FAILURE_RETRY(recv(socket, buffer, sizeof(buffer), 0));
7 if (result == 0) {
8 return Error();
9 } else if (result < 0) {
10 return ErrnoError();
11 }
12 return std::string(buffer, result);
13}
这个具体数据,实际上就是对应的msg_case为InitMessage::kLoadPersistentProperties的数据。
3.1.3总结
实际上,这里的socket的作用意图很明显,将Init进程和非Init进程的隔离开来,非Init进程操作属性的写入需要转到Init进程去操作,而非Init进程和Init进程通信这里用到了本地的socket通信。这样,属性的管控者就是Init进程,可以做到过滤隔离作用。
除此之外,这里还涉及到了epoll、poll、protobuf、socket和socketpair等概念,充分理解和掌握这些基本概念之后,对于源码的阅读可以理解的更加深入。
3.2Camera dump数据源码
Camera dump操作中用到了SOCK_STREAM套接字类型的socketpair。
3.2.1创建socketpair
1//frameworks/base/core/jni/android_hardware_camera2_CameraMetadata.cpp
2static void CameraMetadata_dump(JNIEnv *env, jclass thiz, jlong ptr) {
3 ALOGV("%s", __FUNCTION__);
4 CameraMetadata* metadata = CameraMetadata_getPointerThrow(env, ptr);
5 if (metadata == NULL) {
6 return;
7 }
8 int writeFd, readFd;
9 {
10 int sv[2];
11 //这里创建socketpair,使用TCP的形式,一端发送,一端接受
12 if (socketpair(AF_LOCAL, SOCK_STREAM, /*protocol*/0, &sv[0]) < 0) {
13 jniThrowExceptionFmt(env, "java/io/IOException",
14 "Failed to create socketpair (errno = %#x, message = '%s')",
15 errno, strerror(errno));
16 return;
17 }
18 writeFd = sv[0];
19 readFd = sv[1];
20 }
21
22 pthread_t writeThread;
23 DumpMetadataParams params = {
24 writeFd,
25 metadata
26 };
27 //开启write线程,作为写入段
28 {
29 int threadRet = pthread_create(&writeThread, /*attr*/NULL,
30 CameraMetadata_writeMetadataThread, (void*)¶ms);
31
32 if (threadRet != 0) {
33 close(writeFd);
34 close(readFd);
35
36 jniThrowExceptionFmt(env, "java/io/IOException",
37 "Failed to create thread for writing (errno = %#x, message = '%s')",
38 threadRet, strerror(threadRet));
39 return;
40 }
41 }
42 ...
43
44 int res;
45
46 // 等待数据完成后,退出线程
47 if ((res = pthread_join(writeThread, /*retval*/NULL)) != 0) {
48 ALOGE("%s: Failed to join thread (errno = %#x, message = '%s')",
49 __FUNCTION__, res, strerror(res));
50 }
51}
3.2.2发送端
子线程用来发送数据
1//frameworks/base/core/jni/android_hardware_camera2_CameraMetadata.cpp
2struct DumpMetadataParams {
3 int writeFd;
4 const CameraMetadata* metadata;
5};
6
7static void* CameraMetadata_writeMetadataThread(void* arg) {
8 DumpMetadataParams* p = static_cast<DumpMetadataParams*>(arg);
9
10 p->metadata->dump(p->writeFd, /*verbosity*/2);
11
12 if (close(p->writeFd) < 0) {
13 ALOGE("%s: Failed to close writeFd (errno = %#x, message = '%s')",
14 __FUNCTION__, errno, strerror(errno));
15 }
16
17 return NULL;
18}
这里的p为DumpMetadataParams是结构体类型,实际上是CameraMetadata数据dump
1//frameworks/av/camera/CameraMetadata.cpp
2camera_metadata_t *mBuffer;
3void dump(int fd, int verbosity = 1, int indentation = 0) const;
4void CameraMetadata::dump(int fd, int verbosity, int indentation) const {
5 dump_indented_camera_metadata(mBuffer, fd, verbosity, indentation);
6}
调用到dump_indented_camera_metadata
1//system/media/camera/src/camera_metadata.c
2void dump_indented_camera_metadata(const camera_metadata_t *metadata,
3 int fd,
4 int verbosity,
5 int indentation) {
6 unsigned int i;
7 dprintf(fd,
8 "%*sDumping camera metadata array: %" PRIu32 " / %" PRIu32 " entries, "
9 "%" PRIu32 " / %" PRIu32 " bytes of extra data.\n", indentation, "",
10 metadata->entry_count, metadata->entry_capacity,
11 metadata->data_count, metadata->data_capacity);
12 dprintf(fd, "%*sVersion: %d, Flags: %08x\n",
13 indentation + 2, "",
14 metadata->version, metadata->flags);
15 camera_metadata_buffer_entry_t *entry = get_entries(metadata);
16 for (i=0; i < metadata->entry_count; i++, entry++) {
17
18 const char *tag_name, *tag_section;
19 tag_section = get_local_camera_metadata_section_name(entry->tag, metadata);
20 if (tag_section == NULL) {
21 tag_section = "unknownSection";
22 }
23 tag_name = get_local_camera_metadata_tag_name(entry->tag, metadata);
24 if (tag_name == NULL) {
25 tag_name = "unknownTag";
26 }
27 const char *type_name;
28 if (entry->type >= NUM_TYPES) {
29 type_name = "unknown";
30 } else {
31 type_name = camera_metadata_type_names[entry->type];
32 }
33 dprintf(fd, "%*s%s.%s (%05x): %s[%" PRIu32 "]\n",
34 indentation + 2, "",
35 tag_section,
36 tag_name,
37 entry->tag,
38 type_name,
39 entry->count);
40
41 if (verbosity < 1) continue;
42
43 if (entry->type >= NUM_TYPES) continue;
44
45 size_t type_size = camera_metadata_type_size[entry->type];
46 uint8_t *data_ptr;
47 if ( type_size * entry->count > 4 ) {
48 if (entry->data.offset >= metadata->data_count) {
49 ALOGE("%s: Malformed entry data offset: %" PRIu32 " (max %" PRIu32 ")",
50 __FUNCTION__,
51 entry->data.offset,
52 metadata->data_count);
53 continue;
54 }
55 data_ptr = get_data(metadata) + entry->data.offset;
56 } else {
57 data_ptr = entry->data.value;
58 }
59 int count = entry->count;
60 if (verbosity < 2 && count > 16) count = 16;
61
62 print_data(fd, data_ptr, entry->tag, entry->type, count, indentation);
63 }
64}
可以看到这里没有使用相应的write去发送,而是选择使用dprintf的方式来发送。
关于IO函数比较,write和dprintf区别
write
向已打开文件描述符. 将缓存buf内容写count个字节到fd指向的文件. buf不必以null终结符结尾.
1#include <unistd.h> 2ssize_t write(int fd, const void *buf, size_t count);
示例: 向stdout写入所有缓存字符
1char buf[2] = {'a', 'b'}; 2int n = write(STDOUT_FILENO, buf, sizeof(buf)); 3if (n < 0) { 4 perror("write error"); 5 exit(1); 6}
dprintf
将格式化串输出到已打开文件描述符, 其他同printf.
1#include <stdio.h> 2int sprintf(char *str, const char *format, ...);
示例: 向标准输出(默认已打开, 文件描述符 = STDOUT_FILENO)输出
1int age = 20; 2dprintf(STDOUT_FILENO, "my age is %d\n", age);
区别
write函数和dprintf函数,都可以实现将字符串写入文件,但两者的区别在于,write会写入字符串中的NULL字符,因而以vim打开文件后会出现少量乱码,dprintf则不会。
3.2.3接收端
1//frameworks/base/core/jni/android_hardware_camera2_CameraMetadata.cpp
2static void CameraMetadata_dump(JNIEnv *env, jclass thiz, jlong ptr) {
3 //当前线程读取数据
4 {
5 char out[] = {'\0', '\0'};
6 String8 logLine;
7
8 //每次读取一个数据,当读取到换行符的时候,就把当前缓冲区数据,打印出来,直到全部读完为止
9 ssize_t res;
10 while ((res = TEMP_FAILURE_RETRY(read(readFd, &out[0], /*count*/1))) > 0) {
11 if (out[0] == '\n') {
12 ALOGD("%s", logLine.string());
13 logLine.clear();
14 } else {
15 logLine.append(out);
16 }
17 }
18
19 ALOGD("%s", logLine.string());
20 close(readFd);
21 }
22 ...
23}
3.2.4总结
其实这一块内容比较简单,主要这里可能涉及到了Camera相关的CameraMetadata数据结构,这个比较复杂,具体关于CameraMetadata可以在参考中查找,这里不展开说明。
3.3AudioGroup相关源码
Audio相关操作中用到了SOCK_DGRAM套接字类型的socketpair。
3.3.1创建socketpair
1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
2bool AudioGroup::set(int sampleRate, int sampleCount)
3{
4 //这里也涉及到了epoll机制
5 mEventQueue = epoll_create1(EPOLL_CLOEXEC);
6 if (mEventQueue == -1) {
7 ALOGE("epoll_create1: %s", strerror(errno));
8 return false;
9 }
10
11 mSampleRate = sampleRate;
12 mSampleCount = sampleCount;
13
14 // 创建socketpair,是UDP的形式
15 int pair[2];
16 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
17 ALOGE("socketpair: %s", strerror(errno));
18 return false;
19 }
20 //会在线程中使用
21 mDeviceSocket = pair[0];
22
23 // Create device stream.
24 mChain = new AudioStream;
25 //把另一端发给mChain保存
26 if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
27 sampleRate, sampleCount, -1, -1)) {
28 close(pair[1]);
29 ALOGE("cannot initialize device stream");
30 return false;
31 }
32
33 // Give device socket a reasonable timeout.
34 timeval tv;
35 tv.tv_sec = 0;
36 tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
37 //设置接收时间为微妙
38 if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
39 ALOGE("setsockopt: %s", strerror(errno));
40 return false;
41 }
42
43 // Add device stream into event queue.
44 epoll_event event;
45 event.events = EPOLLIN;
46 event.data.ptr = mChain;
47 //将pair[1]加入到epoll中,当发生变化,回调给mChain
48 if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
49 ALOGE("epoll_ctl: %s", strerror(errno));
50 return false;
51 }
52
53 // Anything else?
54 ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
55 return true;
56}
AudioStream::set
这里是一些初始化操作,包括模式,传输头的魔数
1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
2bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
3 AudioCodec *codec, int sampleRate, int sampleCount,
4 int codecType, int dtmfType)
5{
6 if (mode < 0 || mode > LAST_MODE) {
7 return false;
8 }
9 mMode = mode;
10
11 mCodecMagic = (0x8000 | codecType) << 16;
12 mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
13 ...
14 // 传入的pair[1]赋值给mSocket
15 mSocket = socket;
16 ...
17 return true;
18}
pair[0]赋值给了mDeviceSocket,pair[1]最终加入到了epoll中,等到写入操作触发。
3.3.2发送端
发送端主要是NetworkThread线程中的encode和decode
1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
2bool AudioGroup::NetworkThread::threadLoop()
3{
4 AudioStream *chain = mGroup->mChain;
5 int tick = elapsedRealtime();
6 int deadline = tick + 10;
7 int count = 0;
8 //根据mChain数量来确定epoll等待fd数量
9 for (AudioStream *stream = chain; stream; stream = stream->mNext) {
10 if (tick - stream->mTick >= 0) {
11 //[1]首先编码传输
12 stream->encode(tick, chain);
13 }
14 if (deadline - stream->mTick > 0) {
15 deadline = stream->mTick;
16 }
17 ++count;
18 }
19
20 int event = mGroup->mDtmfEvent;
21 if (event != -1) {
22 for (AudioStream *stream = chain; stream; stream = stream->mNext) {
23 stream->sendDtmf(event);
24 }
25 mGroup->mDtmfEvent = -1;
26 }
27
28 deadline -= tick;
29 if (deadline < 1) {
30 deadline = 1;
31 }
32
33 epoll_event events[count];
34 //等待时间至少是1s
35 count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
36 if (count == -1) {
37 ALOGE("epoll_wait: %s", strerror(errno));
38 return false;
39 }
40 //有写入事件发生,调用对应的ptr->decode,实际是mChain->decode回调函数
41 //[2]然后解码传输
42 for (int i = 0; i < count; ++i) {
43 ((AudioStream *)events[i].data.ptr)->decode(tick);
44 }
45
46 return true;
47}
上面主要是两个环节,先编码然后解码
3.3.2.1循环调用encode
1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
2void AudioStream::encode(int tick, AudioStream *chain)
3{
4 ...
5 if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
6 int duration = mTimestamp - mDtmfStart;
7 // Make sure duration is reasonable.
8 if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
9 duration += mSampleCount;
10 int32_t buffer[4] = {
11 static_cast<int32_t>(htonl(mDtmfMagic | mSequence)),
12 static_cast<int32_t>(htonl(mDtmfStart)),
13 static_cast<int32_t>(mSsrc),
14 static_cast<int32_t>(htonl(mDtmfEvent | duration)),
15 };
16 if (duration >= mSampleRate * DTMF_PERIOD) {
17 buffer[3] |= htonl(1 << 23);
18 mDtmfEvent = -1;
19 }
20 //[1]第一次发送一些关于头部的信息,总共16个字节
21 sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
22 (sockaddr *)&mRemote, sizeof(mRemote));
23 return;
24 }
25 mDtmfEvent = -1;
26 }
27 int32_t buffer[mSampleCount + 3];
28 int16_t samples[mSampleCount];
29 // Cook the packet and send it out.
30 buffer[0] = htonl(mCodecMagic | mSequence);
31 buffer[1] = htonl(mTimestamp);
32 buffer[2] = mSsrc;
33 //[2]在发送前先编码,调用AudioCodec编码
34 int length = mCodec->encode(&buffer[3], samples);
35 if (length <= 0) {
36 ALOGV("stream[%d] encoder error", mSocket);
37 return;
38 }
39 //[3]编码之后发送,MSG_DONTWAIT代表无阻塞
40 sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
41 sizeof(mRemote));
42}
3.3.2.2回调到decode函数
当完成发送的时候,epoll会回调到decode函数
1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
2void AudioStream::decode(int tick)
3{
4 ...
5 int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
6 if (count < mSampleCount) {
7 // 如果发送的数据越界,那么退出
8 ALOGV("stream[%d] buffer overflow", mSocket);
9 recv(mSocket, &c, 1, MSG_DONTWAIT);
10 return;
11 }
12
13 // Receive the packet and decode it.
14 int16_t samples[count];
15 if (!mCodec) {
16 ...
17 } else {
18 //四字节对其
19 __attribute__((aligned(4))) uint8_t buffer[2048];
20 sockaddr_storage remote;
21 socklen_t addrlen = sizeof(remote);
22
23 int bufferSize = sizeof(buffer);
24 //接收数据
25 int length = recvfrom(mSocket, buffer, bufferSize,
26 MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
27
28 //进行数据头部校验
29 if (length < 12 || length > bufferSize ||
30 (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
31 ALOGV("stream[%d] malformed packet", mSocket);
32 return;
33 }
34 int offset = 12 + ((buffer[0] & 0x0F) << 2);
35 if (offset + 2 + (int)sizeof(uint16_t) > length) {
36 ALOGV("invalid buffer offset: %d", offset+2);
37 return;
38 }
39 if ((buffer[0] & 0x10) != 0) {
40 offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
41 }
42 if ((buffer[0] & 0x20) != 0) {
43 length -= buffer[length - 1];
44 }
45 length -= offset;
46 if (length >= 0) {
47 //接收完成后解码,调用AudioCodec
48 length = mCodec->decode(samples, count, &buffer[offset], length);
49 }
50 if (length > 0 && mFixRemote) {
51 mRemote = remote;
52 mFixRemote = false;
53 }
54 count = length;
55 }
56 ...
57}
3.3.3接收端
接收端也是另外一个DeviceThread线程,主要是用于本地的一些实时处理
1//frameworks/opt/net/voip/src/jni/rtp/AudioGroup.cpp
2bool AudioGroup::DeviceThread::threadLoop()
3{
4 //在线程初始化的时候就赋值了
5 int deviceSocket = mGroup->mDeviceSocket;
6 ...
7 //[0]设置接收和发送的数据为最小帧数
8 setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
9 setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
10
11 // [0]这里主要去除一些错误信息
12 char c;
13 while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
14 ...
15 //本地回声处理的一些初始化操作
16 while (!exitPending()) {
17 int16_t output[sampleCount];
18 //[1]接收数据,缓冲区大小为2*sampleCount
19 if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
20 memset(output, 0, sizeof(output));
21 }
22
23 int16_t input[sampleCount];
24 int toWrite = sampleCount;
25 int toRead = (mode == MUTED) ? 0 : sampleCount;
26 int chances = 100;
27 ...
28 //处理数据
29 if (mode != MUTED) {
30 if (echo != NULL) {
31 ALOGV("echo->run()");
32 //[2]回声抵消操作
33 echo->run(output, input);
34 }
35 //[3]处理完成之后,再把处理后的数据发送给对端,MSG_DONTWAIT非阻塞
36 send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
37 }
38 }
39
40exit:
41 delete echo;
42 return true;
43}
3.3.4总结
这里是rtp协议,通过网络形式发送实时流,然后通过编码传输到本地,进行回声抵消等操作,然后处理完成后再从本地发送给网络端,并对其解码,整个流程是UDP的套接字,说明对数据的实时性要求非常高,且允许发送和接收过程丢包。
4总结
总的来说,socketpair就是socket和pipe的结合,具有全双工,并且两端都可读写,不管是在源码中还是平时使用中,特别是多进程/多线程之间的通信,都会涉及到socketpair这一块。本文主要讲了三种类型的套接字类型SOCK_STREAM
(TCP流),SOCK_SEQPACKET
(整体发送),SOCK_DGRAM
(UDP性质的),其实还可以有其他类型,这一块需要读者自己去研究了。耐心阅读,找到最根本的原理,即可一通百通。
参考
[1] 明明1109. Linux C常见数I/O函数比较: printf, sprintf, fprintf, write…, 2021.
[1] Donald_Shallwing. 经验分享:编写简易的温度监控程序(3), 2019.
[1] alibli. Camera MetaData介绍_独家原创, 2023.
[1] IT技术分享网. Android Camera之CameraMetadata分析, 2023.
[1] cfc1243570631. CameraMetadata 知识学习整理, 2022.
[1] armwind. Android Camera之CameraMetadata分析, 2016.
[1] “小夜猫&小懒虫&小财迷"的男人. 【高通SDM660平台】(8) — Camera MetaData介绍, 2020.
[1]程序猿Ricky的日常干货 . Linux fcntl 函数详解, 2019.
[1] 二的次方. Android 图像显示系统 - 基础知识之 BitTube, 2022.
[1] 程序员Android. Android 系统init进程启动流程, 2023.
[1] 晓涵涵. Android中UID、GID和PID的讲解, 2019.