2 using System.Threading;
3 using System.Collections.Generic;
7 using System.Reactive.Linq;
9 namespace Sasa.Reactive
12 /// Indicates the thread should retry its updates.
14 public sealed class RetryException : System.Exception
19 /// Indicates an error occurred while propagating the update.
21 public sealed class UpdateException : System.Exception
24 /// Construct an exception.
26 /// <param name="message">The message to report.</param>
27 /// <param name="inner">The inner exception.</param>
28 public UpdateException(string message, Exception inner)
29 : base(message, inner)
36 public sealed class NoValueException : System.Exception
38 internal static NoValueException instance = new NoValueException();
40 : base("The Property<T> currently has no value.")
46 /// An observable property that retains only the current value, and notifies clients only of distinct changes.
48 /// <typeparam name="T">The type of values.</typeparam>
50 /// Clients should first invoke <seealso cref="Ticket.Acquire" /> or <seealso cref="Transaction.Begin" /> before operating on signals.
52 public class Property<T> : IObservable<T>, IObserver<T>, IDisposable, IRef<T>, IValue<T>, IResult<T>
54 // the queue of subscribed observers
57 // the current value or error
59 internal Exception error;
60 // monotonically increasing version number; crude approximation of transactional memory
61 long version = long.MinValue;
62 // the current subscription
63 IDisposable subscription;
67 /// Initialize an empty signal.
69 public Property() : this(NoValueException.instance)
73 /// Initialize a signal with a default value.
75 /// <param name="value">The default value of the signal.</param>
76 public Property(T value)
82 /// Initialize a signal with a default value.
84 /// <param name="error">The default value of the signal.</param>
85 public Property(Exception error)
87 head = tail = new Node();
91 /// Initialize the signal with the given observable sequence.
93 /// <param name="observable">The observable to listen to.</param>
94 public Property(IObservable<T> observable)
95 : this(observable.FirstOrDefault())
100 /// Dispose of a <seealso cref="Property{T}"/>.
107 /// The current value of the signal.
109 /// <exception cref="System.Exception">Thrown if the current value is an error.</exception>
114 //FIXME: why lock this?
118 if (error != null) throw error;
125 //FIXME: why lock this?
129 changed = !EqualityComparer<T>.Default.Equals(this.value, this.value = value);
132 if (changed) Notify(head.next, value, null);
136 /// The current error value of the signal, if any.
138 public Exception Error
155 changed = !EqualityComparer<Exception>.Default.Equals(this.error, this.error = value);
158 if (changed) Notify(head.next, ovalue, value);
162 /// True if the <seealso cref="Property{T}"/> has a value, and no error.
166 get { return Error != null; }
169 /// Obtains the encapsulated value, if any.
171 /// <param name="value">The value to extract.</param>
172 /// <returns>True if value was available, false otherwise.</returns>
173 public bool TryGetValue(out T value)
176 return this.error == null;
179 /// Returns true if this signal has subscribed to observables.
181 public bool IsSubscribed
183 get { return subscription != null; }
186 /// Notify <paramref name="observer"/> of any changes to this signal.
188 /// <param name="observer">The observer to notify.</param>
189 /// <returns>A disposable handle representing the subscription.</returns>
190 public IDisposable Subscribe(IObserver<T> observer)
192 // notify current observer
195 //FIXME: why acquire ticket?
203 var node = Enqueue(observer);
204 // does not notify if property has not yet been initialized
205 Notify(node, value, error);
206 // if ticket acquired locally, then release it immediately after notifications complete
208 return new DisposeObserver { node = node, property = this };
211 /// Subscribe to any changes made to this property.
213 /// <param name="onNext">The delegate to invoke for a new value.</param>
214 /// <returns>A disposable handle representing the subscription.</returns>
215 public IDisposable Subscribe(Action<T> onNext)
217 return Subscribe(Observer.Create(onNext));
220 /// Subscribe to any changes made to this property.
222 /// <param name="onNext">The delegate to invoke for a new value.</param>
223 /// <param name="onError">The delegate to invoke when the property receives an error.</param>
224 /// <returns>A disposable handle representing the subscription.</returns>
225 public IDisposable Subscribe(Action<T> onNext, Action<Exception> onError)
227 return Subscribe(Observer.Create(onNext, onError));
230 /// Subscribe to any changes made to this property.
232 /// <param name="onNext">The delegate to invoke for a new value.</param>
233 /// <param name="onError">The delegate to invoke on error.</param>
234 /// <param name="onDispose">The delegate to invoke when the property is disposed.</param>
235 /// <returns>A disposable handle representing the subscription.</returns>
236 public IDisposable Subscribe(Action<T> onNext, Action<Exception> onError, Action onDispose)
238 return Subscribe(Observer.Create(onNext, onError, onDispose));
241 /// Subscribe this signal to listen to <paramref name="observable"/>.
243 /// <param name="observable">The observable to listen to.</param>
244 public void Switch(IObservable<T> observable)
246 //FIXME: why acquire this lock?
250 if (subscription != null) subscription.Dispose();
251 subscription = observable.Subscribe(this);
256 /// Dispose of any subscriptions.
258 public void Dispose()
264 if (subscription != null) subscription.Dispose();
265 head = this.head.next;
266 this.head.next = null;
268 for (var y = head; y != null; y = y.next)
272 y.observer.OnCompleted();
281 /// Generates a string representation.
283 /// <returns>A string representation of this property.</returns>
284 public override string ToString()
286 return Value.ToString();
291 void IObserver<T>.OnNext(T value)
295 void IObserver<T>.OnError(Exception error)
299 void IObserver<T>.OnCompleted()
306 if (version > Ticket.Current) throw new RetryException();
307 version = Ticket.Current;
311 if (tail == null) throw new ObjectDisposedException("Property<T>", "This property has been disposed.");
313 void Notify(Node head, T value, Exception error)
317 if (error is NoValueException) return;
318 for (var x = head; x != null; x = x.next)
320 Notify(x.observer, error);
325 for (var x = head; x != null; x = x.next)
327 Notify(x.observer, value);
331 void Notify(IObserver<T> x, T value)
337 catch (RetryException)
339 //FIXME: should this propagate, ignore, or accumulate errors?
340 throw; // propagate retry exceptions
344 // notify only the current observer of the error
345 Notify(x, new UpdateException("Error propagating value", e));
348 void Notify(IObserver<T> x, Exception error)
357 catch (RetryException)
359 //FIXME: should this propagate, ignore, or accumulate errors?
360 throw; // propagate retry exceptions
364 // try notifying of new update error
365 error = new UpdateException("Error propagating exception.", e);
370 internal Node Enqueue(IObserver<T> o)
374 tail.next = new Node { observer = o };
375 return tail = tail.next;
378 void Remove(Node node)
380 // must lock for thread-safety:
381 // head -> o1 -> o2 -> o3
382 //T1:rem(o1) T2:rem(o2)
383 //head.n = o1.n o1.n = o2.n
386 for (var x = head; x.next != null; x = x.next)
390 // must lock in case node is tail
391 lock (tail) x.next = node.next;
399 #region Public operators
401 /// Implicitly create a <seealso cref="Property{T}"/>.
403 /// <param name="value">The default value of the property.</param>
404 /// <returns>A property with the given value.</returns>
405 public static implicit operator Property<T>(T value)
407 return new Property<T>(value);
410 /// Implicitly create a <seealso cref="Property{T}"/>.
412 /// <param name="error">The starting error of the property.</param>
413 /// <returns>A property with the given error.</returns>
414 public static implicit operator Property<T>(Exception error)
416 return new Property<T>(error);
418 //public static implicit operator Signal<T>(IObservable<T> value)
420 // return new Signal<T>(value);
423 /// Merge events from two sources.
425 /// <param name="lhs">The first property.</param>
426 /// <param name="rhs">The second property.</param>
427 /// <returns>A stream accumulating the changes from both sources.</returns>
428 public static IObservable<T> operator |(Property<T> lhs, IObservable<T> rhs)
430 return lhs.Merge(rhs);
433 /// Merge events from two properties.
435 /// <param name="lhs">The first property.</param>
436 /// <param name="rhs">The second property.</param>
437 /// <returns>A stream accumulating the changes from both sources.</returns>
438 public static IObservable<T> operator |(IObservable<T> lhs, Property<T> rhs)
440 return lhs.Merge(rhs);
443 /// Merge events from two properties.
445 /// <param name="lhs">The first property.</param>
446 /// <param name="rhs">The second property.</param>
447 /// <returns>A stream accumulating the changes from both sources.</returns>
448 public static IObservable<T> operator |(Property<T> lhs, Property<T> rhs)
450 return lhs.Merge(rhs);
454 class DisposeObserver : IDisposable
457 internal Property<T> property;
462 public void Dispose()
464 var x = Interlocked.CompareExchange(ref property, null, property);
468 GC.SuppressFinalize(this);
472 internal sealed class Node
474 internal IObserver<T> observer;
478 #region Linq extensions
482 /// <typeparam name="R"></typeparam>
483 /// <param name="selector"></param>
484 /// <returns></returns>
485 public Property<R> Select<R>(Func<T, R> selector)
487 var x = new Property<R>();
488 x.subscription = Subscribe(t =>
492 x.Value = selector(t);
502 public Property<R> SelectMany<R>(Func<T, Property<R>> selector)
504 var x = new Property<R>();
505 x.subscription = Subscribe(t =>
509 x.Value = selector(t).Value;
515 }, e => x.Error = e);
518 public Property<R> SelectMany<U, R>(Func<T, Property<U>> collector, Func<T, U, R> selector)
520 var x = new Property<R>();
521 x.subscription = Subscribe(t =>
525 x.Value = selector(t, collector(t).Value);
531 }, e => x.Error = e);
534 public Property<T> Where(Func<T, bool> predicate)
536 var x = new Property<T>();
537 x.subscription = Subscribe(t =>
541 if (predicate(t)) x.Value = t;
551 public Property<T> Skip(int count)
553 var x = new Property<T>();
554 x.subscription = Subscribe(t =>
556 if (count-- > 0) return;
570 /// Register a permanent event handler.
572 /// <param name="onNext">The handler to invoke.</param>
573 public void Register(Action<T> onNext)
575 Enqueue(Observer.Create(onNext));
580 /// Some convenient extensions.
582 public static class Property
585 /// Bind an object property.
587 /// <typeparam name="T">The type of the property value.</typeparam>
588 /// <param name="value">The current property value.</param>
589 /// <param name="setter">A function used to set the underlying property value.</param>
590 /// <returns>A property that's bidirectionally bound to the underlying property.</returns>
592 /// The returned Property<T> must be the ONLY way that the underlying object property is updated,
593 /// otherwise they will become out of sync. Use the more elaborate Bind operation if the underlying
594 /// property is updated by other means.
596 public static Property<T> Bind<T>(this T value, Action<T> setter)
598 var prop = new Property<T>(value);
599 prop.Enqueue(Observer.Create(setter));
603 /// Bind an object property.
605 /// <typeparam name="T">The type of the property value.</typeparam>
606 /// <typeparam name="TEventArgs">The event args.</typeparam>
607 /// <param name="getter">A function used to obtain the underlying property value.</param>
608 /// <param name="setter">A function used to set the underlying property value.</param>
609 /// <param name="register">Registers an event handler that notifies when the underlying property has been updated.</param>
610 /// <param name="unregister">Unregisters an event handler for the underlying property.</param>
611 /// <returns>A property that's bidirectionally bound to an underlying object property.</returns>
612 public static Property<T> Bind<T, TEventArgs>(Func<T> getter, Action<T> setter, Action<EventHandler<TEventArgs>> register, Action<EventHandler<TEventArgs>> unregister = null)
613 where TEventArgs : EventArgs
615 var prop = new Property<T>();
616 EventHandler<TEventArgs> h = (o, e) =>
620 prop.Value = getter();
627 // invoke event handler to set the initial property
628 h(prop, default(TEventArgs));
629 prop.Enqueue(Observer.Create(setter, () => unregister(h)));