1/*
2 * Copyright (C) 2018 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include "MessagePortChannel.h"
28
29#include "Logging.h"
30#include "MessagePortChannelRegistry.h"
31#include <wtf/CompletionHandler.h>
32#include <wtf/MainThread.h>
33
34namespace WebCore {
35
36Ref<MessagePortChannel> MessagePortChannel::create(MessagePortChannelRegistry& registry, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
37{
38 return adoptRef(*new MessagePortChannel(registry, port1, port2));
39}
40
41MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
42 : m_registry(registry)
43{
44 ASSERT(isMainThread());
45
46 relaxAdoptionRequirement();
47
48 m_ports[0] = port1;
49 m_processes[0] = port1.processIdentifier;
50 m_entangledToProcessProtectors[0] = this;
51 m_ports[1] = port2;
52 m_processes[1] = port2.processIdentifier;
53 m_entangledToProcessProtectors[1] = this;
54
55 m_registry.messagePortChannelCreated(*this);
56}
57
58MessagePortChannel::~MessagePortChannel()
59{
60 m_registry.messagePortChannelDestroyed(*this);
61}
62
63Optional<ProcessIdentifier> MessagePortChannel::processForPort(const MessagePortIdentifier& port)
64{
65 ASSERT(isMainThread());
66 ASSERT(port == m_ports[0] || port == m_ports[1]);
67 size_t i = port == m_ports[0] ? 0 : 1;
68 return m_processes[i];
69}
70
71bool MessagePortChannel::includesPort(const MessagePortIdentifier& port)
72{
73 ASSERT(isMainThread());
74
75 return m_ports[0] == port || m_ports[1] == port;
76}
77
78void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& port, ProcessIdentifier process)
79{
80 ASSERT(isMainThread());
81
82 ASSERT(port == m_ports[0] || port == m_ports[1]);
83 size_t i = port == m_ports[0] ? 0 : 1;
84
85 LOG(MessagePorts, "MessagePortChannel %s (%p) entangling port %s (that port has %zu messages available)", logString().utf8().data(), this, port.logString().utf8().data(), m_pendingMessages[i].size());
86
87 ASSERT(!m_processes[i] || *m_processes[i] == process);
88 m_processes[i] = process;
89 m_entangledToProcessProtectors[i] = this;
90 m_pendingMessagePortTransfers[i].remove(this);
91}
92
93void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
94{
95 ASSERT(isMainThread());
96
97 LOG(MessagePorts, "MessagePortChannel %s (%p) disentangling port %s", logString().utf8().data(), this, port.logString().utf8().data());
98
99 ASSERT(port == m_ports[0] || port == m_ports[1]);
100 size_t i = port == m_ports[0] ? 0 : 1;
101
102 ASSERT(m_processes[i] || m_isClosed[i]);
103 m_processes[i] = WTF::nullopt;
104 m_pendingMessagePortTransfers[i].add(this);
105
106 // This set of steps is to guarantee that the lock is unlocked before the
107 // last ref to this object is released.
108 auto protectedThis = WTFMove(m_entangledToProcessProtectors[i]);
109}
110
111void MessagePortChannel::closePort(const MessagePortIdentifier& port)
112{
113 ASSERT(isMainThread());
114
115 ASSERT(port == m_ports[0] || port == m_ports[1]);
116 size_t i = port == m_ports[0] ? 0 : 1;
117
118 m_processes[i] = WTF::nullopt;
119 m_isClosed[i] = true;
120
121 // This set of steps is to guarantee that the lock is unlocked before the
122 // last ref to this object is released.
123 auto protectedThis = makeRef(*this);
124
125 m_pendingMessages[i].clear();
126 m_pendingMessagePortTransfers[i].clear();
127 m_pendingMessageProtectors[i] = nullptr;
128 m_entangledToProcessProtectors[i] = nullptr;
129}
130
131bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
132{
133 ASSERT(isMainThread());
134
135 ASSERT(remoteTarget == m_ports[0] || remoteTarget == m_ports[1]);
136 size_t i = remoteTarget == m_ports[0] ? 0 : 1;
137
138 m_pendingMessages[i].append(WTFMove(message));
139 LOG(MessagePorts, "MessagePortChannel %s (%p) now has %zu messages pending on port %s", logString().utf8().data(), this, m_pendingMessages[i].size(), remoteTarget.logString().utf8().data());
140
141 if (m_pendingMessages[i].size() == 1) {
142 m_pendingMessageProtectors[i] = this;
143 return true;
144 }
145
146 ASSERT(m_pendingMessageProtectors[i] == this);
147 return false;
148}
149
150void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& port, Function<void(Vector<MessageWithMessagePorts>&&, Function<void()>&&)>&& callback)
151{
152 ASSERT(isMainThread());
153
154 LOG(MessagePorts, "MessagePortChannel %p taking all messages for port %s", this, port.logString().utf8().data());
155
156 ASSERT(port == m_ports[0] || port == m_ports[1]);
157 size_t i = port == m_ports[0] ? 0 : 1;
158
159 if (m_pendingMessages[i].isEmpty()) {
160 callback({ }, [] { });
161 return;
162 }
163
164 ASSERT(m_pendingMessageProtectors[i]);
165
166 Vector<MessageWithMessagePorts> result;
167 result.swap(m_pendingMessages[i]);
168
169 ++m_messageBatchesInFlight;
170
171 LOG(MessagePorts, "There are %zu messages to take for port %s. Taking them now, messages in flight is now %" PRIu64, result.size(), port.logString().utf8().data(), m_messageBatchesInFlight);
172
173 auto size = result.size();
174 callback(WTFMove(result), [size, this, port, protectedThis = WTFMove(m_pendingMessageProtectors[i])] {
175 UNUSED_PARAM(port);
176#if LOG_DISABLED
177 UNUSED_PARAM(size);
178#endif
179 --m_messageBatchesInFlight;
180 LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, logString().utf8().data(), size, port.logString().utf8().data(), m_messageBatchesInFlight);
181
182 });
183}
184
185void MessagePortChannel::checkRemotePortForActivity(const MessagePortIdentifier& remotePort, CompletionHandler<void(MessagePortChannelProvider::HasActivity)>&& callback)
186{
187 ASSERT(isMainThread());
188 ASSERT(remotePort == m_ports[0] || remotePort == m_ports[1]);
189
190 // If the remote port is closed there is no pending activity.
191 size_t i = remotePort == m_ports[0] ? 0 : 1;
192 if (m_isClosed[i]) {
193 callback(MessagePortChannelProvider::HasActivity::No);
194 return;
195 }
196
197 // If there are any messages in flight between the ports, there is pending activity.
198 if (hasAnyMessagesPendingOrInFlight()) {
199 callback(MessagePortChannelProvider::HasActivity::Yes);
200 return;
201 }
202
203 // If the port is not currently in a process then it's being transferred as part of a postMessage.
204 // We treat these ports as if they do have activity since they will be revived when the message is delivered.
205 if (!m_processes[i]) {
206 callback(MessagePortChannelProvider::HasActivity::Yes);
207 return;
208 }
209
210 auto outerCallback = CompletionHandler<void(MessagePortChannelProvider::HasActivity)> { [this, protectedThis = makeRef(*this), callback = WTFMove(callback)] (MessagePortChannelProvider::HasActivity hasActivity) mutable {
211 if (hasActivity == MessagePortChannelProvider::HasActivity::Yes) {
212 callback(hasActivity);
213 return;
214 }
215
216 // If the remote port said it had no activity, check again for any messages that might be in flight.
217 // This is because it might have asynchronously sent a message just before it was asked about local activity.
218 if (hasAnyMessagesPendingOrInFlight())
219 hasActivity = MessagePortChannelProvider::HasActivity::Yes;
220
221 callback(hasActivity);
222 } };
223
224 m_registry.provider().checkProcessLocalPortForActivity(remotePort, *m_processes[i], WTFMove(outerCallback));
225}
226
227bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const
228{
229 ASSERT(isMainThread());
230 return m_messageBatchesInFlight || !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty();
231}
232
233} // namespace WebCore
234