Skip to content

Commit

Permalink
re-indented TCA files
Browse files Browse the repository at this point in the history
  • Loading branch information
ab1470 committed Oct 18, 2024
1 parent 8b91668 commit d88655e
Show file tree
Hide file tree
Showing 9 changed files with 3,158 additions and 3,158 deletions.
396 changes: 198 additions & 198 deletions Sources/KlaviyoSwift/Vendor/ComposableArchitecture/Cancellation.swift

Large diffs are not rendered by default.

Large diffs are not rendered by default.

282 changes: 141 additions & 141 deletions Sources/KlaviyoSwift/Vendor/ComposableArchitecture/Create.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,170 +33,170 @@ import Combine
import Darwin

final class DemandBuffer<S: Subscriber>: @unchecked Sendable {
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()
private let lock: os_unfair_lock_t

init(subscriber: S) {
self.subscriber = subscriber
self.lock = os_unfair_lock_t.allocate(capacity: 1)
self.lock.initialize(to: os_unfair_lock())
}

deinit {
self.lock.deinitialize(count: 1)
self.lock.deallocate()
}

func buffer(value: S.Input) -> Subscribers.Demand {
precondition(
self.completion == nil, "How could a completed publisher sent values?! Beats me 🤷‍♂️")

switch demandState.requested {
case .unlimited:
return subscriber.receive(value)
default:
buffer.append(value)
return flush()
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()
private let lock: os_unfair_lock_t

init(subscriber: S) {
self.subscriber = subscriber
self.lock = os_unfair_lock_t.allocate(capacity: 1)
self.lock.initialize(to: os_unfair_lock())
}
}

func complete(completion: Subscribers.Completion<S.Failure>) {
precondition(
self.completion == nil, "Completion have already occurred, which is quite awkward 🥺")

self.completion = completion
_ = flush()
}

func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
}

private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
self.lock.sync {

if let newDemand = newDemand {
demandState.requested += newDemand
}

// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }

while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
demandState.processed += 1
}

if let completion = completion {
// Completion event was already sent
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
}

let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand

deinit {
self.lock.deinitialize(count: 1)
self.lock.deallocate()
}
}

struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}
func buffer(value: S.Input) -> Subscribers.Demand {
precondition(
self.completion == nil, "How could a completed publisher sent values?! Beats me 🤷‍♂️")

switch demandState.requested {
case .unlimited:
return subscriber.receive(value)
default:
buffer.append(value)
return flush()
}
}

extension AnyPublisher {
private init(
_ callback: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable
) {
self = Publishers.Create(callback: callback).eraseToAnyPublisher()
}

static func create(
_ factory: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable
) -> AnyPublisher<Output, Failure> {
AnyPublisher(factory)
}
}
func complete(completion: Subscribers.Completion<S.Failure>) {
precondition(
self.completion == nil, "Completion have already occurred, which is quite awkward 🥺")

extension Publishers {
fileprivate class Create<Output, Failure: Swift.Error>: Publisher {
private let callback: (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable
self.completion = completion
_ = flush()
}

init(callback: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable) {
self.callback = callback
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
}

func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure {
subscriber.receive(subscription: Subscription(callback: callback, downstream: subscriber))
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
self.lock.sync {

if let newDemand = newDemand {
demandState.requested += newDemand
}

// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }

while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
demandState.processed += 1
}

if let completion = completion {
// Completion event was already sent
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
}

let sentDemand = demandState.requested - demandState.sent
demandState.sent += sentDemand
return sentDemand
}
}

struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}
}

extension Publishers.Create {
fileprivate final class Subscription<Downstream: Subscriber>: Combine.Subscription
where Downstream.Input == Output, Downstream.Failure == Failure {
private let buffer: DemandBuffer<Downstream>
private var cancellable: Cancellable?

init(
callback: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable,
downstream: Downstream
extension AnyPublisher {
private init(
_ callback: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable
) {
self.buffer = DemandBuffer(subscriber: downstream)

let cancellable = callback(
.init(
send: { [weak self] in _ = self?.buffer.buffer(value: $0) },
complete: { [weak self] in self?.buffer.complete(completion: $0) }
)
)
self = Publishers.Create(callback: callback).eraseToAnyPublisher()
}

self.cancellable = cancellable
static func create(
_ factory: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable
) -> AnyPublisher<Output, Failure> {
AnyPublisher(factory)
}
}

extension Publishers {
fileprivate class Create<Output, Failure: Swift.Error>: Publisher {
private let callback: (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable

init(callback: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable) {
self.callback = callback
}

func request(_ demand: Subscribers.Demand) {
_ = self.buffer.demand(demand)
func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure {
subscriber.receive(subscription: Subscription(callback: callback, downstream: subscriber))
}
}
}

func cancel() {
self.cancellable?.cancel()
extension Publishers.Create {
fileprivate final class Subscription<Downstream: Subscriber>: Combine.Subscription
where Downstream.Input == Output, Downstream.Failure == Failure {
private let buffer: DemandBuffer<Downstream>
private var cancellable: Cancellable?

init(
callback: @escaping (EffectPublisher<Output, Failure>.Subscriber) -> Cancellable,
downstream: Downstream
) {
self.buffer = DemandBuffer(subscriber: downstream)

let cancellable = callback(
.init(
send: { [weak self] in _ = self?.buffer.buffer(value: $0) },
complete: { [weak self] in self?.buffer.complete(completion: $0) }
)
)

self.cancellable = cancellable
}

func request(_ demand: Subscribers.Demand) {
_ = self.buffer.demand(demand)
}

func cancel() {
self.cancellable?.cancel()
}
}
}
}

extension Publishers.Create.Subscription: CustomStringConvertible {
var description: String {
return "Create.Subscription<\(Output.self), \(Failure.self)>"
}
var description: String {
return "Create.Subscription<\(Output.self), \(Failure.self)>"
}
}

extension EffectPublisher {
struct Subscriber {
private let _send: (Action) -> Void
private let _complete: (Subscribers.Completion<Failure>) -> Void

init(
send: @escaping (Action) -> Void,
complete: @escaping (Subscribers.Completion<Failure>) -> Void
) {
self._send = send
self._complete = complete
}

func send(_ value: Action) {
self._send(value)
}

func send(completion: Subscribers.Completion<Failure>) {
self._complete(completion)
struct Subscriber {
private let _send: (Action) -> Void
private let _complete: (Subscribers.Completion<Failure>) -> Void

init(
send: @escaping (Action) -> Void,
complete: @escaping (Subscribers.Completion<Failure>) -> Void
) {
self._send = send
self._complete = complete
}

func send(_ value: Action) {
self._send(value)
}

func send(completion: Subscribers.Completion<Failure>) {
self._complete(completion)
}
}
}
}
Loading

0 comments on commit d88655e

Please sign in to comment.