forked from cloudevents/sdk-csharp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
JsonEventFormatter.cs
242 lines (218 loc) · 10.5 KB
/
JsonEventFormatter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
namespace CloudNative.CloudEvents
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Mime;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
/// <summary>
/// Formatter that implements the JSON Event Format
/// </summary>
public class JsonEventFormatter : ICloudEventFormatter
{
public const string MediaTypeSuffix = "+json";
public CloudEvent DecodeStructuredEvent(Stream data, params ICloudEventExtension[] extensions)
{
return DecodeStructuredEvent(data, (IEnumerable<ICloudEventExtension>)extensions);
}
public async Task<CloudEvent> DecodeStructuredEventAsync(Stream data, IEnumerable<ICloudEventExtension> extensions)
{
var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true));
var jObject = await JObject.LoadAsync(jsonReader);
return DecodeJObject(jObject, extensions);
}
public CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<ICloudEventExtension> extensions = null)
{
var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true));
var jObject = JObject.Load(jsonReader);
return DecodeJObject(jObject, extensions);
}
public CloudEvent DecodeStructuredEvent(byte[] data, params ICloudEventExtension[] extensions)
{
return DecodeStructuredEvent(data, (IEnumerable<ICloudEventExtension>)extensions);
}
public CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable<ICloudEventExtension> extensions = null)
{
var jsonText = Encoding.UTF8.GetString(data);
var jObject = JObject.Parse(jsonText);
return DecodeJObject(jObject, extensions);
}
public CloudEvent DecodeJObject(JObject jObject, IEnumerable<ICloudEventExtension> extensions = null)
{
CloudEventsSpecVersion specVersion = CloudEventsSpecVersion.Default;
if (jObject.ContainsKey(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_1)) ||
jObject.ContainsKey(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_1).ToLowerInvariant()))
{
specVersion = CloudEventsSpecVersion.V0_1;
}
if (jObject.ContainsKey(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_2)) ||
jObject.ContainsKey(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_2).ToLowerInvariant()))
{
specVersion =
((string)jObject[CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_2)] ==
"0.2")
? CloudEventsSpecVersion.V0_2 :
((string)jObject[CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_3)] ==
"0.3")
? CloudEventsSpecVersion.V0_3 : CloudEventsSpecVersion.Default;
}
var cloudEvent = new CloudEvent(specVersion, extensions);
var attributes = cloudEvent.GetAttributes();
foreach (var keyValuePair in jObject)
{
// skip the version since we set that above
if (keyValuePair.Key.Equals(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_1), StringComparison.InvariantCultureIgnoreCase) ||
keyValuePair.Key.Equals(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_2), StringComparison.InvariantCultureIgnoreCase) ||
keyValuePair.Key.Equals(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V1_0), StringComparison.InvariantCultureIgnoreCase))
{
continue;
}
if (specVersion == CloudEventsSpecVersion.V1_0)
{
// handle base64 encoded binaries
if (keyValuePair.Key.Equals("data_base64"))
{
attributes["data"] = Convert.FromBase64String(keyValuePair.Value.ToString());
continue;
}
}
switch (keyValuePair.Value.Type)
{
case JTokenType.String:
attributes[keyValuePair.Key] = keyValuePair.Value.ToObject<string>();
break;
case JTokenType.Date:
attributes[keyValuePair.Key] = keyValuePair.Value.ToObject<DateTime>();
break;
case JTokenType.Uri:
attributes[keyValuePair.Key] = keyValuePair.Value.ToObject<Uri>();
break;
case JTokenType.Null:
attributes[keyValuePair.Key] = null;
break;
case JTokenType.Integer:
attributes[keyValuePair.Key] = keyValuePair.Value.ToObject<int>();
break;
default:
attributes[keyValuePair.Key] = (dynamic)keyValuePair.Value;
break;
}
}
return cloudEvent;
}
public byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType)
{
contentType = new ContentType("application/cloudevents+json")
{
CharSet = Encoding.UTF8.WebName
};
JObject jObject = new JObject();
var attributes = cloudEvent.GetAttributes();
foreach (var keyValuePair in attributes)
{
if (keyValuePair.Value == null)
{
continue;
}
if (keyValuePair.Value is ContentType && !string.IsNullOrEmpty(((ContentType)keyValuePair.Value).MediaType))
{
jObject[keyValuePair.Key] = JToken.FromObject(((ContentType)keyValuePair.Value).ToString());
}
else if (cloudEvent.SpecVersion == CloudEventsSpecVersion.V1_0 &&
keyValuePair.Key.Equals(CloudEventAttributes.DataAttributeName(cloudEvent.SpecVersion)))
{
if (keyValuePair.Value is Stream)
{
using (var sr = new BinaryReader((Stream)keyValuePair.Value))
{
jObject["data_base64"] = Convert.ToBase64String(sr.ReadBytes((int)sr.BaseStream.Length));
}
}
else if (keyValuePair.Value is IEnumerable<byte>)
{
jObject["data_base64"] =
Convert.ToBase64String(((IEnumerable<byte>)keyValuePair.Value).ToArray());
}
else
{
jObject["data"] = JToken.FromObject(keyValuePair.Value);
}
}
else
{
jObject[keyValuePair.Key] = JToken.FromObject(keyValuePair.Value);
}
}
return Encoding.UTF8.GetBytes(jObject.ToString());
}
public object DecodeAttribute(CloudEventsSpecVersion specVersion, string name, byte[] data, IEnumerable<ICloudEventExtension> extensions = null)
{
if (name.Equals(CloudEventAttributes.IdAttributeName(specVersion)) ||
name.Equals(CloudEventAttributes.TypeAttributeName(specVersion)) ||
name.Equals(CloudEventAttributes.SubjectAttributeName(specVersion)))
{
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string));
}
if (name.Equals(CloudEventAttributes.TimeAttributeName(specVersion)))
{
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(DateTime));
}
if (name.Equals(CloudEventAttributes.SourceAttributeName(specVersion)) ||
name.Equals(CloudEventAttributes.DataSchemaAttributeName(specVersion)))
{
var uri = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string)) as string;
return new Uri(uri);
}
if (name.Equals(CloudEventAttributes.DataContentTypeAttributeName(specVersion)))
{
var s = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string)) as string;
return new ContentType(s);
}
if (extensions != null)
{
foreach (var extension in extensions)
{
Type type = extension.GetAttributeType(name);
if (type != null)
{
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), type);
}
}
}
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data));
}
public byte[] EncodeAttribute(CloudEventsSpecVersion specVersion, string name, object value, IEnumerable<ICloudEventExtension> extensions = null)
{
if (name.Equals(CloudEventAttributes.DataAttributeName(specVersion)))
{
if (value is Stream)
{
using (var buffer = new MemoryStream())
{
((Stream)value).CopyTo(buffer);
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(buffer.ToArray()));
}
}
}
if (extensions != null)
{
foreach (var extension in extensions)
{
Type type = extension.GetAttributeType(name);
if (type != null)
{
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(Convert.ChangeType(value, type)));
}
}
}
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value));
}
}
}